Hadoop Streaming例子(python)
2014-11-23 12:45
316 查看
以前总是用java写一些MapReduce程序现举一个例子使用Python通过Hadoop Streaming来实现Mapreduce。
任务描述:
HDFS上有两个目录/a和/b,里面数据均有3列,第一列都是id,第二列是各自的业务类型(这里假设/a对应a,/b对应b),第三列是一个json串。各举一例:
/a的一行:1234567 a {"name":"jiufeng","age":"27","sex":"male","school":"","status":["111","000","001"],...}
/b的一行:12345 b {"a":"abc","b":"adr","xxoo":"e",...}
要查找在/a中出现"status"且有"111"状态,而且要再/b中有这个id的所有id列表。
那么来吧,首先需要mapper来提取/a中满足"status"有"111"状态的id和第二列"a"、/b中所有行的前两列,python代码如下,mapper.py:
这个mapper是从表中输入中提取数据,然后将满足条件的数据通过标准输出。然后是reducer.py:
一个reducer可以接受N多行数据,不像java那样的一行对应一个key然后多个value,而是一个key对应一个value,但好在相同key的行都是连续的,只要在key变化的时候做一下处理就行。
然后安排让hadoop执行,schedule.py:
schedule.py就是执行MapReduce的地方通过调用hadoop-streamingXXX.jar会通过调用shell命令来提交job,另外可以配置一下参数,shell命令会将制定的文件上传到hdfs然后分发到各个节点执行。。。$HADOOP_HOME就是hadoop的安装目录。。。mapper和reducer的python脚本的名字无所谓,方法名无所谓因为在配置shell执行命令时已经指定了
上述是一个很简单的python_hadoop-streamingXXX例子。。。。
任务描述:
HDFS上有两个目录/a和/b,里面数据均有3列,第一列都是id,第二列是各自的业务类型(这里假设/a对应a,/b对应b),第三列是一个json串。各举一例:
/a的一行:1234567 a {"name":"jiufeng","age":"27","sex":"male","school":"","status":["111","000","001"],...}
/b的一行:12345 b {"a":"abc","b":"adr","xxoo":"e",...}
要查找在/a中出现"status"且有"111"状态,而且要再/b中有这个id的所有id列表。
那么来吧,首先需要mapper来提取/a中满足"status"有"111"状态的id和第二列"a"、/b中所有行的前两列,python代码如下,mapper.py:
#!/usr/bin/env python #coding = utf-8 import json import sys import traceback import datetime,time def mapper(): for line in sys.stdin: line = line.strip() id,tag,content = line.split('\t') if tag == 'a': jstr = json.loads(content) active = jstr.get('status',[]) if "111" in active: print '%s\t%s' %(id,tag) if tag == 'b': print '%s\t%s' % ( id,tag) if __name__ == '__main__': mapper()
这个mapper是从表中输入中提取数据,然后将满足条件的数据通过标准输出。然后是reducer.py:
#!/usr/bin/env python #coding = utf-8 import sys import json def reducer(): tag_a = 0 tag_b = 0 pre_id = '' for line in sys.stdin: line = line.strip() current_id,tag = line.split('\t') if current_id != pre_id: if tag_a==1 and tag_b==1: tag_a = 0 tag_b = 0 print '%s' % pre_id else : tag_a = 0 tag_b = 0 pre_id = current_id if tag == 'a': if tag_a == 0: tag_a = 1 if tag == 'b': if tag_b == 0: tag_b = 1 if tag_b==1 and tag_b==1: print '%s' % pre_id if __name__ == '__main__': reducer()
一个reducer可以接受N多行数据,不像java那样的一行对应一个key然后多个value,而是一个key对应一个value,但好在相同key的行都是连续的,只要在key变化的时候做一下处理就行。
然后安排让hadoop执行,schedule.py:
#!/usr/bin/env python #coding = utf-8 import subprocess, os import datetime def mr_job(): mypath = os.path.dirname(os.path.abspath(__file__)) inputpath1 = '/b/*' inputpath2 = '/a/*' outputpath = '/out/' mapper = mypath + '/mapper.py' reducer = mypath + '/reducer.py' cmds = ['$HADOOP_HOME/bin/hadoop', 'jar', '$HADOOP_HOME/contrib/streaming/hadoop-streaming-1.2.1.jar', '-numReduceTasks', '40', '-input', inputpath1, '-input', inputpath2, '-output', outputpath, '-mapper', mapper, '-reducer', reducer, '-file', mapper, '-file', reducer,] for f in os.listdir(mypath): cmds.append(mypath + '/' + f) cmd = ['$HADOOP_HOME/bin/hadoop', 'fs', '-rmr', outputpath] subprocess.call(cmd) subprocess.call(cmds) def main(): mr_job() if __name__ == '__main__': main()
schedule.py就是执行MapReduce的地方通过调用hadoop-streamingXXX.jar会通过调用shell命令来提交job,另外可以配置一下参数,shell命令会将制定的文件上传到hdfs然后分发到各个节点执行。。。$HADOOP_HOME就是hadoop的安装目录。。。mapper和reducer的python脚本的名字无所谓,方法名无所谓因为在配置shell执行命令时已经指定了
上述是一个很简单的python_hadoop-streamingXXX例子。。。。
相关文章推荐
- 使用python开发hadoop streaming程序及hadoop python网页抓取例子
- hadoop python streaming 特殊文本解析
- hadoop的c++版wordcount例子(streaming方式)
- 用hadoop-streaming 运行python map-reduce程序
- Python+Hadoop Streaming实现MapReduce(word count)
- 用python + hadoop streaming 编写分布式程序的本地调试方法
- c++&&python实现Hadoop Streaming的partitioner和模块化
- 使用python+hadoop-streaming编写hadoop处理程序
- Python+Hadoop Streaming实现MapReduce(如何给map和reduce的脚本传递参数)
- Python+Hadoop Streaming实现MapReduce(word count)
- HADOOP STREAMING实例HIVE引用PYTHON
- Hadoop Streaming运行Python脚本程序
- 用python + hadoop streaming 编写分布式程序(二) -- 在集群上运行与监控
- 用python + hadoop streaming 编写分布式程序(一) -- 原理介绍,样例程序与本地调试
- Python+hadoop Streaming编写的Map-Reduce程序与调试运行
- Hadoop Streaming for Python
- python Hadoop Streaming程序测试
- 用python + hadoop streaming 编写分布式程序(三) -- 自定义功能
- python hadoop 在streaming中获取文件名的方法
- 利用virtualenv在Hadoop Streaming中使用完全个性化的Python解释器