您的位置:首页 > 编程语言 > Python开发

Hadoop:使用原生python编写MapReduce来统计文本文件中所有单词出现的频率功能

2017-10-24 16:57 1356 查看
功能:统计文本文件中所有单词出现的频率功能。

一、Python编写的MapReduce程序

1) 准备输入语料集

     若是英文,需要以一定分隔符分割;

     若是中文,则需要分词,可以使用中文分词(Chinese Word Segmentation),如开源的jieba分词,NLPIR,StanfordNLP等

我们使用的语料集合input.txt如下(ps. 两个字符之间用空格进行分割):

中国 foo foo 人民  quux 万岁 labs 十九大 foo 习书记 bar 威武 quux 极了 abc 我 爱 中国 bar see 江泽民 主席 you by test welcome test
abc labs foo me python hadoop ab ac bc bec 中国 自强python

2)  把文本数据放入fs建立好的1025_input文件夹中

localhost:map_reduce_program a6$ hadoop fs -mkdir /1025_input
localhost:map_reduce_program a6$ hadoop fs -put input.txt /1025_input


3) 编写代码之一:统计每个单词在input.txt的文档里出现情况

Map函数代码,它会从标准输入(stdin)读取数据,默认以空格分割单词,然后按行输出单词机器出现频率到标准输出(stdout),不过整个Map处理过程并不会统计每个单词出现的总次数,而是直接输出“word,1”,以便作为Reduce的输入进行统计,要求mapper.py具备执行权限。

mapper.py代码如下:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
'''
Test by yyz
Date: 2017/10/24
'''
import sys
# 输入为标准输入stdin
for line in sys.stdin:
'''
# 本地编写测试代码
#if True:
#    line="foo foo quux labs foo bar quux abc bar see you by test welcome test"
'''
# 删除开头和结尾的空行
line = line.strip()
# 以默认空格分隔单词到words列表
words = line.split()
for word in words:
# 输出所有单词,格式为“单词,1”以便作为Reduce的输入
print '%s\t%s' % (word, 1)


3) 编写代码之一:汇总每个单词在input.txt的文档里出现情况

Reduce代码,它会从标准输入(stdin)读取mapper.py的结果,然后统计每个单词出现的总次数并输出到标准输出(stdout),要求reducer.py同样具备可执行 权限。

reducer.py代码如下:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
'''
Test by yyz
Date: 2017/10/24
'''
import sys
current_word = None
current_count = 0
word = None
# 获取标准输入,即mapper.py的标准输出
for line in sys.stdin:
'''
# 本地编写测试代码
#for line in ['welcome	1','welcome	2']:
'''
# 删除开头和结尾的空行
line = line.strip()
# 解析mapper.py输出作为程序的输入,以tab作为分隔符
word,count = line.split('\t', 1)
# 转换count从字符型到整型
try:
count = int(count)
except ValueError:
# count非数字时,忽略此行
continue
# 要求mapper.py的输出做排序(sort)操作,以便对连续的word做判断
if current_word == word:
current_count += count
#print "22222",current_count,current_word
else:
if current_word:
# 输出当前word统计结果到标准输出
print '%s\t%s' % (current_word, current_count)
current_count = count
current_word = word
# 输出最后一个word统计
if current_word == word:
print '%s\t%s' % (current_word, current_count)


二、运行Python编写的MapReduce程序

1) 在Hadoop平台运行前进行本地测试


mapper阶段执行结果如下

localhost:map_reduce_program a6$ cat input.txt | python mapper.py
中国	1
foo	1
foo	1
人民	1
quux	1
万岁	1
labs	1
十九大	1
foo	1
习书记	1
bar	1
威武	1
quux	1
极了	1
abc	1
我	1
爱	1
中国	1
bar	1
see	1
江泽民	1
主席	1
you	1
by	1
test	1
welcome	1
test	1
abc	1
labs	1
foo	1
me	1
python	1
hadoop	1
ab	1
ac	1
bc	1
bec	1
中国	1
自强python	1


在mapper和ruduce阶段后产生的结果如下:

localhost:map_reduce_program a6$ cat input.txt | python mapper.py | sort | python reducer.py
ab	1
abc	2
ac	1
bar	2
bc	1
bec	1
by	1
foo	4
hadoop	1
labs	2
me	1
python	1
quux	2
see	1
test	2
welcome	1
you	1
我	1
爱	1
万岁	1
中国	3
主席	1
人民	1
威武	1
极了	1
自强python	1
习书记	1
十九大	1
江泽民	1
2)在Hadoop平台运行

执行MapReduce任务,输出结果文件指定为   /1025_output

为确保该目录不存在,先进性如下操作

localhost:experiment_data a6$ hadoop fs -test -e /1025_output
localhost:experiment_data a6$ hadoop fs -rmr /1025_output


执行MapReduce的操作如下:

hadoop jar /Users/a6/Applications/hadoop-2.6.5/share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar  \
-mapper "python mapper.py"  \
-reducer "python reducer.py"  \
-input "/1025_input/*"  \
-output "/1025_output"  \
-file "/Users/a6/Downloads/PycharmProjects/map_reduce_program/mapper.py"  \
-file "/Users/a6/Downloads/PycharmProjects/map_reduce_program/reducer.py"
该命令参数解释如下:

/usr/local/hadoop-2.6.4/bin/hadoop jar /usr/local/hadoop-2.6.4/share/hadoop/tools/lib/hadoop-streaming-2.6.4.jar \
-input <输入目录> \ # 可以指定多个输入路径,例如:-input '/user/foo/dir1' -input '/user/foo/dir2'
-inputformat <输入格式 JavaClassName> \
-output <输出目录> \
-outputformat <输出格式 JavaClassName> \
-mapper <mapper executable or JavaClassName> \
-reducer <reducer executable or JavaClassName> \
-combiner <combiner executable or JavaClassName> \
-partitioner <JavaClassName> \
-cmdenv <name=value> \ # 可以传递环境变量,可以当作参数传入到任务中,可以配置多个
-file <依赖的文件> \ # 配置文件,字典等依赖
-D <name=value> \ # 作业的属性配置
查看生成的分析结果文件清单,其中/1025_output/part-00000为分析结果文件

localhost:experiment_data a6$ hadoop dfs -ls /1025_output
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.

17/10/24 18:49:43 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 2 items
-rw-r--r--   1 a6 supergroup          0 2017-10-24 18:48 /1025_output/_SUCCESS


查看结果数据

localhost:experiment_data a6$ hadoop fs -cat /1025_output/part-00000
17/10/24 18:51:38 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
ab	1
abc	2
ac	1
bar	2
bc	1
bec	1
by	1
foo	4
hadoop	1
labs	2
me	1
python	1
quux	2
see	1
test	2
welcome	1
you	1
万岁	1
中国	3
主席	1
习书记	1
人民	1
十九大	1
威武	1
我	1
极了	1
江泽民	1
爱	1
自强python	1
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  mapreduce hadoop