Python模块化开发组织代码程序示例
2017-01-16 19:14
696 查看
样例包含三部分代码,周的处理函数部分、业务数据处理部分及多线程跑批调度处理部分。
代码按功能分类存放,有助于使代码更清晰,通过from...import的方式,使代码重复使用。
另外,多线程的调用部分,有效处理了程序先后依赖及多程序串并行跑批问题,为以后相似问题的处理,提供了借鉴。
1、周处理函数
/Users/nisj/PycharmProjects/EsDataProc/Remain_Data_Proc/WeekCalc.py
2、业务处理部分
/Users/nisj/PycharmProjects/EsDataProc/Remain_Data_Proc/Hive_remain_byWeek_proc.py
另:供打印sql测试的代码
/Users/nisj/PycharmProjects/EsDataProc/Remain_Data_Proc/xx.py
3、多线程批调度
/Users/nisj/PycharmProjects/EsDataProc/Remain_Data_Proc/BatchThread.py
代码按功能分类存放,有助于使代码更清晰,通过from...import的方式,使代码重复使用。
另外,多线程的调用部分,有效处理了程序先后依赖及多程序串并行跑批问题,为以后相似问题的处理,提供了借鉴。
1、周处理函数
/Users/nisj/PycharmProjects/EsDataProc/Remain_Data_Proc/WeekCalc.py
# -*- coding=utf-8 -*- import warnings import datetime warnings.filterwarnings("ignore") def getNowYearWeek(): # 当前时间年第几周的计算 timenow = datetime.datetime.now() - datetime.timedelta(days=7) NowYearWeek = timenow.isocalendar() return str(NowYearWeek[0])+"#"+str(NowYearWeek[1]) def dateRange(beginDate, endDate): dates = [] dt = datetime.datetime.strptime(beginDate, "%Y-%m-%d") date = beginDate[:] while date <= endDate: dates.append(date) dt = dt + datetime.timedelta(1) date = dt.strftime("%Y-%m-%d") return dates def weekRang(beginDate, endDate): week = set() for date in dateRange(beginDate, endDate): week.add(datetime.date(int(date[0:4]), int(date[5:7]), int(date[8:10])).isocalendar()[0:2]) wk_l = [] for wl in sorted(list(week)): wk_l.append(str(wl[0])+'#'+str(wl[1])) return wk_l def currWeekList(his_week): last_wk = datetime.datetime.now() - datetime.timedelta(days=7) end_day = str(last_wk)[0:10] curr_week_list = [] for week in weekRang('2015-07-01', end_day): if (int(week[0:4]) == int(his_week[0:4]) and int(week[5:]) >= int(his_week[5:])) or (int(week[0:4]) > int(his_week[0:4])): curr_week_list.append(week) return curr_week_list def hisRunWeekList(his_week): batch_week_list = [] for curr_week in currWeekList(his_week): if (int(his_week[0:4]) == int(curr_week[0:4]) and int(his_week[5:]) <= int(curr_week[5:])) or (int(his_week[0:4]) < int(curr_week[0:4])): batch_week_list.append(([curr_week, his_week],None)) return batch_week_list def RuningWeekList(): curr_week = getNowYearWeek() batch_week_list = [] for his_week in currWeekList('2015#27'): if (int(his_week[0:4]) == int(curr_week[0:4]) and int(his_week[5:]) <= int(curr_week[5:])) or (int(his_week[0:4]) < int(curr_week[0:4])): batch_week_list.append(([curr_week, his_week],None)) return batch_week_list def getWeekFristday(weekflag): yearnum = weekflag[0:4] # 取到年份 weeknum = weekflag[5:7] # 取到周 stryearstart = yearnum + '0101' # 当年第一天 yearstart = datetime.datetime.strptime(stryearstart, '%Y%m%d') # 格式化为日期格式 yearstartcalendarmsg = yearstart.isocalendar() # 当年第一天的周信息 yearstartweekday = yearstartcalendarmsg[2] yearstartyear = yearstartcalendarmsg[0] if yearstartyear < int(yearnum): daydelat = (8 - int(yearstartweekday)) + (int(weeknum) - 1) * 7 else: daydelat = (8 - int(yearstartweekday)) + (int(weeknum) - 2) * 7 week1day = (yearstart + datetime.timedelta(days=daydelat)).date() return week1day # Batch Test # his_week_list = ['2015#46', '2015#45', '2016#2'] # batch_week_list = [] # for his_week in his_week_list: # batch_week_list.extend(hisRunWeekList(his_week)) # print batch_week_list # print getWeekFristday('2016#11') # his_week = '2016#11' # print currWeekList(his_week) # print getNowYearWeek()
2、业务处理部分
/Users/nisj/PycharmProjects/EsDataProc/Remain_Data_Proc/Hive_remain_byWeek_proc.py
# -*- coding=utf-8 -*- import time import os import re from WeekCalc import * warnings.filterwarnings("ignore") def newuser_byweek_proc(batch_week): week1day = getWeekFristday(batch_week) os.system("""/usr/lib/hive-current/bin/hive -e " \ alter table bi_newuser_byweek drop if exists partition(pt_week='%s'); \ alter table bi_newuser_byweek add partition(pt_week='%s'); \ insert into table bi_newuser_byweek partition (pt_week='%s') \ select a1.appsource,a1.appkey,a1.identifier,a1.uid from ( \ select appsource,appkey,identifier,uid \ from bi_all_access_log \ where case when weekofyear(pt_day)>=52 and month(pt_day)=1 then concat(year(pt_day)-1,'#',weekofyear(pt_day)) else concat(year(pt_day),'#',weekofyear(pt_day)) end = '%s' \ group by appsource,appkey,identifier,uid) a1 \ left join \ (select appsource,appkey,identifier,uid \ from bi_all_access_log \ where pt_day < '%s' ) a2 on a1.appkey=a2.appkey and a1.identifier=a2.identifier and a1.appsource=a2.appsource and a1.uid=a2.uid \ where a2.identifier is null \ ;" \ """ % (batch_week, batch_week, batch_week, batch_week, week1day)); def user_remain_payamount_byweek(curr_week, his_week): os.system("""/usr/bin/mysql -hMysqlHost -P6603 -uhadoop -pMysqlPass -e "use funnyai_data; \ delete from bi_user_remain_payamount_byweek where data_week='%s' and remain_week='%s'; \ " """ % (his_week, curr_week)) newuser_remain_pay_data = os.popen("""source /etc/profile; \ /usr/lib/hive-current/bin/hive -e " \ add jar /home/hadoop/nisj/udf-jar/hadoop_udf_radixChange.jar; \ create temporary function RadixChange as 'com.kascend.hadoop.RadixChange'; \ with his_new_user as (select appsource,appkey,identifier,RadixChange(uid,16,10) uid \ from bi_newuser_byweek \ where pt_week = '%s' \ ), \ curr_week_data as (select appsource,appkey,identifier,RadixChange(uid,16,10) uid \ from bi_all_access_log \ where case when weekofyear(pt_day)>=52 and month(pt_day)=1 then concat(year(pt_day)-1,'#',weekofyear(pt_day)) else concat(year(pt_day),'#',weekofyear(pt_day)) end = '%s' \ group by appsource,appkey,identifier,RadixChange(uid,16,10)), \ curr_week_pay as (select uid,sum(amount) amount \ from data_chushou_pay_info \ where state=0 and \ case when weekofyear(pt_day)>=52 and month(pt_day)=1 then concat(year(pt_day)-1,'#',weekofyear(pt_day)) else concat(year(pt_day),'#',weekofyear(pt_day)) end = '%s' \ group by uid) \ select b1.appkey,b1.appsource,sum(b2.amount) pay_amount from \ (select a1.appkey,a1.appsource,a1.uid \ from his_new_user a1 \ inner join curr_week_data a2 on a1.appkey=a2.appkey and a1.identifier=a2.identifier and a1.appsource=a2.appsource \ group by a1.appkey,a1.appsource,a1.uid) b1 \ left join curr_week_pay b2 on b1.uid=b2.uid \ group by b1.appkey,b1.appsource \ ;" \ """ % (his_week, curr_week, curr_week)).readlines(); nrpd_list = [] for nrp_list in newuser_remain_pay_data: nrp = re.split('\t', nrp_list.replace('\n', '')) nrpd_list.append(nrp) for nrpd in nrpd_list: remain_week = curr_week appkey = nrpd[0] appsource = nrpd[1] pay_amount = nrpd[2] etl_time = time.strftime('%Y-%m-%d %X', time.localtime()) os.system("""/usr/bin/mysql -hMysqlHost -P6603 -uhadoop -pMysqlPass -e "use funnyai_data; \ insert into bi_user_remain_payamount_byweek(data_week,appsource,appkey,remain_week,pay_amount,etl_time) \ select '%s','%s','%s','%s','%s','%s'; \ " """ % (his_week, appsource, appkey, remain_week, pay_amount, etl_time)) def user_remain_pay_byweek(curr_week, his_week): os.system("""/usr/bin/mysql -hMysqlHost -P6603 -uhadoop -pMysqlPass -e "use funnyai_data; \ delete from bi_user_remain_pay_byweek where data_week='%s' and remain_week='%s'; \ " """ % (his_week, curr_week)) newuser_remain_pay_data = os.popen("""source /etc/profile; \ /usr/lib/hive-current/bin/hive -e " \ add jar /home/hadoop/nisj/udf-jar/hadoop_udf_radixChange.jar; \ create temporary function RadixChange as 'com.kascend.hadoop.RadixChange'; \ with his_new_user as (select appsource,appkey,identifier,RadixChange(uid,16,10) uid \ from bi_newuser_byweek \ where pt_week = '%s' \ ), \ curr_week_data as (select appsource,appkey,identifier,RadixChange(uid,16,10) uid \ from bi_all_access_log \ where case when weekofyear(pt_day)>=52 and month(pt_day)=1 then concat(year(pt_day)-1,'#',weekofyear(pt_day)) else concat(year(pt_day),'#',weekofyear(pt_day)) end = '%s' \ group by appsource,appkey,identifier,RadixChange(uid,16,10)) \ select a1.appkey,a1.appsource,count(distinct a2.identifier) remain_cnt,0 pay_amount \ from his_new_user a1 \ inner join curr_week_data a2 on a1.appkey=a2.appkey and a1.identifier=a2.identifier and a1.appsource=a2.appsource \ group by a1.appkey,a1.appsource \ ;" \ """ % (his_week, curr_week)).readlines(); nrpd_list = [] for nrp_list in newuser_remain_pay_data: nrp = re.split('\t', nrp_list.replace('\n', '')) nrpd_list.append(nrp) for nrpd in nrpd_list: remain_week = curr_week appkey = nrpd[0] appsource = nrpd[1] remain_cnt = nrpd[2] pay_amount = nrpd[3] etl_time = time.strftime('%Y-%m-%d %X', time.localtime()) os.system("""/usr/bin/mysql -hMysqlHost -P6603 -uhadoop -pMysqlPass -e "use funnyai_data; \ insert into bi_user_remain_pay_byweek(data_week,appsource,appkey,remain_week,remain_cnt,pay_amount,etl_time) \ select '%s','%s','%s','%s','%s','%s','%s'; \ " """ % (his_week, appsource, appkey, remain_week, remain_cnt, pay_amount, etl_time)) # Batch Test # curr_week = '2016#6' # his_week = '2015#46' # user_remain_payamount_byweek(curr_week, his_week) # user_remain_pay_byweek(curr_week, his_week) # batch_week = '2015#46' # newuser_byweek_proc(batch_week)
另:供打印sql测试的代码
/Users/nisj/PycharmProjects/EsDataProc/Remain_Data_Proc/xx.py
# -*- coding=utf-8 -*- import time import os import re from WeekCalc import * warnings.filterwarnings("ignore") def newuser_byweek_proc(batch_week): week1day = getWeekFristday(batch_week) sql_text = """/usr/lib/hive-current/bin/hive -e " \ alter table bi_newuser_byweek drop if exists partition(pt_week='%s'); \ alter table bi_newuser_byweek add partition(pt_week='%s'); \ insert into table bi_newuser_byweek partition (pt_week='%s') \ select a1.appsource,a1.appkey,a1.identifier,a1.uid from ( \ select appsource,appkey,identifier,uid \ from bi_all_access_log \ where case when weekofyear(pt_day)>=52 and month(pt_day)=1 then concat(year(pt_day)-1,'#',weekofyear(pt_day)) else concat(year(pt_day),'#',weekofyear(pt_day)) end = '%s' \ group by appsource,appkey,identifier,uid) a1 \ left join \ (select appsource,appkey,identifier,uid \ from bi_all_access_log \ where pt_day < '%s' ) a2 on a1.appkey=a2.appkey and a1.identifier=a2.identifier and a1.appsource=a2.appsource and a1.uid=a2.uid \ where a2.identifier is null \ ;" \ """ % (batch_week, batch_week, batch_week, batch_week, week1day); print sql_text def user_remain_payamount_byweek(curr_week, his_week): sql_text="""source /etc/profile; \ /usr/lib/hive-current/bin/hive -e " \ add jar /home/hadoop/nisj/udf-jar/hadoop_udf_radixChange.jar; \ create temporary function RadixChange as 'com.kascend.hadoop.RadixChange'; \ with his_new_user as (select appsource,appkey,identifier,RadixChange(uid,16,10) uid \ from bi_newuser_byweek \ where pt_week = '%s' \ ), \ curr_week_data as (select appsource,appkey,identifier,RadixChange(uid,16,10) uid \ from bi_all_access_log \ where case when weekofyear(pt_day)>=52 and month(pt_day)=1 then concat(year(pt_day)-1,'#',weekofyear(pt_day)) else concat(year(pt_day),'#',weekofyear(pt_day)) end = '%s' \ group by appsource,appkey,identifier,RadixChange(uid,16,10)), \ curr_week_pay as (select uid,sum(amount) amount \ from data_chushou_pay_info \ where state=0 and \ case when weekofyear(pt_day)>=52 and month(pt_day)=1 then concat(year(pt_day)-1,'#',weekofyear(pt_day)) else concat(year(pt_day),'#',weekofyear(pt_day)) end = '%s' \ group by uid) \ select b1.appkey,b1.appsource,sum(b2.amount) pay_amount from \ (select a1.appkey,a1.appsource,a1.uid \ from his_new_user a1 \ inner join curr_week_data a2 on a1.appkey=a2.appkey and a1.identifier=a2.identifier and a1.appsource=a2.appsource \ group by a1.appkey,a1.appsource,a1.uid) b1 \ left join curr_week_pay b2 on b1.uid=b2.uid \ group by b1.appkey,b1.appsource \ ;" \ """ % (his_week, curr_week, curr_week); print sql_text def user_remain_pay_byweek(curr_week, his_week): sql_text="""source /etc/profile; \ /usr/lib/hive-current/bin/hive -e " \ add jar /home/hadoop/nisj/udf-jar/hadoop_udf_radixChange.jar; \ create temporary function RadixChange as 'com.kascend.hadoop.RadixChange'; \ with his_new_user as (select appsource,appkey,identifier,RadixChange(uid,16,10) uid \ from bi_newuser_byweek \ where pt_week = '%s' \ ), \ curr_week_data as (select appsource,appkey,identifier,RadixChange(uid,16,10) uid \ from bi_all_access_log \ where case when weekofyear(pt_day)>=52 and month(pt_day)=1 then concat(year(pt_day)-1,'#',weekofyear(pt_day)) else concat(year(pt_day),'#',weekofyear(pt_day)) end = '%s' \ group by appsource,appkey,identifier,RadixChange(uid,16,10)) \ select a1.appkey,a1.appsource,count(distinct a2.identifier) remain_cnt,0 pay_amount \ from his_new_user a1 \ inner join curr_week_data a2 on a1.appkey=a2.appkey and a1.identifier=a2.identifier and a1.appsource=a2.appsource \ group by a1.appkey,a1.appsource \ ;" \ """ % (his_week, curr_week); print sql_text # Batch Test # curr_week = '2016#6' # his_week = '2015#46' # user_remain_payamount_byweek(curr_week, his_week) # user_remain_pay_byweek(curr_week, his_week) # batch_week = '2015#46' # newuser_byweek_proc(batch_week)
3、多线程批调度
/Users/nisj/PycharmProjects/EsDataProc/Remain_Data_Proc/BatchThread.py
# -*- coding=utf-8 -*- import threadpool from Hive_remain_byWeek_proc import * # from xx import * warnings.filterwarnings("ignore") today = datetime.date.today() yesterday = today - datetime.timedelta(days=1) tomorrow = today + datetime.timedelta(days=1) now_time = time.strftime('%Y-%m-%d %X', time.localtime()) print "当前时间是:",now_time # 新用户数据先跑出来 last_week = [getNowYearWeek()] request_newuser_byweek_proc = threadpool.makeRequests(newuser_byweek_proc, last_week) frist_pool = threadpool.ThreadPool(8) [frist_pool.putRequest(req) for req in request_newuser_byweek_proc] frist_pool.wait() # 然后再执行用户留存和充值金额数据 if True: batch_week_list = RuningWeekList() requests = [] request_user_remain_payamount_byweek = threadpool.makeRequests(user_remain_payamount_byweek, batch_week_list) request_user_remain_pay_byweek = threadpool.makeRequests(user_remain_pay_byweek, batch_week_list) requests.extend(request_user_remain_payamount_byweek) requests.extend(request_user_remain_pay_byweek) main_pool = threadpool.ThreadPool(8) [main_pool.putRequest(req) for req in requests] if __name__ == '__main__': while True: try: time.sleep(960) main_pool.poll() except KeyboardInterrupt: print("**** Interrupted!") break except threadpool.NoResultsPending: break if main_pool.dismissedWorkers: print("Joining all dismissed worker threads...") main_pool.joinAllDismissedWorkers() now_time = time.strftime('%Y-%m-%d %X', time.localtime()) print "当前时间是:",now_time
相关文章推荐
- Sublime开发python程序的示例代码
- 使用C#调用存储过程,用函数合理组织代码,使程序更加的清晰(示例)
- Bluemix云端数据库服务ClearDB MySQL使用示例———Python开发投票程序
- php程序开发范例宝典--代码模块化
- 使用Python的Twisted框架编写非阻塞程序的代码示例
- [微信小程序]组件化开发,以一个自定义模块框组件当做示例(附完整示例代码和效果图)
- Python实现比较扑克牌大小程序代码示例
- js模块化开发---js大项目代码组织和多人协作的解决之道
- 使用Python的Twisted框架编写非阻塞程序的代码示例
- ArcGIS 二次开发系列之Python版示例代码
- 快速了解Python开发中的cookie及简单代码示例
- [微信小程序]组件化开发,以一个自定义模块框组件当做示例(附完整示例代码和效果图)
- python 示例开发程序
- 使用C#调用存储过程,用函数合理组织代码,使程序更加的清晰(示例)
- 黄聪:python+MySQLdb操作Mysql数据库示例代码程序教程
- AjaxPanel自定义控件实现页面无刷新数据交互(做了个示例程序, 效果确实比较Cool, 用法非常简单! )(示例代码下载)
- 〔转载〕C Java PHP Perl Python 的程序代码美化工具
- AjaxPanel自定义控件实现页面无刷新数据交互(做了个示例程序, 效果确实比较Cool, 用法非常简单! )(示例代码下载)
- Windows Forms 实现安全的多线程详解(附带程序代码示例)
- 程序开发小技巧——格式化代码