Hadoop streaming 编写MapReduce程序-二次排序,多文件输入
2017-08-08 14:08
453 查看
这个例子用到了二次排序,多文件输入的处理
Mapper.py
import sys,hashlib,struct,os from urllib import unquote tag=['11522','1157','15999','44060','373934'] if __name__=="__main__": for line in sys.stdin: line = line.strip() tks = line.split("\t") if len(tks) < 1: continue input_file = "" try: input_file = os.environ['mapreduce_map_input_file'] except KeyError: input_file = os.environ['map_input_file'] if str(os.environ.get('input_dir')) in input_file: if len(tks)<5: continue alist=tks[3:len(tks)-1:2] if( tag[0] in alist and alist.index(tag[0])<len(alist)/2 and alist.index(tag[0])<128 ) or (tag[1] in alist and alist.index(tag[1])<len(alist)/2 and alist.index(tag[1])<128 ): print tks[0] + "\t" + "2" + "\t" + '2' elif 'qq' in input_file: if len(tks) > 0: print tks[0] + "\t" + "1" + "\t"+ tks[1] else: continue
Reducer.py
import sys,re pre_user = "" gray = 0 qq = "" for line in sys.stdin: line = line.strip() tks = line.split("\t") if len(tks) < 3: continue if tks[0] != pre_user: gray = 0 pre_user = tks[0] qq = "" if tks[1] == '1': gray = 1 qq = tks[2] elif gray == 1 and tks[1] == '2' and len(tks)>2: print qq gray = 0
import sys,re,hashlib,struct,os from urllib import unquote def GetMD5Uint64(src): (value_1, value_2) = struct.unpack("QQ", hashlib.md5(src).digest()) return value_1 dic = {} dic_path1 = str(os.environ.get('taginfo1')) for f in os.listdir(dic_path1): f = dic_path1 + "/" + f if os.path.isfile(f): file = open(f,"r") for line in file: line = line.strip() tks = line.split("\t") dic[tks[0]] = tks[1:] tags='' for i in range(0,len(dic[tks[0]])-1,2): tags+=dic[tks[0]][i]+'&' dic[tks[0]]=tags dic_path2 = str(os.environ.get('taginfo2')) if dic_path2 != dic_path1: for f in os.listdir(dic_path2): f = dic_path2 + "/" + f if os.path.isfile(f): file = open(f,"r") for line in file: line = line.strip() tks = line.split("\t") dic[tks[0]] = tks[1:] tags='' for i in range(0,len(dic[tks[0]])-1,2): tags+=dic[tks[0]][i]+'&' dic[tks[0]]=tags dic_path3 = str(os.environ.get('taginfo3')) if dic_path3 != dic_path2: 4000 for f in os.listdir(dic_path3): f = dic_path3 + "/" + f if os.path.isfile(f): file = open(f,"r") for line in file: line = line.strip() tks = line.split("\t") dic[tks[0]] = tks[1:] for i in range(0,len(dic[tks[0]])-1,2): tags+=dic[tks[0]][i]+'&' dic[tks[0]]=tags for line in sys.stdin: line = line.strip() tks = line.split(",") if len(tks) <= 9: continue if (tks[9].find('V',8)==8): values=tks[9]+','+tks[10]+','+tks[11] if dic.has_key(tks[9]): values+=','+dic[tks[9]] keys=tks[0]+','+tks[8] print keys+'\t'+values
运行脚本
day=`date +%Y%m%d -d "${1}"` input_dir=${2} whitelist_dir=${3} md5_flag=${4} output=${5} hadoop jar hadoop-streaming-2.3.0-cdh5.1.0.jar -archives hadoop-streaming-2.3.0-cdh5.1.0.jar \ -D mapred.reduce.tasks=100 \ -D mapreduce.job.name=model_utils \ -D mapreduce.job.queuename="hhg_rcm" \ -D stream.num.map.output.key.fields=2 \ -D num.key.fields.for.partition=1 \ -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \ -mapper mapper.py \ -file mapper.py \ -reducer reducer.py \ -file reducer.py \ -cmdenv md5_flag=${md5_flag} \ -cmdenv input_dir=${input_dir} \ -cmdenv whitelist_dir=${whitelist_dir} \ -input ${input_dir} \ -input ${whitelist_dir} \ -output ${output} hadoop jar hadoop-streaming-2.3.0-cdh5.1.0.jar -archives hadoop-streaming-2.3.0-cdh5.1.0.jar \ -D mapred.reduce.tasks=1 \ -D mapreduce.job.name=model_utils \ -D mapreduce.job.queuename="dian_rcm" \ -D stream.num.map.output.key.fields=2 \ -D num.key.fields.for.partition=1 \ -D mapred.text.key.comparator.options="-k2,2nr" \ -D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \ -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \ -mapper mapper.py \ -file mapper.py \ -reducer reducer.py \ -file reducer.py \ -input ${input_dir} \ -output ${output}
测试
将单前文件夹中多个输出合并到一个文件result中 find . -type f -exec cat {} \;>result.txt cat ./out1 |python mapper.py|sort -k2,2nr|python reducer.py 按照第二个排序,默认是tab做分隔 cat ./test_224/* | python mapper.py |sort -k 1,2 -t"," | python reducer.py > reducerout cat ./out3 |python mapper.py| sort -k1,1n -k2,2nr|python reducer.py
相关文章推荐
- 借助hadoop streaming,使用C++编写MapReduce程序
- hadoop平台使用python编写mapreduce排序小程序
- 使用python+hadoop-streaming编写hadoop处理程序
- 编写程序,从键盘输入各位职工的工资数据,存入磁盘文件Salary.dat中,然后从该文件读出职工的工资数据,并计算输出每位职工的实发工资。实发工资的计算方法如下:实发工资=基本工资+加班工奖金-扣除
- Hadoop基础教程-第7章 MapReduce进阶(7.6 MapReduce 二次排序)
- 把以下IP存入一个txt文件,编写程序把这些IP按数值大小,从小到达排序并打印出来。 61.54.231.245 61.54.231.9 61.54.231.246 61.54.231.48
- 如何在Hadoop的MapReduce程序中处理JSON文件
- Hadoop MapReduce编程 API入门系列之自定义多种输入格式数据类型和排序多种输出格式(十一)
- 编写简单的Mapreduce程序并部署在Hadoop2.2.0上运行
- 自定义 hadoop MapReduce InputFormat 切分输入文件
- 如何在Hadoop中使用Streaming编写MapReduce(转帖)
- 如何在Hadoop上编写MapReduce程序
- 如何使用Python为Hadoop编写一个简单的MapReduce程序
- 编写一个程序,使其从标准输入读取字符,直到遇到文件结尾。对美个字符 程序需要检查并报告改字符是否是一个字母。如果是报告字母在字符表中的位置,否则返回-1
- Hadoop-Python实现Hadoop Streaming分组和二次排序
- [置顶] 编写简单的Mapreduce程序并部署在Hadoop2.2.0上运行
- hadoop之MapReduce自定义二次排序流程实例详解 推荐
- 编写一个程序,一行行地读取输入行,直至到达文件尾。算出每行输入行的长度,然后把最长的那行打印出来。为了简单起见,你可以假定所有的输入行均不超过1000个字符
- 编写IoDemo.java的Java应用程序,程序完成的功能是:首先读取text.txt文件内容,再通过键盘输入文件的名称为iodemo.txt,把text.txt的内容存入iodemo.txt
- 编写一个应用程序,用户分别从两个文本框输入学术的姓名和分数,程序按成绩排序将这些学生的姓名和分数显示在一个文本区中。