互斥锁
from multiprocessing import Process
from multiprocessing import Lock
import time
import random
def task1(lock):
print('task1') # 验证cpu遇到io切换了
lock.acquire()
print('task1: 开始打印')
time.sleep(random.randint(1, 3))
print('task1: 打印完成')
lock.release()
def task2(lock):
print('task2') # 验证cpu遇到io切换了
lock.acquire()
print('task2: 开始打印')
time.sleep(random.randint(1, 3))
print('task2: 打印完成')
lock.release()
def task3(lock):
print('task3') # 验证cpu遇到io切换了
lock.acquire()
print('task3: 开始打印')
time.sleep(random.randint(1, 3))
print('task3: 打印完成')
lock.release()
if __name__ == '__main__':
lock = Lock()
p1 = Process(target=task1, args=(lock,))
p2 = Process(target=task2, args=(lock,))
p3 = Process(target=task3, args=(lock,))
p1.start()
p2.start()
p3.start()
# 上锁:
# 一定要是同一把锁: 只能按照这个规律:上锁一次,解锁一次.
# 互斥锁与join区别共同点? (面试题)
# 共同点: 都是完成了进程之间的串行.
# 区别: join认为控制的进程串行,互斥锁是随机的抢占资源.保证了公平性
模拟抢票系统
业务需求分析:
# 买票之前先要查票,必须经历的流程: 你在查票的同时,100个人也在查本此列票.
# 买票时,你要先从服务端获取到票数,票数>0 ,买票,然后服务端票数减一. 中间肯定有网络延迟.
# 使用线程互斥锁
from multiprocessing import Process
from multiprocessing import Lock
import time
import json
import os
import random
# 多进程原则上是不能互相通信的,它们在内存级别数据隔离的.不代表磁盘上数据隔离.
# 它们可以共同操作一个文件.
def search():
time.sleep(random.random())
with open('db.json',encoding='utf-8') as f1:
dic = json.load(f1)
print(f'剩余票数{dic["count"]}')
def get():
with open('db.json',encoding='utf-8') as f1:
dic = json.load(f1)
time.sleep(random.randint(1,3))
if dic['count'] > 0:
dic['count'] -= 1
with open('db.json', encoding='utf-8', mode='w') as f1:
json.dump(dic,f1)
print(f'{os.getpid()}用户购买成功')
else:
print('没票了.....')
def task(lock):
search()
lock.acquire()
get()
lock.release()
if __name__ == '__main__':
lock = Lock()
for i in range(5):
p = Process(target=task,args=(lock,))
p.start()
队列
Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。
maxsize是队列中允许最大项数,省略则无大小限制.
1 q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。
2 q.get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.
3
4 q.get_nowait():同q.get(False)
5 q.put_nowait():同q.put(False)
6
7 q.empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。
8 q.full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。
9 q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样
用进程通讯队列模拟实例
import os
from multiprocessing import Queue
from multiprocessing import Process
def task(q):
try:
q.put(f'{os.getpid()}',block=False)
except Exception:
return
if __name__ == '__main__':
q = Queue(10)
for i in range(100):
p = Process(target=task,args=(q,))
p.start()
for i in range(1,11):
print(f'排名第{i}的用户: {q.get()}',)
生产者消费者模型
在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。