您的位置:首页 > 编程语言 > Java开发

MyJob.java解析

2017-03-06 00:00 155 查看
摘要:mapreduce

importjava.io.IOException;

//导入IO异常类,当发生某种I/O异常时,抛出此异常。此类是失败或中断的I/O操作生成的异常的通用类。
publicclassIOExceptionextendsException
importjava.util.Iterator;

//导入接口Iterator,Iterator是对collection进行迭代的迭代器。迭代器取代了JavaCollectionsFramework中的Enumeration。迭代器与枚举有两点不同:迭代器允许调用者利用定义良好的语义在迭代期间从迭代器所指向的collection移除元素。方法名称得到了改进。publicinterfaceIterator
//Collection是最基本的集合接口,一个Collection代表一组Object,即Collection的元素(Elements)。一些Collection允许相同的元素而另一些不行。一些能排序而另一些不行。JavaSDK不提供直接继承自Collection的类,JavaSDK提供的类都是继承自Collection的"子接口"如List和Set。

importorg.apache.hadoop.conf.Configuration;

//hadoop使用了一套独有的配置文件管理系统,并提供自己的API,即使用org.apache.hadoop.conf.Configuration处理配置信息。详细参看《hadoop的APIConfiguration》

//是MapReduce配置模块最底层的类,支持序列化(实现Iterator接口)和迭代(实现Writable接口)

publicclassConfiguration
extendsObject
implementsIterable<Map.Entry<String,String>>,Writable

importorg.apache.hadoop.conf.Configured;

//org.apache.hadoop.conf中的最后一个类,也是这个包中以后用的最频繁的一个,Configurable算是肉体,Configuration算是灵魂吧

//Configurable是一个很简单的接口,也位于org.apache.hadoop.conf包中,从字面解释,Configurable的含义是可配置的,如果一个类实现了Configurable接口,意味着这个类是可配置的。也就是说,可以通过为这个类的对象传一个Configuration实例,提供对象工作需要的一些配置信息。在setConf()中,如果对象实现了Configurable接口,那么对象的setConf()方法会被调用,并根据Configuration类的实例conf进一步初始化对象。

publicinterfaceConfigurable


publicclassConfigured
extendsObject
implementsConfigurable

importorg.apache.hadoop.fs.Path;

//Path对路径进行解析,将参数转换为标准的URI格式,对Path的参数作判断,标准化,字符化等操作。

publicclassPathextendsObjectimplementsComparable
//NamesafileordirectoryinaFileSystem.Pathstringsuseslashasthedirectoryseparator.Apathstringisabsoluteifitbeginswithaslash.

importorg.apache.hadoop.io.Text;

//ThisclassstorestextusingstandardUTF8encoding.Itprovidesmethodstoserialize,deserialize,andcomparetextsatbytelevel.Thetypeoflengthisintegerandisserializedusingzero-compressedformat.

publicclassText
extendsBinaryComparable
implementsWritableComparable<BinaryComparable>

importorg.apache.hadoop.mapred.FileInputFormat;

//InputFormat接口的实现

//为Map-Reduce作业描述输入的细节规范。1、检查作业输入的有效性。2、把输入文件切分成多个逻辑InputSplit实例,并把每一实例分别分发给一个Mapper。3、提供RecordReader的实现,这个RecordReader从逻辑InputSplit中获得输入记录,这些记录将由Mapper处理。

publicabstractclassFileInputFormat<K,V>
extendsObject
implementsInputFormat<K,V>

importorg.apache.hadoop.mapred.FileOutputFormat;

//OutputFormat接口的实现

//描述Map-Reduce作业的输出样式。1、检验作业的输出,例如检查输出路径是否已经存在。2、提供一个RecordWriter的实现,用来输出作业结果。输出文件保存在FileSystem上。


publicabstractclassFileOutputFormat<K,V>
extendsObject
implementsOutputFormat<K,V>

importorg.apache.hadoop.mapred.JobClient;

//JobClient是用户提交的作业与JobTracker交互的主要接口,提供提交作业,追踪进程,访问子任务的日志记录,获得Map-reduce集群状态信息等功能。

publicclassJobClient
extendsConfigured
implementsTool

importorg.apache.hadoop.mapred.JobConf;

//代表一个Map-Reduce作业的配置,描述一个map-reduce作业运行时所需要的所有信息。

//JobConf会自动加载配置文件mapred-default.xml和mapred-site.xml

publicclassJobConf
extendsConfiguration

importorg.apache.hadoop.mapred.KeyValueTextInputFormat;

//KeyValueTextInputFormat:用于读取文件,如果行被分隔符分割为两部分,第一部分为key,剩下的为value;若没有分隔符,整行作为key,value为空。

publicclassKeyValueTextInputFormat
extendsFileInputFormat<Text,Text>
implementsJobConfigurable

importorg.apache.hadoop.mapred.MapReduceBase;

//Mapper和Reducer的基类,提供了的close()和configure(JobConfjob)方法,继承子类需要编写两个方法的实现。

publicclassMapReduceBase
extendsObject
implementsCloseable,JobConfigurable

importorg.apache.hadoop.mapred.Mapper;

//将输入键值对(key/valuepair)映射到一组中间格式的键值对集合。

publicinterfaceMapper<K1,V1,K2,V2>
extendsJobConfigurable,Closeable

importorg.apache.hadoop.mapred.OutputCollector;

//Map端的上下文,OutputCollector和Reporter是Hadoop-0.19以前版本里面的API,在Hadoop-0.20.2以后就换成Context,Context的功能包含了OutputCollector和Reporter的功能。OutputCollector由Hadoop框架提供,负责收集Mapper和Reducer的输出数据,实现map函数和reduce函数时,只需要简单地将其输出的<key,value>对往OutputCollector中一丢即可,剩余的事框架自会帮你处理好。


publicinterfaceOutputCollector<K,V>

importorg.apache.hadoop.mapred.Reducer;

//reducer,将与一个key关联的一组中间数值集归约(reduce)为一个更小的数值集。

publicinterfaceReducer<K2,V2,K3,V3>
extendsJobConfigurable,Closeable

importorg.apache.hadoop.mapred.Reporter;

//Reporter用于Map-Reduce应用程序报告进度,设定应用级别的状态消息,更新Counters(计数器)的机制。

publicinterfaceReporter
extendsProgressable

importorg.apache.hadoop.mapred.TextOutputFormat;

//TextInputFormat:用于读取纯文本文件,文件被分为一系列以LF或CR结束的行,key是每一行的偏移量(LongWritable),value是每一行的内容(Text)。

//TextOutputFormat:用于输出一个纯文本文件。将每个记录写为一行文本。键和值以字符串的形式写入,并以制表符(\t)分隔。

publicclassTextOutputFormat<K,V>
extendsFileOutputFormat<K,V>

importorg.apache.hadoop.util.Tool;

//Tool接口可以支持处理通用的命令行选项,它是所有Map-Reduce程序的都可用的一个标准接口

publicinterfaceTool
extendsConfigurable
int
run(String[]args)
Executethecommandwiththegivenarguments.
importorg.apache.hadoop.util.ToolRunner;

//定义的一个类,实现Tool接口,在main()方法中通过ToolRunner.run(...)方法调用上述类的run(String[]方法)

publicclassToolRunner
extendsObject

//定义MyJob类

publicclassMyJobextendsConfiguredimplementsTool{

//实现Mapper接口

publicstaticclassMapClassextendsMapReduceBase

implementsMapper<Text,Text,Text,Text>{

publicvoidmap(Textkey,Textvalue,

OutputCollector<Text,Text>output,

Reporterreporter)throwsIOException{

output.collect(value,key);

}

}

//实现Reducer接口

publicstaticclassReduceextendsMapReduceBase

implementsReducer<Text,Text,Text,Text>{

//输入参数中的value是文本文件中的一行

publicvoidreduce(Textkey,Iterator<Text>values,

OutputCollector<Text,Text>output,

Reporterreporter)throwsIOException{

Stringcsv="";

//values.hasNext()如果仍有元素可以迭代,则返回true。

while(values.hasNext()){

if(csv.length()>0)csv+=",";

//values.next()返回迭代的下一个元素。

csv+=values.next().toString();

}

output.collect(key,newText(csv));

}

}

//重载tool的run方法

publicintrun(String[]args)throwsException{

//调用基类Configured的getConf获取环境变量实例

Configurationconf=getConf();

//定义一个map-reduce作业,作业的配置conf,作业被MyJob使用

JobConfjob=newJobConf(conf,MyJob.class);

//定义输入路径

Pathin=newPath(args[0]);

//定义输出路径

Pathout=newPath(args[1]);

//为作业定义输入路径

FileInputFormat.setInputPaths(job,in);

//为作业定义输出路径

FileOutputFormat.setOutputPath(job,out);

//设置作业名称

job.setJobName("MyJob");

//设置作业的分解对象

job.setMapperClass(MapClass.class);

//设置作业的整合对象

job.setReducerClass(Reduce.class);

//设置作业的输入实现,输入的数据用给定的分隔符分割,前面是键key,后面是值value,

//key1、value1的类型均为text

job.setInputFormat(KeyValueTextInputFormat.class);

//设置作业的输出实现

job.setOutputFormat(TextOutputFormat.class);

//设置作业输出键是纯文本,保证与输入分割时的输出的key的类型一致key2的类型是text

job.setOutputKeyClass(Text.class);

//设置输出的值是纯文本,保证与输入分割时的输出的value的类型一致value2的类型是text

job.setOutputValueClass(Text.class);

//设置逗号为输入文本的分割符

job.set("key.value.separator.in.input.line",",");

//客户端作业类运行作业

JobClient.runJob(job);

return0;

}

//主方法,args是调用命令行输入的参数数组

publicstaticvoidmain(String[]args)throwsException{

//调用MyJob的run方法,为MyJob创建一个配置项,传入命令行参数args

//这里的newMyJob()作为Tool的实现

//newConfiguration,将得到hadoop默认的路径,在core-default.xml中配置

intres=ToolRunner.run(newConfiguration(),newMyJob(),args);

System.exit(res);

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  hadoop map-reduce