Python根据AccessLog统计对应Url的点击量
2017-04-20 20:53
323 查看
1、场景说明
AccessLog已经从阿里Oss上拉取到Hdfs上了,数据是按机器名、日期、小时存放的。每天有上千个文件,500G左右的大小。
单个文件命名如下所示:
2、处理日期小时的代码
/Users/nisj/PycharmProjects/BiDataProc/hitsCalc/DateHourCalc.py
3、以小时为单位汇总处理统计的代码
/Users/nisj/PycharmProjects/BiDataProc/hitsCalc/hitsStatic_byHour.py
4、多线程调度的代码
/Users/nisj/PycharmProjects/BiDataProc/hitsCalc/BatchThread.py
5、附:每个文件一统计及串行总控处理的代码
5.1、按文件统计后汇总的代码
/Users/nisj/PycharmProjects/BiDataProc/hitsStatic/hitsStatic_byFile.py
5.2、串行总控
/Users/nisj/PycharmProjects/BiDataProc/hitsStatic/hitsStatic_Ctl.py
6、总结说明
具体是用按file还是每小时统计汇总,是要根据那就的数据量来的。我这里在生产使用的时候,是用按小时的方式进行并行调度的。
串行总控时,最好将引用的【
】与batch_Parameter_list不相关的内容注掉,否则可能会有问题。
日期函数并行多线程调度的时候,考虑其兼容性,要加锁解决。
AccessLog已经从阿里Oss上拉取到Hdfs上了,数据是按机器名、日期、小时存放的。每天有上千个文件,500G左右的大小。
单个文件命名如下所示:
/tmp/oss_access/2017-04-16/sz-98-72_localhost_access_log.2017-04-16.08.txt /tmp/oss_access/2017-04-16/sz-jf-server-tmp-40_localhost_access_log.2017-04-16.21.txt /tmp/oss_access/2017-04-16/sz-jf-server-tmp-40_localhost_access_log.2017-04-16.15.txt
2、处理日期小时的代码
/Users/nisj/PycharmProjects/BiDataProc/hitsCalc/DateHourCalc.py
# -*- coding=utf-8 -*- import warnings import datetime import threading warnings.filterwarnings("ignore") def dateRange(beginDate, endDate): dates = [] dt = datetime.datetime.strptime(beginDate, "%Y-%m-%d") date = beginDate[:] while date <= endDate: dates.append(date) dt = dt + datetime.timedelta(1) date = dt.strftime("%Y-%m-%d") return dates c = threading.RLock() def dateHourRange(beginDateHour, endDateHour): with c: dateHours = [] dt = datetime.datetime.strptime(beginDateHour, "%Y-%m-%d.%H") dateHour = beginDateHour[:] while dateHour <= endDateHour: dateHours.append(dateHour) dt = dt + datetime.timedelta(hours=1) dateHour = dt.strftime("%Y-%m-%d.%H") return dateHours # print dateHourRange(beginDateHour='2017-04-14.03', endDateHour='2017-04-17.11')
3、以小时为单位汇总处理统计的代码
/Users/nisj/PycharmProjects/BiDataProc/hitsCalc/hitsStatic_byHour.py
# -*- coding=utf-8 -*- import os from DateHourCalc import * warnings.filterwarnings("ignore") def hitsStatic_byHour(label, url, beginDateHour, endDateHour): record_num = 0 for dateHour in dateHourRange(beginDateHour=beginDateHour, endDateHour=endDateHour): date = dateHour[0:10] hit_num = os.popen("""hadoop dfs -cat /tmp/oss_access/{date}/*_localhost_access_log.{dateHour}.txt |grep "{url}" |wc -l""".format(date=date, dateHour=dateHour, url=url)).readlines(); record_num = record_num + int(hit_num[0]) # xx = """hadoop dfs -cat /tmp/oss_access/{date}/*_localhost_access_log.{dateHour}.txt |grep "{url}" |wc -l""".format(date=date, dateHour=dateHour, url=url); # print xx os.system("""echo {label}-{url}:{beginDateHour}~{endDateHour}hits_number_is:{record_num} > {label}-{url}:{beginDateHour}~{endDateHour}.txt""".format(label=label, url=url.replace('/', '#'), beginDateHour=beginDateHour, endDateHour=endDateHour, record_num=record_num));
4、多线程调度的代码
/Users/nisj/PycharmProjects/BiDataProc/hitsCalc/BatchThread.py
# -*- coding=utf-8 -*- import threadpool import time from hitsStatic_byHour import * warnings.filterwarnings("ignore") today = datetime.date.today() yesterday = today - datetime.timedelta(days=1) tomorrow = today + datetime.timedelta(days=1) now_time = time.strftime('%Y-%m-%d %X', time.localtime()) print "当前时间是:",now_time batch_Parameter_list=[(['weijiaLive','/information/729.htm','2017-04-15.09','2017-04-17.09'],None), (['weijiaLive','/m/information/729.htm','2017-04-15.09','2017-04-17.09'],None), (['weijiaLive','/room/1030.htm?_s=zx','2017-04-15.09','2017-04-17.09'],None), (['KPLzhanBao','/information/732.htm','2017-04-15.11','2017-04-17.11'],None), (['KPLzhanBao','/m/information/732.htm','2017-04-15.11','2017-04-17.11'],None), (['KPLzhanBao','/gamezone/video/play/1578286.htm?_s=zx','2017-04-15.11','2017-04-17.11'],None), (['KPLzhanBao','/gamezone/video/play/1579020.htm?_s=zx','2017-04-15.11','2017-04-17.11'],None), (['KPLzhanBao','/gamezone/video/play/1579883.htm?_s=zx','2017-04-15.11','2017-04-17.11'],None), (['KPLzhanBao','/gamezone/video/play/1580903.htm?_s=zx','2017-04-15.11','2017-04-17.11'],None), (['KuPaoChunjiShei','/information/726.htm','2017-04-13.19','2017-04-15.19'],None), (['KuPaoChunjiShei','/m/information/726.htm','2017-04-13.19','2017-04-15.19'],None), (['KuPaoChunjiShei','/room/1036.htm?_s=zx','2017-04-13.19','2017-04-15.19'],None), (['ZheTianJiFaBuHui','/information/728.htm','2017-04-14.10','2017-04-16.10'],None), (['ZheTianJiFaBuHui','/m/information/728.htm','2017-04-14.10','2017-04-16.10'],None), (['ZheTianJiFaBuHui','/room/2115.htm?_s=zx','2017-04-14.10','2017-04-16.10'],None), (['WangZhePaiWei','/information/727.htm','2017-04-14.11','2017-04-17.11'],None), (['WangZhePaiWei','/m/information/727.htm','2017-04-14.11','2017-04-17.11'],None), (['WangZhePaiWei','/gamezone/pvp.htm?_s=zx','2017-04-14.11','2017-04-17.11'],None), (['WangZhePaiWei','/room/26031018.htm?_s=zx','2017-04-14.11','2017-04-17.11'],None), (['WangZhePaiWei','/room/4914467.htm?_s=zx','2017-04-14.11','2017-04-17.11'],None), (['WangZhePaiWei','/room/12612445.htm?_s=zx','2017-04-14.11','2017-04-17.11'],None), (['WangZhePaiWei','/room/3016002.htm?_s=zx','2017-04-14.11','2017-04-17.11'],None), (['WangZhePaiWei','/room/2815598.htm?_s=zx','2017-04-14.11','2017-04-17.11'],None), (['WangZhePaiWei','/room/11473986.htm?_s=zx','2017-04-14.11','2017-04-17.11'],None), (['DaHeTiaoZhanShei','/information/731.htm','2017-04-14.20','2017-04-17.20'],None), (['DaHeTiaoZhanShei','/m/information/731.htm','2017-04-14.20','2017-04-17.20'],None), (['DaHeTiaoZhanShei','/room/87503.htm?_s=zx','2017-04-14.20','2017-04-17.20'],None), (['DaHeTiaoZhanShei','/room/4541796.htm?_s=zx','2017-04-14.20','2017-04-17.20'],None), (['DaHeTiaoZhanShei','/room/2407710.htm?_s=zx','2017-04-14.20','2017-04-17.20'],None), (['DaHeTiaoZhanShei','/room/8807270.htm?_s=zx','2017-04-14.20','2017-04-17.20'],None), (['DaHeTiaoZhanShei','/room/35189.htm?_s=zx','2017-04-14.20','2017-04-17.20'],None), (['DaHeTiaoZhanShei','/room/30952343.htm?_s=zx','2017-04-14.20','2017-04-17.20'],None)] requests = [] request_hitsStatic = threadpool.makeRequests(hitsStatic_byHour, batch_Parameter_list) requests.extend(request_hitsStatic) main_pool = threadpool.ThreadPool(8) [main_pool.putRequest(req) for req in requests] while True: try: time.sleep(30) main_pool.poll() except KeyboardInterrupt: print("**** Interrupted!") break except threadpool.NoResultsPending: break if main_pool.dismissedWorkers: print("Joining all dismissed worker threads...") main_pool.joinAllDismissedWorkers() now_time = time.strftime('%Y-%m-%d %X', time.localtime()) print "当前时间是:",now_time
5、附:每个文件一统计及串行总控处理的代码
5.1、按文件统计后汇总的代码
/Users/nisj/PycharmProjects/BiDataProc/hitsStatic/hitsStatic_byFile.py
# -*- coding=utf-8 -*- import os from DateHourCalc import * warnings.filterwarnings("ignore") def hitsStatic(url, beginDateHour, endDateHour): record_num = 0 for dateHour in dateHourRange(beginDateHour=beginDateHour, endDateHour=endDateHour): date = dateHour[0:10] oss_file_path_list = os.popen("""hadoop dfs -ls /tmp/oss_access/%s/*_localhost_access_log.%s.txt |awk '{print $8}' """ % (date, dateHour)).readlines(); for oss_file_path in oss_file_path_list: oss_file_path = oss_file_path[:-1] hit_num = os.popen("""hadoop dfs -cat {oss_file_path} |grep "{url}" |wc -l""".format(oss_file_path=oss_file_path, url=url)).readlines(); record_num = record_num + int(hit_num[0]) print record_num hitsStatic(url="/information/729.htm", beginDateHour='2017-04-14.03', endDateHour='2017-04-14.06')
5.2、串行总控
/Users/nisj/PycharmProjects/BiDataProc/hitsStatic/hitsStatic_Ctl.py
# -*- coding=utf-8 -*- from hitsStatic_byHour import * from BatchThread import batch_Parameter_list warnings.filterwarnings("ignore") for batch_Parameter in batch_Parameter_list: label = batch_Parameter[0][0] url = batch_Parameter[0][1] beginDateHour = batch_Parameter[0][2] endDateHour = batch_Parameter[0][3] # print label,url,beginDateHour,endDateHour hitsStatic(label=label, url=url, beginDateHour=beginDateHour, endDateHour=endDateHour)
6、总结说明
具体是用按file还是每小时统计汇总,是要根据那就的数据量来的。我这里在生产使用的时候,是用按小时的方式进行并行调度的。
串行总控时,最好将引用的【
from BatchThread import batch_Parameter_list
】与batch_Parameter_list不相关的内容注掉,否则可能会有问题。
日期函数并行多线程调度的时候,考虑其兼容性,要加锁解决。
相关文章推荐
- Python根据AccessLog统计对应Url的点击量2
- 用Shell根据AcessLog统计对应的点击量
- Python的MapReduce调用及多输入文件的使用(统计url的点击量)
- python根据url获取网页内容
- 使用libcurl,根据url下载对应html页面
- Convert URL to image with Python and OpenCV(根据URL下载图片)
- 根据多个索引高效删除python list中对应位置的元素
- python——根据输入显示对应的数字图形
- 用python怎样爬网页呢?其实就是根据URL来获取它的网页信息!
- 用Python将统计数据不存在的记录按维度对应指标补齐
- app根据URL跳转到AppStore相对应的软件
- 用Python编写MapReduce代码与调用-统计accessLog中链接的点击量
- python 根据中文构造url的方法
- 一个简易的python脚本统计nginx日志里的url及大小
- python分析nginx日志根据共性url屏蔽ip
- 用Python将统计数据不存在的记录按维度对应指标补齐(续:日数据情形)
- python 脚本统计squid日志中的IP访问数和URL访问数量
- Java 根据url下载图片 并 保存到对应的本地的新建文件夹中
- syfomy自动根据url中的id找到对应的记录
- python根据url地址下载小文件