您的位置:首页 > 运维架构

Hadoop streaming 编写MapReduce程序-二次排序,多文件输入

2017-08-08 14:08 453 查看

这个例子用到了二次排序,多文件输入的处理

Mapper.py

import sys,hashlib,struct,os
from urllib import unquote

tag=['11522','1157','15999','44060','373934']
if __name__=="__main__":
for line in sys.stdin:
line = line.strip()
tks = line.split("\t")
if len(tks) < 1:
continue
input_file = ""
try:
input_file = os.environ['mapreduce_map_input_file']
except KeyError:
input_file = os.environ['map_input_file']
if str(os.environ.get('input_dir')) in input_file:
if len(tks)<5:
continue
alist=tks[3:len(tks)-1:2]
if( tag[0] in alist and alist.index(tag[0])<len(alist)/2 and alist.index(tag[0])<128 ) or (tag[1] in alist and alist.index(tag[1])<len(alist)/2 and alist.index(tag[1])<128 ):
print tks[0] + "\t" + "2" + "\t" + '2'
elif 'qq' in input_file:
if len(tks) > 0:
print tks[0] + "\t" + "1" + "\t"+ tks[1]
else:
continue


Reducer.py

import sys,re

pre_user = ""
gray = 0
qq = ""
for line in sys.stdin:
line = line.strip()
tks = line.split("\t")
if len(tks) < 3:
continue
if tks[0] != pre_user:
gray = 0
pre_user = tks[0]
qq = ""
if tks[1] == '1':
gray = 1
qq = tks[2]
elif gray == 1 and tks[1] == '2' and len(tks)>2:
print qq
gray = 0


import sys,re,hashlib,struct,os
from urllib import unquote

def GetMD5Uint64(src):
(value_1, value_2) = struct.unpack("QQ", hashlib.md5(src).digest())
return value_1

dic = {}
dic_path1 = str(os.environ.get('taginfo1'))
for f in os.listdir(dic_path1):
f = dic_path1 + "/" + f
if os.path.isfile(f):
file = open(f,"r")
for line in file:
line = line.strip()
tks = line.split("\t")
dic[tks[0]] = tks[1:]
tags=''
for i in range(0,len(dic[tks[0]])-1,2):
tags+=dic[tks[0]][i]+'&'
dic[tks[0]]=tags

dic_path2 = str(os.environ.get('taginfo2'))
if dic_path2 != dic_path1:
for f in os.listdir(dic_path2):
f = dic_path2 + "/" + f
if os.path.isfile(f):
file = open(f,"r")
for line in file:
line = line.strip()
tks = line.split("\t")
dic[tks[0]] = tks[1:]
tags=''
for i in range(0,len(dic[tks[0]])-1,2):
tags+=dic[tks[0]][i]+'&'
dic[tks[0]]=tags

dic_path3 = str(os.environ.get('taginfo3'))
if dic_path3 != dic_path2:

4000
for f in os.listdir(dic_path3):
f = dic_path3 + "/" + f
if os.path.isfile(f):
file = open(f,"r")
for line in file:
line = line.strip()
tks = line.split("\t")
dic[tks[0]] = tks[1:]
for i in range(0,len(dic[tks[0]])-1,2):
tags+=dic[tks[0]][i]+'&'
dic[tks[0]]=tags

for line in sys.stdin:
line = line.strip()
tks = line.split(",")
if len(tks) <= 9:
continue
if (tks[9].find('V',8)==8):
values=tks[9]+','+tks[10]+','+tks[11]
if dic.has_key(tks[9]):
values+=','+dic[tks[9]]
keys=tks[0]+','+tks[8]
print keys+'\t'+values


运行脚本

day=`date +%Y%m%d -d "${1}"`
input_dir=${2}
whitelist_dir=${3}
md5_flag=${4}
output=${5}
hadoop jar hadoop-streaming-2.3.0-cdh5.1.0.jar -archives hadoop-streaming-2.3.0-cdh5.1.0.jar \
-D mapred.reduce.tasks=100 \
-D mapreduce.job.name=model_utils \
-D mapreduce.job.queuename="hhg_rcm" \
-D stream.num.map.output.key.fields=2 \
-D num.key.fields.for.partition=1 \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
-mapper mapper.py \
-file mapper.py \
-reducer reducer.py \
-file reducer.py \
-cmdenv md5_flag=${md5_flag} \
-cmdenv input_dir=${input_dir} \
-cmdenv whitelist_dir=${whitelist_dir} \
-input ${input_dir} \
-input ${whitelist_dir} \
-output ${output}

hadoop jar hadoop-streaming-2.3.0-cdh5.1.0.jar -archives hadoop-streaming-2.3.0-cdh5.1.0.jar \
-D mapred.reduce.tasks=1 \
-D mapreduce.job.name=model_utils \
-D mapreduce.job.queuename="dian_rcm" \
-D stream.num.map.output.key.fields=2 \
-D num.key.fields.for.partition=1 \
-D mapred.text.key.comparator.options="-k2,2nr" \
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
-mapper mapper.py \
-file mapper.py \
-reducer reducer.py \
-file reducer.py \
-input ${input_dir} \
-output ${output}


测试

将单前文件夹中多个输出合并到一个文件result中
find . -type f -exec cat {} \;>result.txt
cat ./out1 |python mapper.py|sort -k2,2nr|python reducer.py  按照第二个排序,默认是tab做分隔
cat ./test_224/* | python mapper.py |sort -k 1,2 -t"," | python reducer.py > reducerout
cat ./out3 |python mapper.py| sort -k1,1n -k2,2nr|python reducer.py
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐