使用Python脚本从Hive中取数据计算后加载到Mysql示例
2016-12-21 11:05
746 查看
由于没有在服务器上安装Python库的权限,所以此文中采用了Os操作Hive及Mysql库的方式进行数据的读取和写入。
重点关注和学习:
Python接收和传送通过Os操作数据库的方式;
外部参数传送到Python并使用;
Python进行数据分组汇总;
利用Python的set对象进行数据的去重及交、并、差的操作;
Python的set、list、str之间的相互转换;
Python中函数中定义函数,及函数的调用、函数参数的使用;
“if __name__ == '__main__':”主程序入口的使用;
Python时间的打印,计算程序的执行时间;
等等。
1、新用户一月内每日留存的计算(无外部参数传入,内部定义)
/Users/nisj/PycharmProjects/EsDataProc/Hive_remain.py
2、新用户一月内每日留存的计算(外部参数传入)
/Users/nisj/PycharmProjects/EsDataProc/Hive_remain_par.py
3、新用户一月内每日留存的计算(带入平台、渠道维度)
/Users/nisj/PycharmProjects/EsDataProc/Hive_remain_group_par.py
4、DAU的计算(带入平台、渠道维度)
/Users/nisj/PycharmProjects/EsDataProc/Hive_dau_group_par.py
5、终极留存的计算(带入平台、渠道维度)
/Users/nisj/PycharmProjects/EsDataProc/Hive_final_remain_par.py
6、数据批量调度的方式
重点关注和学习:
Python接收和传送通过Os操作数据库的方式;
外部参数传送到Python并使用;
Python进行数据分组汇总;
利用Python的set对象进行数据的去重及交、并、差的操作;
Python的set、list、str之间的相互转换;
Python中函数中定义函数,及函数的调用、函数参数的使用;
“if __name__ == '__main__':”主程序入口的使用;
Python时间的打印,计算程序的执行时间;
等等。
1、新用户一月内每日留存的计算(无外部参数传入,内部定义)
/Users/nisj/PycharmProjects/EsDataProc/Hive_remain.py
# -*- coding=utf-8 -*- import warnings import datetime import time import os warnings.filterwarnings("ignore") today = datetime.date.today() yesterday = today - datetime.timedelta(days=1) tomorrow = today + datetime.timedelta(days=1) batch_date = today - datetime.timedelta(days=52) now_time = time.strftime('%Y-%m-%d %X', time.localtime()) print "当前时间是:",now_time remain_static = {} newuser_data = os.popen("""/usr/lib/hive-current/bin/hive -e " \ select a1.appsource,a1.appkey,a1.identifier from ( \ select appsource,appkey,identifier \ from bi_all_access_log \ where pt_day = '%s' ) a1 \ left join \ (select appsource,appkey,identifier \ from bi_all_access_log \ where pt_day < '%s' ) a2 on a1.appkey=a2.appkey and a1.identifier=a2.identifier and a1.appsource=a2.appsource \ where a2.identifier is null \ ;" \ """ %(batch_date,batch_date)).readlines(); remain_static['newuser_cnt'] = len(newuser_data) for i in range(2,31): sql_text = "select appsource,appkey,identifier from bi_all_access_log where pt_day = date_add('%s',%s);"%(batch_date,i) print sql_text day2user_data = os.popen("""/usr/lib/hive-current/bin/hive -e "%s" """%(sql_text)).readlines(); day2remain_data = set(newuser_data) & set(day2user_data) remain_static['day%sremain_cnt'%(i)] = len(day2remain_data) # print remain_static['newuser_cnt'], remain_static['day2remain_cnt'], remain_static['day3remain_cnt'], remain_static['day4remain_cnt'], remain_static['day5remain_cnt'], remain_static['day6remain_cnt'] os.system("""/usr/bin/mysql -hwebserverip -P6603 -uwebserveruser -pwebserverpasswd -e "use funnyai_data;delete from bi_remain_user_static where data_date='%s'; \ insert into bi_remain_user_static(data_date,newuser_cnt,day2remain_cnt,day3remain_cnt,day4remain_cnt,day5remain_cnt,day6remain_cnt,day7remain_cnt,day8remain_cnt,day9remain_cnt,day10remain_cnt,day11remain_cnt,day12remain_cnt,day13remain_cnt,day14remain_cnt,day15remain_cnt,day16remain_cnt,day17remain_cnt,day18remain_cnt,day19remain_cnt,day20remain_cnt,day21remain_cnt,day22remain_cnt,day23remain_cnt,day24remain_cnt,day25remain_cnt,day26remain_cnt,day27remain_cnt,day28remain_cnt,day29remain_cnt,day30remain_cnt) \ select '%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s'; \ " """ % (batch_date, batch_date, remain_static['newuser_cnt'], remain_static['day2remain_cnt'], remain_static['day3remain_cnt'], remain_static['day4remain_cnt'], remain_static['day5remain_cnt'], remain_static['day6remain_cnt'], remain_static['day7remain_cnt'], remain_static['day8remain_cnt'], remain_static['day9remain_cnt'], remain_static['day10remain_cnt'], remain_static['day11remain_cnt'], remain_static['day12remain_cnt'], remain_static['day13remain_cnt'], remain_static['day14remain_cnt'], remain_static['day15remain_cnt'], remain_static['day16remain_cnt'], remain_static['day17remain_cnt'], remain_static['day18remain_cnt'], remain_static['day19remain_cnt'], remain_static['day20remain_cnt'], remain_static['day21remain_cnt'], remain_static['day22remain_cnt'], remain_static['day23remain_cnt'], remain_static['day24remain_cnt'], remain_static['day25remain_cnt'], remain_static['day26remain_cnt'], remain_static['day27remain_cnt'], remain_static['day28remain_cnt'], remain_static['day29remain_cnt'], remain_static['day30remain_cnt'])) now_time = time.strftime('%Y-%m-%d %X', time.localtime()) print "当前时间是:",now_time
2、新用户一月内每日留存的计算(外部参数传入)
/Users/nisj/PycharmProjects/EsDataProc/Hive_remain_par.py
# -*- coding=utf-8 -*- import warnings import datetime import time import os import sys warnings.filterwarnings("ignore") today = datetime.date.today() yesterday = today - datetime.timedelta(days=1) tomorrow = today + datetime.timedelta(days=1) # batch_date = today - datetime.timedelta(days=52) now_time = time.strftime('%Y-%m-%d %X', time.localtime()) print "当前时间是:",now_time def user_remain_proc(batch_date): remain_static = {} newuser_data = os.popen("""/usr/lib/hive-current/bin/hive -e " \ select a1.appsource,a1.appkey,a1.identifier from ( \ select appsource,appkey,identifier \ from bi_all_access_log \ where pt_day = '%s' ) a1 \ left join \ (select appsource,appkey,identifier \ from bi_all_access_log \ where pt_day < '%s' ) a2 on a1.appkey=a2.appkey and a1.identifier=a2.identifier and a1.appsource=a2.appsource \ where a2.identifier is null \ ;" \ """ %(batch_date,batch_date)).readlines(); remain_static['newuser_cnt'] = len(newuser_data) for i in range(2,31): sql_text = "select appsource,appkey,identifier from bi_all_access_log where pt_day = date_add('%s',%s);"%(batch_date,i) print sql_text day2user_data = os.popen("""/usr/lib/hive-current/bin/hive -e "%s" """%(sql_text)).readlines(); day2remain_data = set(newuser_data) & set(day2user_data) remain_static['day%sremain_cnt'%(i)] = len(day2remain_data) os.system("""/usr/bin/mysql -hwebserverip -P6603 -uwebserveruser -pwebserverpasswd -e "use funnyai_data;delete from bi_remain_user_static where data_date='%s'; \ insert into bi_remain_user_static(data_date,newuser_cnt,day2remain_cnt,day3remain_cnt,day4remain_cnt,day5remain_cnt,day6remain_cnt,day7remain_cnt,day8remain_cnt,day9remain_cnt,day10remain_cnt,day11remain_cnt,day12remain_cnt,day13remain_cnt,day14remain_cnt,day15remain_cnt,day16remain_cnt,day17remain_cnt,day18remain_cnt,day19remain_cnt,day20remain_cnt,day21remain_cnt,day22remain_cnt,day23remain_cnt,day24remain_cnt,day25remain_cnt,day26remain_cnt,day27remain_cnt,day28remain_cnt,day29remain_cnt,day30remain_cnt) \ select '%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s'; \ " """ % (batch_date, batch_date, remain_static['newuser_cnt'], remain_static['day2remain_cnt'], remain_static['day3remain_cnt'], remain_static['day4remain_cnt'], remain_static['day5remain_cnt'], remain_static['day6remain_cnt'], remain_static['day7remain_cnt'], remain_static['day8remain_cnt'], remain_static['day9remain_cnt'], remain_static['day10remain_cnt'], remain_static['day11remain_cnt'], remain_static['day12remain_cnt'], remain_static['day13remain_cnt'], remain_static['day14remain_cnt'], remain_static['day15remain_cnt'], remain_static['day16remain_cnt'], remain_static['day17remain_cnt'], remain_static['day18remain_cnt'], remain_static['day19remain_cnt'], remain_static['day20remain_cnt'], remain_static['day21remain_cnt'], remain_static['day22remain_cnt'], remain_static['day23remain_cnt'], remain_static['day24remain_cnt'], remain_static['day25remain_cnt'], remain_static['day26remain_cnt'], remain_static['day27remain_cnt'], remain_static['day28remain_cnt'], remain_static['day29remain_cnt'], remain_static['day30remain_cnt'])) if __name__ == '__main__': for batch_date in sys.argv[1:]: print batch_date user_remain_proc(batch_date) now_time = time.strftime('%Y-%m-%d %X', time.localtime()) print "当前时间是:",now_time
3、新用户一月内每日留存的计算(带入平台、渠道维度)
/Users/nisj/PycharmProjects/EsDataProc/Hive_remain_group_par.py
# -*- coding=utf-8 -*- import warnings import datetime import time import os import re import sys from itertools import groupby from operator import itemgetter warnings.filterwarnings("ignore") today = datetime.date.today() yesterday = today - datetime.timedelta(days=1) tomorrow = today + datetime.timedelta(days=1) # batch_date = today - datetime.timedelta(days=52) now_time = time.strftime('%Y-%m-%d %X', time.localtime()) print "当前时间是:",now_time def user_remain_proc(batch_date): os.system("""/usr/bin/mysql -hwebserverip -P6603 -uwebserveruser -pwebserverpasswd -e "use funnyai_data; \ delete from bi_user_remain_static where data_date='%s'; \ " """ % (batch_date)) newuser_data = os.popen("""/usr/lib/hive-current/bin/hive -e " \ select a1.appsource,a1.appkey,a1.identifier from ( \ select appsource,appkey,identifier \ from bi_all_access_log \ where pt_day = '%s' ) a1 \ left join \ (select appsource,appkey,identifier \ from bi_all_access_log \ where pt_day < '%s' ) a2 on a1.appkey=a2.appkey and a1.identifier=a2.identifier and a1.appsource=a2.appsource \ where a2.identifier is null \ ;" \ """ %(batch_date,batch_date)).readlines(); for i in range(31): sql_text = "select appsource,appkey,identifier from bi_all_access_log where pt_day = date_add('%s',%s) ;"%(batch_date, i) print sql_text day2user_data = os.popen("""/usr/lib/hive-current/bin/hive -e "%s" """%(sql_text)).readlines(); day2remain_data = set(newuser_data) & set(day2user_data) day2remain_data = list(day2remain_data) day2remain_list = [] for d2rm_list in day2remain_data: d2l = re.split('\t',d2rm_list.replace('\n','')) day2remain_list.append(d2l) day2remain_data_sorted = sorted(day2remain_list, key=itemgetter(0, 1)) groupby_day2remain_data = groupby(day2remain_data_sorted, key=itemgetter(0, 1)) rl = [] for key, item in groupby_day2remain_data: item_cnt = 0 for jtem in item: item_cnt += 1 groupby_day2remain_list = (key, item_cnt) rl.append(groupby_day2remain_list) # print rl for x in rl: print x[0][0], x[0][1], x[1] appsource = x[0][0] appkey = x[0][1] data_type = "day(%s)remain_cnt" % (i) data_cnt = x[1] etl_time = time.strftime('%Y-%m-%d %X', time.localtime()) os.system("""/usr/bin/mysql -hwebserverip -P6603 -uwebserveruser -pwebserverpasswd -e "use funnyai_data; \ insert into bi_user_remain_static(data_date,appsource,appkey,data_type,data_cnt,etl_time) \ select '%s','%s','%s','%s','%s','%s'; \ " """ % (batch_date, appsource, appkey, data_type, data_cnt, etl_time)) if __name__ == '__main__': for batch_date in sys.argv[1:]: print batch_date user_remain_proc(batch_date) now_time = time.strftime('%Y-%m-%d %X', time.localtime()) print "当前时间是:",now_time
4、DAU的计算(带入平台、渠道维度)
/Users/nisj/PycharmProjects/EsDataProc/Hive_dau_group_par.py
# -*- coding=utf-8 -*- import warnings import datetime import time import os import re import sys from itertools import groupby from operator import itemgetter warnings.filterwarnings("ignore") today = datetime.date.today() yesterday = today - datetime.timedelta(days=1) tomorrow = today + datetime.timedelta(days=1) # batch_date = today - datetime.timedelta(days=52) now_time = time.strftime('%Y-%m-%d %X', time.localtime()) print "当前时间是:",now_time def user_dau_proc(batch_date): os.system("""/usr/bin/mysql -hwebserverip -P6603 -uwebserveruser -pwebserverpasswd -e "use funnyai_data; \ delete from bi_user_dau_static where data_date='%s'; \ " """ % (batch_date)) def dau_data_proc(dau_data, data_type): dau_data = set(dau_data) dau_data = list(dau_data) dau_list = [] for dd_list in dau_data: ddl = re.split('\t',dd_list.replace('\n','')) dau_list.append(ddl) dau_data_sorted = sorted(dau_list, key=itemgetter(0, 1)) groupby_dau_data = groupby(dau_data_sorted, key=itemgetter(0, 1)) rl = [] for key, item in groupby_dau_data: item_cnt = 0 for jtem in item: item_cnt += 1 groupby_dau_list = (key, item_cnt) rl.append(groupby_dau_list) # print rl for x in rl: # print x[0][0], x[0][1], x[1] appsource = x[0][0] appkey = x[0][1] data_type = data_type data_cnt = x[1] etl_time = time.strftime('%Y-%m-%d %X', time.localtime()) os.system("""/usr/bin/mysql -hwebserverip -P6603 -uwebserveruser -pwebserverpasswd -e "use funnyai_data; \ insert into bi_user_dau_static(data_date,appsource,appkey,data_type,data_cnt,etl_time) \ select '%s','%s','%s','%s','%s','%s'; \ " """ % (batch_date, appsource, appkey, data_type, data_cnt, etl_time)) ytd_dau_data = os.popen("""/usr/lib/hive-current/bin/hive -e " \ select appsource,appkey,identifier \ from bi_all_access_log \ where pt_day = '%s' " \ """ % (batch_date)).readlines(); # before_dau_data = os.popen("""/usr/lib/hive-current/bin/hive -e " \ # select appsource,appkey,identifier \ # from bi_all_access_log \ # where pt_day < '%s' " \ # """ # % (batch_date)).readlines(); dau_data_proc(ytd_dau_data, 'dau'); # dau_data_proc(before_dau_data, 'before-dau'); if __name__ == '__main__': for batch_date in sys.argv[1:]: print batch_date user_dau_proc(batch_date) now_time = time.strftime('%Y-%m-%d %X', time.localtime()) print "当前时间是:",now_time
5、终极留存的计算(带入平台、渠道维度)
/Users/nisj/PycharmProjects/EsDataProc/Hive_final_remain_par.py
# -*- coding=utf-8 -*- import warnings import datetime import time import os import re import sys warnings.filterwarnings("ignore") today = datetime.date.today() yesterday = today - datetime.timedelta(days=1) tomorrow = today + datetime.timedelta(days=1) # batch_date = today - datetime.timedelta(days=52) now_time = time.strftime('%Y-%m-%d %X', time.localtime()) print "当前时间是:",now_time def user_dau_proc(batch_date): os.system("""/usr/bin/mysql -hwebserverip -P6603 -uwebserveruser -pwebserverpasswd -e "use funnyai_data; \ delete from bi_final_remain_static where data_date='%s'; \ " """ % (batch_date)) def dau_data_proc(dau_data): dau_data = set(dau_data) dau_data = list(dau_data) dau_list = [] for dd_list in dau_data: ddl = re.split('\t',dd_list.replace('\n','')) dau_list.append(ddl) for x in dau_list: print x[0], x[1], x[2], x[3] appsource = x[0] appkey = x[1] ytd_dau = x[2] before_ytd_dau = x[3] etl_time = time.strftime('%Y-%m-%d %X', time.localtime()) os.system("""/usr/bin/mysql -hwebserverip -P6603 -uwebserveruser -pwebserverpasswd -e "use funnyai_data; \ insert into bi_final_remain_static(data_date,appsource,appkey,ytd_dau,before_ytd_dau,etl_time) \ select '%s','%s','%s','%s','%s','%s'; \ " """ % (batch_date, appsource, appkey, ytd_dau, before_ytd_dau, etl_time)) final_remain_data = os.popen("""/usr/lib/hive-current/bin/hive -e " \ with ytd_dau as(select appkey,appsource,count(distinct identifier) ytd_dau from bi_all_access_log where pt_day='%s' group by appkey,appsource), \ before_ytd_dau as (select appkey,appsource,count(distinct identifier) before_ytd_dau from bi_all_access_log where pt_day<'%s' group by appkey,appsource) \ select a1.appsource,a1.appkey,a1.ytd_dau,a2.before_ytd_dau \ from ytd_dau a1 \ left join before_ytd_dau a2 on a1.appkey=a2.appkey and a1.appsource=a2.appsource; "\ """ % (batch_date, batch_date)).readlines(); dau_data_proc(final_remain_data); if __name__ == '__main__': for batch_date in sys.argv[1:]: print batch_date user_dau_proc(batch_date) now_time = time.strftime('%Y-%m-%d %X', time.localtime()) print "当前时间是:",now_time
6、数据批量调度的方式
nohup python hive_remain_user.py 2016-10-01 2016-10-02 2016-10-03 2016-10-04 2016-10-05 2016-10-06 2016-10-07 2016-10-08 2016-10-09 2016-10-10 2016-10-11 2016-10-12 2016-10-13 2016-10-14 2016-10-15 2016-10-16 2016-10-17 2016-10-18 2016-10-19 2016-10-20 2016-10-21 2016-10-22 2016-10-23 2016-10-24 2016-10-25 2016-10-26 2016-10-27 2016-10-28 2016-10-29 2016-10-30 2016-10-31 &
相关文章推荐
- Hive使用脚本加载数据
- Hive SQL使用和数据加载的一点总结
- hive原生和复合类型的数据加载和使用
- Delphi中使用python脚本读取Excel数据
- hive原生和复合类型的数据加载和使用
- [python]使用python实现Hadoop MapReduce程序:计算一组数据的均值和方差
- Hive SQL使用和数据加载的一点总结
- Hive使用记录-建表、加载数据
- 在arcgis使用python脚本进行字段计算时是如何解决中文问题的
- Delphi中使用python脚本读取Excel数据
- 使用python和numpy重写计算uv分量脚本
- 通过Load table命令将数据文件加载到Sybase IQ数据库里面的Python脚本
- hive原生和复合类型的数据加载和使用
- hive原生和复合类型的数据加载和使用
- Hive SQL使用和数据加载
- 使用Perl或者Python加载Praat脚本在DOS命令窗口直接执行
- Python脚本---在 MySQL数据库中跑批加载多个表的数据
- hive使用python脚本导致java.io.IOException: Broken pipe异常退出
- [置顶] hive 动态加载数据到指定分区,以及其他hive使用的技巧
- Hive - 建表和加载数据指令小结 以及使用Load data指令的注意事项