您的位置:首页 > 运维架构

Hadoop的基本使用(3)——MapReduce的基本操作(实现字符统计)

2016-11-30 23:51 627 查看

MapReduce:

是Hadoop中一个并行计算框架,默认Hadoop提供了一些工具实现对HDFS上数据的分析计算汇总。

特点:hadoop充分的利用了集群当中DataNode的节点的CPU和内存,使用这些节点作为计算汇总节点,最终将汇总的数据写回HDFS(默认)。

数据: 存储各个dataNode中 (block单位)

数据拆分==>数据切片(针对数据块一种逻辑映射)==>MapTask(DataNode所在机器)(多个)==>ReduceTask(若干个DataNode所在机器)


Hadoop2 MapReduce2基于Yarn实现 资源管理器 负责资源调度和调配

ResourceManager:负责资源的分配和管理 CPU,内存 提供MapTask、ReduceTask的jvm启动参数 | 任务分配。

NodeManager:每一个DataNode上会启动一个NodeManager,负责连接ResourceManager以及启动MapTask或者ReduceTask (MapTask和ReduceTask统称 YarnChild)。

(详情:参考 hadoop权威指南第三版 75 RMB 中文版 《How MapReduce Works》)

搭建MapReduce的运行环境

1.修改etc/hadoop/yarn|mapred-site.xml

yarn-site.xml

<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.resourcemanager.hostname</name>
<value>CentOSA</value>
</property>


mapred-site.xml

<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<!--跨平台运行-->
<property>
<name>mapreduce.app-submission.cross-platform</name>
<value>true</value>
</property>


2.启动|停止Yarn

[root@CentOSA hadoop-2.6.0]# ./sbin/start|stop-yarn.sh


通过MapReduce实现字符统计

1、导入相关jar包

hadoop-common

hadoop-hdfs

hadoop-mapred

hadoop-yarn

2、书写Mapper、Reducer类

mappers

/*
* Keyin(限此类):数据在文件中的偏移量
* Value:数据
* keyout:统计依据
* valueout:统计值
*/
public class Mapper extends Mapper<LongWritable, Text, Text, IntWritable> {
protected void map(LongWritable key, Text value,Context context) throws Exception {
String string[]=value.toString().split("");
for(String s:string){
context.write(new Text(s), new IntWritable(1));
}
}
}


reducer

public class Reducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> value,Context context)
throws IOException, InterruptedException {
int total=0;
for(IntWritable i:value){
total+=i.get();
}
context.write(key, new IntWritable(total));
}
}


3、书写实现类

public class Submitter {
public static void main(String[] args) throws Exception {
//1、获得job对象
Configuration conf=new Configuration();
//...关联配置
conf.addResource("core-site.xml");
conf.addResource("hdfs-site.xml");
conf.addResource("yarn-site.xml");
conf.addResource("mapred-site.xml");
conf.set("mapreduce.job.jar", "wc.jar");
//...获取job实例
Job job=Job.getInstance(conf);
job.setJarByClass(Submitter.class);
//2、设置数据的读入、输出类型
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
//3、设置数据的读取输出路径
Path src=new Path("");
TextInputFormat.addInputPath(job, src);
Path dst=new Path("");
//..输出文件目录必须不存在,若存在通过此代码删除
FileSystem fileSystem=FileSystem.get(conf);
if(fileSystem.exists(dst)){
fileSystem.delete(dst, true);
}
TextOutputFormat.setOutputPath(job, dst);
//4、设置依据类
job.setMapperClass(Mapper.class);
job.setReducerClass(Reducer.class);
//5、设置传输数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//6、提交
job.waitForCompletion(true);
}
}


代码运行过程

1、切片(spilt)计算过程

InputFormat

指定了文件处理路径

对文件做切片,实现了切片数据的计算逻辑

实现了对切片数据的读取逻辑,为Mapper提供数据

InputFormat->FileInputFormat->TextInputFormater


方法

getSplit (FileInputFormat实现)

creatRecordReader (FileInputFormat未实现)

TextInputFormat举例

一个数据块对应一个切面,在任务提交时计算切片,并将切片数据写入hdfs目录,存储对应的块信息

MapTask(MapContext)

调用mapper,传入自身持有的Context

MapContext下面持有RecordReader
bb4e
(来自于TextInputFormat方法提供)

RecordReader所需要的实现类是LineRecordReader

LineRecordReader

核心方法:intialize()

Mapper

方法:setup、map、cleanup、run

run(context)↓

public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
//调用context,实际委派给Reader调用的是RecordReader中的方法
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
cleanup(context);
}
}


set->map->cleanup

MapContext

实现类:Context

2、任务提交过程、切片计算

connect()连接到资源管理器;

JobSubmiter.submitJobInternal()

checkSpecs(job);检查路径是否为空

//增加权限访问
./bin/hdfs dfs -chmod -R 777 /tmp
//开启历史查看
./sbin/mr-jobhistory-daemon.sh start historyserver


submitClient.getNewJobID();获取jobjd

checkSpecs(job);检查输出目录是否为null

copyAndConfigureFiles(job,submitJobDir);构建临时目录,上传jar包,得到配置文件

(files,libjars,archives,jobjar上传至创建的新目录)

writeSplits(job, submitJobDir);计算切片 并且将切片信息写入到HDFS

(一个切片——>一个MapTask——>1秒——>2g内存)

writeConf(conf, submitJobFile);生成job.xml配置信息

getSplit();

建造切面:makeSplit

如果长度不为空,获得blkLocation,file.getBlkLocation

判断文件是否可以拆分isSplitable();不能拆分则文件多大创造一个多大的切面

每次创建一个切面长度减少切面大小,不足一个块时作为小块处理

最后一个切面可以比一个块稍大一点点

3、常见InputFormat

TextInputFormat

job.setInputFormatClass(TextInputFormat.class);


Key: 偏移量

值 : 行数据

切片: 优先按照文件为单位 再按照splitSize去计算切片

NLineInputFormat

job.setInputFormatClass(NLineInputFormat.class);


Key: 偏移量(LongWriteble)

值 : 行数据(Text)

切片: 优先按照文件为单位 再按照N行为单位去计算切片

//设置每次创建切片时所用的行数
conf.setInt("mapereduce.input.lineinputformat.linespermap",3)


KeyValueTextInputFormat

job.setInputFormatClass(KeyValueTextInputFormat.class);


Key: 内容key(Text)

值 : 内容值

切片: 优先按照文件为单位 再按照splitSize去计算切片

注意在Mapper类修改

public class WordMapper extends Mapper<Text, Text, Text, Text> {
@Override
protected void map(Text key, Text value,Context context)
}
}


CombineTextInputFormat(Rackaware)

job.setInputFormatClass(CombineTextInputFormat.class);


Key: 偏移量

值 : 行数据

切片: 按照总文件的大小/splitSize去计算切片数目 一个切片 对应 多个小的block

MultipleInputs

MultipleInputs.addInputPath(job, new Path("/demo/order1"),KeyValueTextInputFormat.class , OrderMapper1.class);
MultipleInputs.addInputPath(job, new Path("/demo/order2"),KeyValueTextInputFormat.class , OrderMapper2.class);
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: