您的位置:首页 > 其它

MapReduce之Reduce端Join实现

2015-12-20 00:20 337 查看
MapReduce之Helloword很简单,但是要想继续自己来写,还有很多坑要注意。这不,咱们这回来个Reduce端Join实现。网上一搜,有很多,我们要来个自己的吧。

废话少说,先上传一些数据到HDFS上。

input/pd.csv(假设这个是产品名称文件)

1,a,apple

2,b,orange

3,c,banana

input/sale.csv(假设这个是销售记录,数据和单价)

1,2,3

2,3,5

3,4,6

很明显,他们通过第一列ID来join.

基本思路:根据文件名称判断哪个是产品(左边),哪个是记录(右边),Mapper之后全部以ID->值的方式保存。

在Reduce阶段,相同key的数据全部在一起,这样通过名称是字符,销量是数字来简单判断左边和右边,输出结果。

下面直接上代码:

package com.abc.test;

import java.io.IOException;

import java.util.ArrayList;

import java.util.HashMap;

import java.util.Map;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.InputFormat;

import org.apache.hadoop.mapred.TextOutputFormat;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.OutputFormat;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

public class TestJoin {

public static class MyMapper extends Mapper<Object, Text, Text, Text> {

@Override

public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

// StringTokenizer itr = new StringTokenizer(value.toString());

String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();

// System.out.println("File="+pathName);

String str = value.toString();

if (pathName.endsWith("pd.csv")) {

Text jkey = new Text();

String ss[] = str.split(",");

jkey.set(ss[0]);

System.out.println("File=" + pathName + "" + jkey.toString() + "," + new Text(ss[1]));

context.write(jkey, new Text(ss[2])); //(ID 名称) 为key - value

} else if (pathName.endsWith("sale.csv")) {

String ss[] = str.split(",");

Text jkey = new Text();

jkey.set(ss[0]);

double d = Double.parseDouble(ss[1]) * Double.parseDouble(ss[2]);

IntWritable v = new IntWritable((int) d);

System.out.println("File=" + pathName + "" + jkey.toString() + "," + new Text("" + (int) d));

context.write(jkey, new Text("" + (int) d)); //(ID 销量) 为key - value

}

}

}

public static class MyReducer extends Reducer<Text, Text, Text, Text> {

private Text result = new Text();

@Override

public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

//int sum = 0;

String str = key.toString();

Text name = new Text();

Text money = new Text();

for (Text val : values) {

//System.out.println(str+"=" + val.toString());

try {

int i = Integer.parseInt(val.toString());

money = new Text(""+i);

} catch (Exception e) {

name = new Text(val.toString()); //Please notice here!!! 不要直接name = val,可能出错

}

}

result.set(name+","+money);

context.write(key, result);

}

}

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();

// conf.set("mapred.tasktracker.reduce.tasks.maximum", "3");

String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

if (otherArgs.length < 2) {

System.err.println("Usage: wordcount <in> [<in>...] <out>");

System.exit(2);

}

Job job = new Job(conf, "test join");

job.setJarByClass(TestJoin.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(Text.class);

job.setMapperClass(MyMapper.class);

job.setReducerClass(MyReducer.class);

//job.setNumReduceTasks(1);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(Text.class);

for (int i = 0; i < otherArgs.length - 1; ++i) {

FileInputFormat.addInputPath(job, new Path(otherArgs[i]));

}

long t = System.currentTimeMillis();

FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1] + "/" + t));

System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}

运行的时候指定参数为

hdfs://192.168.6.128:9000/input/

hdfs://192.168.6.128:9000/result

这样得到的输出结果为:

1 apple,6

2 orange,15

3 banana,24

中间需要注意的地方,一不小心就中雷了:

1. 在map和reduce方法上加个@Override有好处,这样方便查看是否符合接口规范,参数是否有错。

2.出现recude好像结果没有合并,找不到左边或者右边,结果文件中显示部分值为空,可能需要注意中间的值形式

不要随便遍历Iterable<Text> values

不要随便设置Combiner //job.setCombinerClass(MyReducer.class);

不要随意static

赋值的时候要考虑是否指向地址还是copy。

3.自定义类型的时候可能遇到Type mismatch in key from map: expected org.apache.hadoop.io.Text, recieved org.apache.hadoop.io.LongWritable

这个时候要检查map和reduce的输入输出类型是否正确。例如:

public static class MyMapper extends Mapper<Object, Text, Text, MyValues>

这里的前面两个是输入的类型,后面两个是map的输出类型

public static class MyReducer extends Reducer<Text, MyValues, Text, Text>

这里前面两个是Map的输出,也是Reduce的输入,后面两个是Reduce的输出类型

4.使用2.0版的API和以前旧的方式有所不同,很容易混淆,例如

import org.apache.hadoop.mapred.InputFormat;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

这里的包mapred和mapreduce有不同的。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: