MapReuce笔记六之输入类InputFormat
2016-10-13 20:48
148 查看
使用hadoop jar执行mapreduce任务时首先从hdfs中读取数据将这些数据解析为inputsplit,然后再将inputsplit中的内容解析为一个一个的<k,v>键值对,这个过程就是有InputFormat的子类完成的。之前在MR例子中有一段代码job.setInputFormatClass(TextInputFormat.class);就是指定TextInputFormat来完成这项工作,这个类是hadoop默认的其实可以不写。
InputFormat是一个抽象类,类中有两个抽象方法List<InputSplit> getSplits和RecordReader<K,V>createRecordReader,getSplit负责将hdfs数据解析为InputSplit,createRecordReader负责将每个InputSplit中的每一行解析为<k,v>键值对。
FileInputFormat继承了InputFormat并实现了getSplits方法。
主要完成的功能是:
根据路径解析hdfs数据,判断文件是否可以被切分。
计算splitSize,默认等于blockSize,128M
获取每一个hdfs对象并进行遍历并将结果放入List<InputSplit>中返回。
Hadoop中一个block对应一个inputsplit,一个inputsplit对应一个map任务。
注意:
hadoop不会对小于128M的文件进行切分,例如一个文件1G那就是8个map任务,如果有1000个100kb的文件则对应1000个map任务,这样会造成效率下降。所以MapReduce不适合处理小文件。
如果inputsplit和blocksize不一样比如大于,那么在解析为inputsplit时一个block就不够用,此时框架就会去别的节点上读取数据来构造inputsplit,这样会产生网络消耗影响效率。
createRecordReader
TextInputFormat继承了FileInputFormat并实现了createRecordReader方法。此方法的返回值是抽象类RecordReader,而最终返回的是LineRecordReader,LineRecordReader实现了RecordReader并在实现的抽象方法中完成解析。
主要完成的功能是:
在initialize方法中获取FileSplit对象并读取每一行内容。
获取<k,v>键值对作为map任务的入参再调用map任务。
框架每获取一个<k,v>就会调用一次map任务。
至此我们可以通过下图大概了解一下这几个类的关系
org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
job.setInputFormatClass(NLineInputFormat.class);
NLineInputFormat.setNumLinesPerSplit(job,3);
job.setInputFormatClass(KeyValueTextInputFormat.class);
同时也可以指定其他的字符串为分隔符
conf.setStrings(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR,",");
如下图,hdfs上有4个文件对应的split数量也为4,map任务也为4
CombineFileInputFormat这个输入类可以合并小文件,下面来看一个例子。
下图中显示的是日志,可以看到相比于输入类使用TextInputFormat,使用CombineFileInputFormat的split和map任务数量都要少,之间也应该更快。CombineFileInputFormat合并了小文件。
InputFormat是一个抽象类,类中有两个抽象方法List<InputSplit> getSplits和RecordReader<K,V>createRecordReader,getSplit负责将hdfs数据解析为InputSplit,createRecordReader负责将每个InputSplit中的每一行解析为<k,v>键值对。
TextInputFormat
getSplitsFileInputFormat继承了InputFormat并实现了getSplits方法。
主要完成的功能是:
根据路径解析hdfs数据,判断文件是否可以被切分。
计算splitSize,默认等于blockSize,128M
获取每一个hdfs对象并进行遍历并将结果放入List<InputSplit>中返回。
Hadoop中一个block对应一个inputsplit,一个inputsplit对应一个map任务。
注意:
hadoop不会对小于128M的文件进行切分,例如一个文件1G那就是8个map任务,如果有1000个100kb的文件则对应1000个map任务,这样会造成效率下降。所以MapReduce不适合处理小文件。
如果inputsplit和blocksize不一样比如大于,那么在解析为inputsplit时一个block就不够用,此时框架就会去别的节点上读取数据来构造inputsplit,这样会产生网络消耗影响效率。
createRecordReader
TextInputFormat继承了FileInputFormat并实现了createRecordReader方法。此方法的返回值是抽象类RecordReader,而最终返回的是LineRecordReader,LineRecordReader实现了RecordReader并在实现的抽象方法中完成解析。
主要完成的功能是:
在initialize方法中获取FileSplit对象并读取每一行内容。
获取<k,v>键值对作为map任务的入参再调用map任务。
框架每获取一个<k,v>就会调用一次map任务。
至此我们可以通过下图大概了解一下这几个类的关系
NlineInputFormat
Hadoop中默认是一个block一个inputsplit,但是在代码中可以指定其他的inputFormat子类,NLineInputFormat可以设置指定文件中多少行为一个inputsplit,下面的代码指定每3行一个inputsplit。org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
job.setInputFormatClass(NLineInputFormat.class);
NLineInputFormat.setNumLinesPerSplit(job,3);
KeyValueTextInputFormat
记录中有制表符(tab),以第一个制表符为分隔符,前面的作为key后面的作为value,若无制表符则全部为key,value为空。job.setInputFormatClass(KeyValueTextInputFormat.class);
同时也可以指定其他的字符串为分隔符
conf.setStrings(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR,",");
CombineFileInputFormat
以之前提到的wordcount实例中需要统计单词出现的次数输入类使用的是TextInputFormat,但是如果我有许多的小文件那么在执行mapreduce时split的数量就会很多。如下图,hdfs上有4个文件对应的split数量也为4,map任务也为4
CombineFileInputFormat这个输入类可以合并小文件,下面来看一个例子。
package mapreduce; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat; import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader; import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; //import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.ReflectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * hdfs上的hello中的内容为 tiger pig pig cat dog dog bird cat tiger house bus bike bus car * @author think * */ public class WordCount { public static void main(String[] args) throws Exception { String inPath = args[0]; Path outPath = new Path(args[1]); //1:hdfs configuration,get SystemFile Object Configuration conf = new Configuration(); URI uri = new URI("/");// URI uri = new URI("hdfs://192.168.79.128:9000/"); FileSystem fileSystem = FileSystem.get(uri, conf); if (fileSystem.exists(outPath)) { fileSystem.delete(outPath, true); } // 2:job object String jobName = WordCount.class.getName(); Job job = Job.getInstance(conf, jobName); job.setJarByClass(WordCount.class); // 3:输入路径 FileInputFormat.setInputPaths(job, inPath); // 4:指定inputFormat的子类,可选,默认是TextInputFormat //job.setInputFormatClass(TextInputFormat.class); job.setInputFormatClass(CombineSmallFileInputFormat.class); // 5:指定mapper类,指定mapper的输出<k2,v2>类型 job.setMapperClass(MapTask.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); // 6:指定reduce类,指定reduce的输出<k3,v3>类型 job.setReducerClass(ReduceTask.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); // 7:指定输出路径 FileOutputFormat.setOutputPath(job, outPath); // 8:指定outputformat子类 job.setOutputFormatClass(TextOutputFormat.class); // 9:提交yarn执行 job.waitForCompletion(true); } /** * Map 任务 * @author think * LongWritable, Text, Text, LongWritable这4个参数依次代表map任务的输入键值对<k1,v1>和输出键值对<k2,v2> */ public static class MapTask extends Mapper<LongWritable, Text, Text, LongWritable> { Logger logger = LoggerFactory.getLogger(WordCount.class); Text k2 = new Text(); LongWritable v2 = new LongWritable(); /** * 重写map方法 * context是一个mapper的内部类 */ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException { //1:key为内容的字节序数,value为内容 String content = value.toString(); System.out.println("内容:" + key.get() + " ," + content); logger.info("内容:" + key.get() + " ," + content); String[] arrs = content.split(","); for(String word : arrs) { k2.set(word); v2.set(1); context.write(k2, v2); logger.info("map:" + k2.toString() + "," + v2); } } } /** * Reduce 任务 * @author think * Text, LongWritable, Text, LongWritable这4个参数依次代表reduce任务的输入键值对<k2,v2s>和输出键值对<k3,v3> */ public static class ReduceTask extends Reducer<Text, LongWritable, Text, LongWritable> { LongWritable v3 = new LongWritable(); @Override protected void reduce(Text k2, Iterable<LongWritable> v2s, Reducer<Text, LongWritable, Text, LongWritable>.Context content) throws IOException, InterruptedException { System.out.println("k2:" + k2.toString()); long sum = 0; for(LongWritable v2 : v2s) { System.out.println("v2:" + v2); sum += v2.get(); } v3.set(sum); content.write(k2, v3); System.out.println("k3,v3:" + k2.toString() + "," + v3); } } /** * 自定义处理小文件的mapreduce输入类 * @author think * */ public static class CombineSmallFileInputFormat extends CombineFileInputFormat<LongWritable, Text>{ /** * createRecordReader创建一个读取器,实现RecordReader方法 * <LongWritable, Text>是map任务的输入参数,和之前的一样。入参感觉要随map任务的业务而定 * 返回值是CombineFileRecordReader对象实例 * 这个对象继承了RecordReader * 生成实例需要三个参数 * 第一个需要强转成CombineFileSplit * 第二个是上下文 * 第三个是我们自定义的一个类,这个类必须继承RecordReader */ @Override public RecordReader<LongWritable, Text> createRecordReader( InputSplit split, TaskAttemptContext context) throws IOException { return new CombineFileRecordReader((CombineFileSplit)split, context, CombineSmallFileRecordReader.class); } } /** * 继承RecordReader类的<k,v>和上面一样都是<LongWritable, Text> * 实现RecordReader方法 * @author think * */ public static class CombineSmallFileRecordReader extends RecordReader<LongWritable, Text> { private LineRecordReader lrr; /** * 在解析多个小文件时,每个小文件都会调用上面的 * return new CombineFileRecordReader((CombineFileSplit)split, context, CombineSmallFileRecordReader.class); * 所以构造函数的第三个参数index就是每个小文件的序号,比如第一个,第二个...... * * * @param split * @param context * @param index 文件的序号 * @throws IOException * @throws Interrupted Exception */ public CombineSmallFileRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index) throws IOException, InterruptedException { //1.通过反射机制实例化lrr this.lrr = ReflectionUtils.newInstance(LineRecordReader.class, context.getConfiguration()); //2.为初始化方法构造参数 /** * 参数fileSplit就是我们处理的众多小文件(word,word1..)所以是FileSplit,我们需要自行构造 * 4个参数分别是路径信息(file),起始位置(偏移量start),长度(length),所在位置hosts,我们需要构建这4个参数 * 4个参数均从split中获取,index是文件的序号 */ Path file = split.getPath(index); long start = split.getOffset(index); long length = split.getLength(index); String[] hosts = split.getLocations(); InputSplit fileSplit = new FileSplit(file, start, length, hosts); //3.调用初始化方法 this.lrr.initialize(fileSplit, context); } @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { // TODO Auto-generated method stub } @Override public boolean nextKeyValue() throws IOException, InterruptedException { return lrr.nextKeyValue(); } @Override public LongWritable getCurrentKey() throws IOException, InterruptedException { return lrr.getCurrentKey(); } @Override public Text getCurrentValue() throws IOException, InterruptedException { return lrr.getCurrentValue(); } @Override public float getProgress() throws IOException, InterruptedException { return lrr.getProgress(); } @Override public void close() throws IOException { lrr.close(); } } }
下图中显示的是日志,可以看到相比于输入类使用TextInputFormat,使用CombineFileInputFormat的split和map任务数量都要少,之间也应该更快。CombineFileInputFormat合并了小文件。
相关文章推荐
- Hadoop2.6.0学习笔记(八)InputFormat和OutputFormat
- Hadoop2.6.0学习笔记(四)TextInputFormat及RecordReader解析
- Hadoop2.6.0学习笔记(五)自定义InputFormat和RecordReader
- FileInputFormat分析
- java.lang.NumberFormatException: For input string
- 入门:java.lang.NumberFormatException: For input string: "11"
- Hadoop中DBInputFormat和DBOutputFormat使用
- date format picture ends before converting entire input string(日期格式图片在转换整个输入字符串之前结束)
- The input stream is not a valid binary format
- The error: ORA-01830: date format picture ends before converting entire input string..
- ffmpeg API 笔记:使用libavcodec/libavformat/libswscale
- The Error ORA-01830: date format picture ends before converting entire input string
- java.lang.NumberFormatException: For input string: "bobo"
- NLineInputFormat 应用
- cobol学习笔记(4)COBOL的程序结构 INPUT-OUTPUT SECTION2010/06/30 16:45INPUT-OUTPUT SECTION(输入输出节)
- Hadoop开发常用的InputFormat和OutputFormat
- No result defined for action and result input(笔记)
- format 使用一例 之 color & input
- SQL Server Date Format 小笔记[1]
- Python学习笔记(三)input和raw_input的区别