您的位置:首页 > 编程语言

MapReduce的框架代码

2018-01-26 12:21 113 查看
package com.bigdata;
import org.apache.hadoop.io.LongWritable;

import java.io.IOException;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.task.reduce.MapOutput;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.bigdata.hadoop_MapReduce2.WCMapper.MapReduceReducer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.io.Text;

public class hadoop_MapReduce2 extends Configured implements Tool {

//  step 1 :Mapper class
public static class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable>{

private Text MapOutputkey = new Text();
//      出现一次就记录一次
private IntWritable mapOutputValue = new IntWritable(1);
@Override
public void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {

//      读取文件的每一件,将Text类型装换为String类型
String linevalue = value.toString();

//      分割单词,以空格分割
String[]strs=linevalue.split(" ");

//      分割之后将单词从数组中一个个拿出来,组成<keyvalue>,比如<hadoop,1>

for (String str : strs) {
//          设置key的输出
MapOutputkey.set(str);

//          map输出
context.write(MapOutputkey,mapOutputValue);
}

}

//  step 2 :Reducer class
public static class MapReduceReducer extends Reducer<Text, IntWritable, Text , IntWritable>{

private IntWritable outputValue = new IntWritable();
protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
int sum = 0;
//      对值进行迭代累加
for(IntWritable value : values) {
//          total
sum+=value.get();

}
//      set output value
outputValue.set(sum);

//      最终输出
context.write(key, outputValue);
}
}
}

//  step 3 :Driver
public int run(String[] args)throws Exception{

//      获取集群中的相关配置信息
Configuration configuration = this.getConf();
//      创建一个job任务
Job job=Job.getInstance(configuration,this.getClass().getSimpleName());

//      整个MapReduce程序运行的入口,或者叫jar包的入口,jar具体运行的是哪个类
job.setJarByClass(this.getClass());;
//      设置Job
//      input输入,输入的路径
Path inpath =new Path(args[0]);
FileInputFormat.addInputPath(job, inpath);

//      output输出,输出路径
Path outpath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outpath);

//      设置mapper
job.setMapperClass(WCMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

//      设置Reduce
job.setReducerClass(MapReduceReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

//      提交Job-》YARN
boolean isSuccess = job.waitForCompletion(true);

return isSuccess ? 0:1;

}

public static void main(String[] args)throws Exception {

Configuration configuration = new Configuration();

//run job
int status = ToolRunner.run(configuration, new hadoop_MapReduce2(), args);

//exit program
System.exit(status);

/*   args = new String[] {

"hdfs://hadoop3:9000/wc.input",
"hdfs://hadoop3:9000/output"
};

int status =new hadoop_MapReduce2().run(args);

//      关闭
System.exit(status);

}
*/

}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  MapReduce