普通的一个python脚本,hadoop进军的准备
2013-07-25 10:34
393 查看
输入文件:20130712000000
格式:packageid\truleid\tpid\tuserid\tshw\tclk\tprice\ttime
输入文件: ./cache_data/0-9
格式为:userid\t客服id\t运营单位。其中0文件里存放的是所有userid%10=0的数据,5文件里存放的是所有userid%10=5的数据,其他一样。
部分数据格式为 userid\t\t,这里客服id=””
运营单位=””
任务:生成以下格式的数据。
Packageid\t运营单位\t影响的用户数\t消费额度
即根据包来获取包所影响的所有用户userid,并计算出这些用户所在的运营单位,然后按照运营单位为维度,来分别统计用户数和消费额
格式:packageid\truleid\tpid\tuserid\tshw\tclk\tprice\ttime
输入文件: ./cache_data/0-9
格式为:userid\t客服id\t运营单位。其中0文件里存放的是所有userid%10=0的数据,5文件里存放的是所有userid%10=5的数据,其他一样。
部分数据格式为 userid\t\t,这里客服id=””
运营单位=””
任务:生成以下格式的数据。
Packageid\t运营单位\t影响的用户数\t消费额度
即根据包来获取包所影响的所有用户userid,并计算出这些用户所在的运营单位,然后按照运营单位为维度,来分别统计用户数和消费额
#[work@yx-testing-ecom124.vm.baidu.com 0724]$ cat deal_union_07.py #!/bin/python #encoding=utf-8 import sys import os import time MAP_COUNT = 8 def output_dic(flag, dic, f) : if (flag != "") : for key in dic.keys() : f.write("%s\t%s\t%d\t%d\n"%(flag, key, dic[key][1], dic[key][2])) return True return False def main_07(fname_2013 = "./20130712000000", cache_2013 = "./cache_data/") : if (len(sys.argv)==3): fname_2013 = sys.argv[1] cache_2013 = sys.argv[2] line_cnt = 0 err_line_cnt = 0 cache_cnt_list = [] for i in range(10) : cache_cnt_list.append([0, 0]) dic = {} flag = "" cache = [] for i in range(10) : cache.append({}) for i in range(10) : fo = open(cache_2013 + str(i), 'r') for line in fo : lstr = line.rstrip('\n').split('\t') cache_cnt_list[i][0] += 1 if(len(lstr) < 3) : cache_cnt_list[i][1] += 1 cache[i][lstr[0]] = "NULL" continue else : cache[i][lstr[0]] = lstr[2] fo.close() for i in range(10) : if(cache_cnt_list[i][1] > 0) : print "The " + cache_2013 + str(i) + " totally %d lines processed with %d error lines (No operator)" % (cache_cnt_list[i][0], cache_cnt_list[i][1]) f_2013 = open(fname_2013, 'r') f_output_07 = open("output_07", 'w') for line in f_2013 : line_cnt += 1 record = line.rstrip('\n').split('\t') if(len(record) < MAP_COUNT) : err_line_cnt += 1 packid = record[0] userid = record[3] price = int(record[6]) adress = cache[int(userid[-1])][userid] try : if(flag == packid) : if(adress in dic.keys()) : if(dic[adress][0] != userid) : dic[adress][1] += 1 dic[adress][2] += price else : dic[adress] = [userid, 1, price] else : output_dic(flag, dic, f_output_07) dic = {} flag = packid dic[adress] = [userid, 1, price] except Exception, e : print >> sys.stderr, "%s\t%s: failed in line#%d, [err_msg:%s]" % \ (time.asctime(time.localtime(time.time())), os.path.basename(__file__), line_cnt, e) output_dic(flag, dic, f_output_07) dic = {} f_output_07.close() f_2013.close() print >> sys.stderr, "%s\t%s: totally %u lines processed with %u error lines" % \ (time.asctime(time.localtime(time.time())), "The file " + fname_2013, line_cnt, err_line_cnt) #print >> sys.stderr, "%s\t%s: totally %u lines processed with %u error lines" % \ #(time.asctime(time.localtime(time.time())), os.path.basename(__file__), line_cnt, err_line_cnt) if __name__ == '__main__': try : main_07() except Exception, e : print >> sys.stderr, "%s\t%s: failed to process file, [err_msg:%s]" \ % (time.asctime(time.localtime(time.time())), os.path.basename(__file__), e) sys.exit(-1) sys.exit(0)
data_2013原数据 350 1 1 3 1 1 5 2013 350 1 1 2 1 1 20 2013 350 1 1 4 1 1 0 2013 350 1 1 2 1 1 0 2013 350 1 1 6 1 1 1 2013 350 1 1 1 1 1 2 2013 351 1 1 2 1 2 1 2013 351 1 1 1 1 1 0 2013 352 1 1 5 1 5 0 2013 Cache_data 文件1 1 9 a 11 9 b 文件2 2 9 a 文件3 3 9 b 文件4 4 9 c 文件5 5 文件6 16 9 d 6 9 b catoutput_07 最终结果 350 a 2 22 350 c 1 0 350 b 2 6 351 a 2 1 352 NULL 1 0
相关文章推荐
- 普通的一个python脚本,hadoop进军的准备
- 一个简单的 python 实现 图片同步脚本 ,对于有图片群集的项目有参考价值
- python构建一个简单的备份脚本
- 一个批量转换文件编码的python脚本
- python脚本判断一个数是否为素数的几种方法
- python学习之路-第六天-一个简单的脚本
- 一个批量更改文件名的Python脚本
- 一个批量更改文件名的Python脚本
- python实时分析日志的一个小脚本分享
- 如何使用Python为Hadoop编写一个简单的MapReduce程序
- 用Python编写一个每天都在系统下新建一个文件夹的脚本
- 树莓派把一个python脚本作为服务运行,配置开机自动启动
- 树梅派应用44:又一个让树莓派开机运行Python脚本的方法
- DJango-如何快速准备Python虚拟开发环境并快速定制一个项目
- 实现一个简单的python小脚本的一些必要步骤
- 一个mysql异备Python脚本
- 一个python拖库字段的小脚本
- 一个Excel转换为Json格式的Python脚本
- 获取linux系统信息的一个python脚本
- Python脚本中import另外一个目录的脚本