python笔记(进程中的组件:事件,信号量,队列,管道,数据共享)

一、多进程中的组件(信号量):某一段代码同一时间只能被n个进程执行

from multiprocessing import Process,Semaphore
def func(i,sem):
    sem.acquire()   #获取要钥匙
    print(i)
    sem.release()   #换钥匙
for i in range(10):
    sem = Semaphore(4) #
    p = Process(target=func,args=(i,sem))
    p.start()

@通过一个信号来控制多个进程同时执行或者阻塞

二、事件
(1)一个信号可以使所有的进程进入阻塞状态
(2)也可以控制所有的进程解除阻塞
(3)一个事件被创建之后,默认是阻塞状态

from multiprocessing import Event
e = Event()#创建了一个事件
print(e.is_set())#查看一个事件的状态,默认被设置成阻塞
e.set()#将这个享件的状态改为True
print(e.is_set())
e.wait()#是依据e.is_set的值来决定是否阻塞的
print(123)
e.clear()
print(e.is_set())
e.wait()
print('*'*3)

输出结果:

False
True
123
False

1、set和clear(分别用来修改一个事件的状态True或者False)
(1)is_set用来查看一个事件的状态合
(2)wait是依据事件的状态来决定自己是否阻塞,False:阻塞 Ture:不阻塞

红绿灯事件(事件的例子)

import time,random
from multiprocessing import Event,Process
def cars(e,i):
    if not e.is_set():
        print('car%s在等待'%i)
        e.wait()     #阻塞
    print('car%s通过'%i)

def light(e):
    while True:
        if e.is_set():
            e.clear()#清除事件
            print('\033[31m红灯亮了\033[0m')
        else:
            e.set()#开启事件
            print('\033[32m绿灯亮了\033[0m')
        time.sleep(2)

if __name__ == '__main__':
    e = Event()
    traffic = Process(target=light,args=(e,))
    traffic.start()
    for i in range(10):
        car = Process(target=cars,args=(e,i))
        car.start()
        time.sleep(random.random())

三、队列(先进先出)
@进程间通信:IPC(inter Process Communication)

from multiprocessing import Queue
q = Queue(5)
for i in range(5):
    q.put(i)
print(q.full())  #查看队列是否满了
for i in range(5):
    q.get()
print(q.empty()) #查看队列是否为空

from multiprocessing import Queue,Process
def produce(q):
    q.put('hello')

def consum(q):
    print(q.get())

if __name__ == '__main__':
    q = Queue()
    p = Process(target=produce,args=(q,))
    p.start()
    c = Process(target=consum,args=(q,))
    c.start()

1、生产者消费者模型
例子:买包子
生产者:进程
消费者:进程

from multiprocessing import Process,Queue,JoinableQueue
import time
import random
def consumer(q,name):
    while True:
        food = q.get()
        if food is None:
            print('获取到一个空'%name)
            break
        f = '%s消费了%s' % (name,food)
        print(f)
        time.sleep(random.randint(1,3))
        q.task_done()        #count - 1
def producer(name,food,q):  #生产者
    for i in range(10):
        time.sleep(random.randint(1,3))
        f = '%s生产了%s%s'%(name,food,i)
        print(f)
        q.put(f)
    q.join()                 #阻塞   感知一个队列中的数据全部被执行完毕
if __name__ == '__main__':
    q = JoinableQueue(10)
    p1 = Process(target=producer,args=('long','包子',q))
    p2 = Process(target=producer, args=('geng', '油条', q))
    c = Process(target=consumer, args=('yuan', q))
    p1.start()
    p2.start()
    c.daemon = True #设置为守护进程
    c.start()
    p1.join()      #感知一个进程的结束(阻塞)
    p2.join()
    # q.put(None)

解释:
1、在消费者这一端:
(1) 每次获取一个数据
(2) 处理一个数据
(3) 发送一个记号:标志一个数据被处理成功

2、在生产者这一端:
(1)每一次生产一个数据
(2)且每一次生产的数据都放在队列中
(3)在队列中刻上一个记号
(4)当生产者全部生产完毕之后
join信号:已经停止生产数据了,且要等待之前被刻上的记号都被消费完,当数据都被处理完时,join阻塞结束。
consumer:把所有的任务消耗完
(5)producer端的jion感知到,停止阻塞
(6)所有的producer进程结束
(7)主进程中的p.join结束
(8)主进程中代码结束
(9)守护进程(消费者的进程)结束

四、复习:
1、信号量:from multiprocessing import Semaphore(用锁的原理实现的,内置了一个计数器)在同一的间只能有指定数量的进程执行某一段被控制住的代码。
2、事件:wait阻塞收到事件状态控制的同步组件
(1)状态 (True False)
is_set:(查看状态)
true-> false :cleart()
false -> true :set()
wait:状态为Ture不阻塞状态为Fa1se的时候阻塞

3、队列:Queue
(1)put :当队列满的时候阻塞等待从列有空位置
(2)get :当队列空的时候阻塞等待风列有数据
(3)full empty 不完全准确
(4)JoinableQueue
task_done:(一般和get方法联用)
join:(一般和put联用)

五、管道
1、

from multiprocessing import Pipe,Process

def func(conn1):
    conn1.send('ok')#端口1发送

if __name__ == '__main__':
    conn1, conn2 = Pipe()#建立管道的两端
    Process(target=func,args=(conn1,)).start()
    print(conn2.recv())#端口2接收

输出结果:

ok

2、解决阻塞问题

from multiprocessing import Pipe,Process
def func1(conn2):
    while True:
        ret = conn2.recv()#多个进程接收
        if ret is None:break
        print(ret)

if __name__ == '__main__':
    conn1,conn2 = Pipe()
    Process(target=func1,args=(conn2,)).start()
    for i in range(10):
        conn2.send('hello')#多个数据传送
    conn2.send(None) #解决阻塞

3、根据抛异EOFError常解决阻塞问题

from multiprocessing import Pipe,Process
def func1(conn1,conn2):
    conn2.close()
    while True:
        try:
            ret = conn1.recv()
            print(ret)
        except EOFError:
            conn1.close()
            break

if __name__ == '__main__':
    conn1,conn2 = Pipe()
    Process(target=func1,args=(conn1,conn2)).start()
    conn1.close()#关闭端口1
    for i in range(10):
        conn2.send('hello')
    conn2.close()#关闭端口2,会抛异常

4、生产者消费者模型

import time
import random
from multiprocessing import Pipe,Process,Lock
def producer(con,pro,name,food):
    con.close()
    for i in range(4):
        time.sleep(random.randint(1,3))
        f = '%s生产%s%s'%(name,food,i)
        print(f)
        pro.send(f)
    pro.close()
def consumer(con,pro,name,lock):
    pro.close()
    while True:
        lock.acquire()
        try:
            food = con.recv()
            lock.release()
            print('%s吃了%s'%(name,food))
            time.sleep(random.randint(1,3))
        except EOFError:
            con.close()
            break
if __name__ == '__main__':
    con,pro = Pipe()
    lock = Lock()
    p = Process(target=producer,args=(con,pro,'long','糖'))#生产者
    c = Process(target=consumer,args=(con,pro,'yue',lock))#消费者
    p.start()
    c.start()
    con.close()
    pro.close()

(1)pipe数据不安全性,加锁来控制操作管道的行为,来避免进程之间争抢数据造成的数据不安全现象
(2)队列进程之间数据安全的

六、进程之间数据共享Manager

from multiprocessing import Manager,Process,Lock

def main(dic,lock):
    lock.acquire()
    dic['count'] -= 1
    lock.release()

if __name__ == '__main__':
    m = Manager()
    lock = Lock()
    dic = m.dict({'count':100})
    p_lst = []
    for i in range(50):
        p = Process(target=main,args=(dic,lock))
        p.start()
        p_lst.append(p)
    for i in p_lst:i.join()
    print('主进程',dic)

输出结果:

主进程 {‘count’: 50}

@数据共享会用到:kafak,rebbitmq,memcache(类似于队列)

猜你喜欢

转载自blog.csdn.net/qq_41433183/article/details/85454025