文章目录
一、前言
上一篇博文学习了线程的互斥锁(Python | threading02 - 互斥锁解决多个线程之间随机调度,造成“线程不安全”的问题 ),线程的互斥锁的目的是解决“线程不安全”的问题。值得注意的是互斥锁解决了”线程不安全“问题,但产生了另外一些问题,如“死锁”,“加锁不合理导致程序的效率低下”等。
今天继续学习python线程的另外一个工具 - 条件对象,条件对象用于实现线程间的同步。
官方文档:
threading - Thread-based parallelism - Python 3.10.1 documentation
二、生产者-消费者的模型
2.1、代码
代码的目的:
- customer每隔2秒将item减少1(可以理解为消耗一个item)。当发现item=0时(即没有item可以消耗时,赶紧通知producer生产item)。
- producer每隔1秒将item增加1。当item等于5时(可以理解为没有地方存放item了),马上停止生产item。
# python3.9
import threading
import time
c = threading.Condition() # 声明一个条件对象
item = 0 # 用于计数
def producer():
"""
子线程-生产者
"""
global c # c是全局变量
global item # item是全局变量
while True:
time.sleep(1) # 线程休眠1秒
c.acquire() # 获取锁
# 如果item等于5,就进行阻塞
if item == 5:
print("子线程%s发现item = %d,time = %s,将进入阻塞态"
% (threading.current_thread().getName(),item,time.perf_counter()))
c.wait() # 进入阻塞态,并释放锁,,等待其他线程notify
item += 1 # 每一秒增加1
print("子线程%s将item增加1,item = %d,time = %s"
% (threading.current_thread().getName(),item,time.perf_counter()))
c.notify() # 通知customer解除阻塞态
c.release() # 释放锁
def customer():
"""
子线程-消费者
"""
global c # c是全局变量
global item # item是全局变量
while True:
time.sleep(2) # 线程休眠2秒
c.acquire() #获取锁
if item == 0:
print("子线程%s发现item = %d,time = %s,将进入阻塞态"
% (threading.current_thread().getName(),item,time.perf_counter()))
print("通知producer恢复运行(解除阻塞态)")
c.notify() # 通知producer线程解除阻塞态
c.wait() # 进入阻塞态,并释放锁。等待其他线程notify
item -= 1
print("子线程%s将item减少1,item = %d,time = %s"
% (threading.current_thread().getName(),item,time.perf_counter()))
c.release() # 释放锁
def main():
t1 = threading.Thread(target=customer,name="thread_customer",daemon=True) # 创建producer子线程
t2 = threading.Thread(target=producer,name="thread_producer",daemon=True) # 创建customer子线程
t1.start() # 启动customer线程
t2.start() # 启动producer线程
t1.join() # 子线程customer是无限循环的线程,所以主线程需要等待它运行结束
t2.join() # 子线程producer是无限循环的线程,所以主线程需要等待它运行结束
print("主线程运行结束!")
if __name__ == "__main__":
main()
2.2、运行
运行的结果:
2.3、wait( )方法会将互斥锁释放
刚开始我就犯了一个错误,c.wait( )之后紧跟着c.release( )。根据《Python并行编程》的解释,wait( )方法会将互斥锁释放回去。
三、条件同步 - threading.Condition( )
3.1、相关API
- acquire() — 线程锁,注意线程条件变量Condition中的所有相关函数使用必须在acquire() /release() 内部操作;
- release() — 释放锁,注意线程条件变量Condition中的所有相关函数使用必须在acquire() /release() 内部操作;
- wait(timeout) — 线程挂起(阻塞状态),直到收到一个notify通知或者超时才会被唤醒继续运行(超时参数默认不设置,可选填,类型是浮点数,单位是秒)。wait()必须在已获得Lock前提下才能调用,否则会触发RuntimeError;
- notify(n=1) — 通知其他线程,那些挂起的线程接到这个通知之后会开始运行,缺省参数,默认是通知一个正等待通知的线程,最多则唤醒n个等待的线程。notify()必须在已获得Lock前提下才能调用,否则会触发RuntimeError,notify()不会主动释放Lock;
- notifyAll() — 如果wait状态线程比较多,notifyAll的作用就是通知所有线程;
- wait_for(predicate,timeout=None) — 这个实用方法会重复地调用wait( ),直到满足判断式或者发生超时。
3.2、acquire( )
从代码可以看到,使用c.notify( )与c.wait( )之前需要先调用c.acquire( )获取互斥锁,否则会抛出异常。
3.2、release( )
没什么好说的,就是释放互斥锁。
3.3、wait( )
让调用wait( )的线程进入阻塞态,等待其他线程调用notify( )来进行同步,可以设置timeout。
3.4、notify(n=1)
让其他调用wait( )的线程从阻塞态恢复至运行态,继续运行后面的代码。
3.5、notify_all( )
跟notify( )类似,notify_all( )不止唤醒一个线程,而是唤醒所有因为wait( )而进入阻塞态的线程。
3.6、wait_for(predicate,timeout=None)
这个wait_for相当于以下代码(来自官方的解释):
while not predicate():
c.wait()
使用代码测试一下wait_for( )的用法。
# python3.9
import threading
import time
c = threading.Condition() # 声明一个条件对象
item = 0 # 用于计数
def customer_Wait_For():
"""
"""
global item
if item == 0:
print("子线程%s发现item = %d,time = %s,将进入阻塞态"
% (threading.current_thread().getName(),item,time.perf_counter()))
print("通知producer恢复运行(解除阻塞态)")
c.notify() # 通知producer线程解除阻塞态
return False # 相当于运行c.wait()
else:
return True # 相当于不运行c.wait()
def producer_Wait_For():
"""
"""
global item
if item == 5:
print("子线程%s发现item = %d,time = %s,将进入阻塞态"
% (threading.current_thread().getName(),item,time.perf_counter()))
return False # 相当于运行c.wait()
else:
return True # 相当于不运行c.wait()
def producer():
"""
子线程-生产者
"""
global c # c是全局变量
global item # item是全局变量
while True:
time.sleep(1) # 线程休眠1秒
c.acquire() # 获取锁
c.wait_for(producer_Wait_For) # 看看是否需要阻塞
item += 1 # 每一秒增加1
print("子线程%s将item增加1,item = %d,time = %s"
% (threading.current_thread().getName(),item,time.perf_counter()))
c.notify() # 通知customer解除阻塞态
c.release() # 释放锁
def customer():
"""
子线程-消费者
"""
global c # c是全局变量
global item # item是全局变量
while True:
time.sleep(2) # 线程休眠2秒
c.acquire() #获取锁
c.wait_for(customer_Wait_For) # 看看是否需要阻塞
item -= 1
print("子线程%s将item减少1,item = %d,time = %s"
% (threading.current_thread().getName(),item,time.perf_counter()))
c.release() # 释放锁
def main():
t1 = threading.Thread(target=customer,name="thread_customer",daemon=True) # 创建producer子线程
t2 = threading.Thread(target=producer,name="thread_producer",daemon=True) # 创建customer子线程
t1.start() # 启动customer线程
t2.start() # 启动producer线程
t1.join() # 子线程customer是无限循环的线程,所以主线程需要等待它运行结束
t2.join() # 子线程producer是无限循环的线程,所以主线程需要等待它运行结束
print("主线程运行结束!")
if __name__ == "__main__":
main()
从运行的结果看来,跟c.wait_for()只是将条件判断(是否阻塞)封装在一个函数里面而已,使得线程的函数变得更加简洁。
四、with语句让代码更加简洁
从官方文档了解到,条件对象也支持with语句。
4.1、代码
从下面的代码可以看到,with语句帮我们管理了acquire( )方法与release( )方法(代码里找不到acquire( )方法与release( )方法了。)
# python3.9
import threading
import time
c = threading.Condition() # 声明一个条件对象
item = 0 # 用于计数
def producer():
"""
子线程-生产者
"""
global c # c是全局变量
global item # item是全局变量
while True:
time.sleep(1) # 线程休眠1秒
with c:
# 如果item等于5,就进行阻塞
if item == 5:
print("子线程%s发现item = %d,time = %s,将进入阻塞态"
% (threading.current_thread().getName(),item,time.perf_counter()))
c.wait() # 进入阻塞态,并释放锁,,等待其他线程notify
item += 1 # 每一秒增加1
print("子线程%s将item增加1,item = %d,time = %s"
% (threading.current_thread().getName(),item,time.perf_counter()))
c.notify() # 通知customer解除阻塞态
def customer():
"""
子线程-消费者
"""
global c # c是全局变量
global item # item是全局变量
while True:
time.sleep(2) # 线程休眠2秒
with c:
if item == 0:
print("子线程%s发现item = %d,time = %s,将进入阻塞态"
% (threading.current_thread().getName(),item,time.perf_counter()))
print("通知producer恢复运行(解除阻塞态)")
c.notify() # 通知producer线程解除阻塞态
c.wait() # 进入阻塞态,并释放锁。等待其他线程notify
item -= 1
print("子线程%s将item减少1,item = %d,time = %s"
% (threading.current_thread().getName(),item,time.perf_counter()))
def main():
t1 = threading.Thread(target=customer,name="thread_customer",daemon=True) # 创建producer子线程
t2 = threading.Thread(target=producer,name="thread_producer",daemon=True) # 创建customer子线程
t1.start() # 启动customer线程
t2.start() # 启动producer线程
t1.join() # 子线程customer是无限循环的线程,所以主线程需要等待它运行结束
t2.join() # 子线程producer是无限循环的线程,所以主线程需要等待它运行结束
print("主线程运行结束!")
if __name__ == "__main__":
main()