您的位置:首页 > 运维架构

Hadoop学习笔记2--第一个Mapreduce程序

2011-09-08 14:42 691 查看
       今天继续,学习第一个mapreduce的程序。程序的输入数据为此:

       "CITING","CITED"

     3858241,956203

3858241,1324234

3858241,3398406

3858241,3557384

3858241,3634889

3858242,1515701

3858242,3319261

3858242,3668705

3858242,3707004

   输出数据应该为:

     1       3964859,4647229

10000   4539112

100000  5031388

1000006 4714284

1000007 4766693

1000011 5033339

1000017 3908629

1000026 4043055

1000033 4190903,4975983

1000043 4091523

1000044 4082383,4055371

1000045 4290571

1000046 5918892,5525001

1000049 5996916

即通过mapreduce计算出每一个专利所引用的所有专利。

 

/*
*作为mapreduce的编程风格,由于这个例子中的mapper和reducer都很小,所以写成内部类,并且,hadoop要求mapper和reducer都必须是静态类。大量的mapreduce程序都会从已有的程序中进行修改,所以
*以下程序可以作为我们刚开始的一个模板使用。
*/

public class MyJob extends Configured implements Tool {
public static class MapClass extends MapReduceBase
implements Mapper<Text, Text, Text, Text> {
public void map(Text key, Text value,
OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
output.collect(value, key);//将输入的key value值输入到output中
 }
}
public static class Reduce extends MapReduceBase
implements Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterator<Text> values,
OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
String csv = "";
while (values.hasNext()) {
if (csv.length() > 0) csv += ",";
csv += values.next().toString();
将所有的values连在一起输出。            }
output.collect(key, new Text(csv));
}
}
public int run(String[] args) throws Exception {
Configuration conf = getConf();
JobConf job = new JobConf(conf, MyJob.class);
Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
job.setJobName("MyJob");
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
job.setInputFormat(KeyValueTextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.set("key.value.separator.in.input.line", ",");
JobClient.runJob(job);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new MyJob(), args);//由于很多时候,我们在命令行运行作业的时候,需要添加额外的参数,所以hadoop提供了Tool
Runner这样的类来驱动作业。
 System.exit(res);
}

/*
*在
*/
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息