Python进行主播拉新相关数据统计的脚本
2017-10-26 15:36
579 查看
脚本中主要关注点:
1、Mysql批量数据插入
2、日期范围内的列表清单
3、更多的是蕴含在内的业务逻辑
/Users/nisj/PycharmProjects/BiDataProc/Demand/hadoopStat/anchorBringnew.py
附:建表语句及最后的展现查询语句
Hive目标明细表建表
1、Mysql批量数据插入
2、日期范围内的列表清单
3、更多的是蕴含在内的业务逻辑
/Users/nisj/PycharmProjects/BiDataProc/Demand/hadoopStat/anchorBringnew.py
# -*- coding=utf-8 -*- import datetime import time import os import warnings import sys import re reload(sys) sys.setdefaultencoding('utf8') warnings.filterwarnings("ignore") yesterday = (datetime.date.today() - datetime.timedelta(days=1)).strftime('%Y-%m-%d') def getDay1BeforeAndYesterday(runDay): yesterday = (datetime.date.today() - datetime.timedelta(days=1)).strftime('%Y-%m-%d') day1Before = (datetime.datetime.strptime(runDay, '%Y-%m-%d') - datetime.timedelta(days=1)).strftime('%Y-%m-%d') return yesterday, day1Before def dateRange(beginDate, endDate): dates = [] dt = datetime.datetime.strptime(beginDate, "%Y-%m-%d") date = beginDate[:] while date <= endDate: dates.append(date) dt = dt + datetime.timedelta(1) date = dt.strftime("%Y-%m-%d") return dates def anchorBringnewDetail(runDay): yesterday=getDay1BeforeAndYesterday(runDay)[0] os.system("""source /etc/profile; \ /usr/lib/hive-current/bin/hive -e " \ alter table hs_anchor_bringnew_detail drop if exists partition(pt_day='{runDay}'); \ alter table hs_anchor_bringnew_detail add partition(pt_day='{runDay}'); \ with tab_user_frist_subscriber as ( \ select room_id,fans_uid,state,first_subscriber_date \ from (select room_id,uid fans_uid,state,substr(created_time,1,10) first_subscriber_date,row_number()over(partition by uid order by created_time asc) rk from oss_room_subscriber_roomid where pt_day='{yesterday}') x \ where rk=1 and first_subscriber_date='{runDay}'), \ tab_user_info as ( \ select a2.nickname,a1.id room_id,a2.uid anchor_uid,a2.last_login_time,a1.is_profession \ from oss_room_v2 a1 \ left join oss_chushou_user_profile a2 on a1.creator_uid=a2.uid \ where a1.pt_day='{yesterday}' and a2.pt_day='{yesterday}'), \ tab_live_info as( \ select a1.room_id,sum(live_minute) anthor_live_time \ from (select room_id,hour(time_interval)*60+minute(time_interval)+second(time_interval)/60 live_minute from ( \ select room_id,cast(updated_time as timestamp)-cast(switch_time as timestamp) time_interval \ from honeycomb_all_live_history_status \ where pt_day='{runDay}' \ ) x) a1 \ group by a1.room_id \ ) \ insert into table hs_anchor_bringnew_detail partition (pt_day='{runDay}') \ select a1.first_subscriber_date calc_date,a1.room_id,a2.anchor_uid,a2.nickname,count(distinct a1.fans_uid) fans_add_cnt,a3.anthor_live_time anthor_live_time \ from tab_user_frist_subscriber a1 \ left join tab_user_info a2 on a1.room_id=a2.room_id \ left join tab_live_info a3 on a1.room_id=a3.room_id \ group by a1.first_subscriber_date,a1.room_id,a2.anchor_uid,a2.nickname,a3.anthor_live_time \ order by a1.first_subscriber_date,fans_add_cnt desc \ ; \ " """.format(runDay=runDay, yesterday=yesterday)); def anchorBringnewDetail2Mysql(runDay): day1Before=getDay1BeforeAndYesterday(runDay)[1] anchorBringnews=os.popen("""source /etc/profile; \ /usr/lib/hive-current/bin/hive -e " \ select a1.calc_date,row_number()over(order by a1.fans_add_cnt desc) fans_add_rank,a1.room_id,a1.anchor_uid,a1.nickname,a1.fans_add_cnt,case when a1.anthor_live_time is null then 0 else a1.anthor_live_time end,a1.fans_add_cnt - case when a2.fans_add_cnt is null then 0 else a2.fans_add_cnt end fans_add_changes,case when a1.anthor_live_time is null then 0 else a1.anthor_live_time end - case when a2.anthor_live_time is null then 0 else a2.anthor_live_time end live_long_changes \ from hs_anchor_bringnew_detail a1 \ left join hs_anchor_bringnew_detail a2 on a1.room_id=a2.room_id \ where a1.calc_date='{runDay}' and a2.calc_date='{day1Before}' \ ; \ " """.format(runDay=runDay, day1Before=day1Before)).readlines(); anchorBringnew_list = [] for anchorBringnewList in anchorBringnews: anchorBringnew = re.split('\t', anchorBringnewList.replace('\n', '')) anchorBringnew_list.append(anchorBringnew) # data rollback os.system("""source /etc/profile; \ /usr/bin/mysql -hMysqlHost -P6605 -uMysqlUser -pMysqlPass --default-character-set=utf8 -e "use jellyfish_hadoop_stat; \ delete from jellyfish_hadoop_stat.anchor_bringnew_detail where calc_date='{runDay}' \ " """.format(runDay=runDay)) i = 0 insert_sql_text = "insert into jellyfish_hadoop_stat.anchor_bringnew_detail(calc_date,fans_add_rank,room_id,anchor_uid,nickname,fans_add_cnt,anthor_live_time,fans_add_changes,live_long_changes,etl_time) values " for anchorBringnew in anchorBringnew_list: calc_date=anchorBringnew[0] fans_add_rank=anchorBringnew[1] room_id=anchorBringnew[2] anchor_uid=anchorBringnew[3] nickname=str(anchorBringnew[4]).replace('\n', '').replace('`', '').replace('\'', '').replace('"', '') fans_add_cnt=anchorBringnew[5] anthor_live_time=anchorBringnew[6] fans_add_changes=anchorBringnew[7] live_long_changes=anchorBringnew[8] etl_time=time.strftime('%Y-%m-%d %X', time.localtime()) i += 1 insert_sql_text = insert_sql_text + "('{calc_date}',{fans_add_rank},{room_id},{anchor_uid},'{nickname}',{fans_add_cnt},{anthor_live_time},{fans_add_changes},{live_long_changes},'{etl_time}'),".format(calc_date=calc_date, fans_add_rank=fans_add_rank, room_id=room_id, anchor_uid=anchor_uid, nickname=nickname, fans_add_cnt=fans_add_cnt, anthor_live_time=anthor_live_time, fans_add_changes=fans_add_changes, live_long_changes=live_long_changes, etl_time=etl_time) if (i % 800 == 0): insert_sql_text = insert_sql_text[0:-1] + ";" os.system("""source /etc/profile; \ /usr/bin/mysql -hMysqlHost -P6605 -uMysqlUser -pMysqlPass --default-character-set=utf8 -e "use jellyfish_hadoop_stat; \ {insert_sql_text} \ " """.format(insert_sql_text=insert_sql_text)) insert_sql_text = "insert into jellyfish_hadoop_stat.anchor_bringnew_detail(calc_date,fans_add_rank,room_id,anchor_uid,nickname,fans_add_cnt,anthor_live_time,fans_add_changes,live_long_changes,etl_time) values " insert_sql_text = insert_sql_text[0:-1] + ";" os.system("""source /etc/profile; \ /usr/bin/mysql -hMysqlHost -P6605 -uMysqlUser -pMysqlPass --default-character-set=utf8 -e "use jellyfish_hadoop_stat; \ {insert_sql_text} \ " """.format(insert_sql_text=insert_sql_text)) # os.system("""source /etc/profile; \ # /usr/bin/mysql -hMysqlHost -P6605 -uMysqlUser -pMysqlPass --default-character-set=utf8 -e "insert into jellyfish_hadoop_stat.anchor_bringnew_detail(calc_date,fans_add_rank,room_id,anchor_uid,nickname,fans_add_cnt,anthor_live_time,fans_add_changes,live_long_changes,etl_time) \ # values('{calc_date}',{fans_add_rank},{room_id},{anchor_uid},'{nickname}',{fans_add_cnt},{anthor_live_time},{fans_add_changes},{live_long_changes},'{etl_time}'); \ # " """.format(calc_date=calc_date, fans_add_rank=fans_add_rank, room_id=room_id, anchor_uid=anchor_uid, nickname=nickname, fans_add_cnt=fans_add_cnt, anthor_live_time=anthor_live_time, fans_add_changes=fans_add_changes, live_long_changes=live_long_changes, etl_time=etl_time)); # Batch Test # anchorBringnewDetail(runDay='{runDay}') # anchorBringnewDetail2Mysql(runDay='2017-10-02') for runDay in dateRange(beginDate='2017-09-30', endDate='2017-10-09'): anchorBringnewDetail(runDay=runDay) anchorBringnewDetail2Mysql(runDay=runDay)
附:建表语句及最后的展现查询语句
Hive目标明细表建表
drop table if exists hs_anchor_bringnew_detail; CREATE TABLE hs_anchor_bringnew_detail( calc_date DATE, room_id BIGINT, anchor_uid BIGINT, nickname string, fans_add_cnt int, anthor_live_time DECIMAL(38,10)) PARTITIONED BY ( pt_day string) ;Mysql目标明细表建表
drop table if exists anchor_bringnew_detail; CREATE TABLE anchor_bringnew_detail ( calc_date date DEFAULT NULL COMMENT '统计日期', fans_add_rank int(11) DEFAULT NULL COMMENT '排名', room_id int(11) DEFAULT NULL COMMENT '房间号', anchor_uid bigint(20) DEFAULT NULL COMMENT '主播UID', nickname varchar(200) DEFAULT NULL COMMENT '主播昵称', fans_add_cnt int(11) DEFAULT NULL COMMENT '自带粉丝增加数', anthor_live_time decimal(38,10) DEFAULT NULL COMMENT '直播时长', fans_add_changes int(11) DEFAULT NULL COMMENT '粉丝增加与昨天相比变化量', live_long_changes decimal(38,10) DEFAULT NULL COMMENT '直播时长与昨天相比变化量', etl_time datetime DEFAULT CURRENT_TIMESTAMP COMMENT '数据跑批时间', UNIQUE KEY idx_prikey (calc_date,room_id) USING BTREE COMMENT '业务主键索引' ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 ;汇总查询语句
select calc_date,count(room_id) anchor_cnt,sum(anthor_live_time) anthor_live_time,sum(fans_add_cnt) fans_add_cnt,sum(fans_add_changes) fans_add_changes,sum(fans_add_cnt)/count(room_id) fans_add_per from anchor_bringnew_detail -- where calc_date='2017-10-02' group by calc_date ;
相关文章推荐
- Python脚本进行主播招募相关数据统计的案例
- Python进行主播收入统计的脚本
- python进行数据分析------相关分析
- 通过Python根据汉语水平词汇与汉字等级大纲进行数据统计
- Python数据统计脚本
- Mysql分表数据通过Python进行汇总统计
- 用python对文本格式的数据进行统计处理
- python数据统计脚本实例mysql,redis
- Python从阿里云Oss拉数据写入Hive表并进行相关处理
- Python数据分析模块 | pandas做数据分析(三):统计相关函数
- 利用Python进行数据分析_python3实现_pandas入门_相关系数与协方差
- Python中的数据类型转换举例及脚本统计服务器内存实例
- python把csv数据做成列表、字典类型的数据进行存储脚本(readDataToDic_V2.2)
- python把csv数据做成列表、字典类型的数据进行存储脚本(readDataToDic_V2.2)
- 利用Python进行数据导入、变化、统计和假设检验等基本数据分析
- 利用 Python 进行数据分析(九)pandas 汇总统计和计算
- 利用Python进行数据分析(9) pandas基础: 汇总统计和计算
- Python从数据库取数,对时间进行处理,统计数据汇总后画图
- 用python进行科学统计及数据挖掘--便捷工具环境搭建
- Python连麦相关信息统计的脚本