Python_进程2

进程同步(multiprocess.Lock)

锁:

当多个进程使用同一份数据资源的时候,就会引发数据安全或顺序混乱问题

举例:抢票为例

不带锁的情况,则会出现多个进程同时对数据进行修改,但数据只修改了一次,也就是会出现:票多售的情况

""" 模拟不带锁的情况 """

from multiprocessing import Process
import time,json
def search():
    dic=json.load(open('db'))
    print('剩余票数%s' %dic['count'])

def get():
    dic=json.load(open('db'))
    time.sleep(0.1) #模拟读数据的网络延迟
    if dic['count'] >0:
        dic['count']-=1
        time.sleep(0.2) #模拟写数据的网络延迟
        json.dump(dic,open('db','w'))
        print('购票成功')

def task():
    search()
    get()

if __name__ == '__main__':
    for i in range(100): #模拟并发100个客户端抢票
        p=Process(target=task)
        p.start()

模拟带锁的情况,当对 票进行修改的操作时,只允许一个进程去执行,其他进程就绪等待,虽然失去了效率,但是保证了数据安全

from multiprocessing import Process,Lock
import time,json,random
def search():
    dic=json.load(open('db'))
    print('剩余票数%s' %dic['count'])

def get():
    dic=json.load(open('db'))
    time.sleep(random.random()) #模拟读数据的网络延迟
    if dic['count'] >0:
        dic['count']-=1
        time.sleep(random.random()) #模拟写数据的网络延迟
        json.dump(dic,open('db','w'))
        print('购票成功')
    else:
        print('购票失败')

def task(lock):
    search()
    lock.acquire()    
    get()
    lock.release()

"""
# 等同于上面
def task(lock):
    search()
    with lock:
        get()
"""

if __name__ == '__main__':
    lock = Lock()
    for i in range(100): #模拟并发100个客户端抢票
        p=Process(target=task,args=(lock,))
        p.start()

进程间通信

队列(Queue):

进程间通信:IPC(Inter-Process Communication)

队列的底层使用 管道(数据不安全) 和 锁

方法介绍:

Queue()       创建共享的进程队列,其中有一个参数 maxsize—时队列中允许的最大项数,如果省略,则无大小限制,

创建出来的实例q具有以下方法:

q.get([block], timeout]])       返回q中的一个项目,如果q为空,此方法将阻塞,直到队列中有项目可用为止, block 用于控制阻塞行为,默认为true , 如果设置为False,将会引发Queue.Empty异常(定义在Queue模块中)。 timeout是可选超时时间,如果在指定的时间间隔内没有项目变为可用,将会引发Queue.Empty异常

q.get_nowait()        等同于 q.get(block=False)  方法

q.put(item,[block],timeout]])      将item放入队列,如果队列已满,此方法将阻塞至有空间可用为止,其他两个参数等同于 get()  方法中的参数

q.qsize()       返回队列中目前项目的正确数量。(此方法的结果并不可靠,因为在返回结果和在接下来的程序中使用结果之前,队列中可能添加或删除了项目, 在一些系统上,该方法可能引发 NotImplementedError异常)

q.empty()       判断队列中是否还有项目,为空返回True。(如果其他进程或线程正在向队列中添加项目,结果异界不是特别可靠。)

q.full()           判断队列中是否已满, 满返回True,  (但是其结果依旧不是可靠的,参考上面 q.empty() 方法)

q.close()      关闭队列,防止队列中加入更多数据,调用此方法时,后台线程将继续写入那些已入队列但尚未写入的数据,但将在此方法完成时马上完毕。   如果q被垃圾收集,将自动调用此方法,关闭队列不会在队列使用者中生成任何类型的数据结束信号或异常。例如:当程序被阻塞在get() 或者put() 操作时,关闭生产者的队列不会导致get和put方法返回错误

基于 Queue 实现的  生产者消费者模型  请查看 ————》https://blog.csdn.net/weixin_42598585/article/details/87973360

JoinableQueue([maxsize])     

创建可连接的共享进程队列,这就像一个Queue对象,但队列允许项目的使用者通知生产者项目已经被成功处理,通知进程是使用共享的信号和条件变量来实现的。

JoinableQueue的实例q 除了与Queue对象相同的方法之外,还具有以下方法:

q.task_done()       使用者使用此方法发出信号,表示q.get() 返回的项目已经被处理,如果调用此方法的次数大于从队列中删除的项目数量,将引发ValueError 异常

q.join()           生产者将使用此方法进行阻塞,直到队列中所有项目均被处理,   阻塞将持续到为队列中的每个项目均调用 q.task_done() 方法为止。

基于JoinableQueue 队列实现的消费者生产者模型

from multiprocessing import Process,JoinableQueue
import time,random,os

def consumer(q):
    while True:
        res = q.get()    # 取数据
        time.sleep(random.randint(1,3))
        print("%s吃%s"%(os.getpid(), res))
        q.task_done()    # 向 q.join() 发送一次信号,证明这个数据已经被取走了

def producer(name,q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        ret = "%s%s"%(name,i)
        q.put(ret)        # 向队列中放数据
    q.join()        # 生产完毕,使用此方法进行阻塞,直到队列中所有项目均被处理(均执行了q.task_done())

if __name__ == "__main__":
    q = JoinableQueue()
    # 生产者
    p1 = Process(target=producer, args=("包子",q))
    p2 = Process(target=producer, args=("骨头",q))
    p3 = Process(target=producer, args=("面条",q))
    # 消费者
    c1 = Process(target=consumer, args=(q,))
    c2 = Process(target=consumer, args=(q,))
    c1.daemon = True  # 守护进程
    c2.daemon = True  # 守护进程

    # 开始
    p_li = [p1,p2,p3,c1,c2]
    for p in p_li:
        p.start()
    p1.join()
    p2.join()
    p3.join()
    print("主程序")

"""
p1,p2,p3结束了,证明c1,c2肯定全都收完了p1,p2,p3发到队列的数据
    #因而c1,c2也没有存在的价值了,不需要继续阻塞在进程中影响主进程了。应该随着主进程的结束而结束,所以设置成守护进程就可以了。
"""

进程之间的数据共享

Manager模块

"""

进程间数据是独立的,可以借助于队列或管道实现通信,二者都是基于消息传递的

虽然进程间数据独立,但是可以通过Manager实现数据共享,事实上Manageer的功能远不止于此

"""

from multiprocessing import Manager,Process,Lock
def work(d,lock):
    with lock: #不加锁而操作共享的数据,很有可能会出现数据错乱
        d['count']-=1

if __name__ == '__main__':
    lock=Lock()
    with Manager() as m:
        dic=m.dict({'count':10})
        p_l=[]
        for i in range(10):
            p=Process(target=work,args=(dic,lock))
            p_l.append(p)
            p.start()
        for p in p_l:
            p.join()
        print(dic)        # {"count":0}

进程池

multiprocess.Pool 模块

Pool([numprocess [,initializer [,initarges] ])      创建进程池  numprocess:要创建的进程数,如果省略,将默认使用cpu_count() 的值,也就是cpu的核数,  initializer:是每个工作进程启动时要执行的可调用对象,默认为None, initargs: 是要传给Initializer的参数组。

主要方法

p.apply(func,args,kwargs)    : 在一个池工作进程中执行func(*args,**kwargs),并返回结果。但是是同步执行

p.apply_async(func,args,kwargs, callback=None,error_callback=None):   在一个池工作进程中执行func(*args,**kwargs),并返回结果。但是是异步执行(推荐)

p.close()       :  关闭进程池,防止进一步操作,如果所有的操作持续挂起,他们将在工作进程终止前完成

p.join()       :  等待所有工作进程退出,此方法只能在close()  或 teminate()   之后调用

方法apply_async()  和 map_async()   的返回值是AsyncResul的实例,实例具有以下方法

obj.get()   : 返回结果,如果有必要则等待结果到达,timeout是可选的,如果指定时间没到达,则引发异常

obj.ready()   :  如果调用完成,返回true

obj,successful()   :  如果调用完成且没有引发异常,返回True

obj.wait([timeout])  : 等待结果变为可用

obj.teminate()  :  立即终止所有工作进程。同时不执行任何清理或结束任何挂起工作

进程池的异步调用

import os,time,random
from multiprocessing import Pool

def work(n):
    print('%s run' %os.getpid())
    time.sleep(random.random())
    return n**2

if __name__ == '__main__':
    p=Pool(3) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务
    res_l=[]
    for i in range(10):
        res=p.apply_async(work,args=(i,)) # 异步运行,根据进程池中有的进程数,每次最多3个子进程在异步执行
                                          # 返回结果之后,将结果放入列表,归还进程,之后再执行新的任务
                                          # 需要注意的是,进程池中的三个进程不会同时开启或者同时结束
                                          # 而是执行完一个就释放一个进程,这个进程就去接收新的任务。  
        res_l.append(res)

    # 异步apply_async用法:如果使用异步提交的任务,主进程需要使用jion,等待进程池内任务都处理完,然后可以用get收集结果
    # 否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了
    p.close()
    p.join()
    for res in res_l:
        print(res.get()) #使用get来获取apply_aync的结果,如果是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get

进程池爬虫例子

from urllib.request import urlopen
from multiprocessing import Pool


def get_page(url):
    response = urlopen(url).read().decode('utf-8')
    return response


def parse_page(info):
    response = info
    print(len(response))


if __name__ == '__main__':
    url_dic = ('http://maoyan.com/board/{}'.format(i) for i in range(4, 8))
    p = Pool()
    res_l = []
    while 1:
        try:
            res = p.apply_async(get_page, args=(url_dic.__next__(),), callback=parse_page)
            res_l.append(res)
        except:
            break
    for i in res_l:
        i.get()

猜你喜欢

转载自blog.csdn.net/weixin_42598585/article/details/88086690