您的位置:首页 > 编程语言 > C语言/C++

hadoop streaming/c++编程指南

2013-02-25 18:43 246 查看
1.
Hadoop
streaming简介与工作机制

Hadoop streaming可以帮助用户创建和运行一类特殊的Map/Reduce作业,

这些特殊的Map/Reduce作业是由一些可执行文件或脚本文件充当Mapper或者reducer。Mapper和reducer都是可执行文件,它们从标准输入读入数据(一行一行读),并把计算结果发给标准输出。Hadoop Streaming会创建一个Map/Reduce作业,并把它发送给合适的集群,同时监视这个作业的整个执行过程。

如果一个可执行文件被用于Mapper,则在Mapper初始化时,每一个Mapper任务会把这个可执行文件作为一个单独的进程启动。
Mapper任务运行时,它把输入切分成行并把每一行提供给可执行文件进程的标准输入。同时,Mapper收集可执行文件进程标准输出的内容,并把收到的每一行内容转化成key/value对,作为Mapper的输出。默认情况下,一行中第一个tab之前的部分作为key,之后的(不包括tab)作为value。如果没有tab,整行作为key值,value值为null。如果一个可执行文件被用于reducer,每个reducer任务会把这个可执行文件作为一个单独的进程启动。
Reducer任务运行时,它把输入切分成行并把每一行提供给可执行文件进程的标准输入。同时,reducer收集可执行文件进程标准输出的内容,并把每一行内容转化成key/value对,作为reducer的输出。默认情况下,一行中第一个tab之前的部分作为key,之后的(不包括tab)作为value。Map和Reduce中的key和value的切分方式用户是可以自定义的。


2.
C++编程实例

下面以C++的wordcount为例使用streaming编程接口。
a.

编写map,name:testmap.cpp

功能:将文本内容变为,输出到标准输出
//mapper
#include
#include
#include
using namespace std;
int main(){

string key;

int value = 1;

while(cin>>key){

if(!key.empty())

cout<<key<<"\t"<<value<<endl;

}

return 0;
}
b.

编写reduce

功能:对于相同的key,对其value相加
#include
#include
using namespace
std;
int main()
{

map wordMap;

map::iterator it;

string key;

int value;

while(cin>>key>>value) {

wordMap[key] +=value;

}

for(it=wordMap.begin();it != wordMap.end();it++) {

cout<<it->first<<"\t"<<it->second<<endl;

}

return 0;
}

C.编译

Makefile如下:

CXX =
g++

CXXFLAGS =
-g -Wall -O2

sources =
testmap.cpp

sources2 =
testreduce.cpp



target =
$(basename $(sources))

target2 =
$(basename $(sources2))



.PHONY :
all

all :
$(target) $(target2)



testmap :
testmap.o


$(CXX) $< -o $@

testreduce
: testreduce.o


$(CXX) $< -o $@

%.o :
%.cpp


$(CXX) -c $< -o $@ $(CXXFLAGS)

PHONY :
clean

clean
:


-rm -rf $(target) $(target2) *.o



d.编写运行脚本

run.sh

编写运行脚本,需要指定HADOOP_HOME,例如提交到yuqing资源池队列,如下:

PWD=/home/zhaizhouwei/wordcount

HADOOP_HOME=/home/zhaizhouwei
/hadoop-0.20.2-cdh3u5

streaming=$HADOOP_HOME/
hadoop-streaming-0.20.2-cdh3u5.jar

$HADOOP_HOME/bin/hadoop jar $streaming \

-file
$PWD/testmap \

-mapper
$PWD/testmap \

-file
$PWD/testreduce \

-reducer
$PWD/testreduce \

-input
/data0/yuqing/zhaizhouwei/tinput \

-output
/data0/yuqing/zhaizhouwei/toutput \

-numReduceTasks 1 \

-jobconf
mapred.job.name="test_wordcount" \

-jobconf
mapred.job.queue.name="yuqing"

然后运行run.sh就可以提交作业到hadoop集群。


3.
Streaming命令

streaming参数,下面是参数列表:

-input

输入数据路径
-output
输出数据路径
-mapper
mapper可执行程序或Java类
-reducer [/b]
reducer可执行程序或Java类
-file
[/i]
Optional[/b]
分发本地文件
-cacheFile
[/i]
Optional[/b]
分发HDFS文件
-cacheArchive
[/i]
Optional[/b]
分发HDFS压缩文件
-numReduceTasks Optional[/b]
reduce任务个数
-jobconf | -D NAME=VALUE
Optional[/b]
作业配置参数
-combiner
Optional[/b]
Combiner Java类
-partitioner Optional[/b]
Partitioner Java类
-inputformat [/i]
Optional[/b]
InputFormat Java类
-outputformat Optional[/b]
OutputFormat Java类
-inputreader Optional[/b]
InputReader配置
-cmdenv
=[/i]
Optional[/b]
传给mapper和reducer的环境变量
-mapdebug Optional[/b]
mapper失败时运行的debug程序
-reducedebug Optional[/b]
reducer失败时运行的debug程序
-verbose
Optional[/b]
详细输出模式
4.
–jobconf
作业参数
mapred.job.name[/i][/b]
作业名
mapred.job.priority[/i][/b]
作业优先级
mapred.job.map.capacity[/i][/b]
最多同时运行map任务数
mapred.job.reduce.capacity[/i][/b]
最多同时运行reduce任务数
hadoop.job.ugi[/i][/b]
作业执行权限
mapred.map.tasks[/b]
map任务个数
mapred.reduce.tasks[/b]
reduce任务个数
mapred.job.groups[/b]
作业可运行的计算节点分组
mapred.task.timeout[/b]
任务没有响应(输入输出)的最大时间
mapred.compress.map.output[/b]
map的输出是否压缩
mapred.map.output.compression.codec[/b]
map的输出压缩方式
mapred.output.compress[/b]
reduce的输出是否压缩
mapred.output.compression.codec[/b]
reduce的输出压缩方式
stream.map.output.field.separator[/b]
map输出分隔符
5.

作业优先级

目前调度器支持可以使用的优先级从高到低依次有VERY_HIGH, HIGH, NORMAL, LOW和VERY_LOW五种。

增加参数:-jobconf[/b]
mapred.job.priority=NORMAL[/b]

不加参数时默认优先级为[/b]NORMAL。



6.
Streaming程序本地测试
Streaming程序可以在单机上用下面的方式运行,因此可以先用小规模数据在单机调试,使用本地的调试方法和调试工具,节省开发测试的时间。
cat input[/i][/b] | mapper[/b] | sort | reducer[/b] > output[/i][/b]
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: