MR-2.输入格式(InputFormat)基本介绍
2016-01-26 19:01
357 查看
Hadoop可以处理多种不同类型的数据格式,从文本型数据到数据库中的数据都可以。
基于InputFormat的实现类如图所示
InputSplit类的结构:
一个MapReduce开发者,不需要关心InputSplits,他们通过InputFormat来创建,那么InputFormat类的结构:
运行一个job的客户端调用getSplit方法计算出splits,然后发送给application master,它用这个存储目录安排maptasks在集群上来处理这些数据。
MapTask通过InputFormat类中createRecordReader( )方法为InputSplit获取一个RecordReader
然后通过initialize方法获取记录键值对,每个k/v通过map函数来处理,具体可以参考Mapper类中的run方法。
最后,需要重点说明一点,Mapper‘s的run方法是个公共的方法,需要用户自定义来实现。而MultithreadedMapper是运行mappers的一种实现,MultithreadedMapper中内部类MapRunner具体如下:
基于InputFormat的实现类如图所示
输入分片和记录(InputSplitand Record)
一个Input split是整个输入文件的一块,每个Input split被单独的MapTask处理,每个split被切分成多个记录,Map处理每条记录(k/v)。Input split在J***A类中是InputSplit类。InputSplit类的结构:
public abstract class InputSplit { /** * 获取每个split的长度,每个输入分片按照大小排序 * @return 输入分片的字节长度 * @throws IOException * @throws InterruptedException */ public abstract long getLength() throws IOException, InterruptedException; /** * 获取含有该数据的所有主机节点 * * @return a new array of the node nodes. * @throws IOException * @throws InterruptedException */ public abstract String[] getLocations() throws IOException, InterruptedException }
一个MapReduce开发者,不需要关心InputSplits,他们通过InputFormat来创建,那么InputFormat类的结构:
public abstract class InputFormat<K, V> { /** * 获取job的分片列表,每个InputSplit被分片给单独Mapper处理 * * 这个split是输入数据的逻辑表示,不代表物理上块。每个InputSplit文件包括 * 文件路径,开始位置,偏移量。另InputFormat也创建RecordReader来,主要读取InputSplit */ public abstract List<InputSplit> getSplits(JobContext context ) throws IOException, InterruptedException; /** * 为InputSplit创建一个record reader.框架会在split用之前执行RecordReader的 initialize方法. * @param split the split to be read * @param context the information about the task * @return a new record reader * @throws IOException * @throws InterruptedException */ public abstract RecordReader<K,V> createRecordReader(InputSplit split, TaskAttemptContext context ) throws IOException, InterruptedException; }
运行一个job的客户端调用getSplit方法计算出splits,然后发送给application master,它用这个存储目录安排maptasks在集群上来处理这些数据。
MapTask通过InputFormat类中createRecordReader( )方法为InputSplit获取一个RecordReader
然后通过initialize方法获取记录键值对,每个k/v通过map函数来处理,具体可以参考Mapper类中的run方法。
public void run(Context context) throws IOException, InterruptedException { //开始:每个maptask仅调用一次 setup(context); try { //每个记录<k,v>都调用 while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { //每个maptask仅调用一次 cleanup(context); } }
最后,需要重点说明一点,Mapper‘s的run方法是个公共的方法,需要用户自定义来实现。而MultithreadedMapper是运行mappers的一种实现,MultithreadedMapper中内部类MapRunner具体如下:
public class MultithreadedMapper<K1, V1, K2, V2> extends Mapper<K1, V1, K2, V2> { private static final Log LOG = LogFactory.getLog(MultithreadedMapper.class); public static String NUM_THREADS = "mapreduce.mapper.multithreadedmapper.threads"; public static String MAP_CLASS = "mapreduce.mapper.multithreadedmapper.mapclass"; private Class<? extends Mapper<K1,V1,K2,V2>> mapClass; private Context outer; private List<MapRunner> runners; /** * 用一个线程池来运行MapRunner线程,默认开启10个线程 */ @Override public void run(Context context) throws IOException, InterruptedException { outer = context; int numberOfThreads = getNumberOfThreads(context); mapClass = getMapperClass(context); if (LOG.isDebugEnabled()) { LOG.debug("Configuring multithread runner to use " + numberOfThreads + " threads"); } runners = new ArrayList<MapRunner>(numberOfThreads); for(int i=0; i < numberOfThreads; ++i) { MapRunner thread = new MapRunner(context); thread.start(); runners.add(i, thread); } for(int i=0; i < numberOfThreads; ++i) { MapRunner thread = runners.get(i); thread.join(); Throwable th = thread.throwable; if (th != null) { if (th instanceof IOException) { throw (IOException) th; } else if (th instanceof InterruptedException) { throw (InterruptedException) th; } else { throw new RuntimeException(th); } } } } private class MapRunner extends Thread { private Mapper<K1,V1,K2,V2> mapper; private Context subcontext; private Throwable throwable; private RecordReader<K1,V1> reader = new SubMapRecordReader(); MapRunner(Context context) throws IOException, InterruptedException { //通过反射获取用户自定义的mapper mapper = ReflectionUtils.newInstance(mapClass, context.getConfiguration()); MapContext<K1, V1, K2, V2> mapContext = new MapContextImpl<K1, V1, K2, V2>(outer.getConfiguration(), outer.getTaskAttemptID(), reader, new SubMapRecordWriter(), context.getOutputCommitter(), new SubMapStatusReporter(), outer.getInputSplit()); subcontext = new WrappedMapper<K1, V1, K2, V2>().getMapContext(mapContext); //调用InputFormat中通过createRecordReader中的RecordReader方法 reader.initialize(context.getInputSplit(), context); } @Override public void run() { try { //这里直接调用Mapper中的run方法 mapper.run(subcontext); reader.close(); } catch (Throwable ie) { throwable = ie; } } } }
相关文章推荐
- 操作注册表的API函数介绍(RegCreateKeyEx:该函数用来创建注册表键,如果该键已经存在,则打开它)
- android多线程断点续传下载
- Codeforces Round #326 (Div. 2) E. Duff in the Army(LCA+倍增法)
- 关于SQL语句的使用心得
- linux下查看路由表、默认路由器
- 数据结构---无重复元素链表的实现
- JavaScript基础
- linux命令(2):df 磁盘占用
- Android的底层库libutils介绍
- 使用lua代码创建文件夹,解压zip文件到指定目录(亲测可用)
- 大话重构连载6:一个真实的谎言
- 学习Slim Framework for PHP v3 (三)
- web项目中文乱码问题实践经验(springmvc +hibernate)
- Spy++的使用方法
- Postgresql通过批处理命令执行sql文件
- 23岁产妇坐月子双腿险被截肢,产后绝对不能做这八件事!
- Use of undeclared identifier 'BMKMapPointMake'错误 (百度地图)
- MR-2.MapReduce序列化&反序列化&MapReduce函数
- Qt之进程间通信(IPC)
- JSP+Servlet+JDBC+MySQL实现表单生成