您的位置:首页 > 编程语言 > Python开发

Python进行主播拉新相关数据统计的脚本

2017-10-26 15:36 579 查看
脚本中主要关注点:
1、Mysql批量数据插入

2、日期范围内的列表清单

3、更多的是蕴含在内的业务逻辑
/Users/nisj/PycharmProjects/BiDataProc/Demand/hadoopStat/anchorBringnew.py
# -*- coding=utf-8 -*-
import datetime
import time
import os
import warnings
import sys
import re
reload(sys)
sys.setdefaultencoding('utf8')

warnings.filterwarnings("ignore")

yesterday = (datetime.date.today() - datetime.timedelta(days=1)).strftime('%Y-%m-%d')

def getDay1BeforeAndYesterday(runDay):
yesterday = (datetime.date.today() - datetime.timedelta(days=1)).strftime('%Y-%m-%d')
day1Before = (datetime.datetime.strptime(runDay, '%Y-%m-%d') - datetime.timedelta(days=1)).strftime('%Y-%m-%d')
return yesterday, day1Before

def dateRange(beginDate, endDate):
dates = []
dt = datetime.datetime.strptime(beginDate, "%Y-%m-%d")
date = beginDate[:]
while date <= endDate:
dates.append(date)
dt = dt + datetime.timedelta(1)
date = dt.strftime("%Y-%m-%d")
return dates

def anchorBringnewDetail(runDay):
yesterday=getDay1BeforeAndYesterday(runDay)[0]
os.system("""source /etc/profile; \
/usr/lib/hive-current/bin/hive -e " \
alter table hs_anchor_bringnew_detail drop if exists partition(pt_day='{runDay}'); \
alter table hs_anchor_bringnew_detail add partition(pt_day='{runDay}'); \
with tab_user_frist_subscriber as ( \
select room_id,fans_uid,state,first_subscriber_date \
from (select room_id,uid fans_uid,state,substr(created_time,1,10) first_subscriber_date,row_number()over(partition by uid order by created_time asc) rk from oss_room_subscriber_roomid where pt_day='{yesterday}') x \
where rk=1 and first_subscriber_date='{runDay}'), \
tab_user_info as ( \
select a2.nickname,a1.id room_id,a2.uid anchor_uid,a2.last_login_time,a1.is_profession \
from oss_room_v2 a1 \
left join oss_chushou_user_profile a2 on a1.creator_uid=a2.uid \
where a1.pt_day='{yesterday}' and a2.pt_day='{yesterday}'), \
tab_live_info as( \
select a1.room_id,sum(live_minute) anthor_live_time \
from (select room_id,hour(time_interval)*60+minute(time_interval)+second(time_interval)/60 live_minute from ( \
select room_id,cast(updated_time as timestamp)-cast(switch_time as timestamp) time_interval \
from honeycomb_all_live_history_status \
where pt_day='{runDay}' \
) x) a1 \
group by a1.room_id \
) \
insert into table hs_anchor_bringnew_detail partition (pt_day='{runDay}') \
select a1.first_subscriber_date calc_date,a1.room_id,a2.anchor_uid,a2.nickname,count(distinct a1.fans_uid) fans_add_cnt,a3.anthor_live_time anthor_live_time \
from tab_user_frist_subscriber a1 \
left join tab_user_info a2 on a1.room_id=a2.room_id \
left join tab_live_info a3 on a1.room_id=a3.room_id \
group by a1.first_subscriber_date,a1.room_id,a2.anchor_uid,a2.nickname,a3.anthor_live_time \
order by a1.first_subscriber_date,fans_add_cnt desc \
; \
" """.format(runDay=runDay, yesterday=yesterday));

def anchorBringnewDetail2Mysql(runDay):
day1Before=getDay1BeforeAndYesterday(runDay)[1]
anchorBringnews=os.popen("""source /etc/profile; \
/usr/lib/hive-current/bin/hive -e " \
select a1.calc_date,row_number()over(order by a1.fans_add_cnt desc) fans_add_rank,a1.room_id,a1.anchor_uid,a1.nickname,a1.fans_add_cnt,case when a1.anthor_live_time is null then 0 else a1.anthor_live_time end,a1.fans_add_cnt - case when a2.fans_add_cnt is null then 0 else a2.fans_add_cnt end fans_add_changes,case when a1.anthor_live_time is null then 0 else a1.anthor_live_time end - case when a2.anthor_live_time is null then 0 else a2.anthor_live_time end live_long_changes \
from hs_anchor_bringnew_detail a1 \
left join hs_anchor_bringnew_detail a2 on a1.room_id=a2.room_id \
where a1.calc_date='{runDay}' and a2.calc_date='{day1Before}' \
; \
" """.format(runDay=runDay, day1Before=day1Before)).readlines();

anchorBringnew_list = []
for anchorBringnewList in anchorBringnews:
anchorBringnew = re.split('\t', anchorBringnewList.replace('\n', ''))
anchorBringnew_list.append(anchorBringnew)

# data rollback
os.system("""source /etc/profile; \
/usr/bin/mysql  -hMysqlHost -P6605 -uMysqlUser -pMysqlPass --default-character-set=utf8 -e "use jellyfish_hadoop_stat; \
delete from jellyfish_hadoop_stat.anchor_bringnew_detail where calc_date='{runDay}' \
" """.format(runDay=runDay))

i = 0
insert_sql_text = "insert into jellyfish_hadoop_stat.anchor_bringnew_detail(calc_date,fans_add_rank,room_id,anchor_uid,nickname,fans_add_cnt,anthor_live_time,fans_add_changes,live_long_changes,etl_time) values "
for anchorBringnew in anchorBringnew_list:
calc_date=anchorBringnew[0]
fans_add_rank=anchorBringnew[1]
room_id=anchorBringnew[2]
anchor_uid=anchorBringnew[3]
nickname=str(anchorBringnew[4]).replace('\n', '').replace('`', '').replace('\'', '').replace('"', '')
fans_add_cnt=anchorBringnew[5]
anthor_live_time=anchorBringnew[6]
fans_add_changes=anchorBringnew[7]
live_long_changes=anchorBringnew[8]
etl_time=time.strftime('%Y-%m-%d %X', time.localtime())

i += 1

insert_sql_text = insert_sql_text + "('{calc_date}',{fans_add_rank},{room_id},{anchor_uid},'{nickname}',{fans_add_cnt},{anthor_live_time},{fans_add_changes},{live_long_changes},'{etl_time}'),".format(calc_date=calc_date, fans_add_rank=fans_add_rank, room_id=room_id, anchor_uid=anchor_uid, nickname=nickname, fans_add_cnt=fans_add_cnt, anthor_live_time=anthor_live_time, fans_add_changes=fans_add_changes, live_long_changes=live_long_changes, etl_time=etl_time)

if (i % 800 == 0):
insert_sql_text = insert_sql_text[0:-1] + ";"
os.system("""source /etc/profile; \
/usr/bin/mysql  -hMysqlHost -P6605 -uMysqlUser -pMysqlPass --default-character-set=utf8 -e "use jellyfish_hadoop_stat; \
{insert_sql_text} \
" """.format(insert_sql_text=insert_sql_text))

insert_sql_text = "insert into jellyfish_hadoop_stat.anchor_bringnew_detail(calc_date,fans_add_rank,room_id,anchor_uid,nickname,fans_add_cnt,anthor_live_time,fans_add_changes,live_long_changes,etl_time) values "

insert_sql_text = insert_sql_text[0:-1] + ";"
os.system("""source /etc/profile; \
/usr/bin/mysql  -hMysqlHost -P6605 -uMysqlUser -pMysqlPass --default-character-set=utf8 -e "use jellyfish_hadoop_stat; \
{insert_sql_text} \
" """.format(insert_sql_text=insert_sql_text))

# os.system("""source /etc/profile; \
#             /usr/bin/mysql  -hMysqlHost -P6605 -uMysqlUser -pMysqlPass --default-character-set=utf8 -e "insert into jellyfish_hadoop_stat.anchor_bringnew_detail(calc_date,fans_add_rank,room_id,anchor_uid,nickname,fans_add_cnt,anthor_live_time,fans_add_changes,live_long_changes,etl_time) \
#             values('{calc_date}',{fans_add_rank},{room_id},{anchor_uid},'{nickname}',{fans_add_cnt},{anthor_live_time},{fans_add_changes},{live_long_changes},'{etl_time}'); \
#             " """.format(calc_date=calc_date, fans_add_rank=fans_add_rank, room_id=room_id, anchor_uid=anchor_uid, nickname=nickname, fans_add_cnt=fans_add_cnt, anthor_live_time=anthor_live_time, fans_add_changes=fans_add_changes, live_long_changes=live_long_changes, etl_time=etl_time));

# Batch Test
# anchorBringnewDetail(runDay='{runDay}')
# anchorBringnewDetail2Mysql(runDay='2017-10-02')

for runDay in dateRange(beginDate='2017-09-30', endDate='2017-10-09'):
anchorBringnewDetail(runDay=runDay)
anchorBringnewDetail2Mysql(runDay=runDay)

附:建表语句及最后的展现查询语句
Hive目标明细表建表
drop table if exists hs_anchor_bringnew_detail;
CREATE TABLE hs_anchor_bringnew_detail(
calc_date DATE,
room_id BIGINT,
anchor_uid BIGINT,
nickname string,
fans_add_cnt int,
anthor_live_time DECIMAL(38,10))
PARTITIONED BY (
pt_day string)
;
Mysql目标明细表建表
drop table if exists anchor_bringnew_detail;
CREATE TABLE anchor_bringnew_detail (
calc_date date DEFAULT NULL COMMENT '统计日期',
fans_add_rank int(11) DEFAULT NULL COMMENT '排名',
room_id int(11) DEFAULT NULL COMMENT '房间号',
anchor_uid bigint(20) DEFAULT NULL COMMENT '主播UID',
nickname varchar(200) DEFAULT NULL COMMENT '主播昵称',
fans_add_cnt int(11) DEFAULT NULL COMMENT '自带粉丝增加数',
anthor_live_time decimal(38,10) DEFAULT NULL COMMENT '直播时长',
fans_add_changes int(11) DEFAULT NULL COMMENT '粉丝增加与昨天相比变化量',
live_long_changes decimal(38,10) DEFAULT NULL COMMENT '直播时长与昨天相比变化量',
etl_time datetime DEFAULT CURRENT_TIMESTAMP COMMENT '数据跑批时间',
UNIQUE KEY idx_prikey (calc_date,room_id) USING BTREE COMMENT '业务主键索引'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
;
汇总查询语句
select calc_date,count(room_id) anchor_cnt,sum(anthor_live_time) anthor_live_time,sum(fans_add_cnt) fans_add_cnt,sum(fans_add_changes) fans_add_changes,sum(fans_add_cnt)/count(room_id) fans_add_per
from anchor_bringnew_detail
-- where calc_date='2017-10-02'
group by calc_date
;
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: