您的位置:首页 > 编程语言 > Python开发

用Python编写MapReduce的WordCount实例程序

2013-07-15 18:05 639 查看
条件,假设hadoop环境和python环境已经安装好,且hadoop已正常启动

Hadoop Streaming提供了编写MapReduce程序的map和reduce的一种方式,Map 和 Reduce间传递数据通过STDIN (标准输入)和STDOUT (标准输出)来实现的。我们仅仅使用Python的sys.stdin来输入数据,使用sys.stdout输出数据.下面介绍如何用Python编写一个WordCount实例程序。

map过程的python源文件wc_map.py:

#!/usr/bin/env python

import sys

for line in sys.stdin:
line = line.strip()
words = line.split()
for word in words:
print '%s %d' % (word,1)
reduce过程的python源文件wc_red.py:

#!/usr/bin/env python

import sys

wdict = {}
for line in sys.stdin:
line = line.strip()
if len(line.split()) != 2:
continue
word, count = line.split()
try:
if wdict.has_key(word) == False:
wdict[word] = 0
count = int(count)
wdict[word] += count
except Exception:
pass

for key in wdict.keys():
print '%s %d' % (key, wdict[key])


测试Python代码(cat
data | map | sort | reduce)

建议在运行MapReduce job测试前尝试手工测试你的mapper.py 和 reducer.py脚本,以免得不到任何返回结果

这里有一些建议,关于如何测试你的Map和Reduce的功能:

测试map:

$ echo "foo foo quux labs foo bar quux" | ./wc_map.py
foo	1
foo	1
quux	1
labs	1
foo	1
bar	1
quux	1
测试reduce:

$ echo "foo foo quux labs foo bar quux" | ./wc_map.py | sort | ./wc_red.py
labs 1
quux 2
foo 3
bar 1

上例子也可以直接传入文件进行测试。

提交MapReduce Job

首先得上传一个test文件到HDFS上进行测试

$ hadoop jar /home/hadoop/soft/hadoop/contrib/streaming/hadoop-streaming-1.0.4.jar \
> -mapper ./wc_map.py \ #指定reduce过程处理的python程序
> 
-combiner
[code].
/wc_red.py \#指定combiner过程处理的python程序
> -reducer ./wc_red.py \ #指定reduce过程处理的python程序
> -input /user/hadoop/test \ #输入文件在HDFS的位置
> -output /user/hadoop/test_r \ #输出结果位置
> -file ./wc_map.py \
> -file ./wc_red.py [/code]注意,wc_map.py和wc_red.py在命令中出现了两次,第一次是告诉Hadoop要执行着两个文件,第二次是告诉Hadoop把这两个文件分发给集群的所有节点。可以通过设置
-jobconf mapred.reduce.tasks=10来设置reduce过程任务的数目。


终端输入如下所示:




packageJobJar: [./wc_map.py, ./wc_red.py, /tmp/hadoop-hadoop/hadoop-unjar6964169209196045728/] [] /tmp/streamjob161475460684037667.jar tmpDir=null
13/07/11 14:56:44 INFO mapred.FileInputFormat: Total input paths to process : 1
13/07/11 14:56:45 INFO streaming.StreamJob: getLocalDirs(): [/tmp/hadoop-hadoop/mapred/local]
13/07/11 14:56:45 INFO streaming.StreamJob: Running job: job_201307110942_0006
13/07/11 14:56:45 INFO streaming.StreamJob: To kill this job, run:
13/07/11 14:56:45 INFO streaming.StreamJob: /home/hadoop/soft/hadoop-1.0.0/libexec/../bin/hadoop job  -Dmapred.job.tracker=localhost:9001 -kill job_201307110942_0006
13/07/11 14:56:45 INFO streaming.StreamJob: Tracking URL: http://localhost.localdomain:50030/jobdetails.jsp?jobid=job_201307110942_0006 13/07/11 14:56:46 INFO streaming.StreamJob:  map 0%  reduce 0%
13/07/11 14:57:03 INFO streaming.StreamJob:  map 50%  reduce 0%
13/07/11 14:57:06 INFO streaming.StreamJob:  map 100%  reduce 0%
13/07/11 14:57:18 INFO streaming.StreamJob:  map 100%  reduce 100%
13/07/11 14:57:24 INFO streaming.StreamJob: Job complete: job_201307110942_0006
13/07/11 14:57:24 INFO streaming.StreamJob: Output: /user/hadoop/test_rc
job运行结束后可以在HDFS上看到新产生一个输出文件夹:

$ hadoop fs -ls /user/hadoop
drwxr-xr-x   - hadoop supergroup          0 2013-07-11 14:26 /user/hadoop/test_r
...
通过命令查看结果:

$ hadoop fs -text test_r/part-00000
...
gap 2
Dover 2
HA-cluster 2
hp-status 2
结果第一列为word,第二列为出现次数,中间空格隔开。

参考:http://www.cnblogs.com/end/archive/2012/08/13/2636175.html
http://www.oschina.net/translate/a-guide-to-python-frameworks-for-hadoop
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: