您的位置:首页 > 编程语言 > Python开发

用hadoop-streaming 运行python map-reduce程序

2015-05-09 20:25 627 查看
简介:

    hadoop是java写的,所以,运行hadoop经常也值直接支持java。这让我们这种不熟悉java的程序员很是心碎啊。还好,Doug Cutting大神也没有直接放弃非java程序员这块庞大的用户需求,提供了各种各样的接口给那些不熟悉java的程序员使用。下面要介绍的是各种接口中的一种:用hadoop-streaming来运行非java的各式map-reduce程序。

示例:

         用我刚开始学hadoop的时候的一个小程序来做例子吧,我们需要统计以下一段话中各个单词出现的次数,例如,我们有这么小小的一段单词:

wo wo wo

shi

yi ke

xiao xiao de shi tou

wo hai shi zhe yang

我们把它存在 /usr/local/hadoop/book目录下,随便起个名字存储起来。然后先写一个map程序,将所有的单词分开并统计出来,脚本如下:

#!/usr/bin/env python
import sys
for line in sys.stdin:
words=line.strip().split()
for word in words:
print '%s\t%s' %(word,1)


说明一下#! /usr/bin/env python 是指明了该脚本的运行程序,也可以不写,只是执行的时候代码有所变化,下文会阐述说明此问题

上述代码就是我们的map程序,我们给它取名叫做mapper.py 也存储到/usr/local/hadoop/目录下,然后载写出reduce程序,统计mapper函数传过来的序列中各个单词出现的次数,并保存在相同目录下,程序如下:

#!/usr/bin/env python

from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None

# input comes from STDIN
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()

# parse the input we got from mapper.py
word, count = line.split('\t', 1)

# convert count (currently a string) to int
try:
count = int(count)
except ValueError:
# count was not a number, so silently
# ignore/discard this line
continue

# this IF-switch only works because Hadoop sorts map output
# by key (here: word) before it is passed to the reducer
if current_word == word:
current_count += count
else:
if current_word:
# write result to STDOUT
print '%s\t%s' % (current_word, current_count)
current_count = count
current_word = word

# do not forget to output the last word if needed!
if current_word == word:
print '%s\t%s' % (current_word, current_count)


然后我们改变我们缩写的程序的权限:(chmod ×××.py)

接下来我们先测试测试我们的程序有没有错: 我们echo出一小段文字,然后分别调用map和reduce程序来统计相关信息,代码如下:

buptpwy@buptpwy-Lenovo:/usr/local/hadoop$ echo "hello hh hh lso"|/usr/local/hadoop/mapper.py |/usr/local/hadoop/reducer.py
hello	1
hh	2
lso	1


如果上面的脚本没有指定运行环境的化我们可以这样:

buptpwy@buptpwy-Lenovo:/usr/local/hadoop$ echo "hello hh hh lso"|python /usr/local/hadoop/mapper.py |python /usr/local/hadoop/reducer.py
hello	1
hh	2
lso	1


然后重头戏来了,我们可以应该调用hadoop-streaming来执行我们的源程序了,首先,先将我们要处理的文件传入HDFS系统中:

buptpwy@buptpwy-Lenovo:~/下载/hadoop/hadoop-0.20.2$ bin/hadoop dfs -put /usr/local/hadoop/book book


前面指定的是文件原本所在的目录,后者指定的是在HDFS文件系统的目录

然后我们就可以调用hadoop-streaming来执行map-reduce程序了,如下:

buptpwy@buptpwy-Lenovo:~/下载/hadoop/hadoop-0.20.2$ bin/hadoop jar contrib/streaming/hadoop-0.20.2-streaming.jar  -mapper /usr/local/hadoop/mapper.py -reducer /usr/local/hadoop/reducer.py -input book/* -output book-outbin/hadoop jar contrib/streaming/hadoop-0.20.2-streaming.jar  -mapper /usr/local/hadoop/mapper.py -reducer /usr/local/hadoop/reducer.py -input book/* -output book-out


-mapper指定map程序位置,-reducer指定reduce程序位置,-input 指定数据位置,-output指定输出结果位置,在次代码执行后,成功的话,会有以下显示:

packageJobJar: [/home/buptpwy/hadoop_tmp/hadoop-unjar580737856059781750/] [] /tmp/streamjob3726475424298491575.jar tmpDir=null
15/05/09 20:17:10 INFO mapred.FileInputFormat: Total input paths to process : 1
15/05/09 20:17:10 INFO streaming.StreamJob: getLocalDirs(): [/usr/hadoop-0.20.2/maperd/local]
15/05/09 20:17:10 INFO streaming.StreamJob: Running job: job_201505091953_0004
15/05/09 20:17:10 INFO streaming.StreamJob: To kill this job, run:
15/05/09 20:17:10 INFO streaming.StreamJob: /home/buptpwy/下载/hadoop/hadoop-0.20.2/bin/../bin/hadoop job  -Dmapred.job.tracker=localhost:9001 -kill job_201505091953_0004
15/05/09 20:17:10 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201505091953_0004 15/05/09 20:17:11 INFO streaming.StreamJob:  map 0%  reduce 0%
15/05/09 20:17:19 INFO streaming.StreamJob:  map 100%  reduce 0%
15/05/09 20:17:31 INFO streaming.StreamJob:  map 100%  reduce 100%
15/05/09 20:17:34 INFO streaming.StreamJob: Job complete: job_201505091953_0004
15/05/09 20:17:34 INFO streaming.StreamJob: Output: book-out


然后我们就可以在hdfs文件系统中找到book-out文件,可以从中查看我们所需要的结果了。

总结

最后仅说说本人最切身的总结,hadoop-streaming提供了一种接口给我们,让我们可以很方便的使用其他语言来写hadoop的map-reduce程序,但是,其实实质上他还需转化成java执行,速度比直接调用javaapi要慢,当然,这也是没有办法的事,至于其他的调用方法,等着以后学到在说吧。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: