您的位置:首页 > 编程语言 > Python开发

使用Python脚本从Hive中取数据计算后加载到Mysql示例

2016-12-21 11:05 746 查看
由于没有在服务器上安装Python库的权限,所以此文中采用了Os操作Hive及Mysql库的方式进行数据的读取和写入。
重点关注和学习:
Python接收和传送通过Os操作数据库的方式;

外部参数传送到Python并使用;

Python进行数据分组汇总;

利用Python的set对象进行数据的去重及交、并、差的操作;

Python的set、list、str之间的相互转换;

Python中函数中定义函数,及函数的调用、函数参数的使用;

“if __name__ == '__main__':”主程序入口的使用;

Python时间的打印,计算程序的执行时间;

等等。
1、新用户一月内每日留存的计算(无外部参数传入,内部定义)
/Users/nisj/PycharmProjects/EsDataProc/Hive_remain.py
# -*- coding=utf-8 -*-
import warnings
import datetime
import time
import os

warnings.filterwarnings("ignore")

today = datetime.date.today()
yesterday = today - datetime.timedelta(days=1)
tomorrow = today + datetime.timedelta(days=1)

batch_date = today - datetime.timedelta(days=52)

now_time = time.strftime('%Y-%m-%d %X', time.localtime())
print "当前时间是:",now_time

remain_static = {}

newuser_data = os.popen("""/usr/lib/hive-current/bin/hive -e " \
select a1.appsource,a1.appkey,a1.identifier from ( \
select appsource,appkey,identifier \
from bi_all_access_log \
where pt_day = '%s' ) a1 \
left join \
(select appsource,appkey,identifier \
from bi_all_access_log \
where pt_day < '%s' ) a2 on a1.appkey=a2.appkey and a1.identifier=a2.identifier and a1.appsource=a2.appsource \
where a2.identifier is null \
;" \
"""
%(batch_date,batch_date)).readlines();

remain_static['newuser_cnt'] = len(newuser_data)
for i in range(2,31):
sql_text = "select appsource,appkey,identifier from bi_all_access_log where pt_day = date_add('%s',%s);"%(batch_date,i)
print sql_text
day2user_data = os.popen("""/usr/lib/hive-current/bin/hive -e "%s" """%(sql_text)).readlines();
day2remain_data = set(newuser_data) & set(day2user_data)
remain_static['day%sremain_cnt'%(i)] = len(day2remain_data)
# print remain_static['newuser_cnt'], remain_static['day2remain_cnt'], remain_static['day3remain_cnt'], remain_static['day4remain_cnt'], remain_static['day5remain_cnt'], remain_static['day6remain_cnt']
os.system("""/usr/bin/mysql -hwebserverip -P6603 -uwebserveruser -pwebserverpasswd -e "use funnyai_data;delete from bi_remain_user_static where data_date='%s'; \
insert into bi_remain_user_static(data_date,newuser_cnt,day2remain_cnt,day3remain_cnt,day4remain_cnt,day5remain_cnt,day6remain_cnt,day7remain_cnt,day8remain_cnt,day9remain_cnt,day10remain_cnt,day11remain_cnt,day12remain_cnt,day13remain_cnt,day14remain_cnt,day15remain_cnt,day16remain_cnt,day17remain_cnt,day18remain_cnt,day19remain_cnt,day20remain_cnt,day21remain_cnt,day22remain_cnt,day23remain_cnt,day24remain_cnt,day25remain_cnt,day26remain_cnt,day27remain_cnt,day28remain_cnt,day29remain_cnt,day30remain_cnt) \
select '%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s'; \
" """ % (batch_date, batch_date, remain_static['newuser_cnt'], remain_static['day2remain_cnt'], remain_static['day3remain_cnt'], remain_static['day4remain_cnt'], remain_static['day5remain_cnt'], remain_static['day6remain_cnt'], remain_static['day7remain_cnt'], remain_static['day8remain_cnt'], remain_static['day9remain_cnt'], remain_static['day10remain_cnt'], remain_static['day11remain_cnt'], remain_static['day12remain_cnt'], remain_static['day13remain_cnt'], remain_static['day14remain_cnt'], remain_static['day15remain_cnt'], remain_static['day16remain_cnt'], remain_static['day17remain_cnt'], remain_static['day18remain_cnt'], remain_static['day19remain_cnt'], remain_static['day20remain_cnt'], remain_static['day21remain_cnt'], remain_static['day22remain_cnt'], remain_static['day23remain_cnt'], remain_static['day24remain_cnt'], remain_static['day25remain_cnt'], remain_static['day26remain_cnt'], remain_static['day27remain_cnt'], remain_static['day28remain_cnt'], remain_static['day29remain_cnt'], remain_static['day30remain_cnt']))
now_time = time.strftime('%Y-%m-%d %X', time.localtime())
print "当前时间是:",now_time

2、新用户一月内每日留存的计算(外部参数传入)
/Users/nisj/PycharmProjects/EsDataProc/Hive_remain_par.py
# -*- coding=utf-8 -*-
import warnings
import datetime
import time
import os
import sys

warnings.filterwarnings("ignore")

today = datetime.date.today()
yesterday = today - datetime.timedelta(days=1)
tomorrow = today + datetime.timedelta(days=1)

# batch_date = today - datetime.timedelta(days=52)

now_time = time.strftime('%Y-%m-%d %X', time.localtime())
print "当前时间是:",now_time

def user_remain_proc(batch_date):
remain_static = {}

newuser_data = os.popen("""/usr/lib/hive-current/bin/hive -e " \
select a1.appsource,a1.appkey,a1.identifier from ( \
select appsource,appkey,identifier \
from bi_all_access_log \
where pt_day = '%s' ) a1 \
left join \
(select appsource,appkey,identifier \
from bi_all_access_log \
where pt_day < '%s' ) a2 on a1.appkey=a2.appkey and a1.identifier=a2.identifier and a1.appsource=a2.appsource \
where a2.identifier is null \
;" \
"""
%(batch_date,batch_date)).readlines();

remain_static['newuser_cnt'] = len(newuser_data)
for i in range(2,31):
sql_text = "select appsource,appkey,identifier from bi_all_access_log where pt_day = date_add('%s',%s);"%(batch_date,i)
print sql_text
day2user_data = os.popen("""/usr/lib/hive-current/bin/hive -e "%s" """%(sql_text)).readlines();
day2remain_data = set(newuser_data) & set(day2user_data)
remain_static['day%sremain_cnt'%(i)] = len(day2remain_data)

os.system("""/usr/bin/mysql -hwebserverip -P6603 -uwebserveruser -pwebserverpasswd -e "use funnyai_data;delete from bi_remain_user_static where data_date='%s'; \
insert into bi_remain_user_static(data_date,newuser_cnt,day2remain_cnt,day3remain_cnt,day4remain_cnt,day5remain_cnt,day6remain_cnt,day7remain_cnt,day8remain_cnt,day9remain_cnt,day10remain_cnt,day11remain_cnt,day12remain_cnt,day13remain_cnt,day14remain_cnt,day15remain_cnt,day16remain_cnt,day17remain_cnt,day18remain_cnt,day19remain_cnt,day20remain_cnt,day21remain_cnt,day22remain_cnt,day23remain_cnt,day24remain_cnt,day25remain_cnt,day26remain_cnt,day27remain_cnt,day28remain_cnt,day29remain_cnt,day30remain_cnt) \
select '%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s'; \
" """ % (batch_date, batch_date, remain_static['newuser_cnt'], remain_static['day2remain_cnt'], remain_static['day3remain_cnt'], remain_static['day4remain_cnt'], remain_static['day5remain_cnt'], remain_static['day6remain_cnt'], remain_static['day7remain_cnt'], remain_static['day8remain_cnt'], remain_static['day9remain_cnt'], remain_static['day10remain_cnt'], remain_static['day11remain_cnt'], remain_static['day12remain_cnt'], remain_static['day13remain_cnt'], remain_static['day14remain_cnt'], remain_static['day15remain_cnt'], remain_static['day16remain_cnt'], remain_static['day17remain_cnt'], remain_static['day18remain_cnt'], remain_static['day19remain_cnt'], remain_static['day20remain_cnt'], remain_static['day21remain_cnt'], remain_static['day22remain_cnt'], remain_static['day23remain_cnt'], remain_static['day24remain_cnt'], remain_static['day25remain_cnt'], remain_static['day26remain_cnt'], remain_static['day27remain_cnt'], remain_static['day28remain_cnt'], remain_static['day29remain_cnt'], remain_static['day30remain_cnt']))

if __name__ == '__main__':
for batch_date in sys.argv[1:]:
print batch_date
user_remain_proc(batch_date)

now_time = time.strftime('%Y-%m-%d %X', time.localtime())
print "当前时间是:",now_time

3、新用户一月内每日留存的计算(带入平台、渠道维度)
/Users/nisj/PycharmProjects/EsDataProc/Hive_remain_group_par.py
# -*- coding=utf-8 -*-
import warnings
import datetime
import time
import os
import re
import sys
from itertools import groupby
from operator import itemgetter

warnings.filterwarnings("ignore")

today = datetime.date.today()
yesterday = today - datetime.timedelta(days=1)
tomorrow = today + datetime.timedelta(days=1)

# batch_date = today - datetime.timedelta(days=52)

now_time = time.strftime('%Y-%m-%d %X', time.localtime())
print "当前时间是:",now_time

def user_remain_proc(batch_date):
os.system("""/usr/bin/mysql -hwebserverip -P6603 -uwebserveruser -pwebserverpasswd -e "use funnyai_data; \
delete from bi_user_remain_static where data_date='%s'; \
" """ % (batch_date))

newuser_data = os.popen("""/usr/lib/hive-current/bin/hive -e " \
select a1.appsource,a1.appkey,a1.identifier from ( \
select appsource,appkey,identifier \
from bi_all_access_log \
where pt_day = '%s' ) a1 \
left join \
(select appsource,appkey,identifier \
from bi_all_access_log \
where pt_day < '%s' ) a2 on a1.appkey=a2.appkey and a1.identifier=a2.identifier and a1.appsource=a2.appsource \
where a2.identifier is null \
;" \
"""
%(batch_date,batch_date)).readlines();

for i in range(31):
sql_text = "select appsource,appkey,identifier from bi_all_access_log where pt_day = date_add('%s',%s) ;"%(batch_date, i)
print sql_text
day2user_data = os.popen("""/usr/lib/hive-current/bin/hive -e "%s" """%(sql_text)).readlines();
day2remain_data = set(newuser_data) & set(day2user_data)
day2remain_data = list(day2remain_data)
day2remain_list = []
for d2rm_list in day2remain_data:
d2l = re.split('\t',d2rm_list.replace('\n',''))
day2remain_list.append(d2l)

day2remain_data_sorted = sorted(day2remain_list, key=itemgetter(0, 1))

groupby_day2remain_data = groupby(day2remain_data_sorted, key=itemgetter(0, 1))

rl = []
for key, item in groupby_day2remain_data:
item_cnt = 0
for jtem in item:
item_cnt += 1
groupby_day2remain_list = (key, item_cnt)
rl.append(groupby_day2remain_list)
# print rl

for x in rl:
print x[0][0], x[0][1], x[1]
appsource = x[0][0]
appkey = x[0][1]
data_type = "day(%s)remain_cnt" % (i)
data_cnt = x[1]
etl_time = time.strftime('%Y-%m-%d %X', time.localtime())

os.system("""/usr/bin/mysql -hwebserverip -P6603 -uwebserveruser -pwebserverpasswd -e "use funnyai_data; \
insert into bi_user_remain_static(data_date,appsource,appkey,data_type,data_cnt,etl_time) \
select '%s','%s','%s','%s','%s','%s'; \
" """ % (batch_date, appsource, appkey, data_type, data_cnt, etl_time))

if __name__ == '__main__':
for batch_date in sys.argv[1:]:
print batch_date
user_remain_proc(batch_date)

now_time = time.strftime('%Y-%m-%d %X', time.localtime())
print "当前时间是:",now_time

4、DAU的计算(带入平台、渠道维度)
/Users/nisj/PycharmProjects/EsDataProc/Hive_dau_group_par.py
# -*- coding=utf-8 -*-
import warnings
import datetime
import time
import os
import re
import sys
from itertools import groupby
from operator import itemgetter

warnings.filterwarnings("ignore")

today = datetime.date.today()
yesterday = today - datetime.timedelta(days=1)
tomorrow = today + datetime.timedelta(days=1)

# batch_date = today - datetime.timedelta(days=52)

now_time = time.strftime('%Y-%m-%d %X', time.localtime())
print "当前时间是:",now_time

def user_dau_proc(batch_date):
os.system("""/usr/bin/mysql -hwebserverip -P6603 -uwebserveruser -pwebserverpasswd -e "use funnyai_data; \
delete from bi_user_dau_static where data_date='%s'; \
" """ % (batch_date))

def dau_data_proc(dau_data, data_type):
dau_data = set(dau_data)
dau_data = list(dau_data)
dau_list = []
for dd_list in dau_data:
ddl = re.split('\t',dd_list.replace('\n',''))
dau_list.append(ddl)

dau_data_sorted = sorted(dau_list, key=itemgetter(0, 1))

groupby_dau_data = groupby(dau_data_sorted, key=itemgetter(0, 1))

rl = []
for key, item in groupby_dau_data:
item_cnt = 0
for jtem in item:
item_cnt += 1
groupby_dau_list = (key, item_cnt)
rl.append(groupby_dau_list)
# print rl

for x in rl:
# print x[0][0], x[0][1], x[1]
appsource = x[0][0]
appkey = x[0][1]
data_type = data_type
data_cnt = x[1]
etl_time = time.strftime('%Y-%m-%d %X', time.localtime())

os.system("""/usr/bin/mysql -hwebserverip -P6603 -uwebserveruser -pwebserverpasswd -e "use funnyai_data; \
insert into bi_user_dau_static(data_date,appsource,appkey,data_type,data_cnt,etl_time) \
select '%s','%s','%s','%s','%s','%s'; \
" """ % (batch_date, appsource, appkey, data_type, data_cnt, etl_time))

ytd_dau_data = os.popen("""/usr/lib/hive-current/bin/hive -e " \
select appsource,appkey,identifier \
from bi_all_access_log \
where pt_day = '%s' " \
"""
% (batch_date)).readlines();

# before_dau_data = os.popen("""/usr/lib/hive-current/bin/hive -e " \
#         select appsource,appkey,identifier \
#         from bi_all_access_log \
#         where pt_day < '%s' " \
#         """
#                            % (batch_date)).readlines();

dau_data_proc(ytd_dau_data, 'dau');
# dau_data_proc(before_dau_data, 'before-dau');

if __name__ == '__main__':
for batch_date in sys.argv[1:]:
print batch_date
user_dau_proc(batch_date)

now_time = time.strftime('%Y-%m-%d %X', time.localtime())
print "当前时间是:",now_time

5、终极留存的计算(带入平台、渠道维度)
/Users/nisj/PycharmProjects/EsDataProc/Hive_final_remain_par.py
# -*- coding=utf-8 -*-
import warnings
import datetime
import time
import os
import re
import sys

warnings.filterwarnings("ignore")

today = datetime.date.today()
yesterday = today - datetime.timedelta(days=1)
tomorrow = today + datetime.timedelta(days=1)

# batch_date = today - datetime.timedelta(days=52)

now_time = time.strftime('%Y-%m-%d %X', time.localtime())
print "当前时间是:",now_time

def user_dau_proc(batch_date):
os.system("""/usr/bin/mysql -hwebserverip -P6603 -uwebserveruser -pwebserverpasswd -e "use funnyai_data; \
delete from bi_final_remain_static where data_date='%s'; \
" """ % (batch_date))

def dau_data_proc(dau_data):
dau_data = set(dau_data)
dau_data = list(dau_data)
dau_list = []
for dd_list in dau_data:
ddl = re.split('\t',dd_list.replace('\n',''))
dau_list.append(ddl)

for x in dau_list:
print x[0], x[1], x[2], x[3]
appsource = x[0]
appkey = x[1]
ytd_dau = x[2]
before_ytd_dau = x[3]
etl_time = time.strftime('%Y-%m-%d %X', time.localtime())

os.system("""/usr/bin/mysql -hwebserverip -P6603 -uwebserveruser -pwebserverpasswd -e "use funnyai_data; \
insert into bi_final_remain_static(data_date,appsource,appkey,ytd_dau,before_ytd_dau,etl_time) \
select '%s','%s','%s','%s','%s','%s'; \
" """ % (batch_date, appsource, appkey, ytd_dau, before_ytd_dau, etl_time))

final_remain_data = os.popen("""/usr/lib/hive-current/bin/hive -e " \
with ytd_dau as(select appkey,appsource,count(distinct identifier) ytd_dau from bi_all_access_log where pt_day='%s' group by appkey,appsource), \
before_ytd_dau as (select appkey,appsource,count(distinct identifier) before_ytd_dau from bi_all_access_log where pt_day<'%s' group by appkey,appsource) \
select a1.appsource,a1.appkey,a1.ytd_dau,a2.before_ytd_dau \
from ytd_dau a1 \
left join before_ytd_dau a2 on a1.appkey=a2.appkey and a1.appsource=a2.appsource; "\
"""
% (batch_date, batch_date)).readlines();

dau_data_proc(final_remain_data);

if __name__ == '__main__':
for batch_date in sys.argv[1:]:
print batch_date
user_dau_proc(batch_date)

now_time = time.strftime('%Y-%m-%d %X', time.localtime())
print "当前时间是:",now_time

6、数据批量调度的方式
nohup python hive_remain_user.py 2016-10-01 2016-10-02 2016-10-03 2016-10-04 2016-10-05 2016-10-06 2016-10-07 2016-10-08 2016-10-09 2016-10-10 2016-10-11 2016-10-12 2016-10-13 2016-10-14 2016-10-15 2016-10-16 2016-10-17 2016-10-18 2016-10-19 2016-10-20 2016-10-21 2016-10-22 2016-10-23 2016-10-24 2016-10-25 2016-10-26 2016-10-27 2016-10-28 2016-10-29 2016-10-30 2016-10-31 &
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: