Python进行主播收入统计的脚本
2017-11-30 11:26
330 查看
关注点:
1、指定日期上一月份的获取
2、取两月份间的所有月份清单
3、mysql小表数据在hive上的插入装载
4、Hive汇总数据向mysql插入
1、Hive临时表及mysql目标表的准备
Hive临时表:
1、指定日期上一月份的获取
2、取两月份间的所有月份清单
3、mysql小表数据在hive上的插入装载
4、Hive汇总数据向mysql插入
1、Hive临时表及mysql目标表的准备
Hive临时表:
drop table if exists xxx_anchor_bringnew_detail_min; CREATE TABLE xxx_anchor_bringnew_detail_min( room_id BIGINT, anchor_uid BIGINT, nickname string, fans_add_bymonth INT) ; drop table if exists xxx_salary_record_min; CREATE TABLE xxx_salary_record_min( room_id BIGINT, amount_bymonth DECIMAL(38,10)) ; CREATE TABLE xxx_invite_anchor_min( uid bigint, room_id bigint) ;Mysql目标表:
drop table if exists anchor_income_static; CREATE TABLE anchor_income_static ( calc_month varchar(7) DEFAULT 'xxxx-xx' COMMENT '统计月份', room_id int(11) DEFAULT 0 COMMENT '房间号', uid int(11) DEFAULT 0 COMMENT '主播UID', nickname varchar(200) DEFAULT 'null' COMMENT '主播昵称', pullnew_cnt bigint(20) DEFAULT 0 COMMENT '月拉新数', pullnew_value bigint(20) DEFAULT 0 COMMENT '拉新价值', actual_income decimal(38,10) DEFAULT 0 COMMENT '实际收入', due_income decimal(38,10) DEFAULT 0 COMMENT '应得收入', profitloss_bymonth decimal(38,10) DEFAULT 0 COMMENT '盈亏', gift_income decimal(38,10) DEFAULT 0 COMMENT '礼物收入', etl_time datetime DEFAULT CURRENT_TIMESTAMP COMMENT '数据跑批时间', UNIQUE KEY idx_prikey (calc_month,room_id) USING BTREE COMMENT '业务主键索引' ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;2、python代码
# -*- coding=utf-8 -*- import datetime import time import os import warnings import sys import re reload(sys) sys.setdefaultencoding('utf8') warnings.filterwarnings("ignore") yesterday = (datetime.date.today() - datetime.timedelta(days=1)).strftime('%Y-%m-%d') today = (datetime.date.today() - datetime.timedelta(days=0)).strftime('%Y-%m-%d') def getLastMonth(runDay): runDayTime = datetime.datetime.strptime(runDay, "%Y-%m-%d") lastMonth = (datetime.date(runDayTime.year, runDayTime.month, 1) - datetime.timedelta(days=1)).strftime('%Y-%m') return lastMonth def monthRange(beginMonth, endMonth): months = set([]) mt = datetime.datetime.strptime(beginMonth, "%Y-%m") month = beginMonth[:] while month <= endMonth: months.add(month) mt = mt + datetime.timedelta(1) month = mt.strftime("%Y-%m") monthList = sorted(months) return monthList def getFilterRoomid(): miniData = os.popen("""source /etc/profile; \ /usr/bin/mysql -hMysqlHost -PMysqlPort -uMysqlUser -pMysqlPass --default-character-set=utf8 -N -e "use jellyfish_hadoop_stat; \ select room_id from invite_anchor; \ " """).readlines(); miniDataList = [] for miniDataRow in miniData: miniD = re.split('\t', miniDataRow.replace('\n', '').replace('`', '').replace('\'', '').replace('"', '')) miniDataList.append(miniD) strRoomId = '' for roomId in miniDataList: strRoomId = roomId[0] + ',' + strRoomId strRoomId = strRoomId[:-1] return strRoomId def mysqlMiniData2hive_invite_anchor(): miniData = os.popen("""source /etc/profile; \ /usr/bin/mysql -hMysqlHost -PMysqlPort -uMysqlUser -pMysqlPass --default-character-set=utf8 -N -e "use jellyfish_hadoop_stat; \ select uid,room_id from invite_anchor; \ " """).readlines(); miniDataList = [] for miniDataRow in miniData: miniD = re.split('\t', miniDataRow.replace('\n', '').replace('`', '').replace('\'', '').replace('"', '')) miniDataList.append(miniD) os.system("""source /etc/profile; \ /usr/lib/hive-current/bin/hive -e " \ truncate table xxx_invite_anchor_min; \ " """) i = 0 insert2HiveSqlText = "insert into xxx_invite_anchor_min(uid,room_id) values " for miniDataVal in miniDataList: # print miniDataVal[0],miniDataVal[1],miniDataVal[2] uid = miniDataVal[0] room_id = miniDataVal[1] # etl_time = time.strftime('%Y-%m-%d %X', time.localtime()) i += 1 insert2HiveSqlText = insert2HiveSqlText + "({uid},{room_id}),".format(uid=uid, room_id=room_id) if (i % 8888888 == 0): insert2HiveSqlText = insert2HiveSqlText[0:-1] + ";" os.system("""source /etc/profile; \ /usr/lib/hive-current/bin/hive -e " \ {insert2HiveSqlText} \ " """.format(insert2HiveSqlText=insert2HiveSqlText)) insert2HiveSqlText = "insert into xxx_invite_anchor_min(uid,room_id) values " insert2HiveSqlText = insert2HiveSqlText[0:-1] + ";" os.system("""source /etc/profile; \ /usr/lib/hive-current/bin/hive -e " \ {insert2HiveSqlText} \ " """.format(insert2HiveSqlText=insert2HiveSqlText)) def mysqlMiniData2hive_anchor_bringnew_detail(lastMonth): miniData = os.popen("""source /etc/profile; \ /usr/bin/mysql -hMysqlHost -PMysqlPort -uMysqlUser -pMysqlPass --default-character-set=utf8 -N -e "use jellyfish_hadoop_stat; \ select room_id,anchor_uid,nickname,sum(fans_add_cnt) fans_add_bymonth \ from anchor_bringnew_detail \ where substr(calc_date,1,7)='{lastMonth}' \ and room_id in ({roomIdFilter}) \ group by room_id,anchor_uid,nickname; \ " """.format(lastMonth=lastMonth, roomIdFilter=getFilterRoomid())).readlines(); miniDataList = [] for miniDataRow in miniData: miniD = re.split('\t', miniDataRow.replace('\n', '').replace('`', '').replace('\'', '').replace('"', '')) miniDataList.append(miniD) os.system("""source /etc/profile; \ /usr/lib/hive-current/bin/hive -e " \ truncate table xxx_anchor_bringnew_detail_min; \ " """) i = 0 insert2HiveSqlText = "insert into xxx_anchor_bringnew_detail_min(room_id,anchor_uid,nickname,fans_add_bymonth) values " for miniDataVal in miniDataList: # print miniDataVal[0],miniDataVal[1],miniDataVal[2] room_id = miniDataVal[0] anchor_uid = miniDataVal[1] nickname = miniDataVal[2] fans_add_bymonth = miniDataVal[3] # etl_time = time.strftime('%Y-%m-%d %X', time.localtime()) i += 1 insert2HiveSqlText = insert2HiveSqlText + "({room_id},{anchor_uid},'{nickname}',{fans_add_bymonth}),".format(room_id=room_id, anchor_uid=anchor_uid, nickname=nickname, fans_add_bymonth=fans_add_bymonth) if (i % 8888888 == 0): insert2HiveSqlText = insert2HiveSqlText[0:-1] + ";" os.system("""source /etc/profile; \ /usr/lib/hive-current/bin/hive -e " \ {insert2HiveSqlText} \ " """.format(insert2HiveSqlText=insert2HiveSqlText)) insert2HiveSqlText = "insert into xxx_anchor_bringnew_detail_min(room_id,anchor_uid,nickname,fans_add_bymonth) values " insert2HiveSqlText = insert2HiveSqlText[0:-1] + ";" os.system("""source /etc/profile; \ /usr/lib/hive-current/bin/hive -e " \ {insert2HiveSqlText} \ " """.format(insert2HiveSqlText=insert2HiveSqlText)) def mysqlMiniData2hive_salary_record(lastMonth): miniData = os.popen("""source /etc/profile; \ /usr/bin/mysql -hMysqlHost -P50506 -uMysqlUser -pMysqlPass --default-character-set=utf8 -N -e "use jellyfish_server; \ select room_id, \ sum(case when type in(6,15,12100,12101,12102,12103,12104,12105,12106,12107,12300,12301,12302,12303,12304,12305,12306,12307) then amount*-1 \ else amount end) amount_bymonth \ from salary_record \ where type not in(3,4) and state=0 and substr(created_time,1,7)='{lastMonth}' \ and room_id in ({roomIdFilter}) \ group by room_id \ order by amount_bymonth desc; \ " """.format(lastMonth=lastMonth, roomIdFilter=getFilterRoomid())).readlines(); miniDataList = [] for miniDataRow in miniData: miniD = re.split('\t', miniDataRow.replace('\n', '').replace('`', '').replace('\'', '').replace('"', '')) miniDataList.append(miniD) os.system("""source /etc/profile; \ /usr/lib/hive-current/bin/hive -e " \ truncate table xxx_salary_record_min; \ " """) i = 0 insert2HiveSqlText = "insert into xxx_salary_record_min(room_id,amount_bymonth) values " for miniDataVal in miniDataList: # print miniDataVal[0],miniDataVal[1] room_id = miniDataVal[0] amount_bymonth = miniDataVal[1] # etl_time = time.strftime('%Y-%m-%d %X', time.localtime()) i += 1 insert2HiveSqlText = insert2HiveSqlText + "({room_id},{amount_bymonth}),".format(room_id=room_id, amount_bymonth=amount_bymonth) if (i % 8888888 == 0): insert2HiveSqlText = insert2HiveSqlText[0:-1] + ";" os.system("""source /etc/profile; \ /usr/lib/hive-current/bin/hive -e " \ {insert2HiveSqlText} \ " """.format(insert2HiveSqlText=insert2HiveSqlText)) insert2HiveSqlText = "insert into xxx_salary_record_min(room_id,amount_bymonth) values " insert2HiveSqlText = insert2HiveSqlText[0:-1] + ";" os.system("""source /etc/profile; \ /usr/lib/hive-current/bin/hive -e " \ {insert2HiveSqlText} \ " """.format(insert2HiveSqlText=insert2HiveSqlText)) def anchorIncomeStaticHiveCalc(lastMonth): os.system("""source /etc/profile; \ /usr/lib/hive-current/bin/hive -e " \ drop table if exists xxx_due_salary; \ create table xxx_due_salary as \ select room_id,sum(case \ when rank between 1 and 5 then 100000/25 \ when rank between 5+1 and 10 then 50000/25 \ when rank between 10+1 and 20 then 25000/25 \\ when rank between 20+1 and 30 then 20000/25 \ when rank between 30+1 and 50 then 15000/25 \ when rank between 50+1 and 80 then 12000/25 \ when rank between 80+1 and 100 then 10000/25 \ when rank between 100+1 and 150 then 6000/25 \ when rank between 150+1 and 300 then 4000/25 \ when rank between 300+1 and 800 then 2000/25 \ when rank between 800+1 and 2000 then 1000/25 \ else 0 end) salary_bymonth \ from (select room_id,rank,row_number()over(partition by room_id order by rank asc) rk \ from data_chushou_room_category_rank \ where pt_month='{lastMonth}' and category_id=0 and last_time=pt_day \ and room_id in ({roomIdFilter})) x \ where rk<=25 \ group by room_id; \ " """.format(lastMonth=lastMonth, roomIdFilter=getFilterRoomid())); os.system("""source /etc/profile; \ /usr/lib/hive-current/bin/hive -e " \ drop table if exists xxx_gift_income; \ create table xxx_gift_income as \ select room_id,sum(gift_point)/1000 gift_amount \ from honeycomb_all_gift_record \ where pt_month='{lastMonth}' \ and room_id in ({roomIdFilter}) \ group by room_id; \ " """.format(lastMonth=lastMonth, roomIdFilter=getFilterRoomid())); def anchorIncomeStaticResult2Mysql(lastMonth): anchorIncomeStaticResults=os.popen("""source /etc/profile; \ /usr/lib/hive-current/bin/hive -e " \ select '{lastMonth}' calc_month,a1.room_id,a1.uid,a6.nickname,coalesce(a2.fans_add_bymonth,0) pullNew_cnt,coalesce(a2.fans_add_bymonth,0)*5 pullNew_value,coalesce(a3.amount_bymonth,0) actual_income,coalesce(a4.salary_bymonth,0) due_income, \ coalesce(a2.fans_add_bymonth,0)*5-(coalesce(a3.amount_bymonth,0)-coalesce(a4.salary_bymonth,0)) profitloss_bymonth,coalesce(a5.gift_amount,0) gift_income \ from xxx_invite_anchor_min a1 \ left join xxx_anchor_bringnew_detail_min a2 on a1.room_id=a2.room_id \ left join xxx_salary_record_min a3 on a1.room_id=a3.room_id \ left join xxx_due_salary a4 on a1.room_id=a4.room_id \ left join xxx_gift_income a5 on a1.room_id=a5.room_id \ left join oss_chushou_user_profile a6 on a1.uid=a6.uid \ where a6.pt_day='{yesterday}' \ ; \ " """.format(lastMonth=lastMonth, yesterday=(datetime.date.today() - datetime.timedelta(days=1)).strftime('%Y-%m-%d'))).readlines(); anchorIncomeStaticResult_list = [] for anchorIncomeStaticResultList in anchorIncomeStaticResults: anchorIncomeStaticResult = re.split('\t', anchorIncomeStaticResultList.replace('\n', '')) anchorIncomeStaticResult_list.append(anchorIncomeStaticResult) # data rollback os.system("""source /etc/profile; \ /usr/bin/mysql -hMysqlHost -PMysqlPort -uMysqlUser -pMysqlPass --default-character-set=utf8 -e "use jellyfish_hadoop_stat; \ delete from jellyfish_hadoop_stat.anchor_income_static where calc_month='{lastMonth}' \ " """.format(lastMonth=lastMonth)) i = 0 insert_sql_text = "insert into jellyfish_hadoop_stat.anchor_income_static(calc_month,room_id,uid,nickname,pullnew_cnt,pullnew_value,actual_income,due_income,profitloss_bymonth,gift_income,etl_time) values " for anchorIncomeStaticResult in anchorIncomeStaticResult_list: calc_month=anchorIncomeStaticResult[0] room_id=anchorIncomeStaticResult[1] uid=anchorIncomeStaticResult[2] nickname=str(anchorIncomeStaticResult[3]).replace('\n', '').replace('`', '').replace('\'', '').replace('"', '').replace('\\', '') pullnew_cnt=anchorIncomeStaticResult[4] pullnew_value=anchorIncomeStaticResult[5] actual_income=anchorIncomeStaticResult[6] due_income=anchorIncomeStaticResult[7] profitloss_bymonth=anchorIncomeStaticResult[8] gift_income=anchorIncomeStaticResult[9] etl_time=time.strftime('%Y-%m-%d %X', time.localtime()) i += 1 insert_sql_text = insert_sql_text + "('{calc_month}',{room_id},{uid},'{nickname}',{pullnew_cnt},{pullnew_value},{actual_income},{due_income},{profitloss_bymonth},{gift_income},'{etl_time}'),".format(calc_month=calc_month, room_id=room_id, uid=uid, nickname=nickname, pullnew_cnt=pullnew_cnt, pullnew_value=pullnew_value, actual_income=actual_income, due_income=due_income, profitloss_bymonth=profitloss_bymonth, gift_income=gift_income, etl_time=etl_time) if (i % 1000 == 0): insert_sql_text = insert_sql_text[0:-1] + ";" os.system("""source /etc/profile; \ /usr/bin/mysql -hMysqlHost -PMysqlPort -uMysqlUser -pMysqlPass --default-character-set=utf8 -e "use jellyfish_hadoop_stat; \ {insert_sql_text} \ " """.format(insert_sql_text=insert_sql_text)) insert_sql_text = "insert into jellyfish_hadoop_stat.anchor_income_static(calc_month,room_id,uid,nickname,pullnew_cnt,pullnew_value,actual_income,due_income,profitloss_bymonth,gift_income,etl_time) values " insert_sql_text = insert_sql_text[0:-1] + ";" os.system("""source /etc/profile; \ /usr/bin/mysql -hMysqlHost -PMysqlPort -uMysqlUser -pMysqlPass --default-character-set=utf8 -e "use jellyfish_hadoop_stat; \ {insert_sql_text} \ " """.format(insert_sql_text=insert_sql_text)) # Batch Test lastMonth = getLastMonth(runDay=(datetime.date.today() - datetime.timedelta(days=0)).strftime('%Y-%m-%d')) mysqlMiniData2hive_invite_anchor() mysqlMiniData2hive_anchor_bringnew_detail(lastMonth) mysqlMiniData2hive_salary_record(lastMonth) anchorIncomeStaticHiveCalc(lastMonth) anchorIncomeStaticResult2Mysql(lastMonth)
相关文章推荐
- Python脚本进行主播招募相关数据统计的案例
- Python进行主播拉新相关数据统计的脚本
- python统计日志小脚本
- 使用rdb文件进行redis数据迁移--python脚本
- 在arcgis使用python脚本进行字段计算时是如何解决中文问题的
- 编写Python小程序来统计测试脚本的关键字
- 编写Python小程序来统计测试脚本的关键字
- PyInstaller对python脚本进行代码打包成单个独立的exe可执行文件
- python 实现nginx/apache 日志格式的统计脚本
- 用Python和Shell结合进行词频统计
- python语言学习笔记(三)-----模拟投掷三个骰子,对游戏结果进行统计
- Python:使用Counter进行计数统计及collections模块
- python统计小脚本
- 使用Python进行描述性统计
- python进行中文分词、词性标注、词频统计
- Python 统计代码的行数,Python脚本 统计代码
- Python中使用Counter进行字典创建以及key数量统计
- python把csv数据做成列表、字典类型的数据进行存储脚本(readDataToDic_V2.2)
- 用python进行科学统计及数据挖掘--便捷工具环境搭建
- Python实现对excel文件列表值进行统计的方法