python-day38_信号量_事件_队列_生产者消费者

1,信号量

import time
import random
from multiprocessing import Process
from multiprocessing import Semaphore

# sem = Semaphore(4)
# sem.acquire()
# print('拿到第一把钥匙')
# sem.acquire()
# print('拿到第二把钥匙')
# sem.acquire()
# print('拿到第三把钥匙')
# sem.acquire()
# print('拿到第四把钥匙')
# sem.acquire()
# print('拿到第五把钥匙')
def ktv(i,sem):
    sem.acquire()    #获取钥匙
    print('%s走进ktv'%i)
    time.sleep(random.randint(1,5))
    print('%s走出ktv'%i)
    sem.release()   


if __name__ == '__main__' :
    sem = Semaphore(4)
    for i in range(20):
        p = Process(target=ktv,args=(i,sem))
        p.start()

# 一套资源 同一时间 只能被n个人访问
# 某一段代码 同一时间 只能被n个进程执行

2,事件

# 通过一个信号 来控制 多个进程 同时 执行或者阻塞
# 事件
# from multiprocessing import Event
# 一个信号可以使所有的进程都进入阻塞状态
# 也可以控制所有的进程解除阻塞
# 一个事件被创建之后,默认是阻塞状态
# e = Event() # 创建了一个事件
# print(e.is_set()) # 查看一个事件的状态,默认被设置成阻塞
# e.set() # 将这个事件的状态改为True
# print(e.is_set())
# e.wait() # 是依据e.is_set()的值来决定是否阻塞的
# print(123456)
# e.clear() # 将这个事件的状态改为False
# print(e.is_set())
# e.wait() # 等待 事件的信号被变成True
# print('*'*10)

# set 和 clear
  # 分别用来修改一个事件的状态 为True或者False
# is_set 用来查看一个事件的状态
# wait 是依据事件的状态来决定自己是否在wait处阻塞
  # False阻塞 True不阻塞

# 红绿灯事件
import time
import random
from multiprocessing import Event,Process
def cars(e,i):
    if not e.is_set():
        print('car%i在等待'%i)
        e.wait()    # 阻塞 直到得到一个 事件状态变成 True 的信号
    print('\033[0;32;40mcar%i通过\033[0m' % i)

def light(e):
    while True:
        if e.is_set():
            e.clear()
            print('\033[31m红灯亮了\033[0m')
        else:
            e.set()
            print('\033[32m绿灯亮了\033[0m')
        time.sleep(2)

if __name__ == '__main__':
    e = Event()
    traffic = Process(target=light,args=(e,))
    traffic.start()
    for i in range(20):
        car = Process(target=cars, args=(e,i))
        car.start()
        time.sleep(random.random())

3,队列

先进先出

from multiprocessing import Queue
import time
q = Queue(4)
q.put('123456')
q.put(2)
q.put(3)
q.put(4)
print(q.full())
print(q.get())
print(q.get())
print(q.get())
print(q.get())
print(q.empty())
while True:
    try:
        print(q.get_nowait())
    except:
        print('队列已空')
        time.sleep(1)

IPC,inter-process communication

实现了进程间的通信

from multiprocessing import Queue,Process
def produce(q):
    q.put('hello')

def consume(q):
    print(q.get())

if __name__ == '__main__':
    q = Queue()
    p = Process(target=produce,args=(q,))
    p.start()
    c = Process(target=consume,args=(q,))
    c.start()

4,生产者消费者模型

import time
import random
from multiprocessing import Process,Queue
def consumer(q,name):
    while True:
        food = q.get()
        if food is None:
            print('%s获取到了一个空'%name)
            break
        print('\033[31m%s消费了%s\033[0m' % (name,food))
        time.sleep(random.randint(1,3))

def producer(name,food,q):
    for i in range(4):
        time.sleep(random.randint(1,3))
        f = '%s生产了%s%s'%(name,food,i)
        print(f)
        q.put(f)

if __name__  == '__main__':
    q = Queue(20)
    p1 = Process(target=producer,args=('Egon','包子',q))
    p2 = Process(target=producer, args=('wusir','泔水', q))
    c1 = Process(target=consumer, args=(q,'alex'))
    c2 = Process(target=consumer, args=(q,'jinboss'))
    p1.start()
    p2.start()
    c1.start()
    c2.start()
    p1.join()
    p2.join()
    q.put(None)
    q.put(None)

JoinableQueue

import time
import random
from multiprocessing import Process,JoinableQueue
def consumer(q,name):
    while True:
        food = q.get()
        print('\033[31m%s消费了%s\033[0m' % (name,food))
        time.sleep(random.randint(1,3))
        q.task_done()     # count - 1 当计数队列中的数,都拿走并被处理完

def producer(name,food,q):
    for i in range(4):
        time.sleep(random.randint(1,3))
        f = '%s生产了%s%s'%(name,food,i)
        print(f)
        q.put(f)
    q.join()    # 阻塞  直到队列中的所有数据 全部被处理完毕

if __name__  == '__main__':
    q = JoinableQueue(20)
    p1 = Process(target=producer,args=('Egon','包子',q))
    p2 = Process(target=producer, args=('wusir','泔水', q))
    c1 = Process(target=consumer, args=(q,'alex'))
    c2 = Process(target=consumer, args=(q,'jinboss'))
    p1.start()
    p2.start()
    c1.daemon = True   # 设置为守护进程 主进程中的代码执行完毕之后,子进程自动结束
    c2.daemon = True
    c1.start()
    c2.start()
    p1.join()
    p2.join()      # 主进程感知一个进程的结束

# 在消费者这一端:
  # 每次获取一个数据
  # 处理一个数据
  # 发送一个记号 : 标志一个队列的数据被处理完

# 在生产者这一端:
  # 每一次生产一个数据,
  # 且每一次生产的数据都放在队列中
  # 在队列中刻上一个记号
  # 当生产者全部生产完毕之后,
  # join信号 : 已经停止生产数据了
    # 且要等待之前被刻上的记号都被消费完
    # 当数据都被处理完时,join阻塞结束

# consumer 中把所有的任务消耗完
# producer 端 的 q.join感知到,停止阻塞
# 所有的producer进程结束
# 主进程中的p.join结束
# 主进程中代码结束
# 守护进程(消费者的进程)结束

猜你喜欢

转载自www.cnblogs.com/yygy/p/10397026.html