您的位置:首页 > 大数据 > Hadoop

Hadoop学习之路(二十二)MapReduce的输入和输出

2018-03-23 18:29 489 查看

MapReduce的输入

作为一个会编写MR程序的人来说,知道map方法的参数是默认的数据读取组件读取到的一行数据

1、是谁在读取? 是谁在调用这个map方法?

查看源码Mapper.java知道是run方法在调用map方法。

/**
*
* 找出谁在调用Run方法
*
*
* 有一个组件叫做:MapTask
*
* 就会有对应的方法在调用mapper.run(context);
*
*
* context.nextKeyValue() ====== lineRecordReader.nextKeyValue();
*/
public void run(Context context) throws IOException, InterruptedException {

/**
* 在每一个mapTask被初始化出来的时候,就会被调用一次
*/
setup(context);
try {

/**
* 数据读取组件每次读取到一行,都交给map方法执行一次
*
*
* context.nextKeyValue()的意义有连点:
*
* 1、读取一个key-value到该context对象中的两个属性中:key-value
* 2、方法的返回值并不是读取到的key-value,是标志有没有读取到key_value的布尔值
*
*
* context.getCurrentKey() ==== key
* context.getCurrentValue() ==== value
*
*
*
* 依赖于最底层的 LineRecordReader的实现
*
* 你的nextKeyValue方法的返回结果中,一定要包含 false
*/
while (context.nextKeyValue()) {
                map(context.getCurrentKey(), context.getCurrentValue(), context);
             }

} finally {

/**
* 当前这个mapTask在执行完毕所有的该切片数据之后,会执行
*/
cleanup(context);
}
}

此处map方法中有四个重要的方法:

1、context.nextKeyValue(); //负责读取数据,但是方法的返回值却不是读取到的key-value,而是返回了一个标识有没有读取到数据的布尔值

2、context.getCurrentKey(); //负责获取context.nextKeyValue() 读取到的key

3、context.getCurrentValue(); //负责获取context.nextKeyValue() 读取到的value

4、context.write(key,value); //负责输出mapper阶段输出的数据

2、谁在调用run方法?context参数怎么来的,是什么?

共同答案:找到了谁在调用run方法,那么就能知道这个谁就会给run方法传入一个参数叫做:context

最开始,mapper.run(context)是由mapTask实例对象进行调用

查看源码MapTask.java

@Override
public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException, ClassNotFoundException, InterruptedException {
this.umbilical = umbilical;

if (isMapTask()) {
// If there are no reducers then there won't be any sort. Hence the
// map
// phase will govern the entire attempt's progress.
if (conf.getNumReduceTasks() == 0) {
mapPhase = getProgress().addPhase("map", 1.0f);
} else {
// If there are reducers then the entire attempt's progress will
// be
// split between the map phase (67%) and the sort phase (33%).
mapPhase = getProgress().addPhase("map", 0.667f);
sortPhase = getProgress().addPhase("sort", 0.333f);
}
}
TaskReporter reporter = startReporter(umbilical);

boolean useNewApi = job.getUseNewMapper();
initialize(job, getJobID(), reporter, useNewApi);

// check if it is a cleanupJobTask
if (jobCleanup) {
runJobCleanupTask(umbilical, reporter);
return;
}
if (jobSetup) {
runJobSetupTask(umbilical, reporter);
return;
}
if (taskCleanup) {
runTaskCleanupTask(umbilical, reporter);
return;
}

/**
* run方法的核心:
*
* 新的API
*/

if (useNewApi) {
/**
* jobConf对象, splitMetaInfo 切片信息 umbilical 通信协议
* reporter就是包含了各种计数器的一个对象
*/
            runNewMapper(job, splitMetaInfo, umbilical, reporter);
         } else {
runOldMapper(job, splitMetaInfo, umbilical, reporter);
}

done(umbilical, reporter);
}

 

得出伪代码调动新的API

1        mapTask.run(){
runNewMapper(){
mapper.run(mapperContext);
}
}

 

3、查看runNewMapper方法

发现此方法还是在MapTask.java中

/**
* 这就是具体的调用逻辑的核心;
*
*
* mapper.run(context);
*
*
*
* @param job
* @param splitIndex
* @param umbilical
* @param reporter
* @throws IOException
* @throws ClassNotFoundException
* @throws InterruptedException
*/
@SuppressWarnings("unchecked")
private <INKEY, INVALUE, OUTKEY, OUTVALUE> void runNewMapper(final JobConf job, final TaskSplitIndex splitIndex,
final TaskUmbilicalProtocol umbilical, TaskReporter reporter)
throws IOException, ClassNotFoundException, InterruptedException {
// make a task context so we can get the classes
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(
job, getTaskID(), reporter);
// make a mapper
org.apache.hadoop.mapreduce.Mapper<INKEY, INVALUE, OUTKEY, OUTVALUE> mapper = (org.apache.hadoop.mapreduce.Mapper<INKEY, INVALUE, OUTKEY, OUTVALUE>) ReflectionUtils
.newInstance(taskContext.getMapperClass(), job);

/**
* inputFormat.createRecordReader() === RecordReader real
*
*
* inputFormat就是TextInputFormat类的实例对象
*
* TextInputFormat组件中的createRecordReader方法的返回值就是  LineRecordReader的实例对象
*/
// make the input format
org.apache.hadoop.mapreduce.InputFormat<INKEY, INVALUE> inputFormat =
(org.apache.hadoop.mapreduce.InputFormat<INKEY, INVALUE>)
ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);

// rebuild the input split
org.apache.hadoop.mapreduce.InputSplit split = null;
split = getSplitDetails(new Path(splitIndex.getSplitLocation()), splitIndex.getStartOffset());
LOG.info("Processing split: " + split);

/**
* NewTrackingRecordReader这个类中一定有三个方法:
*
* nextKeyValue
* getCurrentKey
* getCurrentValue
*
* NewTrackingRecordReader的里面的三个方法的实现
* 其实是依赖于于inputFormat对象的createRecordReader方法的返回值的  三个方法的实现
*
* 默认的InputFormat: TextInputFormat
* 默认的RecordReader:LineRecordReader
*
*
* 最终:NewTrackingRecordReader的三个方法的实现是依赖于:LineRecordReader这个类中的三个同名方法的实现
*/
org.apache.hadoop.mapreduce.RecordReader<INKEY, INVALUE> input =
new NewTrackingRecordReader<INKEY, INVALUE>(
split, inputFormat, reporter, taskContext);

job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());

/**
* 声明一个Output对象用来给mapper的key-value进行输出
*/
org.apache.hadoop.mapreduce.RecordWriter output = null;
// get an output object
if (job.getNumReduceTasks() == 0) {

/**
* NewDirectOutputCollector  直接输出的一个收集器,  这个类中一定有一个方法 叫做  write
*/
output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
} else {

/**
* 有reducer阶段了。
*
*         1、能确定,一定会排序
*
*         2、能否确定一定会使用Parititioner,  不一定。     在逻辑上可以任务没有起作用。
*
* NewOutputCollector 这个类当中,一定有一个方法:write方法
*/
output = new NewOutputCollector(taskContext, job, umbilical, reporter);
}

/**
*  mapContext对象中一定包含三个方法
*
*  找到了之前第一查看源码实现的方法的问题的答案:
*
*      问题:找到谁调用MapContextImpl这个类的构造方法
*
*      mapContext就是MapContextImpl的实例对象
*
*      MapContextImpl类中一定有三个方法:
*
*      input  ===  NewTrackingRecordReader
*
*
*
*      确定的知识:
*
*      1、mapContext对象中,一定有write方法
*
*      2、通过观看MapContextImpl的组成,发现其实没有write方法
*
*      解决:
*
*      其实mapContext.write方法的调用是来自于MapContextImpl这个类的父类
*
*
*
*      最底层的write方法:  output.write();
*/
org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> mapContext =
new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(
job, getTaskID(), input, output, committer, reporter, split);

/**
* mapperContext的内部一定包含是三个犯法:
*
* nextKeyValue
* getCurrentKey
* getCurrentValue
*
* mapperContext的具体实现是依赖于new Context(context);
* context = mapContext
*
* 结论:
*
* mapContext对象的内部一定包含以下三个方法:
*
* nextKeyValue
* getCurrentKey
* getCurrentValue
*
*
* mapContext 中 也有一个方法叫做:write(key,value)
*/
org.apache.hadoop.mapreduce.Mapper<INKEY, INVALUE, OUTKEY, OUTVALUE>.Context mapperContext =
new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>()
.getMapContext(mapContext);

try {

input.initialize(split, mapperContext);

/**
* 复杂调用整个mapTask执行的入口
*
* 方法的逻辑构成:
*
*     1、重点方法在最后,或者在try中
*  2、其他的代码,几乎只有两个任务:一个用来记录记日志或者完善流程。。 一个准备核心方法的参数
*/
            mapper.run(mapperContext);


mapPhase.complete();
setPhase(TaskStatus.Phase.SORT);
statusUpdate(umbilical);
input.close();
input = null;
output.close(mapperContext);
output = null;

} finally {
closeQuietly(input);
closeQuietly(output, mapperContext);
}
}

能确定的是:mapperContext一定有上面说的那四个重要的方法,往上继续查找mapperContext

/**
143          * mapperContext的内部一定包含是三个犯法:
144          *
145          * nextKeyValue
146          * getCurrentKey
147          * getCurrentValue
148          *
149          * mapperContext的具体实现是依赖于new Context(context);
150          * context = mapContext
151          *
152          * 结论:
153          *
154          * mapContext对象的内部一定包含以下三个方法:
155          *
156          * nextKeyValue
157          * getCurrentKey
158          * getCurrentValue
159          *
160          *
161          * mapContext 中 也有一个方法叫做:write(key,value)
162          */
163         org.apache.hadoop.mapreduce.Mapper<INKEY, INVALUE, OUTKEY, OUTVALUE>.Context mapperContext =
164                 new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>()
165                 .getMapContext(mapContext);

查看WrappedMapper.java

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.mapreduce.lib.map;

import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configuration.IntegerRanges;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MapContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.security.Credentials;

/**
* A {@link Mapper} which wraps a given one to allow custom
* {@link Mapper.Context} implementations.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class WrappedMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {

/**
* Get a wrapped {@link Mapper.Context} for custom implementations.
*
* @param mapContext
*            <code>MapContext</code> to be wrapped
* @return a wrapped <code>Mapper.Context</code> for custom implementations
*/
public Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context getMapContext(
MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapContext) {
return new Context(mapContext);
}

@InterfaceStability.Evolving
public class Context extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context {

protected MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapContext;

public Context(MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapContext) {
this.mapContext = mapContext;
}

/**
* Get the input split for this map.
*/
public InputSplit getInputSplit() {
return mapContext.getInputSplit();
}

@Override
public KEYIN getCurrentKey() throws IOException, InterruptedException {
return mapContext.getCurrentKey();
}

@Override
public VALUEIN getCurrentValue() throws IOException, InterruptedException {
return mapContext.getCurrentValue();
}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
return mapContext.nextKeyValue();
}

@Override
public Counter getCounter(Enum<?> counterName) {
return mapContext.getCounter(counterName);
}

@Override
public Counter getCounter(String groupName, String counterName) {
return mapContext.getCounter(groupName, counterName);
}

@Override
public OutputCommitter getOutputCommitter() {
return mapContext.getOutputCommitter();
}

@Override
public void write(KEYOUT key, VALUEOUT value) throws IOException, InterruptedException {
mapContext.write(key, value);
}

@Override
public String getStatus() {
return mapContext.getStatus();
}

@Override
public TaskAttemptID getTaskAttemptID() {
return mapContext.getTaskAttemptID();
}

@Override
public void setStatus(String msg) {
mapContext.setStatus(msg);
}

@Override
public Path[] getArchiveClassPaths() {
return mapContext.getArchiveClassPaths();
}

@Override
public String[] getArchiveTimestamps() {
return mapContext.getArchiveTimestamps();
}

@Override
public URI[] getCacheArchives() throws IOException {
return mapContext.getCacheArchives();
}

@Override
public URI[] getCacheFiles() throws IOException {
return mapContext.getCacheFiles();
}

@Override
public Class<? extends Reducer<?, ?, ?, ?>> getCombinerClass() throws ClassNotFoundException {
return mapContext.getCombinerClass();
}

@Override
public Configuration getConfiguration() {
return mapContext.getConfiguration();
}

@Override
public Path[] getFileClassPaths() {
return mapContext.getFileClassPaths();
}

@Override
public String[] getFileTimestamps() {
return mapContext.getFileTimestamps();
}

@Override
public RawComparator<?> getCombinerKeyGroupingComparator() {
return mapContext.getCombinerKeyGroupingComparator();
}

@Override
public RawComparator<?> getGroupingComparator() {
return mapContext.getGroupingComparator();
}

@Override
public Class<? extends InputFormat<?, ?>> getInputFormatClass() throws ClassNotFoundException {
return mapContext.getInputFormatClass();
}

@Override
public String getJar() {
return mapContext.getJar();
}

@Override
public JobID getJobID() {
return mapContext.getJobID();
}

@Override
public String getJobName() {
return mapContext.getJobName();
}

@Override
public boolean getJobSetupCleanupNeeded() {
return mapContext.getJobSetupCleanupNeeded();
}

@Override
public boolean getTaskCleanupNeeded() {
return mapContext.getTaskCleanupNeeded();
}

@Override
public Path[] getLocalCacheArchives() throws IOException {
return mapContext.getLocalCacheArchives();
}

@Override
public Path[] getLocalCacheFiles() throws IOException {
return mapContext.getLocalCacheFiles();
}

@Override
public Class<?> getMapOutputKeyClass() {
return mapContext.getMapOutputKeyClass();
}

@Override
public Class<?> getMapOutputValueClass() {
return mapContext.getMapOutputValueClass();
}

@Override
public Class<? extends Mapper<?, ?, ?, ?>> getMapperClass() throws ClassNotFoundException {
return mapContext.getMapperClass();
}

@Override
public int getMaxMapAttempts() {
return mapContext.getMaxMapAttempts();
}

@Override
public int getMaxReduceAttempts() {
return mapContext.getMaxReduceAttempts();
}

@Override
public int getNumReduceTasks() {
return mapContext.getNumReduceTasks();
}

@Override
public Class<? extends OutputFormat<?, ?>> getOutputFormatClass() throws ClassNotFoundException {
return mapContext.getOutputFormatClass();
}

@Override
public Class<?> getOutputKeyClass() {
return mapContext.getOutputKeyClass();
}

@Override
public Class<?> getOutputValueClass() {
return mapContext.getOutputValueClass();
}

@Override
public Class<? extends Partitioner<?, ?>> getPartitionerClass() throws ClassNotFoundException {
return mapContext.getPartitionerClass();
}

@Override
public Class<? extends Reducer<?, ?, ?, ?>> getReducerClass() throws ClassNotFoundException {
return mapContext.getReducerClass();
}

@Override
public RawComparator<?> getSortComparator() {
return mapContext.getSortComparator();
}

@Override
public boolean getSymlink() {
return mapContext.getSymlink();
}

@Override
public Path getWorkingDirectory() throws IOException {
return mapContext.getWorkingDirectory();
}

@Override
public void progress() {
mapContext.progress();
}

@Override
public boolean getProfileEnabled() {
return mapContext.getProfileEnabled();
}

@Override
public String getProfileParams() {
return mapContext.getProfileParams();
}

@Override
public IntegerRanges getProfileTaskRange(boolean isMap) {
return mapContext.getProfileTaskRange(isMap);
}

@Override
public String getUser() {
return mapContext.getUser();
}

@Override
public Credentials getCredentials() {
return mapContext.getCredentials();
}

@Override
public float getProgress() {
return mapContext.getProgress();
}
}
}
View Code

此类里面一定有那4个重要的方法,发现调用了mapContext,继续往上找

/**
110          *  mapContext对象中一定包含三个方法
111          *
112          *  找到了之前第一查看源码实现的方法的问题的答案:
113          *
114          *      问题:找到谁调用MapContextImpl这个类的构造方法
115          *
116          *      mapContext就是MapContextImpl的实例对象
117          *
118          *      MapContextImpl类中一定有三个方法:
119          *
120          *      input  ===  NewTrackingRecordReader
121          *
122          *
123          *
124          *      确定的知识:
125          *
126          *      1、mapContext对象中,一定有write方法
127          *
128          *      2、通过观看MapContextImpl的组成,发现其实没有write方法
129          *
130          *      解决:
131          *
132          *      其实mapContext.write方法的调用是来自于MapContextImpl这个类的父类
133          *
134          *
135          *
136          *      最底层的write方法:  output.write();
137          */
138         org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> mapContext =
139                 new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(
140                 job, getTaskID(), input, output, committer, reporter, split);

mapConext就是这个类MapContextImpl的实例对象

继续确定:

mapConext = new MapContextImpl(input)
mapConext.nextKeyVlaue(){

LineRecordReader real = input.createRecordReader();

real.nextKeyValue();
}

查看MapContextImpl.java源码

public class MapContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
extends TaskInputOutputContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
implements MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {

private RecordReader<KEYIN, VALUEIN> reader;
private InputSplit split;

public MapContextImpl(Configuration conf,
TaskAttemptID taskid,
RecordReader<KEYIN, VALUEIN> reader,
RecordWriter<KEYOUT, VALUEOUT> writer,
OutputCommitter committer,
StatusReporter reporter,
InputSplit split) {

// 通过super调用父类的构造方法
super(conf, taskid, writer, committer, reporter);

this.reader = reader;
this.split = split;
}

/**
* Get the input split for this map.
*/
public InputSplit getInputSplit() {
return split;
}

@Override
public KEYIN getCurrentKey() throws IOException, InterruptedException {
return reader.getCurrentKey();
}

@Override
public VALUEIN getCurrentValue() throws IOException, InterruptedException {
return reader.getCurrentValue();
}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
return reader.nextKeyValue();
}

}

 

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: