您的位置:首页 > 编程语言 > Python开发

Python根据AccessLog统计对应Url的点击量

2017-04-20 20:53 323 查看
1、场景说明
AccessLog已经从阿里Oss上拉取到Hdfs上了,数据是按机器名、日期、小时存放的。每天有上千个文件,500G左右的大小。

单个文件命名如下所示:
/tmp/oss_access/2017-04-16/sz-98-72_localhost_access_log.2017-04-16.08.txt
/tmp/oss_access/2017-04-16/sz-jf-server-tmp-40_localhost_access_log.2017-04-16.21.txt
/tmp/oss_access/2017-04-16/sz-jf-server-tmp-40_localhost_access_log.2017-04-16.15.txt

2、处理日期小时的代码
/Users/nisj/PycharmProjects/BiDataProc/hitsCalc/DateHourCalc.py
# -*- coding=utf-8 -*-
import warnings
import datetime
import threading

warnings.filterwarnings("ignore")

def dateRange(beginDate, endDate):
dates = []
dt = datetime.datetime.strptime(beginDate, "%Y-%m-%d")
date = beginDate[:]
while date <= endDate:
dates.append(date)
dt = dt + datetime.timedelta(1)
date = dt.strftime("%Y-%m-%d")
return dates

c = threading.RLock()
def dateHourRange(beginDateHour, endDateHour):
with c:
dateHours = []
dt = datetime.datetime.strptime(beginDateHour, "%Y-%m-%d.%H")
dateHour = beginDateHour[:]
while dateHour <= endDateHour:
dateHours.append(dateHour)
dt = dt + datetime.timedelta(hours=1)
dateHour = dt.strftime("%Y-%m-%d.%H")
return dateHours

# print dateHourRange(beginDateHour='2017-04-14.03', endDateHour='2017-04-17.11')

3、以小时为单位汇总处理统计的代码
/Users/nisj/PycharmProjects/BiDataProc/hitsCalc/hitsStatic_byHour.py
# -*- coding=utf-8 -*-
import os
from DateHourCalc import *

warnings.filterwarnings("ignore")

def hitsStatic_byHour(label, url, beginDateHour, endDateHour):
record_num = 0
for dateHour in dateHourRange(beginDateHour=beginDateHour, endDateHour=endDateHour):
date = dateHour[0:10]
hit_num = os.popen("""hadoop dfs -cat /tmp/oss_access/{date}/*_localhost_access_log.{dateHour}.txt |grep "{url}" |wc -l""".format(date=date, dateHour=dateHour, url=url)).readlines();
record_num = record_num + int(hit_num[0])

# xx = """hadoop dfs -cat /tmp/oss_access/{date}/*_localhost_access_log.{dateHour}.txt |grep "{url}" |wc -l""".format(date=date, dateHour=dateHour, url=url);
# print xx

os.system("""echo {label}-{url}:{beginDateHour}~{endDateHour}hits_number_is:{record_num} > {label}-{url}:{beginDateHour}~{endDateHour}.txt""".format(label=label, url=url.replace('/', '#'), beginDateHour=beginDateHour, endDateHour=endDateHour, record_num=record_num));

4、多线程调度的代码
/Users/nisj/PycharmProjects/BiDataProc/hitsCalc/BatchThread.py
# -*- coding=utf-8 -*-
import threadpool
import time
from hitsStatic_byHour import *

warnings.filterwarnings("ignore")

today = datetime.date.today()
yesterday = today - datetime.timedelta(days=1)
tomorrow = today + datetime.timedelta(days=1)

now_time = time.strftime('%Y-%m-%d %X', time.localtime())
print "当前时间是:",now_time

batch_Parameter_list=[(['weijiaLive','/information/729.htm','2017-04-15.09','2017-04-17.09'],None),
(['weijiaLive','/m/information/729.htm','2017-04-15.09','2017-04-17.09'],None),
(['weijiaLive','/room/1030.htm?_s=zx','2017-04-15.09','2017-04-17.09'],None),
(['KPLzhanBao','/information/732.htm','2017-04-15.11','2017-04-17.11'],None),
(['KPLzhanBao','/m/information/732.htm','2017-04-15.11','2017-04-17.11'],None),
(['KPLzhanBao','/gamezone/video/play/1578286.htm?_s=zx','2017-04-15.11','2017-04-17.11'],None),
(['KPLzhanBao','/gamezone/video/play/1579020.htm?_s=zx','2017-04-15.11','2017-04-17.11'],None),
(['KPLzhanBao','/gamezone/video/play/1579883.htm?_s=zx','2017-04-15.11','2017-04-17.11'],None),
(['KPLzhanBao','/gamezone/video/play/1580903.htm?_s=zx','2017-04-15.11','2017-04-17.11'],None),
(['KuPaoChunjiShei','/information/726.htm','2017-04-13.19','2017-04-15.19'],None),
(['KuPaoChunjiShei','/m/information/726.htm','2017-04-13.19','2017-04-15.19'],None),
(['KuPaoChunjiShei','/room/1036.htm?_s=zx','2017-04-13.19','2017-04-15.19'],None),
(['ZheTianJiFaBuHui','/information/728.htm','2017-04-14.10','2017-04-16.10'],None),
(['ZheTianJiFaBuHui','/m/information/728.htm','2017-04-14.10','2017-04-16.10'],None),
(['ZheTianJiFaBuHui','/room/2115.htm?_s=zx','2017-04-14.10','2017-04-16.10'],None),
(['WangZhePaiWei','/information/727.htm','2017-04-14.11','2017-04-17.11'],None),
(['WangZhePaiWei','/m/information/727.htm','2017-04-14.11','2017-04-17.11'],None),
(['WangZhePaiWei','/gamezone/pvp.htm?_s=zx','2017-04-14.11','2017-04-17.11'],None),
(['WangZhePaiWei','/room/26031018.htm?_s=zx','2017-04-14.11','2017-04-17.11'],None),
(['WangZhePaiWei','/room/4914467.htm?_s=zx','2017-04-14.11','2017-04-17.11'],None),
(['WangZhePaiWei','/room/12612445.htm?_s=zx','2017-04-14.11','2017-04-17.11'],None),
(['WangZhePaiWei','/room/3016002.htm?_s=zx','2017-04-14.11','2017-04-17.11'],None),
(['WangZhePaiWei','/room/2815598.htm?_s=zx','2017-04-14.11','2017-04-17.11'],None),
(['WangZhePaiWei','/room/11473986.htm?_s=zx','2017-04-14.11','2017-04-17.11'],None),
(['DaHeTiaoZhanShei','/information/731.htm','2017-04-14.20','2017-04-17.20'],None),
(['DaHeTiaoZhanShei','/m/information/731.htm','2017-04-14.20','2017-04-17.20'],None),
(['DaHeTiaoZhanShei','/room/87503.htm?_s=zx','2017-04-14.20','2017-04-17.20'],None),
(['DaHeTiaoZhanShei','/room/4541796.htm?_s=zx','2017-04-14.20','2017-04-17.20'],None),
(['DaHeTiaoZhanShei','/room/2407710.htm?_s=zx','2017-04-14.20','2017-04-17.20'],None),
(['DaHeTiaoZhanShei','/room/8807270.htm?_s=zx','2017-04-14.20','2017-04-17.20'],None),
(['DaHeTiaoZhanShei','/room/35189.htm?_s=zx','2017-04-14.20','2017-04-17.20'],None),
(['DaHeTiaoZhanShei','/room/30952343.htm?_s=zx','2017-04-14.20','2017-04-17.20'],None)]

requests = []
request_hitsStatic = threadpool.makeRequests(hitsStatic_byHour, batch_Parameter_list)
requests.extend(request_hitsStatic)
main_pool = threadpool.ThreadPool(8)
[main_pool.putRequest(req) for req in requests]

while True:
try:
time.sleep(30)
main_pool.poll()
except KeyboardInterrupt:
print("**** Interrupted!")
break
except threadpool.NoResultsPending:
break

if main_pool.dismissedWorkers:
print("Joining all dismissed worker threads...")
main_pool.joinAllDismissedWorkers()

now_time = time.strftime('%Y-%m-%d %X', time.localtime())
print "当前时间是:",now_time

5、附:每个文件一统计及串行总控处理的代码
5.1、按文件统计后汇总的代码
/Users/nisj/PycharmProjects/BiDataProc/hitsStatic/hitsStatic_byFile.py
# -*- coding=utf-8 -*-
import os
from DateHourCalc import *

warnings.filterwarnings("ignore")

def hitsStatic(url, beginDateHour, endDateHour):
record_num = 0
for dateHour in dateHourRange(beginDateHour=beginDateHour, endDateHour=endDateHour):
date = dateHour[0:10]
oss_file_path_list = os.popen("""hadoop dfs -ls /tmp/oss_access/%s/*_localhost_access_log.%s.txt |awk '{print $8}' """ % (date, dateHour)).readlines();
for oss_file_path in oss_file_path_list:
oss_file_path = oss_file_path[:-1]
hit_num = os.popen("""hadoop dfs -cat {oss_file_path} |grep "{url}" |wc -l""".format(oss_file_path=oss_file_path, url=url)).readlines();
record_num = record_num + int(hit_num[0])
print record_num

hitsStatic(url="/information/729.htm", beginDateHour='2017-04-14.03', endDateHour='2017-04-14.06')

5.2、串行总控
/Users/nisj/PycharmProjects/BiDataProc/hitsStatic/hitsStatic_Ctl.py
# -*- coding=utf-8 -*-
from hitsStatic_byHour import *
from BatchThread import batch_Parameter_list

warnings.filterwarnings("ignore")

for batch_Parameter in batch_Parameter_list:
label = batch_Parameter[0][0]
url = batch_Parameter[0][1]
beginDateHour = batch_Parameter[0][2]
endDateHour = batch_Parameter[0][3]
# print label,url,beginDateHour,endDateHour
hitsStatic(label=label, url=url, beginDateHour=beginDateHour, endDateHour=endDateHour)

6、总结说明
具体是用按file还是每小时统计汇总,是要根据那就的数据量来的。我这里在生产使用的时候,是用按小时的方式进行并行调度的。

串行总控时,最好将引用的【
from BatchThread import batch_Parameter_list

】与batch_Parameter_list不相关的内容注掉,否则可能会有问题。
日期函数并行多线程调度的时候,考虑其兼容性,要加锁解决。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: