您的位置:首页 > 编程语言 > Python开发

【Python】进程和线程

2017-04-13 19:20 309 查看
多进程

多线程

ThreadLocal

进程vs线程

分布式进程

Top

学习廖老师的py官网的笔记

多任务的实现方式有三种方式:

1、多进程

2、多线程

3、多进程+多线程(这种比较复杂,实际很少采用)

【多进程】

1、在mac中创建子进程使用Python封装的fork()系统调用。

import os

pid = os.fork()


pid



2、在windows上的实现。

使用multiprocessing模块的Process类。

为了观察,还加了一小段代码~

# -*- coding: utf-8 -*-

from multiprocessing import Process
import os

# 这里是子进程要执行的代码
def run_proc(name):
print('Run child process %s (pid=%s)...' % (name, os.getpid()))

if __name__=='__main__':
print('Parent process %s.' % os.getpid())
greeting = input('hi~')
print(greeting)
p = Process(target=run_proc, args=('MyTest',))
print('Child process will start.')
p.start()
p.join()
print('Child process end.')


然后就观察到:





结束:

$python test.py
Parent process 1388.
hi~ hi!
hi!
Child process will start.
Run child process MyTest (pid=12680)...
Child process end.


join用于进程间的同步,子进程结束后继续往下执行。

3、有时候需要创建大量的进程,(比如应对多用户访问)这个时候就需要使用“进程池”。

#-*- coding: utf-8 -*-

from multiprocessing import Pool
import os, time, random

def long_time_task(name):
print('run task %s (%s)...' % (name, os.getpid()))
start = time.time()
time.sleep(random.random() * 3) # teme.sleep(sec)\random.random生成一个0<=x<1的数
end = time.time()
print('Task %s run %.2f seconds.' % (name, (end - start)))

if __name__=='__main__':
print('Parent process %s' % os.getpid())
p = Pool(4)
for i in range(5):
p.apply_async(long_time_task, args=(i,)) # 初始化子进程:这里的意思应该是将任务分配给进程并添加参数
print('Waiting all process Done!.  ..')
p.close() # 关闭进程池:调用之后就不能再添加新进程了
p.join() # 父进程等待:等待所有子进程执行完毕
print('ALL Done.')


Parent process 8936
Waiting all process Done!.  ..
run task 0 (5608)...
run task 1 (11904)...
run task 2 (8212)...
run task 3 (7492)...
Task 0 run 0.86 seconds.
run task 4 (5608)...
Task 4 run 0.85 seconds.
Task 1 run 1.81 seconds.
Task 2 run 2.53 seconds.
Task 3 run 2.88 seconds.
ALL Done.


在join之后宏观来说各个进程应该是并发的,但是task4却在task0执行完之后才开始run。

原因是pool中设置了最大运行并行的进程个数。

p = Pool(4) # 这里设置的是可以同时并发的数量
for i in range(8): # 真正添加的进程数量


run一下:

Parent process 7592
Waiting all process Done!.  ..
run task 0 (4596)...
run task 1 (12712)...
run task 2 (9460)...
run task 3 (7308)...
Task 0 run 1.20 seconds.
run task 4 (4596)... # 因为已经有 4 个进程在并行了,因此 task 4需要等待一个进程结束才开始运行,后面的task也是同理,结束一个,加入一个。
Task 3 run 1.23 seconds.
run task 5 (7308)...
Task 4 run 1.26 seconds.
run task 6 (4596)...
Task 1 run 2.46 seconds.
run task 7 (12712)...
Task 2 run 2.63 seconds.
Task 5 run 1.63 seconds.
Task 6 run 0.95 seconds.
Task 7 run 1.92 seconds.
ALL Done.


p = Pool() # 不添加参数就默认为电脑的核数


4、子进程。

”很多时候,子进程并不是自身,而是一个外部进程。“这里看不懂。

#-*- coding: utf-8 -*-
import subprocess

print('$ nslookup www.python.org')
r = subprocess.call(['nslookup', 'www.python.org'])
print('Exit code:', r)


D:\labs>python test.py
$ nslookup www.python.org
DNS request timed out.
timeout was 2 seconds.
服务器:  UnKnown
Address:  218.85.157.99

非权威应答:
DNS request timed out.
timeout was 2 seconds.
名称:    python.map.fastly.net
Address:  151.101.72.223
Aliases:  www.python.org

Exit code: 0


稍微修改一下,改成编译当前文件夹下的一个.cpp文件:

#-*- coding: utf-8 -*-
import subprocess

print('$ g++ prog1.cpp -std=c++11')
r = subprocess.call(['g++', 'prog1.cpp', 'prog1.cpp'])
print('Exit code:', r)


效果和直接在命令行下输入命令是一样的。(.bat)

如果子进程还需要输入,则可以通过
communicate()
方法输入。

#-*- coding: utf-8 -*-
import subprocess

print('subprocess $ python')
p = subprocess.Popen(['python'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(b'a = 1997\nprint(a)\n')
print('output:' + output.decode('utf-8'))
print('Exit code:', p.returncode)


D:\labs>python test.py
subprocess $ python
output:1997

Exit code: 0


5、进程间的通信。

#-*- coding: utf-8 -*-
# 进程间的通信,queue实现示例。
from multiprocessing import Process, Queue
import os, time, random

# 这段代码由写数据的进程执行
def write(q):
print('Process to write: %s' % os.getpid())
for value in ['A', 'B', 'C']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(random.random())

# 这段代码由读数据的进程执行
def read(q):
print('Process to read: %s' % os.getpid())
while True:
value = q.get(True)
print('Get %s from queue.' % value)

if __name__ == '__main__':
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))

pw.start()
pr.start()
pw.join()
pr.terminate() # pr默认无限循环,这里一旦pw结束就关掉pr


可以把含有read方法的进程看作服务器,把含有write看作客户端程序。。。q当作存放socket的队列。。

$ python test.py
Process to write: 27524
Put A to queue...
Process to read: 25572
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.


【多线程】

1、Python中的线程模块有_thread和threading。_thread是threading的底层,通常使用threading就可以了。

2、线程的接口和进程的接口差不多。

任何进程默认启动一个线程。这个默认的线程叫做:MainThread

#-*- coding: utf-8 -*-

import time, threading

print('current_thread is %s' % threading.current_thread().name)


$ python test.py
current_thread is MainThread


MainThread又可以创建新的线程,方式和创建进程类似:

#-*- coding: utf-8 -*-

import time, threading

def f():
print('current_thread is %s' % threading.current_thread().name)

print('current_thread is %s' % threading.current_thread().name)
t = threading.Thread(target=f, name='>_<')
t.start();
t.join();
print('current_thread is %s' % threading.current_thread().name)


$ python test.py
current_thread is MainThread

D:\labs>python test.py
current_thread is MainThread
current_thread is >_<
current_thread is MainThread


默认的线程名为
Thread-1
Thread-2


3、多线程和多进程的不同在于,多进程中的每一个进程对程序中的变量都有“副本”,各自互不影响。

而多线程中的各个线程却“共享”变量。

尝试一下,线程的情况:

#-*- coding: utf-8 -*-

import time, threading

def f():
global data # 需要先声明一下
data = data + 5
print(data)

data = 30

t = threading.Thread(target=f, name='>_<')
t2 = threading.Thread(target=f, name='( >_ <)')
t.start();
t2.start();
t.join();
t2.join();


最后输出的是40.

4、Lock.

#-*- coding: utf-8 -*-

import threading, time

lock1 = threading.Lock() # 创建锁

def task1():
lock1.acquire() # 获取锁
try:
for i in range(10):
print('execute task1...')
finally:
lock1.release() # 释放锁

def task2():
lock1.acquire() # 获取锁
try:
for i in range(10):
print('execute task2...')
finally:
lock1.release() # 释放锁

t1 = threading.Thread(target=task1)
t2 = threading.Thread(target=task2)
t1.start()
t2.start()
t1.join()
t2.join()


5、GIL锁。

历史遗留问题,可以通过执行多个py程序来跑满多核cpu。(或者)

【ThreadLocal】

参考:http://python.jobbole.com/86150/

有时候使用局部变量不太方便,所以就引入了threadlocal变量。这个变量实质是一个全局的dict,但是却提供储存局部数据的功能。

怎么创建、访问threadlocal储存的局部数据呢?

#-*- coding: utf-8 -*-

import threading, time

global_data = threading.local() # 创建

# 每个线程都可以通过 global_data.num 获得自己独有的数据,并且每个线程读取到的 global_data 都不同
def show():
print(threading.current_thread().getName(), global_data.num)

def thread_cal():
global_data.num = 0
for i in range(1000):
global_data.num += 1
show()

threads = []
for i in range(10):
threads.append(threading.Thread(target=thread_cal))
threads[i].start()
for i in range(10):
threads[i].join()


【进程vs线程】

相对而言,多进程稳定,但是消耗资源大,多线程不稳定,但是消耗资源小。

【分布式进程】

参考代码来源

# -*- coding: utf-8 -*-

import queue

from multiprocessing.managers import BaseManager

task_queue = queue.Queue()
result_queue = queue.Queue()

class QueueManager(BaseManager):
pass

def return_task_queue():
return task_queue

def return_result_queue():
return result_queue

def test():
QueueManager.register('get_task_queue', callable=return_task_queue)
QueueManager.register('get_result_queue', callable=return_result_queue)

manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc')

manager.start()

task = manager.get_task_queue()
result = manager.get_result_queue()

for i in range(10):
task.put(i)

for i in range(10):
print(result.get())

manager.shutdown()

if __name__  == '__main__':
test()


自己写的死活运行不出来。。。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: