MapReduce之Reduce端Join实现
2015-12-20 00:20
337 查看
MapReduce之Helloword很简单,但是要想继续自己来写,还有很多坑要注意。这不,咱们这回来个Reduce端Join实现。网上一搜,有很多,我们要来个自己的吧。
废话少说,先上传一些数据到HDFS上。
input/pd.csv(假设这个是产品名称文件)
1,a,apple
2,b,orange
3,c,banana
input/sale.csv(假设这个是销售记录,数据和单价)
1,2,3
2,3,5
3,4,6
很明显,他们通过第一列ID来join.
基本思路:根据文件名称判断哪个是产品(左边),哪个是记录(右边),Mapper之后全部以ID->值的方式保存。
在Reduce阶段,相同key的数据全部在一起,这样通过名称是字符,销量是数字来简单判断左边和右边,输出结果。
下面直接上代码:
package com.abc.test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class TestJoin {
public static class MyMapper extends Mapper<Object, Text, Text, Text> {
@Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
// StringTokenizer itr = new StringTokenizer(value.toString());
String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();
// System.out.println("File="+pathName);
String str = value.toString();
if (pathName.endsWith("pd.csv")) {
Text jkey = new Text();
String ss[] = str.split(",");
jkey.set(ss[0]);
System.out.println("File=" + pathName + "" + jkey.toString() + "," + new Text(ss[1]));
context.write(jkey, new Text(ss[2])); //(ID 名称) 为key - value
} else if (pathName.endsWith("sale.csv")) {
String ss[] = str.split(",");
Text jkey = new Text();
jkey.set(ss[0]);
double d = Double.parseDouble(ss[1]) * Double.parseDouble(ss[2]);
IntWritable v = new IntWritable((int) d);
System.out.println("File=" + pathName + "" + jkey.toString() + "," + new Text("" + (int) d));
context.write(jkey, new Text("" + (int) d)); //(ID 销量) 为key - value
}
}
}
public static class MyReducer extends Reducer<Text, Text, Text, Text> {
private Text result = new Text();
@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//int sum = 0;
String str = key.toString();
Text name = new Text();
Text money = new Text();
for (Text val : values) {
//System.out.println(str+"=" + val.toString());
try {
int i = Integer.parseInt(val.toString());
money = new Text(""+i);
} catch (Exception e) {
name = new Text(val.toString()); //Please notice here!!! 不要直接name = val,可能出错
}
}
result.set(name+","+money);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
// conf.set("mapred.tasktracker.reduce.tasks.maximum", "3");
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = new Job(conf, "test join");
job.setJarByClass(TestJoin.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
//job.setNumReduceTasks(1);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
long t = System.currentTimeMillis();
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1] + "/" + t));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
运行的时候指定参数为
hdfs://192.168.6.128:9000/input/
hdfs://192.168.6.128:9000/result
这样得到的输出结果为:
1 apple,6
2 orange,15
3 banana,24
中间需要注意的地方,一不小心就中雷了:
1. 在map和reduce方法上加个@Override有好处,这样方便查看是否符合接口规范,参数是否有错。
2.出现recude好像结果没有合并,找不到左边或者右边,结果文件中显示部分值为空,可能需要注意中间的值形式
不要随便遍历Iterable<Text> values
不要随便设置Combiner //job.setCombinerClass(MyReducer.class);
不要随意static
赋值的时候要考虑是否指向地址还是copy。
3.自定义类型的时候可能遇到Type mismatch in key from map: expected org.apache.hadoop.io.Text, recieved org.apache.hadoop.io.LongWritable
这个时候要检查map和reduce的输入输出类型是否正确。例如:
public static class MyMapper extends Mapper<Object, Text, Text, MyValues>
这里的前面两个是输入的类型,后面两个是map的输出类型
public static class MyReducer extends Reducer<Text, MyValues, Text, Text>
这里前面两个是Map的输出,也是Reduce的输入,后面两个是Reduce的输出类型
4.使用2.0版的API和以前旧的方式有所不同,很容易混淆,例如
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
这里的包mapred和mapreduce有不同的。
废话少说,先上传一些数据到HDFS上。
input/pd.csv(假设这个是产品名称文件)
1,a,apple
2,b,orange
3,c,banana
input/sale.csv(假设这个是销售记录,数据和单价)
1,2,3
2,3,5
3,4,6
很明显,他们通过第一列ID来join.
基本思路:根据文件名称判断哪个是产品(左边),哪个是记录(右边),Mapper之后全部以ID->值的方式保存。
在Reduce阶段,相同key的数据全部在一起,这样通过名称是字符,销量是数字来简单判断左边和右边,输出结果。
下面直接上代码:
package com.abc.test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class TestJoin {
public static class MyMapper extends Mapper<Object, Text, Text, Text> {
@Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
// StringTokenizer itr = new StringTokenizer(value.toString());
String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();
// System.out.println("File="+pathName);
String str = value.toString();
if (pathName.endsWith("pd.csv")) {
Text jkey = new Text();
String ss[] = str.split(",");
jkey.set(ss[0]);
System.out.println("File=" + pathName + "" + jkey.toString() + "," + new Text(ss[1]));
context.write(jkey, new Text(ss[2])); //(ID 名称) 为key - value
} else if (pathName.endsWith("sale.csv")) {
String ss[] = str.split(",");
Text jkey = new Text();
jkey.set(ss[0]);
double d = Double.parseDouble(ss[1]) * Double.parseDouble(ss[2]);
IntWritable v = new IntWritable((int) d);
System.out.println("File=" + pathName + "" + jkey.toString() + "," + new Text("" + (int) d));
context.write(jkey, new Text("" + (int) d)); //(ID 销量) 为key - value
}
}
}
public static class MyReducer extends Reducer<Text, Text, Text, Text> {
private Text result = new Text();
@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//int sum = 0;
String str = key.toString();
Text name = new Text();
Text money = new Text();
for (Text val : values) {
//System.out.println(str+"=" + val.toString());
try {
int i = Integer.parseInt(val.toString());
money = new Text(""+i);
} catch (Exception e) {
name = new Text(val.toString()); //Please notice here!!! 不要直接name = val,可能出错
}
}
result.set(name+","+money);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
// conf.set("mapred.tasktracker.reduce.tasks.maximum", "3");
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = new Job(conf, "test join");
job.setJarByClass(TestJoin.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
//job.setNumReduceTasks(1);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
long t = System.currentTimeMillis();
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1] + "/" + t));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
运行的时候指定参数为
hdfs://192.168.6.128:9000/input/
hdfs://192.168.6.128:9000/result
这样得到的输出结果为:
1 apple,6
2 orange,15
3 banana,24
中间需要注意的地方,一不小心就中雷了:
1. 在map和reduce方法上加个@Override有好处,这样方便查看是否符合接口规范,参数是否有错。
2.出现recude好像结果没有合并,找不到左边或者右边,结果文件中显示部分值为空,可能需要注意中间的值形式
不要随便遍历Iterable<Text> values
不要随便设置Combiner //job.setCombinerClass(MyReducer.class);
不要随意static
赋值的时候要考虑是否指向地址还是copy。
3.自定义类型的时候可能遇到Type mismatch in key from map: expected org.apache.hadoop.io.Text, recieved org.apache.hadoop.io.LongWritable
这个时候要检查map和reduce的输入输出类型是否正确。例如:
public static class MyMapper extends Mapper<Object, Text, Text, MyValues>
这里的前面两个是输入的类型,后面两个是map的输出类型
public static class MyReducer extends Reducer<Text, MyValues, Text, Text>
这里前面两个是Map的输出,也是Reduce的输入,后面两个是Reduce的输出类型
4.使用2.0版的API和以前旧的方式有所不同,很容易混淆,例如
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
这里的包mapred和mapreduce有不同的。
相关文章推荐
- 各种基本排序算法总结
- iOS Default-568@2x.png启动图片设置问题
- 【Swift】通过tag删除动态创建的UIButton
- HTML 排版与标签(一)
- 接口的作用(C#)
- 第五次Scrum meeting
- preg_match_all()匹配
- EF6:编写你自己的code first 数据迁移操作(睡前来一篇,翻译的)
- java递归方法
- ssh 10个选项
- 不用也要知道的几种算法(PHP版本)
- CocoaPod的安装及与Swift的桥接
- 第一次冲刺
- Towards Crazyswarms
- Palindrome Linked List
- 剑指offer系列之六十一:二叉树搜索树的第k个节点
- iOS新建项目架构规范
- Android Studio使用lambda
- 【转载】c#如何创建和使用socket链接池
- 类再生