您的位置:首页 > 编程语言 > Python开发

python多线程、多进程

2015-06-16 09:35 645 查看
  最近在项目中有一个需求是没五分钟爬虫抓取一批网上数据,然后实时的将数据更新到mysql和redis中存储,在存储数据时,需要对数据做一些标准化的操作,甚至还需要根据历史数据对新数据某些缺失字段进行融合操作,往往在数据量比较大时,会出现五分钟内无法处理完parser产生的数据,导致数据的堆积。

数据处理逻辑中,其实是可以根据某些特性将全量数据划分为独立的规模较小的数据子集合,每个子集互相独立,可独立进行处理,在更新数据时,不需要用到其他子集的数据。基于该特性以及多核服务器的特性,多线程(多进程)就成为一个加快数据更新速度的较好方案。

我们的整个数据更新服务都是使用python实现的。

在实现对数据进行并行处理时,首先想到的是利用多线程,将全量数据进行划分后,对每个子集数据启动一个子线程单独处理,待所有子线程都处理完各自的子集数据时,父进程在进行剩余的操作。

1、使用threading实现多线程

python中实现多线程可以使用thread或者是threading,官方文档是推荐threading模块实现的。threading实现多线程很简单。

        整个实现的逻辑如下所示,其中具体的操作使用注释代替,主要是描述多线程逻辑。

<span style="font-size:14px;"><span style="font-family:KaiTi_GB2312;font-size:14px;">import time, threading

def sub_func(sub_data,result):
print 'begin deal data...'
/**** do something*****/
result = /**deal with data***/
print 'finish thread'

if __name__ == '__main__':
data_list = [data1,data2]
result_list = [result_1,result_2]

threads = []
for idx in range(len(data_list)):
threads.append(threading.Thread(target=sub_func,args = (data_list[idx],result_list[idx]))

for cand_thread in threads:
cand_thread.start()

for cand_thread in thread:
cand_thread.join()

/***do other things to finsh th updaing***/
 </span><span style="font-family: KaiTi_GB2312;font-size:14px;">do_other()     </span></span>


在上面的多线程代码中,启动每个子线程处理单独的数据子集,父进程阻塞,等待所有子线程结束,然后继续执行do_other函数,执行剩余的操作。

        完成上述代码后,分别测试单线程和多线程之间的性能差异,发现一个比较奇怪的现象,同样大小的数据,单线程消耗的时间总是比多线程要少,而且使用多线程的时候,服务器的CPU只有其中一个的使用率会超过100%,其他CPU使用率非常低。这说明多线程代码根本没用充分地利用多服务器,而是在同一个服务器上实现了多线程。

       上网google了一下(当然需要自备梯子~~),才发现原来python的多线程和C/C++的多线性是不一样的,根本原因在于Python中的Global Interpreter Lock。

在Python 多线程当中,存在一个叫Global Interpreter Lock(GIL)的东西,直译就是全局解释器锁。它的作用在于让同一时刻只能有一个线程对于python对象进行操作。Python已经提供了各种机制让我们进行多线程同步,为什么又要整这个GIL呢?这是因为程序员控制的同步是对各个程序中可见的变量,而GIL同步的是解释器后台的不可见变量,比如为了进行垃圾回收而维护的引用计数。如果没有GIL,有可能出现由于线程切换导致的对同一个对象释放两次的情况。

因此,任何一个CPython线程如果要执行,就必须先获取这个GIL。后果?就是在CPython中,本质上几乎是没有线程并行的,不论你开多少个线程,同一时刻只有获取GIL的那个线程能够执行。

当然也可以避免这个问题,那就需要使用一些其他的包了,否者如果是存的python代码,使用CPython,那么这个问题很难避免。

由于不想引入其他的包,因此想到使用多进程实现,因为每一个进程是独立拥有自己的GIL的,这就避免了上面描述的问题,从而可以实现多核处理器的使用。

2、python多进程

在多进程中,和线程不同的时,每个子进程拥有自己独立的数据空间,这样就不能想多线程一样让每个每个子进程将运行结果返回给父进程。这就需要实现进程间的通信。

python中进程间通信通常使用两类:Queue和Pipe。Queue和Pipe的使用可直接查看官方文档:https://docs.python.org/2/library/multiprocessing.html。

我选择了较简单地queue来实现进程间通信,注意这里用的是multiprocessing.Queue,而不是Queue,multiprocessing.Queue是线程/进程安全的。

在实现中,每个子进程将运行的结果put到queue中,父进程从queue中get结果进程后续处理。一切看起来都perfect。

Talk
is cheap, show me the code~~

<span style="font-size:14px;">import multiprocessing

def sub_func(sub_data,process_queue):
print 'begin deal data...'
/**** do something*****/
result = /**deal with data***/
process_queue.put(result)
print 'finish thread'

if __name__ == '__main__':
data_list = [data1,data2]
result_list = [result_1,result_2]

process_queue = multiprocessing.Queue();
process_list = []
for idx in range(len(data_list)):
cand_process = multiprocessing.Process(target=sub_func,args = (data_list[idx],process_queue))
cand_process.start()
process_list.append(cand_process)

for cand_process in process_list:
cand_process.join()

while False == process_queue.empty():
cand_result = process_queue.get()
do_other(cand_result)
</span>


事情看起来似乎马上就over了,就快要下班回去陪女票了。接下来运行代码,比较一下性能,见证奇迹的时刻就要到了,

可是,尼玛运行了10分钟了,开了三个子进程,加上父进程,四个进程没有一个结束的,每个进程的CPU使用率都接近0。我擦 ,一定是我打开的方式不对,各种看文档,没发现错误啊,文档就是那么写的,示例代码也是这样啊,,,,

抓狂,看来今晚陪女票的计划泡汤了,改为加完班回去跪主板吧!!!

官方文档没找到解决方案,好吧,上万能的google,输入python multiprocessing queue deadlock,我擦,奇迹出现了,我不是一个人在战斗,这是什么节奏。

看了一下网上大家遇到的问题,queue出现deadlock都是因为对queue存在get、set操作,当get操作出现queue为空时,queue阻塞,此时queue的set操作也会阻塞,从而造成死锁。可是我的代码里不是这样的逻辑,在对queue进行get之前,调用join保证所有子进程都结束了。可是仍然出现了死锁,加上各种log,从log中发现,调用第一次join后,父进程就死锁了,这是为什么呢。

查看join函数的文档:

join([timeout])

Block the calling thread until the process whose join() method
is called terminates or until the optional timeout occurs.

If timeout is None then there is no timeout.

A process can be joined many times.

A process cannot join itself because this would cause a deadlock. It is an error to attempt to join a process before it has been started.

也就是说,当调用join的时候,必须保持子进程当前没有结束,否者子进程会出现死锁。

解决方法:

使用不断查询queue的返回结果数量,来判断子进程是否都结束了。当子进程都结束,在处理接下来的逻辑:

<span style="font-size:14px;">import multiprocessing

def sub_func(sub_data,process_queue):
print 'begin deal data...'
/**** do something*****/
result = /**deal with data***/
process_queue.put(result)
print 'finish thread'

if __name__ == '__main__':
data_list = [data1,data2]
result_list = [result_1,result_2]

process_queue = multiprocessing.Queue();
process_list = []
for idx in range(len(data_list)):
cand_process = multiprocessing.Process(target=sub_func,args = (data_list[idx],process_queue))
cand_process.start()
process_list.append(cand_process)

result_received = []
while len(result_received) < process_count:
result_received.append(process_queue.get())
sleep(3)

for cand_result in result_received:
do_other(cand_data)</span>

总感觉还有更加优美的处理方法,等抽空研究了在更新~~
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: