您的位置:首页 > 编程语言 > Python开发

python实现指定目录下批量文件的单词计数:并发版本

2014-10-09 19:54 1121 查看
在 文章 《python实现指定目录下批量文件的单词计数:串行版本》中, 总体思路是: A. 一次性获取指定目录下的所有符合条件的文件 -> B. 一次性获取所有文件的所有文件行
-> C. 解析所有文件行的单词计数 -> D. 按单词出现次数排序并输出TOPN。 A,B,C,D 是完全串行的

本文实现 并发版本。 并发版本的主要思路是: A. 每次获取一个符合条件的文件 -> B. 获取单个文件的所有文件行 -> C. 解析单个文件的所有单词计数 -> D. 聚合所有单词计数并排序,输出 TOPN。 其中 A,B,C 是并发的,D 如果能够做到动态排序, 后续也可以改成并发的。

一、 线程化改造

首先对串行版本进行线程化改造。 将原来普通类的功能变成线程, 普通类之间的传值调用变为通过队列来传送。代码如下:

#-------------------------------------------------------------------------------
# Name:        wordstat_threading.py
# Purpose:     statistic words in java files of given directory by threading
#
# Author:      qin.shuq
#
# Created:     09/10/2014
# Copyright:   (c) qin.shuq 2014
# Licence:     <your licence>
#-------------------------------------------------------------------------------

import re
import os
import time
import logging
import threading, Queue

LOG_LEVELS = {
'DEBUG': logging.DEBUG, 'INFO': logging.INFO,
'WARN': logging.WARNING, 'ERROR': logging.ERROR,
'CRITICAL': logging.CRITICAL
}

def initlog(filename) :

logger = logging.getLogger()
hdlr = logging.FileHandler(filename)
formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s")
hdlr.setFormatter(formatter)
logger.addHandler(hdlr)
logger.setLevel(LOG_LEVELS['INFO'])

return logger

errlog = initlog("error.log")
infolog = initlog("info.log")

timeoutInSecs = 0.05

class FileObtainer(threading.Thread):

def __init__(self, dirpath, qOut, threadID, fileFilterFunc=None):
threading.Thread.__init__(self)
self.dirpath = dirpath
self.fileFilterFunc = fileFilterFunc
self.qOut = qOut
self.threadID = threadID
infolog.info('FileObtainer Initialized')

def obtainFile(self, path):
fileOrDirs = os.listdir(path)
if len(fileOrDirs) == 0:
return

for name in fileOrDirs:
fullPath = path + '/' + name
if os.path.isfile(fullPath):
if self.fileFilterFunc is None:
self.qOut.put(fullPath)
elif self.fileFilterFunc(fullPath):
self.qOut.put(fullPath)
elif os.path.isdir(fullPath):
self.obtainFile(fullPath)

def run(self):
print threading.currentThread()
starttime = time.time()
self.obtainFile(self.dirpath)
endtime = time.time()
print 'ObtainFile cost: ', (endtime-starttime)*1000 , 'ms'

class WordReading(threading.Thread):

def __init__(self, qIn, qOut, threadID):
threading.Thread.__init__(self)
self.qIn = qIn
self.qOut = qOut
self.threadID = threadID
infolog.info('WordReading Initialized')

def readFileInternal(self):
lines = []
try:
filename = self.qIn.get(True, timeoutInSecs)
#print filename
except Queue.Empty, emp:
errlog.error('In WordReading:' + str(emp))
return None

try:
f = open(filename, 'r')
lines = f.readlines()
infolog.info('[successful read file %s]\n' % filename)
f.close()
except IOError, err:
errorInfo = 'file %s Not found \n' % filename
errlog.error(errorInfo)
return lines

def run(self):
print threading.currentThread()
starttime = time.time()
while True:
lines = self.readFileInternal()
if lines is None:
break
self.qOut.put(lines)
endtime = time.time()
print 'WordReading cost: ', (endtime-starttime)*1000 , 'ms'

class WordAnalyzing(threading.Thread):
'''
return Map<Word, count>  the occurrence times of each word
'''
wordRegex = re.compile("[\w]+")

def __init__(self, qIn, threadID):
threading.Thread.__init__(self)
self.qIn = qIn
self.threadID = threadID
self.result = {}
infolog.info('WordAnalyzing Initialized')

def run(self):
print threading.currentThread()
starttime = time.time()
lines = []
while True:
try:
start = time.time()
lines = self.qIn.get(True, timeoutInSecs)
except Queue.Empty, emp:
errlog.error('In WordReading:' + str(emp))
break

linesContent = ''.join(lines)
matches = WordAnalyzing.wordRegex.findall(linesContent)
if matches:
for word in matches:
if self.result.get(word) is None:
self.result[word] = 0
self.result[word] += 1

endtime = time.time()
print 'WordAnalyzing analyze cost: ', (endtime-starttime)*1000 , 'ms'

def obtainResult(self):
return self.result;

class PostProcessing(object):

def __init__(self, resultMap):
self.resultMap = resultMap

def sortByValue(self):
return sorted(self.resultMap.items(),key=lambda e:e[1], reverse=True)

def obtainTopN(self, topN):
sortedResult = self.sortByValue()
sortedNum = len(sortedResult)
topN = sortedNum if topN > sortedNum else topN
for i in range(topN):
topi = sortedResult[i]
print topi[0], ' counts: ', topi[1]

if __name__ == "__main__":

dirpath = "c:\\Users\\qin.shuq\\Desktop\\region_master\\src"
if not os.path.exists(dirpath):
print 'dir %s not found.' % dirpath
exit(1)

qFile = Queue.Queue()
qLines = Queue.Queue()

fileObtainer = FileObtainer(dirpath, qFile, "Thread-FileObtainer", lambda f: f.endswith('.java'))
wr = WordReading(qFile, qLines, "Thread-WordReading")
wa = WordAnalyzing(qLines, "Thread-WordAnalyzing")

fileObtainer.start()
wr.start()
wa.start()

wa.join()

starttime = time.time()
postproc = PostProcessing(wa.obtainResult())
postproc.obtainTopN(30)
endtime = time.time()
print 'PostProcessing cost: ', (endtime-starttime)*1000 , 'ms'

print 'exit the program.'


测量时间:

$ time python wordstat_serial.py
ObtainFile cost:  92.0000076294 ms
WordReading cost:  504.00018692 ms
WordAnalyzing cost:  349.999904633 ms
PostProcessing cost:  16.0000324249 ms
real    0m1.100s
user    0m0.000s
sys     0m0.046s

$ time python wordstat_threading.py
ObtainFile cost:  402.99987793 ms
WordReading cost:  1477.99992561 ms
WordAnalyzing analyze cost:  1528.00011635 ms
PostProcessing cost:  16.0000324249 ms

real    0m1.690s
user    0m0.000s
sys     0m0.046s


从时间测量的结果来看,并发版本甚至还不如串行版本, 这主要是读取文件还是单线程的, 同时队列之间传送消息是阻塞式的,会耗费一定时间。此外, 并发版本尚未使用到多核优势, 也是后续改进点。

注意到 WordAnalyzing 与 WordReading 所耗费的时间很接近,这表明两者是并发执行的 。 PostProcessing 耗费时间几乎可以忽略, 暂不做优化。 下一步优化工作是 ObtainFile 和 WordReading.

二、 使用多线程和多进程优化

1. 由于 Queue.put 会耗费一定时间(平均1ms左右),因此, 大量文件名称的put必定耗费很多不必要的时间, 改进版本中使用文件列表,减少put次数;

2. WordReading 采用多线程来读取大量文件;

3. WordAnalyzing 采用多进程来进行单词计数。

经过优化后的 WordReading 和 WordAnalyzing 耗费时间基本上与串行版本相同。 瓶颈在 FileObtainer
上。 目前对os.walk, for 循环进行了测量, 但测量时间总是远小于ObtainFile cost, 尚没有找出究竟耗费时间在哪里了。

#-------------------------------------------------------------------------------
# Name:        wordstat_threading_improved.py
# Purpose:     statistic words in java files of given directory by threading
#              improved
#
# Author:      qin.shuq
#
# Created:     09/10/2014
# Copyright:   (c) qin.shuq 2014
# Licence:     <your licence>
#-------------------------------------------------------------------------------

import re
import os
import time
import logging
import threading, Queue
from multiprocessing import Process, Pool, cpu_count

LOG_LEVELS = {
'DEBUG': logging.DEBUG, 'INFO': logging.INFO,
'WARN': logging.WARNING, 'ERROR': logging.ERROR,
'CRITICAL': logging.CRITICAL
}

def initlog(filename) :

logger = logging.getLogger()
hdlr = logging.FileHandler(filename)
formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s")
hdlr.setFormatter(formatter)
logger.addHandler(hdlr)
logger.setLevel(LOG_LEVELS['INFO'])

return logger

errlog = initlog("error.log")
infolog = initlog("info.log")

timeoutInSecs = 0.1

class FileObtainer(threading.Thread):

def __init__(self, dirpath, qOut, threadID, fileFilterFunc=None):
threading.Thread.__init__(self)
self.dirpath = dirpath
self.fileFilterFunc = fileFilterFunc
self.qOut = qOut
self.threadID = threadID
infolog.info('FileObtainer Initialized')

def run(self):
print threading.currentThread()
starttime = time.time()

for path, dirs, filenames in os.walk(self.dirpath):
if len(filenames) > 0:
files = []
for filename in filenames:
start = time.time()
fullPath = path+'/'+filename
files.append(fullPath)
end = time.time()

if self.fileFilterFunc is None:
self.qOut.put_nowait(files)
else:
fileterFiles = filter(self.fileFilterFunc, files)
if len(fileterFiles) > 0:
self.qOut.put_nowait(fileterFiles)

endtime = time.time()
print 'ObtainFile cost: ', (endtime-starttime)*1000 , 'ms'

def readFile(filename, qOut):
try:
f = open(filename, 'r')
lines = f.readlines()
infolog.info('[successful read file %s]\n' % filename)
f.close()
except IOError, err:
errorInfo = 'file %s Not found \n' % filename
errlog.error(errorInfo)
qOut.put(lines)

class WordReading(threading.Thread):

def __init__(self, qIn, qOut, threadID):
threading.Thread.__init__(self)
self.qIn = qIn
self.qOut = qOut
self.threadID = threadID
self.threads = []
infolog.info('WordReading Initialized')

def readFileInternal(self):
try:
filelist = self.qIn.get(True, timeoutInSecs)
for filename in filelist:
t = threading.Thread(target=readFile, args=(filename, self.qOut), name=self.threadID+'-'+filename)
t.start()
self.threads.append(t)
return []
except Queue.Empty, emp:
errlog.error('In WordReading:' + str(emp))
return None

def run(self):
print threading.currentThread()
starttime = time.time()
while True:
lines = self.readFileInternal()
if lines is None:
break

for t in self.threads:
t.join()

endtime = time.time()
print 'WordReading cost: ', (endtime-starttime)*1000 , 'ms'

def processLines(lines):
result = {}
linesContent = ''.join(lines)
matches = WordAnalyzing.wordRegex.findall(linesContent)
if matches:
for word in matches:
if result.get(word) is None:
result[word] = 0
result[word] += 1
return result

def mergeToSrcMap(srcMap, destMap):
for key, value in destMap.iteritems():
if srcMap.get(key):
srcMap[key] = srcMap.get(key)+destMap.get(key)
else:
srcMap[key] = destMap.get(key)
return srcMap

class WordAnalyzing(threading.Thread):
'''
return Map<Word, count>  the occurrence times of each word
'''
wordRegex = re.compile("[\w]+")

def __init__(self, qIn, threadID):
threading.Thread.__init__(self)
self.qIn = qIn
self.threadID = threadID
self.resultMap = {}
self.pool = Pool(cpu_count())
infolog.info('WordAnalyzing Initialized')

def run(self):
print threading.currentThread()
starttime = time.time()
lines = []
futureResult = []
while True:
try:
lines = self.qIn.get(True, timeoutInSecs)
futureResult.append(self.pool.apply_async(processLines, args=(lines,)))
except Queue.Empty, emp:
errlog.error('In WordReading:' + str(emp))
break

self.pool.close()
self.pool.join()

resultMap = {}
for res in futureResult:
mergeToSrcMap(self.resultMap, res.get())
endtime = time.time()
print 'WordAnalyzing analyze cost: ', (endtime-starttime)*1000 , 'ms'

def obtainResult(self):
#print len(self.resultMap)
return self.resultMap

class PostProcessing(object):

def __init__(self, resultMap):
self.resultMap = resultMap

def sortByValue(self):
return sorted(self.resultMap.items(),key=lambda e:e[1], reverse=True)

def obtainTopN(self, topN):
sortedResult = self.sortByValue()
sortedNum = len(sortedResult)
topN = sortedNum if topN > sortedNum else topN
for i in range(topN):
topi = sortedResult[i]
print topi[0], ' counts: ', topi[1]

if __name__ == "__main__":

dirpath = "E:\\workspace\\java\\javastudy\\src"
if not os.path.exists(dirpath):
print 'dir %s not found.' % dirpath
exit(1)

qFile = Queue.Queue()
qLines = Queue.Queue()

fileObtainer = FileObtainer(dirpath, qFile, "Thread-FileObtainer", lambda f: f.endswith('.java'))
wr = WordReading(qFile, qLines, "Thread-WordReading")
wa = WordAnalyzing(qLines, "Thread-WordAnalyzing")

fileObtainer.start()
wr.start()
wa.start()

wa.join()

starttime = time.time()
postproc = PostProcessing(wa.obtainResult())
postproc.obtainTopN(30)
endtime = time.time()
print 'PostProcessing cost: ', (endtime-starttime)*1000 , 'ms'

print 'exit the program.'


【未完待续】
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: