您的位置:首页 > 其它

MR-2.输入格式(InputFormat)基本介绍

2016-01-26 19:01 357 查看
Hadoop可以处理多种不同类型的数据格式,从文本型数据到数据库中的数据都可以。



基于InputFormat的实现类如图所示



输入分片和记录(InputSplitand Record)

一个Input split是整个输入文件的一块,每个Input split被单独的MapTask处理,每个split被切分成多个记录,Map处理每条记录(k/v)。Input split在J***A类中是InputSplit类。

InputSplit类的结构:

public abstract class InputSplit {
  /**
   * 获取每个split的长度,每个输入分片按照大小排序
   * @return 输入分片的字节长度
   * @throws IOException
   * @throws InterruptedException
   */
  public abstract long getLength() throws IOException, InterruptedException;

  /**
   * 获取含有该数据的所有主机节点
   * 
   * @return a new array of the node nodes.
   * @throws IOException
   * @throws InterruptedException
   */
  public abstract 
    String[] getLocations() throws IOException, InterruptedException
}


一个MapReduce开发者,不需要关心InputSplits,他们通过InputFormat来创建,那么InputFormat类的结构:

public abstract class InputFormat<K, V> {

  /** 
   * 获取job的分片列表,每个InputSplit被分片给单独Mapper处理
   * 
   * 这个split是输入数据的逻辑表示,不代表物理上块。每个InputSplit文件包括
   * 文件路径,开始位置,偏移量。另InputFormat也创建RecordReader来,主要读取InputSplit
   */
  public abstract 
    List<InputSplit> getSplits(JobContext context
                               ) throws IOException, InterruptedException;
  
  /**
   * 为InputSplit创建一个record reader.框架会在split用之前执行RecordReader的 initialize方法.
   * @param split the split to be read
   * @param context the information about the task
   * @return a new record reader
   * @throws IOException
   * @throws InterruptedException
   */
  public abstract 
    RecordReader<K,V> createRecordReader(InputSplit split,
                                         TaskAttemptContext context
                                        ) throws IOException, 
                                                 InterruptedException;

}


运行一个job的客户端调用getSplit方法计算出splits,然后发送给application master,它用这个存储目录安排maptasks在集群上来处理这些数据。

MapTask通过InputFormat类中createRecordReader( )方法为InputSplit获取一个RecordReader

然后通过initialize方法获取记录键值对,每个k/v通过map函数来处理,具体可以参考Mapper类中的run方法。

public void run(Context context) throws IOException, InterruptedException {
    //开始:每个maptask仅调用一次
	setup(context);
    try {
	  //每个记录<k,v>都调用
      while (context.nextKeyValue()) {
        map(context.getCurrentKey(), context.getCurrentValue(), context);
      }
    } finally {
	   //每个maptask仅调用一次
      cleanup(context);
    }
  }


最后,需要重点说明一点,Mapper‘s的run方法是个公共的方法,需要用户自定义来实现。而MultithreadedMapper是运行mappers的一种实现,MultithreadedMapper中内部类MapRunner具体如下:

public class MultithreadedMapper<K1, V1, K2, V2> 
  extends Mapper<K1, V1, K2, V2> {

  private static final Log LOG = LogFactory.getLog(MultithreadedMapper.class);
  public static String NUM_THREADS = "mapreduce.mapper.multithreadedmapper.threads";
  public static String MAP_CLASS = "mapreduce.mapper.multithreadedmapper.mapclass";
  
  private Class<? extends Mapper<K1,V1,K2,V2>> mapClass;
  private Context outer;
  private List<MapRunner> runners;

  /**
   * 用一个线程池来运行MapRunner线程,默认开启10个线程
   */
  @Override
  public void run(Context context) throws IOException, InterruptedException {
    outer = context;
    int numberOfThreads = getNumberOfThreads(context);
    mapClass = getMapperClass(context);
    if (LOG.isDebugEnabled()) {
      LOG.debug("Configuring multithread runner to use " + numberOfThreads + 
                " threads");
    }
    
    runners =  new ArrayList<MapRunner>(numberOfThreads);
    for(int i=0; i < numberOfThreads; ++i) {
      MapRunner thread = new MapRunner(context);
      thread.start();
      runners.add(i, thread);
    }
    for(int i=0; i < numberOfThreads; ++i) {
      MapRunner thread = runners.get(i);
      thread.join();
      Throwable th = thread.throwable;
      if (th != null) {
        if (th instanceof IOException) {
          throw (IOException) th;
        } else if (th instanceof InterruptedException) {
          throw (InterruptedException) th;
        } else {
          throw new RuntimeException(th);
        }
      }
    }
  }

  private class MapRunner extends Thread {
    private Mapper<K1,V1,K2,V2> mapper;
    private Context subcontext;
    private Throwable throwable;
    private RecordReader<K1,V1> reader = new SubMapRecordReader();

    MapRunner(Context context) throws IOException, InterruptedException {
	  //通过反射获取用户自定义的mapper
      mapper = ReflectionUtils.newInstance(mapClass, 
                                           context.getConfiguration());
      MapContext<K1, V1, K2, V2> mapContext = 
        new MapContextImpl<K1, V1, K2, V2>(outer.getConfiguration(), 
                                           outer.getTaskAttemptID(),
                                           reader,
                                           new SubMapRecordWriter(), 
                                           context.getOutputCommitter(),
                                           new SubMapStatusReporter(),
                                           outer.getInputSplit());
      subcontext = new WrappedMapper<K1, V1, K2, V2>().getMapContext(mapContext);
	  //调用InputFormat中通过createRecordReader中的RecordReader方法
      reader.initialize(context.getInputSplit(), context);
    }

    @Override
    public void run() {
      try {
		//这里直接调用Mapper中的run方法
        mapper.run(subcontext);
        reader.close();
      } catch (Throwable ie) {
        throwable = ie;
      }
    }
  }
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: