您的位置:首页 > 产品设计 > UI/UE

MapReduce的输入格式---KeyValueTextInputFormat---源码分析

2017-08-21 21:03 633 查看
通常情况下,文件的每一行是一个键--值对,使用某个分解符进行分割,比如制表符,例如,由TextOutFormat(即Hadoop的默认输出就是这种),如果要正确处理这种文件,KeyValueTextInputFormat比较合适。1、MaxTempMapper
package hadoop.mr.input.keyvaluetext;import debugtool.RTUtil;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*** MaxTempMapper*/public class MaxTempMapper extends Mapper<Text, Text, IntWritable, IntWritable> {protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {context.write(new IntWritable(Integer.parseInt(key.toString())), new IntWritable(Integer.parseInt(value.toString())));}}
2、MaxTempReducer
package .hadoop.mr.input.keyvaluetext;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/***/public class MaxTempReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable>{protected void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int max = Integer.MIN_VALUE ;for(IntWritable iw : values){max = max > iw.get() ? max : iw.get() ;}context.write(key,new IntWritable(max));}}
3、App
package hadoop.mr.input.keyvaluetext;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.*;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/***/public class App {public static void main(String[] args) throws Exception {args = new String[]{"d:/java/mr/temp.dat", "d:/java/mr/out"};Configuration conf = new Configuration();FileSystem fs = FileSystem.get(conf);if(fs.exists(new Path(args[1]))){fs.delete(new Path(args[1]),true);}Job job = Job.getInstance(conf);job.setJobName("maxTemp");job.setJarByClass(App.class);job.setMapperClass(MaxTempMapper.class);job.setReducerClass(MaxTempReducer.class);FileInputFormat.addInputPath(job,new Path(args[0]));FileOutputFormat.setOutputPath(job,new Path(args[1]));//设置combine输入格式job.setInputFormatClass(KeyValueTextInputFormat.class);job.getConfiguration().set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR," ");job.setOutputKeyClass(IntWritable.class);job.setOutputValueClass(IntWritable.class);job.setNumReduceTasks(2);job.waitForCompletion(true);}}
1、KeyValueTextInputFormat继承FileInputFormat2、job.waitForCompletion提交作业3、调用JobSubmitter类4、调用FileInputFormat的getSplits方法5、KeyValueTextInputFormat的isSplitable6、调用KeyValueTextInputFormat 的createRecordReader 方法创建KeyValueLineRecordReader7、Mapper的run方法8、循环调用KeyValueLineRecordReader的nextKeyValue方法9、返回key和value
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息