您的位置:首页 > 编程语言 > Python开发

普通的一个python脚本,hadoop进军的准备

2013-07-25 10:34 399 查看
输入文件:20130712000000

         格式:packageid\truleid\tpid\tuserid\tshw\tclk\tprice\ttime

         输入文件: ./cache_data/0-9

        格式为:userid\t客服id\t运营单位。其中0文件里存放的是所有userid%10=0的数据,5文件里存放的是所有userid%10=5的数据,其他一样。

        部分数据格式为 userid\t\t,这里客服id=””
运营单位=””

       任务:生成以下格式的数据。

       Packageid\t运营单位\t影响的用户数\t消费额度

       即根据包来获取包所影响的所有用户userid,并计算出这些用户所在的运营单位,然后按照运营单位为维度,来分别统计用户数和消费额#[work@yx-testing-ecom124.vm.baidu.com 0724]$ cat deal_union_07.py
#!/bin/python
#encoding=utf-8
import sys
import os
import time
MAP_COUNT = 8
def output_dic(flag, dic, f) :
if (flag != "") :
for key in dic.keys() :
f.write("%s\t%s\t%d\t%d\n"%(flag, key, dic[key][1], dic[key][2]))
return True
return False

def main_07(fname_2013 = "./20130712000000", cache_2013 = "./cache_data/") :
if (len(sys.argv)==3):
fname_2013 = sys.argv[1]
cache_2013 = sys.argv[2]
line_cnt = 0
err_line_cnt = 0
cache_cnt_list = []
for i in range(10) :
cache_cnt_list.append([0, 0])
dic = {}
flag = ""
cache = []
for i in range(10) :
cache.append({})

for i in range(10) :
fo = open(cache_2013 + str(i), 'r')
for line in fo :
lstr = line.rstrip('\n').split('\t')
cache_cnt_list[i][0] += 1
if(len(lstr) < 3) :
cache_cnt_list[i][1] += 1
cache[i][lstr[0]] = "NULL"
continue
else :
cache[i][lstr[0]] = lstr[2]
fo.close()
for i in range(10) :
if(cache_cnt_list[i][1] > 0) :
print "The " + cache_2013 + str(i) + " totally %d lines processed with %d error lines (No operator)" % (cache_cnt_list[i][0], cache_cnt_list[i][1])
f_2013 = open(fname_2013, 'r')

f_output_07 = open("output_07", 'w')

for line in f_2013 :
line_cnt += 1
record = line.rstrip('\n').split('\t')
if(len(record) < MAP_COUNT) :
err_line_cnt += 1
packid = record[0]
userid = record[3]
price = int(record[6])
adress = cache[int(userid[-1])][userid]

try :
if(flag == packid) :
if(adress in dic.keys()) :
if(dic[adress][0] != userid) :
dic[adress][1] += 1
dic[adress][2] += price
else :
dic[adress] = [userid, 1, price]
else :
output_dic(flag, dic, f_output_07)
dic = {}
flag = packid
dic[adress] = [userid, 1, price]
except Exception, e :
print >> sys.stderr, "%s\t%s: failed in line#%d, [err_msg:%s]" % \
(time.asctime(time.localtime(time.time())), os.path.basename(__file__), line_cnt, e)

output_dic(flag, dic, f_output_07)
dic = {}
f_output_07.close()
f_2013.close()
print >> sys.stderr, "%s\t%s: totally %u lines processed with %u error lines" % \
(time.asctime(time.localtime(time.time())), "The file " + fname_2013, line_cnt, err_line_cnt)
#print >> sys.stderr, "%s\t%s: totally %u lines processed with %u error lines" % \
#(time.asctime(time.localtime(time.time())), os.path.basename(__file__), line_cnt, err_line_cnt)

if __name__ == '__main__':
try :
main_07()
except Exception, e :
print >> sys.stderr, "%s\t%s: failed to process file, [err_msg:%s]" \
% (time.asctime(time.localtime(time.time())), os.path.basename(__file__), e)
sys.exit(-1)
sys.exit(0)
data_2013原数据
350    1       1       3      1       1       5      2013
350    1       1       2      1       1       20     2013
350    1       1       4      1       1       0      2013
350    1       1       2      1       1       0      2013
350    1       1       6      1       1       1      2013
350    1       1       1      1       1       2      2013
351    1       1       2      1       2       1      2013
351    1       1       1      1       1       0      2013
352    1       1       5      1       5       0      2013
Cache_data
文件1
1      9       a
11     9       b
文件2
2      9       a
文件3
3      9       b
文件4
4      9       c
文件5
5
文件6
16     9       d
6      9       b
catoutput_07 最终结果
350    a       2       22
350    c       1       0
350    b       2       6
351    a       2       1
352    NULL    1       0
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: