Hadoop学习之路(二十二)MapReduce的输入和输出
2018-03-23 18:29
489 查看
MapReduce的输入
作为一个会编写MR程序的人来说,知道map方法的参数是默认的数据读取组件读取到的一行数据
1、是谁在读取? 是谁在调用这个map方法?
查看源码Mapper.java知道是run方法在调用map方法。
/** * * 找出谁在调用Run方法 * * * 有一个组件叫做:MapTask * * 就会有对应的方法在调用mapper.run(context); * * * context.nextKeyValue() ====== lineRecordReader.nextKeyValue(); */ public void run(Context context) throws IOException, InterruptedException { /** * 在每一个mapTask被初始化出来的时候,就会被调用一次 */ setup(context); try { /** * 数据读取组件每次读取到一行,都交给map方法执行一次 * * * context.nextKeyValue()的意义有连点: * * 1、读取一个key-value到该context对象中的两个属性中:key-value * 2、方法的返回值并不是读取到的key-value,是标志有没有读取到key_value的布尔值 * * * context.getCurrentKey() ==== key * context.getCurrentValue() ==== value * * * * 依赖于最底层的 LineRecordReader的实现 * * 你的nextKeyValue方法的返回结果中,一定要包含 false */ while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { /** * 当前这个mapTask在执行完毕所有的该切片数据之后,会执行 */ cleanup(context); } }
此处map方法中有四个重要的方法:
1、context.nextKeyValue(); //负责读取数据,但是方法的返回值却不是读取到的key-value,而是返回了一个标识有没有读取到数据的布尔值
2、context.getCurrentKey(); //负责获取context.nextKeyValue() 读取到的key
3、context.getCurrentValue(); //负责获取context.nextKeyValue() 读取到的value
4、context.write(key,value); //负责输出mapper阶段输出的数据
2、谁在调用run方法?context参数怎么来的,是什么?
共同答案:找到了谁在调用run方法,那么就能知道这个谁就会给run方法传入一个参数叫做:context
最开始,mapper.run(context)是由mapTask实例对象进行调用
查看源码MapTask.java
@Override public void run(final JobConf job, final TaskUmbilicalProtocol umbilical) throws IOException, ClassNotFoundException, InterruptedException { this.umbilical = umbilical; if (isMapTask()) { // If there are no reducers then there won't be any sort. Hence the // map // phase will govern the entire attempt's progress. if (conf.getNumReduceTasks() == 0) { mapPhase = getProgress().addPhase("map", 1.0f); } else { // If there are reducers then the entire attempt's progress will // be // split between the map phase (67%) and the sort phase (33%). mapPhase = getProgress().addPhase("map", 0.667f); sortPhase = getProgress().addPhase("sort", 0.333f); } } TaskReporter reporter = startReporter(umbilical); boolean useNewApi = job.getUseNewMapper(); initialize(job, getJobID(), reporter, useNewApi); // check if it is a cleanupJobTask if (jobCleanup) { runJobCleanupTask(umbilical, reporter); return; } if (jobSetup) { runJobSetupTask(umbilical, reporter); return; } if (taskCleanup) { runTaskCleanupTask(umbilical, reporter); return; } /** * run方法的核心: * * 新的API */ if (useNewApi) { /** * jobConf对象, splitMetaInfo 切片信息 umbilical 通信协议 * reporter就是包含了各种计数器的一个对象 */ runNewMapper(job, splitMetaInfo, umbilical, reporter); } else { runOldMapper(job, splitMetaInfo, umbilical, reporter); } done(umbilical, reporter); }
得出伪代码调动新的API
1 mapTask.run(){ runNewMapper(){ mapper.run(mapperContext); } }
3、查看runNewMapper方法
发现此方法还是在MapTask.java中
/** * 这就是具体的调用逻辑的核心; * * * mapper.run(context); * * * * @param job * @param splitIndex * @param umbilical * @param reporter * @throws IOException * @throws ClassNotFoundException * @throws InterruptedException */ @SuppressWarnings("unchecked") private <INKEY, INVALUE, OUTKEY, OUTVALUE> void runNewMapper(final JobConf job, final TaskSplitIndex splitIndex, final TaskUmbilicalProtocol umbilical, TaskReporter reporter) throws IOException, ClassNotFoundException, InterruptedException { // make a task context so we can get the classes org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl( job, getTaskID(), reporter); // make a mapper org.apache.hadoop.mapreduce.Mapper<INKEY, INVALUE, OUTKEY, OUTVALUE> mapper = (org.apache.hadoop.mapreduce.Mapper<INKEY, INVALUE, OUTKEY, OUTVALUE>) ReflectionUtils .newInstance(taskContext.getMapperClass(), job); /** * inputFormat.createRecordReader() === RecordReader real * * * inputFormat就是TextInputFormat类的实例对象 * * TextInputFormat组件中的createRecordReader方法的返回值就是 LineRecordReader的实例对象 */ // make the input format org.apache.hadoop.mapreduce.InputFormat<INKEY, INVALUE> inputFormat = (org.apache.hadoop.mapreduce.InputFormat<INKEY, INVALUE>) ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job); // rebuild the input split org.apache.hadoop.mapreduce.InputSplit split = null; split = getSplitDetails(new Path(splitIndex.getSplitLocation()), splitIndex.getStartOffset()); LOG.info("Processing split: " + split); /** * NewTrackingRecordReader这个类中一定有三个方法: * * nextKeyValue * getCurrentKey * getCurrentValue * * NewTrackingRecordReader的里面的三个方法的实现 * 其实是依赖于于inputFormat对象的createRecordReader方法的返回值的 三个方法的实现 * * 默认的InputFormat: TextInputFormat * 默认的RecordReader:LineRecordReader * * * 最终:NewTrackingRecordReader的三个方法的实现是依赖于:LineRecordReader这个类中的三个同名方法的实现 */ org.apache.hadoop.mapreduce.RecordReader<INKEY, INVALUE> input = new NewTrackingRecordReader<INKEY, INVALUE>( split, inputFormat, reporter, taskContext); job.setBoolean(JobContext.SKIP_RECORDS, isSkipping()); /** * 声明一个Output对象用来给mapper的key-value进行输出 */ org.apache.hadoop.mapreduce.RecordWriter output = null; // get an output object if (job.getNumReduceTasks() == 0) { /** * NewDirectOutputCollector 直接输出的一个收集器, 这个类中一定有一个方法 叫做 write */ output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter); } else { /** * 有reducer阶段了。 * * 1、能确定,一定会排序 * * 2、能否确定一定会使用Parititioner, 不一定。 在逻辑上可以任务没有起作用。 * * NewOutputCollector 这个类当中,一定有一个方法:write方法 */ output = new NewOutputCollector(taskContext, job, umbilical, reporter); } /** * mapContext对象中一定包含三个方法 * * 找到了之前第一查看源码实现的方法的问题的答案: * * 问题:找到谁调用MapContextImpl这个类的构造方法 * * mapContext就是MapContextImpl的实例对象 * * MapContextImpl类中一定有三个方法: * * input === NewTrackingRecordReader * * * * 确定的知识: * * 1、mapContext对象中,一定有write方法 * * 2、通过观看MapContextImpl的组成,发现其实没有write方法 * * 解决: * * 其实mapContext.write方法的调用是来自于MapContextImpl这个类的父类 * * * * 最底层的write方法: output.write(); */ org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> mapContext = new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>( job, getTaskID(), input, output, committer, reporter, split); /** * mapperContext的内部一定包含是三个犯法: * * nextKeyValue * getCurrentKey * getCurrentValue * * mapperContext的具体实现是依赖于new Context(context); * context = mapContext * * 结论: * * mapContext对象的内部一定包含以下三个方法: * * nextKeyValue * getCurrentKey * getCurrentValue * * * mapContext 中 也有一个方法叫做:write(key,value) */ org.apache.hadoop.mapreduce.Mapper<INKEY, INVALUE, OUTKEY, OUTVALUE>.Context mapperContext = new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>() .getMapContext(mapContext); try { input.initialize(split, mapperContext); /** * 复杂调用整个mapTask执行的入口 * * 方法的逻辑构成: * * 1、重点方法在最后,或者在try中 * 2、其他的代码,几乎只有两个任务:一个用来记录记日志或者完善流程。。 一个准备核心方法的参数 */ mapper.run(mapperContext); mapPhase.complete(); setPhase(TaskStatus.Phase.SORT); statusUpdate(umbilical); input.close(); input = null; output.close(mapperContext); output = null; } finally { closeQuietly(input); closeQuietly(output, mapperContext); } }
能确定的是:mapperContext一定有上面说的那四个重要的方法,往上继续查找mapperContext
/** 143 * mapperContext的内部一定包含是三个犯法: 144 * 145 * nextKeyValue 146 * getCurrentKey 147 * getCurrentValue 148 * 149 * mapperContext的具体实现是依赖于new Context(context); 150 * context = mapContext 151 * 152 * 结论: 153 * 154 * mapContext对象的内部一定包含以下三个方法: 155 * 156 * nextKeyValue 157 * getCurrentKey 158 * getCurrentValue 159 * 160 * 161 * mapContext 中 也有一个方法叫做:write(key,value) 162 */ 163 org.apache.hadoop.mapreduce.Mapper<INKEY, INVALUE, OUTKEY, OUTVALUE>.Context mapperContext = 164 new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>() 165 .getMapContext(mapContext);
查看WrappedMapper.java
/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.hadoop.mapreduce.lib.map; import java.io.IOException; import java.net.URI; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration.IntegerRanges; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.MapContext; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.security.Credentials; /** * A {@link Mapper} which wraps a given one to allow custom * {@link Mapper.Context} implementations. */ @InterfaceAudience.Public @InterfaceStability.Evolving public class WrappedMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { /** * Get a wrapped {@link Mapper.Context} for custom implementations. * * @param mapContext * <code>MapContext</code> to be wrapped * @return a wrapped <code>Mapper.Context</code> for custom implementations */ public Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context getMapContext( MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapContext) { return new Context(mapContext); } @InterfaceStability.Evolving public class Context extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context { protected MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapContext; public Context(MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapContext) { this.mapContext = mapContext; } /** * Get the input split for this map. */ public InputSplit getInputSplit() { return mapContext.getInputSplit(); } @Override public KEYIN getCurrentKey() throws IOException, InterruptedException { return mapContext.getCurrentKey(); } @Override public VALUEIN getCurrentValue() throws IOException, InterruptedException { return mapContext.getCurrentValue(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { return mapContext.nextKeyValue(); } @Override public Counter getCounter(Enum<?> counterName) { return mapContext.getCounter(counterName); } @Override public Counter getCounter(String groupName, String counterName) { return mapContext.getCounter(groupName, counterName); } @Override public OutputCommitter getOutputCommitter() { return mapContext.getOutputCommitter(); } @Override public void write(KEYOUT key, VALUEOUT value) throws IOException, InterruptedException { mapContext.write(key, value); } @Override public String getStatus() { return mapContext.getStatus(); } @Override public TaskAttemptID getTaskAttemptID() { return mapContext.getTaskAttemptID(); } @Override public void setStatus(String msg) { mapContext.setStatus(msg); } @Override public Path[] getArchiveClassPaths() { return mapContext.getArchiveClassPaths(); } @Override public String[] getArchiveTimestamps() { return mapContext.getArchiveTimestamps(); } @Override public URI[] getCacheArchives() throws IOException { return mapContext.getCacheArchives(); } @Override public URI[] getCacheFiles() throws IOException { return mapContext.getCacheFiles(); } @Override public Class<? extends Reducer<?, ?, ?, ?>> getCombinerClass() throws ClassNotFoundException { return mapContext.getCombinerClass(); } @Override public Configuration getConfiguration() { return mapContext.getConfiguration(); } @Override public Path[] getFileClassPaths() { return mapContext.getFileClassPaths(); } @Override public String[] getFileTimestamps() { return mapContext.getFileTimestamps(); } @Override public RawComparator<?> getCombinerKeyGroupingComparator() { return mapContext.getCombinerKeyGroupingComparator(); } @Override public RawComparator<?> getGroupingComparator() { return mapContext.getGroupingComparator(); } @Override public Class<? extends InputFormat<?, ?>> getInputFormatClass() throws ClassNotFoundException { return mapContext.getInputFormatClass(); } @Override public String getJar() { return mapContext.getJar(); } @Override public JobID getJobID() { return mapContext.getJobID(); } @Override public String getJobName() { return mapContext.getJobName(); } @Override public boolean getJobSetupCleanupNeeded() { return mapContext.getJobSetupCleanupNeeded(); } @Override public boolean getTaskCleanupNeeded() { return mapContext.getTaskCleanupNeeded(); } @Override public Path[] getLocalCacheArchives() throws IOException { return mapContext.getLocalCacheArchives(); } @Override public Path[] getLocalCacheFiles() throws IOException { return mapContext.getLocalCacheFiles(); } @Override public Class<?> getMapOutputKeyClass() { return mapContext.getMapOutputKeyClass(); } @Override public Class<?> getMapOutputValueClass() { return mapContext.getMapOutputValueClass(); } @Override public Class<? extends Mapper<?, ?, ?, ?>> getMapperClass() throws ClassNotFoundException { return mapContext.getMapperClass(); } @Override public int getMaxMapAttempts() { return mapContext.getMaxMapAttempts(); } @Override public int getMaxReduceAttempts() { return mapContext.getMaxReduceAttempts(); } @Override public int getNumReduceTasks() { return mapContext.getNumReduceTasks(); } @Override public Class<? extends OutputFormat<?, ?>> getOutputFormatClass() throws ClassNotFoundException { return mapContext.getOutputFormatClass(); } @Override public Class<?> getOutputKeyClass() { return mapContext.getOutputKeyClass(); } @Override public Class<?> getOutputValueClass() { return mapContext.getOutputValueClass(); } @Override public Class<? extends Partitioner<?, ?>> getPartitionerClass() throws ClassNotFoundException { return mapContext.getPartitionerClass(); } @Override public Class<? extends Reducer<?, ?, ?, ?>> getReducerClass() throws ClassNotFoundException { return mapContext.getReducerClass(); } @Override public RawComparator<?> getSortComparator() { return mapContext.getSortComparator(); } @Override public boolean getSymlink() { return mapContext.getSymlink(); } @Override public Path getWorkingDirectory() throws IOException { return mapContext.getWorkingDirectory(); } @Override public void progress() { mapContext.progress(); } @Override public boolean getProfileEnabled() { return mapContext.getProfileEnabled(); } @Override public String getProfileParams() { return mapContext.getProfileParams(); } @Override public IntegerRanges getProfileTaskRange(boolean isMap) { return mapContext.getProfileTaskRange(isMap); } @Override public String getUser() { return mapContext.getUser(); } @Override public Credentials getCredentials() { return mapContext.getCredentials(); } @Override public float getProgress() { return mapContext.getProgress(); } } }View Code
此类里面一定有那4个重要的方法,发现调用了mapContext,继续往上找
/** 110 * mapContext对象中一定包含三个方法 111 * 112 * 找到了之前第一查看源码实现的方法的问题的答案: 113 * 114 * 问题:找到谁调用MapContextImpl这个类的构造方法 115 * 116 * mapContext就是MapContextImpl的实例对象 117 * 118 * MapContextImpl类中一定有三个方法: 119 * 120 * input === NewTrackingRecordReader 121 * 122 * 123 * 124 * 确定的知识: 125 * 126 * 1、mapContext对象中,一定有write方法 127 * 128 * 2、通过观看MapContextImpl的组成,发现其实没有write方法 129 * 130 * 解决: 131 * 132 * 其实mapContext.write方法的调用是来自于MapContextImpl这个类的父类 133 * 134 * 135 * 136 * 最底层的write方法: output.write(); 137 */ 138 org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> mapContext = 139 new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>( 140 job, getTaskID(), input, output, committer, reporter, split);
mapConext就是这个类MapContextImpl的实例对象
继续确定:
mapConext = new MapContextImpl(input) mapConext.nextKeyVlaue(){ LineRecordReader real = input.createRecordReader(); real.nextKeyValue(); }
查看MapContextImpl.java源码
public class MapContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends TaskInputOutputContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT> implements MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { private RecordReader<KEYIN, VALUEIN> reader; private InputSplit split; public MapContextImpl(Configuration conf, TaskAttemptID taskid, RecordReader<KEYIN, VALUEIN> reader, RecordWriter<KEYOUT, VALUEOUT> writer, OutputCommitter committer, StatusReporter reporter, InputSplit split) { // 通过super调用父类的构造方法 super(conf, taskid, writer, committer, reporter); this.reader = reader; this.split = split; } /** * Get the input split for this map. */ public InputSplit getInputSplit() { return split; } @Override public KEYIN getCurrentKey() throws IOException, InterruptedException { return reader.getCurrentKey(); } @Override public VALUEIN getCurrentValue() throws IOException, InterruptedException { return reader.getCurrentValue(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { return reader.nextKeyValue(); } }
相关文章推荐
- hadoop生态系统学习之路(四)MR支持的输入文件格式以及输出文件格式
- Hadoop 学习笔记一 ---MapReduce 的输入和输出
- Hadoop之MapReduce输入(split)输出
- Hadoop之MapReduce输入与输出格式(五)
- (防坑笔记)hadoop3.0 (四)MapReduce的输入输出解析及常用数据切分方式(附带压缩)
- Hadoop学习之路(十七)MapReduce框架Partitoner分区
- Hadoop学习之路(十八)MapReduce框架Combiner分区
- Hadoop实践(三)---MapReduce中的输入和输出
- Hadoop学习之路(二十)MapReduce求TopN
- Hadoop学习之路(二十五)MapReduce的API使用(二)
- Hadoop学习之路(二十六)MapReduce的API使用(三)
- hadoop生态系统学习之路(十)MR将结果输出到hbase
- hadoop之MapReduce输入(split)输出
- Hadoop学习笔记(一):MapReduce的输入格式
- Hadoop: MapReduce2多个job串行处理 复杂的MapReduce处理中,往往需要将复杂的处理过程,分解成多个简单的Job来执行,第1个Job的输出做为第2个Job的输入,相互之间有一
- Hadoop学习笔记(一):MapReduce的输入格式
- Hadoop学习之路(十三)MapReduce的初识
- Hadoop学习之路(二十七)MapReduce的API使用(四)
- 阿里封神谈hadoop学习之路 封神 2016-04-14 16:03:51 浏览3283 评论3 发表于: 阿里云E-MapReduce >> 开源大数据周刊 hadoop 学生 spark
- Hadoop学习记录---MapReduce的输入