034-2018-1026 锁 信号量 事件 队列 JoinableQueue

笔记

 

昨日内容回顾:

 

1 创建进程的两种方法

直接使用from multiprocessing import Process

自定义一个类,继承Process类,重写run方法,如果需要传参,重写init,并调用super执行父类的init

2 两种传参方式:

Args = (1,)元组

Kwargs = {‘n’:1,}

3 验证进程之间是空间隔离的

全局变量

4 join等待子进程结束,然后继续往下执行,

 

5 验证了一下并发的执行时间

 

6 for循环在多进程中的应用

 

7 terminate 关闭进程,但是他只是给操作系统发送一个关闭进程的信号,实际操作是操作系统关闭的.

8 is_alive  查看进程是否还活着

9 僵尸进程和孤儿进程

10 守护进程

P1.Daemon = True

11 子进程不能input

 

12 __name__ == ‘__main__’ windows下

 

 

 

今日内容:

进程同步:

锁     #同步效率低,但是保证了数据安全  重点

信号量 # 

事件  #

 

进程间通信IPC

队列   #重点

生产者消费者模型

JoinableQueue

 

 

 

明日预习内容

管道  数据共享  进程池

 

 

明天默写内容:

1 同步锁的作用

2 事件中有哪些方法

3 队列有哪些方法

4 简述生产者消费者模型

 

 

作业:

1 基于队列写一个有多个消费者和生产者的模型

2再有时间的话:

使用同步锁写一个简单的抢票程序,提供并发查票和并发买票的功能

JoinableQueue的生产者消费者模型


#生产者消费者模型
import time
from multiprocessing import Process,Queue,JoinableQueue

def producer(q):
    for i in range(1,11):
        time.sleep(0.5)
        print('生产了包子%s号' % i)
        q.put(i)
    q.join()
    print('在这里等你')
def consumer(q):
    while 1:
        time.sleep(1)
        s = q.get()
        print('消费者吃了%s包子' % s)
        q.task_done()  #给q对象发送一个任务结束的信号

if __name__ == '__main__':
    #通过队列来模拟缓冲区,大小设置为20
    q = JoinableQueue(20)
    #生产者进程
    pro_p = Process(target=producer,args=(q,))
    pro_p.start()
    #消费者进程
    con_p = Process(target=consumer,args=(q,))
    con_p.daemon = True #
    con_p.start()
    pro_p.join()
    print('主进程结束')







test.py

import multiprocessing
import time

def task(arg):
    time.sleep(2)
    print(arg)

def run():
    print(11111111)
    p1 = multiprocessing.Process(target=task,args=(1,))
    p1.start()
    p1.join(6)
    print(22222222)
    p2 = multiprocessing.Process(target=task,args=(2,))
    p2.start()
    p2.join()
    print(33333333)
if __name__ == '__main__':
    run()

事件.py


from multiprocessing import Process,Event

e = Event()  #False True

print(e.is_set())
e.set() #将e事件的状态改为True
print('在这里等待')
e.clear()  #将e事件的状态改为False
print('11111')
e.wait()
print('是真的吗?')


信号量.py

import time
import random
from multiprocessing import Process,Semaphore

def dbj(i,s):
    s.acquire()
    print('%s号男主人公来洗脚'%i)
    print('-------------')
    time.sleep(random.randrange(3,6))
    # print(time.time())
    s.release()

if __name__ == '__main__':
    s = Semaphore(4) #创建一个计数器,每次acquire就减1,直到减到0,那么上面的任务只有4个在同时异步的执行,后面的进程需要等待.
    for i in range(10):
        p1 = Process(target=dbj,args=(i,s,))
        p1.start()




生产者消费者模型.py

#生产者消费者模型
import time
from multiprocessing import Process,Queue

def producer(q):
    for i in range(1,11):
        time.sleep(1)
        print('生产了包子%s号' % i)
        q.put(i)
    q.put(None)  #针对第三个版本的消费者,往队列里面加了一个结束信号
#版本1
# def consumer(q):
#     while 1:
#         time.sleep(2)
#         s = q.get()
#         print('消费者吃了%s包子' % s)

#版本2
# def consumer(q):
#     while 1:
#         time.sleep(0.5)
#         try:
#             s = q.get(False)
#             print('消费者吃了%s包子' % s)
#         except:
#             break

def consumer(q):
    while 1:
        time.sleep(2)
        s = q.get()
        if s == None:
            break
        else:
            print('消费者吃了%s包子' % s)

if __name__ == '__main__':
    #通过队列来模拟缓冲区,大小设置为20
    q = Queue(20)
    #生产者进程
    pro_p = Process(target=producer,args=(q,))
    pro_p.start()
    #消费者进程
    con_p = Process(target=consumer,args=(q,))
    con_p.start()




生产者消费者模型主进程发送结束信号.py

#生产者消费者模型
import time
from multiprocessing import Process,Queue

def producer(q):
    for i in range(1,11):
        time.sleep(1)
        print('生产了包子%s号' % i)
        q.put(i)

def consumer(q):
    while 1:
        time.sleep(2)
        s = q.get()
        if s == None:
            break
        else:
            print('消费者吃了%s包子' % s)

if __name__ == '__main__':
    #通过队列来模拟缓冲区,大小设置为20
    q = Queue(20)
    #生产者进程
    pro_p = Process(target=producer,args=(q,))
    pro_p.start()
    #消费者进程
    con_p = Process(target=consumer,args=(q,))
    con_p.start()
    pro_p.join()

    q.put(None)




通过Event来完成红绿灯.py

import time
import random
from multiprocessing import Process,Event

#模拟红绿灯执行状态的函数
def traffic_lights(e):
    while 1:
        print('红灯啦')
        time.sleep(5)
        e.set()
        print('绿灯亮')
        time.sleep(3)
        e.clear()  #将e改为了False
def car(i,e):

    if not e.is_set(): #新来的车看到是红灯
        print('我们在等待.....')
        e.wait()
        print('走你')
    else:
        print('可以走了!!!')

if __name__ == '__main__':
    e = Event()
    hld = Process(target=traffic_lights,args=(e,))
    hld.start()
    while 1:
        time.sleep(0.5)
        #创建10个车
        for i in range(3):
            # time.sleep(random.randrange(1,3))
            p1 = Process(target=car,args=(i,e,))
            p1.start()




锁.py

import json
import time
import random
from multiprocessing import Process,Lock

def get_ticket(i,ticket_lock):
    print('我们都到齐了,大家预备!!123')
    #所有进程异步执行,到这里等待,同时再去抢下面的代码执行
    time.sleep(1)
    ticket_lock.acquire()  #这里有个门,只有一个人能够抢到这个钥匙,加锁
    with open('ticket','r') as f:
        #将文件数据load为字典类型的数据
        last_ticket_info = json.load(f)
    #查看一下余票信息
    last_ticket = last_ticket_info['count']
    #如果看到余票大于0,说明你可以抢到票
    if last_ticket > 0:
        #模拟网络延迟时间
        time.sleep(random.random())
        #抢到一张票就减去1
        last_ticket = last_ticket - 1
        last_ticket_info['count'] = last_ticket
        #将修改后的票数写回文件
        with open('ticket','w') as f:
            #通过json.dump方法来写回文件,字符串的形式
            json.dump(last_ticket_info,f)
        print('%s号抢到了,丫nb!' % i)
    else:
        print('%s号傻逼,没票啦,明年再来!' % i)
    #释放锁,也就是还钥匙的操作
    ticket_lock.release()
if __name__ == '__main__':
    #创建一个锁
    ticket_lock = Lock()
    for i in range(10):
        #将锁作为参数传给每个进程,因为每个进程都需要通过锁来进行限制,同步
        p = Process(target=get_ticket,args=(i,ticket_lock,))
        p.start()

队列.py


from multiprocessing import Process,Queue
#先进先出
q = Queue(3)

q.put(1)
q.put(2)
# print(q.full()) #q.full()队列满了返回True,不满返回False
q.put(3)
# print('>>>>',q.full())

# q.put(4)   #超出了队列长度,你put插入数据的时候会阻塞
print(q.get())
# print('.....',q.empty())
print(q.get())
print(q.get())
# print('>>>',q.empty())  #不可信,队列空了返回True,不为空返回False
# print(q.get())  #队列为空的时候,get会阻塞
q.get_nowait()
# try:
#     q.get(False)  # queue.Empty
#     q.get_nowait() #queue.Empty
# except:
#     print('队列目前是空的')

# while 1:
#     try:
#         q.get(False)  #queue.Empty
#     except:
#         print('队列目前是空的')

队列实现进程间的通信.py

import time
from multiprocessing import Process,Queue

def girl(q):
    print('来自boy的信息',q.get())
    print('来自校领导的凝视',q.get())
def boy(q):
    q.put('约吗')

if __name__ == '__main__':
    q = Queue(5)
    boy_p = Process(target=boy,args=(q,))
    girl_p = Process(target=girl,args=(q,))
    boy_p.start()
    girl_p.start()
    time.sleep(1)
    q.put('好好工作,别乱搞')

猜你喜欢

转载自blog.csdn.net/David_lyy/article/details/86440748