hadoop streaming/c++编程指南
2013-02-25 18:43
246 查看
1.
Hadoop
streaming简介与工作机制
Hadoop streaming可以帮助用户创建和运行一类特殊的Map/Reduce作业,
这些特殊的Map/Reduce作业是由一些可执行文件或脚本文件充当Mapper或者reducer。Mapper和reducer都是可执行文件,它们从标准输入读入数据(一行一行读),并把计算结果发给标准输出。Hadoop Streaming会创建一个Map/Reduce作业,并把它发送给合适的集群,同时监视这个作业的整个执行过程。
如果一个可执行文件被用于Mapper,则在Mapper初始化时,每一个Mapper任务会把这个可执行文件作为一个单独的进程启动。
Mapper任务运行时,它把输入切分成行并把每一行提供给可执行文件进程的标准输入。同时,Mapper收集可执行文件进程标准输出的内容,并把收到的每一行内容转化成key/value对,作为Mapper的输出。默认情况下,一行中第一个tab之前的部分作为key,之后的(不包括tab)作为value。如果没有tab,整行作为key值,value值为null。如果一个可执行文件被用于reducer,每个reducer任务会把这个可执行文件作为一个单独的进程启动。
Reducer任务运行时,它把输入切分成行并把每一行提供给可执行文件进程的标准输入。同时,reducer收集可执行文件进程标准输出的内容,并把每一行内容转化成key/value对,作为reducer的输出。默认情况下,一行中第一个tab之前的部分作为key,之后的(不包括tab)作为value。Map和Reduce中的key和value的切分方式用户是可以自定义的。
2.
C++编程实例
下面以C++的wordcount为例使用streaming编程接口。
a.
编写map,name:testmap.cpp
功能:将文本内容变为,输出到标准输出
b.
编写reduce
功能:对于相同的key,对其value相加
C.编译
Makefile如下:
d.编写运行脚本
run.sh
编写运行脚本,需要指定HADOOP_HOME,例如提交到yuqing资源池队列,如下:
然后运行run.sh就可以提交作业到hadoop集群。
streaming参数,下面是参数列表:
4.
–jobconf
作业参数
5.
作业优先级
目前调度器支持可以使用的优先级从高到低依次有VERY_HIGH, HIGH, NORMAL, LOW和VERY_LOW五种。
增加参数:-jobconf[/b]
mapred.job.priority=NORMAL[/b]
不加参数时默认优先级为[/b]NORMAL。
6.
Streaming程序本地测试
Streaming程序可以在单机上用下面的方式运行,因此可以先用小规模数据在单机调试,使用本地的调试方法和调试工具,节省开发测试的时间。
Hadoop
streaming简介与工作机制
Hadoop streaming可以帮助用户创建和运行一类特殊的Map/Reduce作业,
这些特殊的Map/Reduce作业是由一些可执行文件或脚本文件充当Mapper或者reducer。Mapper和reducer都是可执行文件,它们从标准输入读入数据(一行一行读),并把计算结果发给标准输出。Hadoop Streaming会创建一个Map/Reduce作业,并把它发送给合适的集群,同时监视这个作业的整个执行过程。
如果一个可执行文件被用于Mapper,则在Mapper初始化时,每一个Mapper任务会把这个可执行文件作为一个单独的进程启动。
Mapper任务运行时,它把输入切分成行并把每一行提供给可执行文件进程的标准输入。同时,Mapper收集可执行文件进程标准输出的内容,并把收到的每一行内容转化成key/value对,作为Mapper的输出。默认情况下,一行中第一个tab之前的部分作为key,之后的(不包括tab)作为value。如果没有tab,整行作为key值,value值为null。如果一个可执行文件被用于reducer,每个reducer任务会把这个可执行文件作为一个单独的进程启动。
Reducer任务运行时,它把输入切分成行并把每一行提供给可执行文件进程的标准输入。同时,reducer收集可执行文件进程标准输出的内容,并把每一行内容转化成key/value对,作为reducer的输出。默认情况下,一行中第一个tab之前的部分作为key,之后的(不包括tab)作为value。Map和Reduce中的key和value的切分方式用户是可以自定义的。
2.
C++编程实例
下面以C++的wordcount为例使用streaming编程接口。
a.
编写map,name:testmap.cpp
功能:将文本内容变为,输出到标准输出
//mapper #include #include #include using namespace std; int main(){ string key; int value = 1; while(cin>>key){ if(!key.empty()) cout<<key<<"\t"<<value<<endl; } return 0; } |
编写reduce
功能:对于相同的key,对其value相加
#include #include using namespace std; int main() { map wordMap; map::iterator it; string key; int value; while(cin>>key>>value) { wordMap[key] +=value; } for(it=wordMap.begin();it != wordMap.end();it++) { cout<<it->first<<"\t"<<it->second<<endl; } return 0; } |
Makefile如下:
CXX = g++ CXXFLAGS = -g -Wall -O2 sources = testmap.cpp sources2 = testreduce.cpp target = $(basename $(sources)) target2 = $(basename $(sources2)) .PHONY : all all : $(target) $(target2) testmap : testmap.o $(CXX) $< -o $@ testreduce : testreduce.o $(CXX) $< -o $@ %.o : %.cpp $(CXX) -c $< -o $@ $(CXXFLAGS) PHONY : clean clean : -rm -rf $(target) $(target2) *.o |
run.sh
编写运行脚本,需要指定HADOOP_HOME,例如提交到yuqing资源池队列,如下:
PWD=/home/zhaizhouwei/wordcount HADOOP_HOME=/home/zhaizhouwei /hadoop-0.20.2-cdh3u5 streaming=$HADOOP_HOME/ hadoop-streaming-0.20.2-cdh3u5.jar $HADOOP_HOME/bin/hadoop jar $streaming \ -file $PWD/testmap \ -mapper $PWD/testmap \ -file $PWD/testreduce \ -reducer $PWD/testreduce \ -input /data0/yuqing/zhaizhouwei/tinput \ -output /data0/yuqing/zhaizhouwei/toutput \ -numReduceTasks 1 \ -jobconf mapred.job.name="test_wordcount" \ -jobconf mapred.job.queue.name="yuqing" |
3.
Streaming命令
streaming参数,下面是参数列表:-input | 输入数据路径 |
-output | 输出数据路径 |
-mapper | mapper可执行程序或Java类 |
-reducer [/b] | reducer可执行程序或Java类 |
-file [/i] Optional[/b] | 分发本地文件 |
-cacheFile [/i] Optional[/b] | 分发HDFS文件 |
-cacheArchive [/i] Optional[/b] | 分发HDFS压缩文件 |
-numReduceTasks Optional[/b] | reduce任务个数 |
-jobconf | -D NAME=VALUE Optional[/b] | 作业配置参数 |
-combiner Optional[/b] | Combiner Java类 |
-partitioner Optional[/b] | Partitioner Java类 |
-inputformat [/i] Optional[/b] | InputFormat Java类 |
-outputformat Optional[/b] | OutputFormat Java类 |
-inputreader Optional[/b] | InputReader配置 |
-cmdenv =[/i] Optional[/b] | 传给mapper和reducer的环境变量 |
-mapdebug Optional[/b] | mapper失败时运行的debug程序 |
-reducedebug Optional[/b] | reducer失败时运行的debug程序 |
-verbose Optional[/b] | 详细输出模式 |
–jobconf
作业参数
mapred.job.name[/i][/b] | 作业名 |
mapred.job.priority[/i][/b] | 作业优先级 |
mapred.job.map.capacity[/i][/b] | 最多同时运行map任务数 |
mapred.job.reduce.capacity[/i][/b] | 最多同时运行reduce任务数 |
hadoop.job.ugi[/i][/b] | 作业执行权限 |
mapred.map.tasks[/b] | map任务个数 |
mapred.reduce.tasks[/b] | reduce任务个数 |
mapred.job.groups[/b] | 作业可运行的计算节点分组 |
mapred.task.timeout[/b] | 任务没有响应(输入输出)的最大时间 |
mapred.compress.map.output[/b] | map的输出是否压缩 |
mapred.map.output.compression.codec[/b] | map的输出压缩方式 |
mapred.output.compress[/b] | reduce的输出是否压缩 |
mapred.output.compression.codec[/b] | reduce的输出压缩方式 |
stream.map.output.field.separator[/b] | map输出分隔符 |
作业优先级
目前调度器支持可以使用的优先级从高到低依次有VERY_HIGH, HIGH, NORMAL, LOW和VERY_LOW五种。
增加参数:-jobconf[/b]
mapred.job.priority=NORMAL[/b]
不加参数时默认优先级为[/b]NORMAL。
6.
Streaming程序本地测试
Streaming程序可以在单机上用下面的方式运行,因此可以先用小规模数据在单机调试,使用本地的调试方法和调试工具,节省开发测试的时间。
cat input[/i][/b] | mapper[/b] | sort | reducer[/b] > output[/i][/b] |
相关文章推荐
- 大数据hadoop学习之---Yarn 体系架…
- hadoop&nbsp;Streaming之aggregate
- Hadoop&nbsp;Shell
- hadoop&nbsp;之wordcount程序&nbsp;命令…
- hadoop&nbsp;hive
- Hadoop&nbsp;权限管理
- Hadoop-2.6.0&nbsp;集群的安装配置
- hadoop&nbsp;datanode启动不起来
- Hadoop&nbsp;Streaming机制
- 配置hadoop&nbsp;使用fair&nbsp;sc…
- hadoop编译问题(2)Could&nbsp;no…
- Hadoop&nbsp;Streaming和Pipes
- 配置hadoop-1.2.1&nbsp;eclipse开发环境
- hadoop&nbsp;HA----Quorum&nbsp;Journal&nbsp;设计…
- Linux&nbsp;下用eclipse&nbsp;连接hadoop
- hadoop&nbsp;pipes编程示例
- Ubuntu 12.04搭建hadoop单机版环境
- Hadoop&nbsp;YARN&nbsp;简介:相比于MRv1,YA…
- hadoop&nbsp;c++&nbsp;pipes接口实现
- shell hadoop hbase&nbs…