您的位置:首页 > 编程语言 > Python开发

Hadoop Streaming例子(python)

2014-11-23 12:45 316 查看
  以前总是用java写一些MapReduce程序现举一个例子使用Python通过Hadoop Streaming来实现Mapreduce。

  任务描述:

  HDFS上有两个目录/a和/b,里面数据均有3列,第一列都是id,第二列是各自的业务类型(这里假设/a对应a,/b对应b),第三列是一个json串。各举一例:

  /a的一行:1234567  a  {"name":"jiufeng","age":"27","sex":"male","school":"","status":["111","000","001"],...}

  /b的一行:12345  b  {"a":"abc","b":"adr","xxoo":"e",...}

  要查找在/a中出现"status"且有"111"状态,而且要再/b中有这个id的所有id列表。

  那么来吧,首先需要mapper来提取/a中满足"status"有"111"状态的id和第二列"a"、/b中所有行的前两列,python代码如下,mapper.py:

#!/usr/bin/env python
#coding = utf-8

import json
import sys
import traceback
import datetime,time

def mapper():
for line in sys.stdin:
line = line.strip()
id,tag,content = line.split('\t')
if tag == 'a':
jstr = json.loads(content)
active = jstr.get('status',[])
if "111" in active:
print '%s\t%s' %(id,tag)
if tag == 'b':
print '%s\t%s' % ( id,tag)

if __name__ == '__main__':
mapper()


  这个mapper是从表中输入中提取数据,然后将满足条件的数据通过标准输出。然后是reducer.py:

#!/usr/bin/env python
#coding = utf-8

import sys
import json

def reducer():
tag_a = 0
tag_b = 0
pre_id = ''
for line in sys.stdin:
line = line.strip()
current_id,tag = line.split('\t')
if current_id != pre_id:
if tag_a==1 and tag_b==1:
tag_a = 0
tag_b = 0
print '%s' % pre_id
else :
tag_a = 0
tag_b = 0
pre_id = current_id
if tag == 'a':
if tag_a == 0:
tag_a = 1
if tag == 'b':
if tag_b == 0:
tag_b = 1
if tag_b==1 and tag_b==1:
print '%s' % pre_id

if __name__ == '__main__':
reducer()


  一个reducer可以接受N多行数据,不像java那样的一行对应一个key然后多个value,而是一个key对应一个value,但好在相同key的行都是连续的,只要在key变化的时候做一下处理就行。

  然后安排让hadoop执行,schedule.py:

#!/usr/bin/env python
#coding = utf-8

import subprocess, os
import datetime

def mr_job():
mypath = os.path.dirname(os.path.abspath(__file__))
inputpath1 = '/b/*'
inputpath2 = '/a/*'
outputpath = '/out/'
mapper = mypath + '/mapper.py'
reducer = mypath + '/reducer.py'
cmds = ['$HADOOP_HOME/bin/hadoop', 'jar', '$HADOOP_HOME/contrib/streaming/hadoop-streaming-1.2.1.jar',
'-numReduceTasks', '40',
'-input', inputpath1,
'-input', inputpath2,
'-output', outputpath,
'-mapper', mapper,
'-reducer', reducer,
'-file', mapper,
'-file', reducer,]
for f in os.listdir(mypath):
cmds.append(mypath + '/' + f)
cmd = ['$HADOOP_HOME/bin/hadoop', 'fs', '-rmr', outputpath]
subprocess.call(cmd)
subprocess.call(cmds)

def main():
mr_job()

if __name__ == '__main__':
main()


  schedule.py就是执行MapReduce的地方通过调用hadoop-streamingXXX.jar会通过调用shell命令来提交job,另外可以配置一下参数,shell命令会将制定的文件上传到hdfs然后分发到各个节点执行。。。$HADOOP_HOME就是hadoop的安装目录。。。mapper和reducer的python脚本的名字无所谓,方法名无所谓因为在配置shell执行命令时已经指定了

  上述是一个很简单的python_hadoop-streamingXXX例子。。。。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: