普通的一个python脚本,hadoop进军的准备
2013-07-25 10:34
399 查看
输入文件:20130712000000
格式:packageid\truleid\tpid\tuserid\tshw\tclk\tprice\ttime
输入文件: ./cache_data/0-9
格式为:userid\t客服id\t运营单位。其中0文件里存放的是所有userid%10=0的数据,5文件里存放的是所有userid%10=5的数据,其他一样。
部分数据格式为 userid\t\t,这里客服id=””
运营单位=””
任务:生成以下格式的数据。
Packageid\t运营单位\t影响的用户数\t消费额度
即根据包来获取包所影响的所有用户userid,并计算出这些用户所在的运营单位,然后按照运营单位为维度,来分别统计用户数和消费额#[work@yx-testing-ecom124.vm.baidu.com 0724]$ cat deal_union_07.py
#!/bin/python
#encoding=utf-8
import sys
import os
import time
MAP_COUNT = 8
def output_dic(flag, dic, f) :
if (flag != "") :
for key in dic.keys() :
f.write("%s\t%s\t%d\t%d\n"%(flag, key, dic[key][1], dic[key][2]))
return True
return False
def main_07(fname_2013 = "./20130712000000", cache_2013 = "./cache_data/") :
if (len(sys.argv)==3):
fname_2013 = sys.argv[1]
cache_2013 = sys.argv[2]
line_cnt = 0
err_line_cnt = 0
cache_cnt_list = []
for i in range(10) :
cache_cnt_list.append([0, 0])
dic = {}
flag = ""
cache = []
for i in range(10) :
cache.append({})
for i in range(10) :
fo = open(cache_2013 + str(i), 'r')
for line in fo :
lstr = line.rstrip('\n').split('\t')
cache_cnt_list[i][0] += 1
if(len(lstr) < 3) :
cache_cnt_list[i][1] += 1
cache[i][lstr[0]] = "NULL"
continue
else :
cache[i][lstr[0]] = lstr[2]
fo.close()
for i in range(10) :
if(cache_cnt_list[i][1] > 0) :
print "The " + cache_2013 + str(i) + " totally %d lines processed with %d error lines (No operator)" % (cache_cnt_list[i][0], cache_cnt_list[i][1])
f_2013 = open(fname_2013, 'r')
f_output_07 = open("output_07", 'w')
for line in f_2013 :
line_cnt += 1
record = line.rstrip('\n').split('\t')
if(len(record) < MAP_COUNT) :
err_line_cnt += 1
packid = record[0]
userid = record[3]
price = int(record[6])
adress = cache[int(userid[-1])][userid]
try :
if(flag == packid) :
if(adress in dic.keys()) :
if(dic[adress][0] != userid) :
dic[adress][1] += 1
dic[adress][2] += price
else :
dic[adress] = [userid, 1, price]
else :
output_dic(flag, dic, f_output_07)
dic = {}
flag = packid
dic[adress] = [userid, 1, price]
except Exception, e :
print >> sys.stderr, "%s\t%s: failed in line#%d, [err_msg:%s]" % \
(time.asctime(time.localtime(time.time())), os.path.basename(__file__), line_cnt, e)
output_dic(flag, dic, f_output_07)
dic = {}
f_output_07.close()
f_2013.close()
print >> sys.stderr, "%s\t%s: totally %u lines processed with %u error lines" % \
(time.asctime(time.localtime(time.time())), "The file " + fname_2013, line_cnt, err_line_cnt)
#print >> sys.stderr, "%s\t%s: totally %u lines processed with %u error lines" % \
#(time.asctime(time.localtime(time.time())), os.path.basename(__file__), line_cnt, err_line_cnt)
if __name__ == '__main__':
try :
main_07()
except Exception, e :
print >> sys.stderr, "%s\t%s: failed to process file, [err_msg:%s]" \
% (time.asctime(time.localtime(time.time())), os.path.basename(__file__), e)
sys.exit(-1)
sys.exit(0)data_2013原数据
350 1 1 3 1 1 5 2013
350 1 1 2 1 1 20 2013
350 1 1 4 1 1 0 2013
350 1 1 2 1 1 0 2013
350 1 1 6 1 1 1 2013
350 1 1 1 1 1 2 2013
351 1 1 2 1 2 1 2013
351 1 1 1 1 1 0 2013
352 1 1 5 1 5 0 2013
Cache_data
文件1
1 9 a
11 9 b
文件2
2 9 a
文件3
3 9 b
文件4
4 9 c
文件5
5
文件6
16 9 d
6 9 b
catoutput_07 最终结果
350 a 2 22
350 c 1 0
350 b 2 6
351 a 2 1
352 NULL 1 0
格式:packageid\truleid\tpid\tuserid\tshw\tclk\tprice\ttime
输入文件: ./cache_data/0-9
格式为:userid\t客服id\t运营单位。其中0文件里存放的是所有userid%10=0的数据,5文件里存放的是所有userid%10=5的数据,其他一样。
部分数据格式为 userid\t\t,这里客服id=””
运营单位=””
任务:生成以下格式的数据。
Packageid\t运营单位\t影响的用户数\t消费额度
即根据包来获取包所影响的所有用户userid,并计算出这些用户所在的运营单位,然后按照运营单位为维度,来分别统计用户数和消费额#[work@yx-testing-ecom124.vm.baidu.com 0724]$ cat deal_union_07.py
#!/bin/python
#encoding=utf-8
import sys
import os
import time
MAP_COUNT = 8
def output_dic(flag, dic, f) :
if (flag != "") :
for key in dic.keys() :
f.write("%s\t%s\t%d\t%d\n"%(flag, key, dic[key][1], dic[key][2]))
return True
return False
def main_07(fname_2013 = "./20130712000000", cache_2013 = "./cache_data/") :
if (len(sys.argv)==3):
fname_2013 = sys.argv[1]
cache_2013 = sys.argv[2]
line_cnt = 0
err_line_cnt = 0
cache_cnt_list = []
for i in range(10) :
cache_cnt_list.append([0, 0])
dic = {}
flag = ""
cache = []
for i in range(10) :
cache.append({})
for i in range(10) :
fo = open(cache_2013 + str(i), 'r')
for line in fo :
lstr = line.rstrip('\n').split('\t')
cache_cnt_list[i][0] += 1
if(len(lstr) < 3) :
cache_cnt_list[i][1] += 1
cache[i][lstr[0]] = "NULL"
continue
else :
cache[i][lstr[0]] = lstr[2]
fo.close()
for i in range(10) :
if(cache_cnt_list[i][1] > 0) :
print "The " + cache_2013 + str(i) + " totally %d lines processed with %d error lines (No operator)" % (cache_cnt_list[i][0], cache_cnt_list[i][1])
f_2013 = open(fname_2013, 'r')
f_output_07 = open("output_07", 'w')
for line in f_2013 :
line_cnt += 1
record = line.rstrip('\n').split('\t')
if(len(record) < MAP_COUNT) :
err_line_cnt += 1
packid = record[0]
userid = record[3]
price = int(record[6])
adress = cache[int(userid[-1])][userid]
try :
if(flag == packid) :
if(adress in dic.keys()) :
if(dic[adress][0] != userid) :
dic[adress][1] += 1
dic[adress][2] += price
else :
dic[adress] = [userid, 1, price]
else :
output_dic(flag, dic, f_output_07)
dic = {}
flag = packid
dic[adress] = [userid, 1, price]
except Exception, e :
print >> sys.stderr, "%s\t%s: failed in line#%d, [err_msg:%s]" % \
(time.asctime(time.localtime(time.time())), os.path.basename(__file__), line_cnt, e)
output_dic(flag, dic, f_output_07)
dic = {}
f_output_07.close()
f_2013.close()
print >> sys.stderr, "%s\t%s: totally %u lines processed with %u error lines" % \
(time.asctime(time.localtime(time.time())), "The file " + fname_2013, line_cnt, err_line_cnt)
#print >> sys.stderr, "%s\t%s: totally %u lines processed with %u error lines" % \
#(time.asctime(time.localtime(time.time())), os.path.basename(__file__), line_cnt, err_line_cnt)
if __name__ == '__main__':
try :
main_07()
except Exception, e :
print >> sys.stderr, "%s\t%s: failed to process file, [err_msg:%s]" \
% (time.asctime(time.localtime(time.time())), os.path.basename(__file__), e)
sys.exit(-1)
sys.exit(0)data_2013原数据
350 1 1 3 1 1 5 2013
350 1 1 2 1 1 20 2013
350 1 1 4 1 1 0 2013
350 1 1 2 1 1 0 2013
350 1 1 6 1 1 1 2013
350 1 1 1 1 1 2 2013
351 1 1 2 1 2 1 2013
351 1 1 1 1 1 0 2013
352 1 1 5 1 5 0 2013
Cache_data
文件1
1 9 a
11 9 b
文件2
2 9 a
文件3
3 9 b
文件4
4 9 c
文件5
5
文件6
16 9 d
6 9 b
catoutput_07 最终结果
350 a 2 22
350 c 1 0
350 b 2 6
351 a 2 1
352 NULL 1 0
相关文章推荐
- 普通的一个python脚本,hadoop进军的准备
- 一个简单的 python 实现 图片同步脚本 ,对于有图片群集的项目有参考价值
- python构建一个简单的备份脚本
- 一个批量转换文件编码的python脚本
- python脚本判断一个数是否为素数的几种方法
- python学习之路-第六天-一个简单的脚本
- 一个批量更改文件名的Python脚本
- 一个批量更改文件名的Python脚本
- python实时分析日志的一个小脚本分享
- 如何使用Python为Hadoop编写一个简单的MapReduce程序
- 用Python编写一个每天都在系统下新建一个文件夹的脚本
- 树莓派把一个python脚本作为服务运行,配置开机自动启动
- 树梅派应用44:又一个让树莓派开机运行Python脚本的方法
- DJango-如何快速准备Python虚拟开发环境并快速定制一个项目
- 实现一个简单的python小脚本的一些必要步骤
- 一个mysql异备Python脚本
- 一个python拖库字段的小脚本
- 一个Excel转换为Json格式的Python脚本
- 获取linux系统信息的一个python脚本
- Python脚本中import另外一个目录的脚本