您的位置:首页 > 编程语言 > Python开发

Python进行主播收入统计的脚本

2017-11-30 11:26 330 查看
关注点:
1、指定日期上一月份的获取

2、取两月份间的所有月份清单

3、mysql小表数据在hive上的插入装载

4、Hive汇总数据向mysql插入

1、Hive临时表及mysql目标表的准备
Hive临时表:
drop table if exists xxx_anchor_bringnew_detail_min;
CREATE TABLE xxx_anchor_bringnew_detail_min(
room_id BIGINT,
anchor_uid BIGINT,
nickname string,
fans_add_bymonth INT)
;
drop table if exists xxx_salary_record_min;
CREATE TABLE xxx_salary_record_min(
room_id BIGINT,
amount_bymonth DECIMAL(38,10))
;
CREATE TABLE xxx_invite_anchor_min(
uid bigint,
room_id bigint)
;
Mysql目标表:
drop table if exists anchor_income_static;
CREATE TABLE anchor_income_static (
calc_month varchar(7) DEFAULT 'xxxx-xx' COMMENT '统计月份',
room_id int(11) DEFAULT 0 COMMENT '房间号',
uid int(11) DEFAULT 0 COMMENT '主播UID',
nickname varchar(200) DEFAULT 'null' COMMENT '主播昵称',
pullnew_cnt bigint(20) DEFAULT 0 COMMENT '月拉新数',
pullnew_value bigint(20) DEFAULT 0 COMMENT '拉新价值',
actual_income decimal(38,10) DEFAULT 0 COMMENT '实际收入',
due_income decimal(38,10) DEFAULT 0 COMMENT '应得收入',
profitloss_bymonth decimal(38,10) DEFAULT 0 COMMENT '盈亏',
gift_income  decimal(38,10) DEFAULT 0 COMMENT '礼物收入',
etl_time datetime DEFAULT CURRENT_TIMESTAMP COMMENT '数据跑批时间',
UNIQUE KEY idx_prikey (calc_month,room_id) USING BTREE COMMENT '业务主键索引'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
2、python代码
# -*- coding=utf-8 -*-
import datetime
import time
import os
import warnings
import sys
import re
reload(sys)
sys.setdefaultencoding('utf8')

warnings.filterwarnings("ignore")

yesterday = (datetime.date.today() - datetime.timedelta(days=1)).strftime('%Y-%m-%d')
today = (datetime.date.today() - datetime.timedelta(days=0)).strftime('%Y-%m-%d')

def getLastMonth(runDay):
runDayTime = datetime.datetime.strptime(runDay, "%Y-%m-%d")
lastMonth = (datetime.date(runDayTime.year, runDayTime.month, 1) - datetime.timedelta(days=1)).strftime('%Y-%m')
return lastMonth

def monthRange(beginMonth, endMonth):
months = set([])
mt = datetime.datetime.strptime(beginMonth, "%Y-%m")
month = beginMonth[:]
while month <= endMonth:
months.add(month)
mt = mt + datetime.timedelta(1)
month = mt.strftime("%Y-%m")
monthList = sorted(months)
return monthList

def getFilterRoomid():
miniData = os.popen("""source /etc/profile; \
/usr/bin/mysql  -hMysqlHost -PMysqlPort -uMysqlUser -pMysqlPass --default-character-set=utf8 -N -e "use jellyfish_hadoop_stat; \
select room_id from invite_anchor; \
" """).readlines();
miniDataList = []
for miniDataRow in miniData:
miniD = re.split('\t', miniDataRow.replace('\n', '').replace('`', '').replace('\'', '').replace('"', ''))
miniDataList.append(miniD)
strRoomId = ''
for roomId in miniDataList:
strRoomId = roomId[0] + ',' + strRoomId
strRoomId = strRoomId[:-1]
return strRoomId

def mysqlMiniData2hive_invite_anchor():
miniData = os.popen("""source /etc/profile; \
/usr/bin/mysql  -hMysqlHost -PMysqlPort -uMysqlUser -pMysqlPass --default-character-set=utf8 -N -e "use jellyfish_hadoop_stat; \
select uid,room_id from invite_anchor; \
" """).readlines();

miniDataList = []
for miniDataRow in miniData:
miniD = re.split('\t', miniDataRow.replace('\n', '').replace('`', '').replace('\'', '').replace('"', ''))
miniDataList.append(miniD)

os.system("""source /etc/profile; \
/usr/lib/hive-current/bin/hive -e " \
truncate table xxx_invite_anchor_min; \
" """)

i = 0
insert2HiveSqlText = "insert into xxx_invite_anchor_min(uid,room_id) values "
for miniDataVal in miniDataList:
# print miniDataVal[0],miniDataVal[1],miniDataVal[2]
uid = miniDataVal[0]
room_id = miniDataVal[1]
# etl_time = time.strftime('%Y-%m-%d %X', time.localtime())

i += 1

insert2HiveSqlText = insert2HiveSqlText + "({uid},{room_id}),".format(uid=uid, room_id=room_id)

if (i % 8888888 == 0):
insert2HiveSqlText = insert2HiveSqlText[0:-1] + ";"
os.system("""source /etc/profile; \
/usr/lib/hive-current/bin/hive -e " \
{insert2HiveSqlText} \
" """.format(insert2HiveSqlText=insert2HiveSqlText))

insert2HiveSqlText = "insert into xxx_invite_anchor_min(uid,room_id) values "

insert2HiveSqlText = insert2HiveSqlText[0:-1] + ";"
os.system("""source /etc/profile; \
/usr/lib/hive-current/bin/hive -e " \
{insert2HiveSqlText} \
" """.format(insert2HiveSqlText=insert2HiveSqlText))

def mysqlMiniData2hive_anchor_bringnew_detail(lastMonth):
miniData = os.popen("""source /etc/profile; \
/usr/bin/mysql  -hMysqlHost -PMysqlPort -uMysqlUser -pMysqlPass --default-character-set=utf8 -N -e "use jellyfish_hadoop_stat; \
select room_id,anchor_uid,nickname,sum(fans_add_cnt) fans_add_bymonth \
from anchor_bringnew_detail \
where substr(calc_date,1,7)='{lastMonth}' \
and room_id in ({roomIdFilter}) \
group by room_id,anchor_uid,nickname; \
" """.format(lastMonth=lastMonth, roomIdFilter=getFilterRoomid())).readlines();

miniDataList = []
for miniDataRow in miniData:
miniD = re.split('\t', miniDataRow.replace('\n', '').replace('`', '').replace('\'', '').replace('"', ''))
miniDataList.append(miniD)

os.system("""source /etc/profile; \
/usr/lib/hive-current/bin/hive -e " \
truncate table xxx_anchor_bringnew_detail_min; \
" """)

i = 0
insert2HiveSqlText = "insert into xxx_anchor_bringnew_detail_min(room_id,anchor_uid,nickname,fans_add_bymonth) values "
for miniDataVal in miniDataList:
# print miniDataVal[0],miniDataVal[1],miniDataVal[2]
room_id = miniDataVal[0]
anchor_uid = miniDataVal[1]
nickname = miniDataVal[2]
fans_add_bymonth = miniDataVal[3]

# etl_time = time.strftime('%Y-%m-%d %X', time.localtime())

i += 1

insert2HiveSqlText = insert2HiveSqlText + "({room_id},{anchor_uid},'{nickname}',{fans_add_bymonth}),".format(room_id=room_id, anchor_uid=anchor_uid, nickname=nickname, fans_add_bymonth=fans_add_bymonth)

if (i % 8888888 == 0):
insert2HiveSqlText = insert2HiveSqlText[0:-1] + ";"
os.system("""source /etc/profile; \
/usr/lib/hive-current/bin/hive -e " \
{insert2HiveSqlText} \
" """.format(insert2HiveSqlText=insert2HiveSqlText))

insert2HiveSqlText = "insert into xxx_anchor_bringnew_detail_min(room_id,anchor_uid,nickname,fans_add_bymonth) values "

insert2HiveSqlText = insert2HiveSqlText[0:-1] + ";"
os.system("""source /etc/profile; \
/usr/lib/hive-current/bin/hive -e " \
{insert2HiveSqlText} \
" """.format(insert2HiveSqlText=insert2HiveSqlText))

def mysqlMiniData2hive_salary_record(lastMonth):
miniData = os.popen("""source /etc/profile; \
/usr/bin/mysql  -hMysqlHost -P50506 -uMysqlUser -pMysqlPass --default-character-set=utf8 -N -e "use jellyfish_server; \
select room_id, \
sum(case when type in(6,15,12100,12101,12102,12103,12104,12105,12106,12107,12300,12301,12302,12303,12304,12305,12306,12307) then amount*-1 \
else amount end) amount_bymonth \
from salary_record \
where type not in(3,4) and state=0 and substr(created_time,1,7)='{lastMonth}' \
and room_id in ({roomIdFilter}) \
group by room_id \
order by amount_bymonth desc; \
" """.format(lastMonth=lastMonth, roomIdFilter=getFilterRoomid())).readlines();

miniDataList = []
for miniDataRow in miniData:
miniD = re.split('\t', miniDataRow.replace('\n', '').replace('`', '').replace('\'', '').replace('"', ''))
miniDataList.append(miniD)

os.system("""source /etc/profile; \
/usr/lib/hive-current/bin/hive -e " \
truncate table xxx_salary_record_min; \
" """)

i = 0
insert2HiveSqlText = "insert into xxx_salary_record_min(room_id,amount_bymonth) values "
for miniDataVal in miniDataList:
# print miniDataVal[0],miniDataVal[1]
room_id = miniDataVal[0]
amount_bymonth = miniDataVal[1]
# etl_time = time.strftime('%Y-%m-%d %X', time.localtime())

i += 1

insert2HiveSqlText = insert2HiveSqlText + "({room_id},{amount_bymonth}),".format(room_id=room_id, amount_bymonth=amount_bymonth)

if (i % 8888888 == 0):
insert2HiveSqlText = insert2HiveSqlText[0:-1] + ";"
os.system("""source /etc/profile; \
/usr/lib/hive-current/bin/hive -e " \
{insert2HiveSqlText} \
" """.format(insert2HiveSqlText=insert2HiveSqlText))

insert2HiveSqlText = "insert into xxx_salary_record_min(room_id,amount_bymonth) values "

insert2HiveSqlText = insert2HiveSqlText[0:-1] + ";"
os.system("""source /etc/profile; \
/usr/lib/hive-current/bin/hive -e " \
{insert2HiveSqlText} \
" """.format(insert2HiveSqlText=insert2HiveSqlText))

def anchorIncomeStaticHiveCalc(lastMonth):
os.system("""source /etc/profile; \
/usr/lib/hive-current/bin/hive -e " \
drop table if exists xxx_due_salary; \
create table xxx_due_salary as \
select room_id,sum(case \
when rank between 1 and 5 then 100000/25 \
when rank between 5+1 and 10 then 50000/25 \
when rank between 10+1 and 20 then 25000/25 \\
when rank between 20+1 and 30 then 20000/25 \
when rank between 30+1 and 50 then 15000/25 \
when rank between 50+1 and 80 then 12000/25 \
when rank between 80+1 and 100 then 10000/25 \
when rank between 100+1 and 150 then 6000/25 \
when rank between 150+1 and 300 then 4000/25 \
when rank between 300+1 and 800 then 2000/25 \
when rank between 800+1 and 2000 then 1000/25 \
else 0 end) salary_bymonth \
from (select room_id,rank,row_number()over(partition by room_id order by rank asc) rk \
from data_chushou_room_category_rank \
where pt_month='{lastMonth}' and category_id=0 and last_time=pt_day \
and room_id in ({roomIdFilter})) x \
where rk<=25 \
group by room_id; \
" """.format(lastMonth=lastMonth, roomIdFilter=getFilterRoomid()));

os.system("""source /etc/profile; \
/usr/lib/hive-current/bin/hive -e " \
drop table if exists xxx_gift_income; \
create table xxx_gift_income as \
select room_id,sum(gift_point)/1000 gift_amount \
from honeycomb_all_gift_record \
where pt_month='{lastMonth}' \
and room_id in ({roomIdFilter}) \
group by room_id; \
" """.format(lastMonth=lastMonth, roomIdFilter=getFilterRoomid()));

def anchorIncomeStaticResult2Mysql(lastMonth):
anchorIncomeStaticResults=os.popen("""source /etc/profile; \
/usr/lib/hive-current/bin/hive -e " \
select '{lastMonth}' calc_month,a1.room_id,a1.uid,a6.nickname,coalesce(a2.fans_add_bymonth,0) pullNew_cnt,coalesce(a2.fans_add_bymonth,0)*5 pullNew_value,coalesce(a3.amount_bymonth,0) actual_income,coalesce(a4.salary_bymonth,0) due_income, \
coalesce(a2.fans_add_bymonth,0)*5-(coalesce(a3.amount_bymonth,0)-coalesce(a4.salary_bymonth,0)) profitloss_bymonth,coalesce(a5.gift_amount,0) gift_income \
from xxx_invite_anchor_min a1 \
left join xxx_anchor_bringnew_detail_min a2 on a1.room_id=a2.room_id \
left join xxx_salary_record_min a3 on a1.room_id=a3.room_id \
left join xxx_due_salary a4 on a1.room_id=a4.room_id \
left join xxx_gift_income a5 on a1.room_id=a5.room_id \
left join oss_chushou_user_profile a6 on a1.uid=a6.uid \
where a6.pt_day='{yesterday}' \
; \
" """.format(lastMonth=lastMonth, yesterday=(datetime.date.today() - datetime.timedelta(days=1)).strftime('%Y-%m-%d'))).readlines();

anchorIncomeStaticResult_list = []
for anchorIncomeStaticResultList in anchorIncomeStaticResults:
anchorIncomeStaticResult = re.split('\t', anchorIncomeStaticResultList.replace('\n', ''))
anchorIncomeStaticResult_list.append(anchorIncomeStaticResult)

# data rollback
os.system("""source /etc/profile; \
/usr/bin/mysql  -hMysqlHost -PMysqlPort -uMysqlUser -pMysqlPass --default-character-set=utf8 -e "use jellyfish_hadoop_stat; \
delete from jellyfish_hadoop_stat.anchor_income_static where calc_month='{lastMonth}' \
" """.format(lastMonth=lastMonth))

i = 0
insert_sql_text = "insert into jellyfish_hadoop_stat.anchor_income_static(calc_month,room_id,uid,nickname,pullnew_cnt,pullnew_value,actual_income,due_income,profitloss_bymonth,gift_income,etl_time) values "
for anchorIncomeStaticResult in anchorIncomeStaticResult_list:
calc_month=anchorIncomeStaticResult[0]
room_id=anchorIncomeStaticResult[1]
uid=anchorIncomeStaticResult[2]
nickname=str(anchorIncomeStaticResult[3]).replace('\n', '').replace('`', '').replace('\'', '').replace('"', '').replace('\\', '')
pullnew_cnt=anchorIncomeStaticResult[4]
pullnew_value=anchorIncomeStaticResult[5]
actual_income=anchorIncomeStaticResult[6]
due_income=anchorIncomeStaticResult[7]
profitloss_bymonth=anchorIncomeStaticResult[8]
gift_income=anchorIncomeStaticResult[9]
etl_time=time.strftime('%Y-%m-%d %X', time.localtime())

i += 1

insert_sql_text = insert_sql_text + "('{calc_month}',{room_id},{uid},'{nickname}',{pullnew_cnt},{pullnew_value},{actual_income},{due_income},{profitloss_bymonth},{gift_income},'{etl_time}'),".format(calc_month=calc_month, room_id=room_id, uid=uid, nickname=nickname, pullnew_cnt=pullnew_cnt, pullnew_value=pullnew_value, actual_income=actual_income, due_income=due_income, profitloss_bymonth=profitloss_bymonth, gift_income=gift_income, etl_time=etl_time)

if (i % 1000 == 0):
insert_sql_text = insert_sql_text[0:-1] + ";"
os.system("""source /etc/profile; \
/usr/bin/mysql  -hMysqlHost -PMysqlPort -uMysqlUser -pMysqlPass --default-character-set=utf8 -e "use jellyfish_hadoop_stat; \
{insert_sql_text} \
" """.format(insert_sql_text=insert_sql_text))

insert_sql_text = "insert into jellyfish_hadoop_stat.anchor_income_static(calc_month,room_id,uid,nickname,pullnew_cnt,pullnew_value,actual_income,due_income,profitloss_bymonth,gift_income,etl_time) values "

insert_sql_text = insert_sql_text[0:-1] + ";"
os.system("""source /etc/profile; \
/usr/bin/mysql  -hMysqlHost -PMysqlPort -uMysqlUser -pMysqlPass --default-character-set=utf8 -e "use jellyfish_hadoop_stat; \
{insert_sql_text} \
" """.format(insert_sql_text=insert_sql_text))

# Batch Test
lastMonth = getLastMonth(runDay=(datetime.date.today() - datetime.timedelta(days=0)).strftime('%Y-%m-%d'))
mysqlMiniData2hive_invite_anchor()
mysqlMiniData2hive_anchor_bringnew_detail(lastMonth)
mysqlMiniData2hive_salary_record(lastMonth)
anchorIncomeStaticHiveCalc(lastMonth)
anchorIncomeStaticResult2Mysql(lastMonth)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: