Hadoop的基本使用(3)——MapReduce的基本操作(实现字符统计)
2016-11-30 23:51
627 查看
MapReduce:
是Hadoop中一个并行计算框架,默认Hadoop提供了一些工具实现对HDFS上数据的分析计算汇总。特点:hadoop充分的利用了集群当中DataNode的节点的CPU和内存,使用这些节点作为计算汇总节点,最终将汇总的数据写回HDFS(默认)。
数据: 存储各个dataNode中 (block单位)
数据拆分==>数据切片(针对数据块一种逻辑映射)==>MapTask(DataNode所在机器)(多个)==>ReduceTask(若干个DataNode所在机器)
Hadoop2 MapReduce2基于Yarn实现 资源管理器 负责资源调度和调配
ResourceManager:负责资源的分配和管理 CPU,内存 提供MapTask、ReduceTask的jvm启动参数 | 任务分配。
NodeManager:每一个DataNode上会启动一个NodeManager,负责连接ResourceManager以及启动MapTask或者ReduceTask (MapTask和ReduceTask统称 YarnChild)。
(详情:参考 hadoop权威指南第三版 75 RMB 中文版 《How MapReduce Works》)
搭建MapReduce的运行环境
1.修改etc/hadoop/yarn|mapred-site.xmlyarn-site.xml
<property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> <property> <name>yarn.resourcemanager.hostname</name> <value>CentOSA</value> </property>
mapred-site.xml
<property> <name>mapreduce.framework.name</name> <value>yarn</value> </property> <!--跨平台运行--> <property> <name>mapreduce.app-submission.cross-platform</name> <value>true</value> </property>
2.启动|停止Yarn
[root@CentOSA hadoop-2.6.0]# ./sbin/start|stop-yarn.sh
通过MapReduce实现字符统计
1、导入相关jar包
hadoop-commonhadoop-hdfs
hadoop-mapred
hadoop-yarn
2、书写Mapper、Reducer类
mappers/* * Keyin(限此类):数据在文件中的偏移量 * Value:数据 * keyout:统计依据 * valueout:统计值 */ public class Mapper extends Mapper<LongWritable, Text, Text, IntWritable> { protected void map(LongWritable key, Text value,Context context) throws Exception { String string[]=value.toString().split(""); for(String s:string){ context.write(new Text(s), new IntWritable(1)); } } }
reducer
public class Reducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> value,Context context) throws IOException, InterruptedException { int total=0; for(IntWritable i:value){ total+=i.get(); } context.write(key, new IntWritable(total)); } }
3、书写实现类
public class Submitter { public static void main(String[] args) throws Exception { //1、获得job对象 Configuration conf=new Configuration(); //...关联配置 conf.addResource("core-site.xml"); conf.addResource("hdfs-site.xml"); conf.addResource("yarn-site.xml"); conf.addResource("mapred-site.xml"); conf.set("mapreduce.job.jar", "wc.jar"); //...获取job实例 Job job=Job.getInstance(conf); job.setJarByClass(Submitter.class); //2、设置数据的读入、输出类型 job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); //3、设置数据的读取输出路径 Path src=new Path(""); TextInputFormat.addInputPath(job, src); Path dst=new Path(""); //..输出文件目录必须不存在,若存在通过此代码删除 FileSystem fileSystem=FileSystem.get(conf); if(fileSystem.exists(dst)){ fileSystem.delete(dst, true); } TextOutputFormat.setOutputPath(job, dst); //4、设置依据类 job.setMapperClass(Mapper.class); job.setReducerClass(Reducer.class); //5、设置传输数据类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //6、提交 job.waitForCompletion(true); } }
代码运行过程
1、切片(spilt)计算过程
InputFormat指定了文件处理路径
对文件做切片,实现了切片数据的计算逻辑
实现了对切片数据的读取逻辑,为Mapper提供数据
InputFormat->FileInputFormat->TextInputFormater
方法:
getSplit (FileInputFormat实现)
creatRecordReader (FileInputFormat未实现)
TextInputFormat举例
一个数据块对应一个切面,在任务提交时计算切片,并将切片数据写入hdfs目录,存储对应的块信息MapTask(MapContext)
调用mapper,传入自身持有的Context
MapContext下面持有RecordReader
bb4e
(来自于TextInputFormat方法提供)
RecordReader所需要的实现类是LineRecordReader
LineRecordReader
核心方法:intialize()
Mapper
方法:setup、map、cleanup、run
run(context)↓
public void run(Context context) throws IOException, InterruptedException { setup(context); try { //调用context,实际委派给Reader调用的是RecordReader中的方法 while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { cleanup(context); } }
set->map->cleanup
MapContext
实现类:Context
2、任务提交过程、切片计算
connect()连接到资源管理器;JobSubmiter.submitJobInternal()
checkSpecs(job);检查路径是否为空
//增加权限访问 ./bin/hdfs dfs -chmod -R 777 /tmp //开启历史查看 ./sbin/mr-jobhistory-daemon.sh start historyserver
submitClient.getNewJobID();获取jobjd
checkSpecs(job);检查输出目录是否为null
copyAndConfigureFiles(job,submitJobDir);构建临时目录,上传jar包,得到配置文件
(files,libjars,archives,jobjar上传至创建的新目录)
writeSplits(job, submitJobDir);计算切片 并且将切片信息写入到HDFS
(一个切片——>一个MapTask——>1秒——>2g内存)
writeConf(conf, submitJobFile);生成job.xml配置信息
getSplit();
建造切面:makeSplit
如果长度不为空,获得blkLocation,file.getBlkLocation
判断文件是否可以拆分isSplitable();不能拆分则文件多大创造一个多大的切面
每次创建一个切面长度减少切面大小,不足一个块时作为小块处理
最后一个切面可以比一个块稍大一点点
3、常见InputFormat
TextInputFormat
job.setInputFormatClass(TextInputFormat.class);
Key: 偏移量
值 : 行数据
切片: 优先按照文件为单位 再按照splitSize去计算切片
NLineInputFormat
job.setInputFormatClass(NLineInputFormat.class);
Key: 偏移量(LongWriteble)
值 : 行数据(Text)
切片: 优先按照文件为单位 再按照N行为单位去计算切片
//设置每次创建切片时所用的行数 conf.setInt("mapereduce.input.lineinputformat.linespermap",3)
KeyValueTextInputFormat
job.setInputFormatClass(KeyValueTextInputFormat.class);
Key: 内容key(Text)
值 : 内容值
切片: 优先按照文件为单位 再按照splitSize去计算切片
注意在Mapper类修改
public class WordMapper extends Mapper<Text, Text, Text, Text> { @Override protected void map(Text key, Text value,Context context) } }
CombineTextInputFormat(Rackaware)
job.setInputFormatClass(CombineTextInputFormat.class);
Key: 偏移量
值 : 行数据
切片: 按照总文件的大小/splitSize去计算切片数目 一个切片 对应 多个小的block
MultipleInputs
MultipleInputs.addInputPath(job, new Path("/demo/order1"),KeyValueTextInputFormat.class , OrderMapper1.class); MultipleInputs.addInputPath(job, new Path("/demo/order2"),KeyValueTextInputFormat.class , OrderMapper2.class);
相关文章推荐
- 使用HDFS API实现hadoop HDFS文件系统的基本操作
- 字符基本操作、使用SYMBOLS、使用消息程序、使用文本程序
- 仅使用基本的表操作实现两个排序后的表L1和L2的并集
- 使用C++实现链表的基本操作
- 使用hadoop实现ip地理位置统计~ip归属地和运营商
- [读书笔记]深入解析MapReduce架构设计与实现原理——CH4 Hadoop RPC基本框架
- Hadoop MapReduce进阶 使用DataJoin包实现Join
- Hadoop学习记录(4)|MapReduce原理|API操作使用
- mongoDB使用mapreduce实现简单的统计功能
- 使用Python实现Hadoop MapReduce程序
- 使用hadoop实现IP个数统计~并将结果写入数据库
- Hadoop学习与使用之基本操作命令
- 使用Python实现Hadoop MapReduce程序
- Hadoop MapReduce进阶 使用DataJoin包实现Join
- 使用Python实现Hadoop MapReduce程序
- Hadoop MapReduce进阶 使用DataJoin包实现Join
- 使用Python实现Hadoop MapReduce程序
- 使用TreeMap集合实现统计字符出现次数
- Tachyon基本使用08-----Running Hadoop MapReduce on Tachyon
- 使用datepicker插件实现日期选择的基本操作 8-5