您的位置:首页 > 编程语言 > Python开发

python多进程笔记4 - 进程间通信总结,Python中线程存在的问题

2017-11-09 15:32 1016 查看
20171108 - thread - day4

=====================================================================

知识点:

1.阻塞与非阻塞:

一直等就是阻塞,干其他事就是非阻塞;

当我们说阻塞与非阻塞的时候,指的是一个进程的行为,相对于我们的部门做好自己的事情,不需要外部干涉

2.异步与同步:

当我们说异步和同步的时候,是指至少两个进程的行为;

这时需要在两个部门之间协调;同步的执行流程相对可控,异步执行流程不可控(不知道京东什么时候来通知我),通常来说效率更高

3.run() 方法 继续调用进程,面向对象方法

4.python的multiprocessing共享内存已经是加锁的

5.僵尸进程导致资源泄露

=====================================================================

1.堆栈(栈 stack)

LIFO(Last In First Out) 后进先出

2.队列总结

queue PriorityQueue      优先队列

multiprocessing.Queue    进程间通信时进程间传递的数据

multiprocessing.Manger.Queue    用以进程池的进程间传递数据

3.进程池总结

进程池中的进程和我们自己创建的进程相比:

进程池中,我们不需要关心分配任务给进程池的策略,而自己创建的进程则需要关心细节;

对于进程池而言,python提供了一些配套措施,比如对于队列的管理:

multiprocessing.Manger.Queue    (实现进程池中进程间的数据通信-变量修改)

4.进程与线程的联系与区别:

区别:

    进程是cpu分配资源的单位;

    线程是cpu执行的基础单元,是轻量级的进程

联系:

    线程是进程的执行单元,一个进程可以有多个线程;

    进程具有的动态含义,是通过线程来体现的

5.变量作用于范围的问题(进程或者线程中的情况,见当日代码文件)

6.Thread模块(线程模块)

7.哈希:对应映射关系,为数据文件打指纹(表示其唯一性),md5的碰撞

os.walk (见当日代码)

8.Python中线程的问题

GIL(Global Interperter Lock) 全局解释锁;

Cpython(C),Jpython(Java),Ironython(.net),Pypy(Python)

并行:真正的多核,多个进程同时运行

并发:虚假的并行,实际上仍然是单核运行

9.Python中线程存在的问题:全局解释锁(GIL,globle interter lock)

在一个python编译环境中,有一把全局解释锁固定解释环境,一次性实际上只有一个进程在运行。

现象:

    写一个死循环并执行,cpu占有率立刻变为接近百分之百

原因:

     由于历史的原因,Cpython(C)天生有一个缺陷:Python的多线程并不是真正的多线程

案例:

     切换虚拟机的核数看代码实验(死循环,占用cpu资源);在真正的并行中它不是真正的并行

进程线程使用总结:

我们在用python(CPython)编程时,什么时候用多线程,什么时候用多进程:

(1)在CPU计算密集情况下,使用多进程;对于Python的每一个进程,Python会单独起一个解释器;

(2)在I/O密集(读写密集,时间片会被挂起,进入阻塞,将时间片给了其他的进程去执行)的情况下,使用多线程;能用进程池使用进程池;通过实践检验到底需要几个核最佳(默认情况下会通过api接口自动获取核的数量来填充这个数)

突破GIL全局解释锁:

    可以尝试用C语言来突破;

    用c重新写一个库(死循环),将其放在与文件(用python写的)相同的路径,在python文件中创建一个线程(只在线程中会出现全局解释锁),通过调用那个自行写的c库来实现使用多个核。

在讲全局解释锁之前,我们需要先明白 线程 进程的概念。 

在进程中有一个或者多个线程。每个进程具有自己的地址空间,内存,数据栈及其它数据。线程有开始、顺序执行和结束部分。线程是cpu调动的,没有自己的资源,所有线程都共享同一进程中的资源。

线程锁: 

CPU执行任务时,在线程之间是进行随机调度的,并且每个线程可能只执行n条代码后就转而执行另外一条线程。由于在一个进程中的多个线程之间是共享资源和数据的,这就容易造成资源抢夺或脏数据,于是就有了锁的概念,限制某一时刻只有一个线程能访问某个指定的数据。

为了解决不同线程同时访问统一资源时,数据保护问题,而产生了GIL. 

python因为GIL的问题广受诟病,因为它在解释器的层面限制了程序在同一时间只有一个线程被CPU实际执行,而不管你的程序里实际开了多少条线程。所以我们经常能发现,python中的多线程编程有时候效率还不如单线程,就是因为这个原因。

所以在python中并不推荐多线程,而是推荐多进程。 

另外 计算密集型推荐多进程,i/o密集型推荐多线程。也是因为防止同一资源占用情况。

进程间通信总结:

(1)在多个进程中传递数据一般而言用队列就可以了,如果要快速高效的话,可以考虑共享内存;

(2)互斥量Lock主要用于在多个进程间加锁;

(3)在多个进程中传递信号可以用信号量Semaphore,事件Event;

(4)互斥量可以看做是特殊的信号量,是一个消费者只有一个物品的信号量;

(5)不同的进程通信方式有不同的优点和缺点,因此,对于不同的应用问题,要根据问题本身情况选择进程间通信的方式;

(6)为了高效:可以考虑:共享内存+信号量

=====================================================================

作业1:

用多线程的方式实现生产者消费者模型;(队列;将进程换为线程;manager队列?)

进程的三态,五态,相互之间怎么切换?

什么是死锁?举例死锁的例子,怎么避免死锁的例子?

进程和线程的区别?

多线程同步与互斥有几种实现方法?是哪些?

python的多进程,多线程分别适合与什么场景下使用?爬虫适合使用多进程还是多线程?

作业2:

网络下载问题:

1.获取文件源地址,网络数据传输readFromInternet;

保存数据到本地,writeToDisk;

实现多线程的下载;

问:需要哪些变量,函数;需要哪些线程间通信的方式

g_buffer[MAXSIZE];

readThread,writeThread;

进程间通信

互斥量:控制银行的存取钱,一个做另一个不能做;一个读,另一个就不能写

推荐用信号量

信号量:阿姨煮咖啡,如果启用两个进程,并将信号量设置为2,那么表示两个进程可以同时进行,一个读信息,一个写信息

作业3:

通过一个进程池实现——文件夹下大量文件拷贝;至少500个文件,有方法证明你的拷贝的文件是ok的(哈希值唯一性证明)

提示:

hash 哈希

import hashlib

digest = hashlib.sha512() #!创建sha512算法

f = open(filename)

while True:

    chunk = f.read(BUFSIZE)

    if not chunk:

        break

    digest.update(chunk.encode('utf8'))#!

f.close()

print(digest_map)#!

复制文件夹:

import os

import shutil

file_dir = "/home/tarena/aid1709/thread_1104_1108/"

os.chdir(file_dir)  #改变工作目录

shutil.copytree("原始","复制进")      # 复制文件夹,且后者文件夹必须不存在

框架:

def init_file():   #最初hash值

def after_file():

def test_file():   #返回一个值 判断是否ok

def oper_file():   #复制操作

while True:

遍历文件:

import hashlib

digest = hashlib.sha512()  # !创建sha512算法

import os

for fpathe, dirs, fs in os.walk('/home/tarena/aid1709/thread_1104_1108/原始'):

    for f in fs:

        print(os.path.join(fpathe,f))   #完整路径

哈希加密:

import hashlib

BUFSIZE = 2048

def compute_digest(filename):

    digest = hashlib.sha512()  # !创建sha512算法

    f = open(filename)

    digest = hashlib.sha512()  # 打印出来的是512位的空间

    while True:

        chunk = f.read(BUFSIZE)

        if not chunk:

            break

        digest.update(chunk.encode('utf8'))

    f.close()

    return filename, digest.hexdigest()  # 以十六禁止的方式得到指纹

digest_map = compute_digest(

    '/home/tarena/aid1709/thread_1104_1108/day4_1108/shiyan.py')

print(digest_map)  # !

实现了对文件夹内文件的遍历加密(程序文件与复制文件不必在一起):

import hashlib

import os

BUFSIZE = 2048

def compute_digest(filename):

    digest = hashlib.sha512()  # !创建sha512算法

    f = open(filename)

    digest = hashlib.sha512()  # 打印出来的是512位的空间

    while True:

        chunk = f.read(BUFSIZE)

        if not chunk:

            break

        digest.update(chunk.encode('utf8'))

    f.close()

    return digest.hexdigest()  # 以十六禁止的方式得到指纹

l = []

for fpathe, dirs, fs in os.walk('/home/tarena/aid1709/thread_1104_1108/原始'):

    for f in fs:

        l.append(os.path.join(fpathe, f))

for i in l:

    print(compute_digest(i))  # !

=====================================================================

第1题:

from threading import Thread

import time

def consumer(input_q):

    while True:

        item = input_q.get()

        if item == None:

            break

        print(item)

def producer(sequence, output_p):

    for i in sequence:

        time.sleep(1)

        output_p.put(i)

if __name__ == '__main__':

    q = Queue()

    con_p = Thread(target=consumer, args=(q,))

    con_p.start()

    sequence = [1, 2, 3, 4, 5]

    producer(sequence, q)

    q.put(None)

第2题:

from multiprocessing import Pool

import time

import os

import hashlib  #哈希算法

import shutil   #对文件操作的模块

def hash_get(file_dir):

    l = []

    z = []

    for fpathe, dirs, fs in os.walk(file_dir):

        for f in fs:

            l.append(os.path.join(fpathe, f))

    for i in l:

        z.append(compute_digest(i))

    return z

def oper_file(dir, init_dir, after_dir):  # 复制操作

    file_dir = dir

    os.chdir(file_dir)  # 改变工作目录

    if (os.path.exists(init_dir)) and (not os.path.exists(after_dir)):

        shutil.copytree(init_dir, after_dir)      # 复制文件夹,且后者文件夹必须不存在

        return

    elif not os.path.exists(init_dir):

        print("原始目录不存在")

        return

    elif os.path.exists(after_dir):

        print("目标目录已存在")

        return

def compute_digest(filename):

    digest = hashlib.sha512()  # !创建sha512算法

    f = open(filename)

    digest = hashlib.sha512()  # 打印出来的是512位的空间

    while True:

        chunk = f.read(BUFSIZE)

        if not chunk:

            break

        digest.update(chunk.encode('utf8'))

    f.close()

    return digest.hexdigest()

if __name__ == '__main__':

    t1 = time.time()

    BUFSIZE = 2048

    pool = Pool(processes=4)

    filedir = "/home/tarena/aid1709/thread_1104_1108/day4_1108"

    initdir = "initdir"

    afterdir = "afterdir"

    pool.apply(oper_file, (filedir, initdir, afterdir))

    hash_init_value = pool.apply(hash_get, (initdir, ))

    hash_after_value = pool.apply(hash_get, (afterdir, ))

    pool.close()

    pool.join()

    if hash_init_value == hash_after_value:

        print('copy successfully')

    else:

        print('copy failed')

    t2 = time.time()
    print('花费时间:%f', t2-t1)

===========================================================================================

相关代码:

#程序1:演示:异步-回调函数

from multiprocessing import Pool

import time

import os

def func(str):

    print ("Pool:", str,os.getpid())

    time.sleep(3)

    print ("Pool:", str, os.getpid(),"end")

    return str+'funcCallback'

def cbFunc(args):

    print('callback',os.getpid(),args)

if __name__ == "__main__":

    pool = Pool(processes = 3)

 

    for i in range(4):

        msg = "apply_async %d" % (i)

        pool.apply_async(func, (msg, ),callback=cbFunc)   

    while True:

        time.sleep(2)

        print('mian pro',os.getpid())

Pool: apply_async 0 5677

Pool: apply_async 1 5676

Pool: apply_async 2 5678

mian pro 5675

Pool: apply_async 1 5676 end

Pool: apply_async 0 5677 end

Pool: apply_async 3 5676

callback 5675 apply_async 1funcCallback

callback 5675 apply_async 0funcCallback

Pool: apply_async 2 5678 end

callback 5675 apply_async 2funcCallback

mian pro 5675

mian pro 5675

Pool: apply_async 3 5676 end

callback 5675 apply_async 3funcCallback

mian pro 5675

mian pro 5675

mian pro 5675

...

总结:

我们看到,

callback 3321 apply_async 0funcCallback

mian pro 3321

这两个id号码是相同的,而且我们知道 mian pro 3321 一定是主进程,所以

callback 3321 apply_async 0funcCallback 也一定是运行在主进程中。

那么问题来了,主进程,我们写的是死循环:

    while True:

        time.sleep(2)

        print('mian pro',os.getpid())

应该一直运行才对啊?!中间怎么会被打断去做了其他事情呢:

callback 3321 apply_async 0funcCallback

实际这就是异步的一个实例,当子进程去做睡眠的时候:

def func(str):

    print ("Pool:", str,os.getpid())

    time.sleep(3)

他们告诉主进程,“我们在睡觉,用不到时间片,你拿去用吧”,

于是主进程被通知,快去用时间片执行回调吧!于是正在        

print('mian pro',os.getpid())  (写作业的我)被打断了,去做另外一件事情

callback 3321 apply_async 2funcCallback

callback 3321 apply_async 1funcCallback

callback 3321 apply_async 0funcCallback

(去拿书),这就叫异步。

注意:

异步会造成某个进程的死循环任务被打破,去做另外一件事情。

#============================================

#程序2:演示:进程池中子进程之间的通信-进程池中创建交互队列

from multiprocessing import Manager,Pool

def read(q):

    for i in range(q.qsize()):

        print('read from manager queue: %s ' % (q.get_nowait()))

def write(q):

    for i in 'AID1709':

        q.put(i)

if __name__ == '__main__':

    #在进程池中创建一个交互数据的队列

    q = Manager().Queue()#代理对象的代理队列

    p = Pool(2)

    p.apply(write,(q,))  #给进程池分配任务 函数入口 和 参数 每个子进程的都有一个共享队列

    p.apply(read,(q,))   #给进程池分配任务

    p.close()

    p.join()

    print('main end')

tarena@tedu:~/aid1709/thread_1104_1108/day4_1108$ python3 shiyan.py 

read from manager queue: A 

read from manager queue: I 

read from manager queue: D 

read from manager queue: 1 

read from manager queue: 7 

read from manager queue: 0 

read from manager queue: 9 

main end

# ============================================

# 程序3:演示:线程-计时器

from threading import Thread

import time

def gettime(interval):

    while True:

        time.sleep(interval)

        print('child thread %s' % time.ctime())

if __name__ == '__main__':

    t = Thread(target = gettime,args = (1,))

    t.start()

 #尝试使用方法 daemon join

tarena@tedu:~/aid1709/thread_1104_1108/day4_1108$ python3 shiyan.py 

child thread Wed Nov  8 13:12:27 2017

child thread Wed Nov  8 13:12:28 2017

child thread Wed Nov  8 13:12:29 2017

child thread Wed Nov  8 13:12:30 2017

child thread Wed Nov  8 13:12:31 2017

child thread Wed Nov  8 13:12:32 2017

    #chang shi daemon join

#---------------------------

# 演示:线程-全局变量(将process改为thread)  

#注意:线程中,默认和其他线程(主进程)共享全局变量!

# 同一个进程中的不同线程

# from threading import Thread

# import time

# g_num = 100

# def getTime(interval):

#     global g_num

#     while True:

#         g_num += 100

#         time.sleep(interval)

#         print("in child num is %d"%g_num)

# if __name__ == '__main__':

#     p = Thread(target=getTime, args=(2,))

#     p.start()

#     while True:

#         g_num += 1

#         print("Current num is %d"%g_num)

#         time.sleep(1)

tarena@tedu:~/aid1709/thread_1104_1108/day4_1108$ python3 shiyan.py 

Current num is 201

Current num is 202

in child num is 202

Current num is 303

Current num is 304

in child num is 304

Current num is 405

Current num is 406

in child num is 406

Current num is 507

Current num is 508

in child num is 508

#============================================

# 程序4:演示:线程- 线程中可以有自己的变量,互不干扰

#注意变量的作用域,不同线程在函数内部可以有自己的变量

from threading import Thread

import time

g_num = 100

def f1():

    # time.sleep(1)

    global g_num

    g_num = 100

    for i in range(5):

        g_num += 1

    print('in f1 g_num id %d' % g_num)

def f2():

    time.sleep(1)

    # global g_num

    # g_num = 100

    print('in f2 g_num id %d' % g_num)

if __name__ == '__main__':

    #创建两个线程

    t1 = Thread(target = f1)

    t1.start() 

    t2 = Thread(target = f2)

    t2.start()

tarena@tedu:~/aid1709/thread_1104_1108/day4_1108$ python3 shiyan.py 

in f1 g_num id 105

in f2 g_num id 105

--------------------------------------------------------

# 程序4:演示:线程- 线程中可以有自己的变量,互不干扰

#注意变量的作用域,不同线程在函数内部可以有自己的变量

from threading import Thread

import time

# g_num = 100

def f1():

    # global g_num

    g_num = 100

    for i in range(5):

        g_num += 1

    print('in f1 g_num id %d' % g_num)

def f2():

    # time.sleep(1)

    # global g_num

    g_num = 100

    print('in f2 g_num id %d' % g_num)

if __name__ == '__main__':

    #创建两个线程

    t1 = Thread(target = f1)

    t1.start() 

    t2 = Thread(target = f2)

    t2.start()

    

tarena@tedu:~/aid1709/thread_1104_1108/day4_1108$ python3 shiyan.py 

in f1 g_num id 105

in f2 g_num id 100

-----------------------------------------------------------

# 程序4:演示:线程- 线程中可以有自己的变量,互不干扰

#注意变量的作用域,不同线程在函数内部可以有自己的变量

from threading import Thread

import time

g_num = 100

def f1():

    time.sleep(1)

    global g_num

    g_num = 100

    for i in range(5):

        g_num += 1

    print('in f1 g_num id %d' % g_num)

def f2():

    # time.sleep(1)

    # global g_num

    # g_num = 100

    print('in f2 g_num id %d' % g_num)

if __name__ == '__main__':

    #创建两个线程

    t1 = Thread(target = f1)

    t1.start() 

    t2 = Thread(target = f2)

    t2.start()

tarena@tedu:~/aid1709/thread_1104_1108/day4_1108$ python3 shiyan.py 

in f2 g_num id 100

in f1 g_num id 105

# ============================================

#对文件进行哈希加密

import multiprocessing

import hashlib

import os

BUFSIZE = 2048

def compute_digest(filename):

    try:

        f = open(filename)

    except IOError:

        return None

    digest = hashlib.sha512()   #打印出来的是512位的空间

    while True:

        chunk = f.read(BUFSIZE)

        if not chunk:

            break

        digest.update(chunk.encode('utf8'))

    f.close()

    return filename,digest.hexdigest()

    

if __name__ == '__main__':

    digest_map = compute_digest('/home/tarena/aid1709/thread_1104_1108/day4_1108/shiyan.py')

    print(digest_map)

tarena@tedu:~/aid1709/thread_1104_1108/day4_1108$ python3 thread_1108.py 

('/home/tarena/aid1709/thread_1104_1108/day4_1108/shiyan.py', 'be8aa10b8f8d5abc4e50eb56ad0f9363f5

7087c411b1d7005e43243d52aef29ba703b8530dd31569ba98e875535c6c337bb6b55a45be621abd5ecc0ee6205fe5')
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: