「这是我参与2022首次更文挑战的第19天,活动详情查看:2022首次更文挑战」。
正式的Python专栏第68篇,同学站住,别错过这个从0开始的文章!
queue.Queue这个类是一个非常经典的消费者生产者模型。
在代码中,我们可以看到queue类创建的时候创建了三个Condition锁,其中之一就是:all_tasks_done, 它也共用了mutex 互斥锁。 还有两个方法和成员变量unfinished_tasks(未完成任务数量),这个是上篇没有讲到的,本篇继续。
先看看一段展示,队列掏空但是未完任务数居然不是0!!!
unfinished_tasks居然没有清0?
学委准备了下面的程序:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2022/2/19 12:40 上午
# @Author : LeiXueWei
# @CSDN/Juejin/Wechat: 雷学委
# @XueWeiTag: CodingDemo
# @File : originalq_qdemo_monitor.py
# @Project : hello
import datetime
import threading
import time
import queue
q = queue.Queue()
threads = []
for i in range(5):
def operate_q():
time.sleep(0.5)
tname = threading.current_thread().name
print("%s - tname %s - before put" % (datetime.datetime.now(), tname))
q.put(tname)
print("%s - tname %s - q: %s" % (datetime.datetime.now(), tname, q.queue))
def get_q():
time.sleep(0.5)
tname = threading.current_thread().name
print("%s - tname %s - before get" % (datetime.datetime.now(), tname))
ele = q.get()
print("%s - tname %s - get q: %s" % (datetime.datetime.now(), tname, ele))
t = threading.Thread(target=operate_q, name='学委-put-' + str(i + 1))
threads.append(t)
t2 = threading.Thread(target=get_q, name='学委-get-' + str(i + 1))
threads.append(t2)
def monitor():
while True:
time.sleep(1)
tname = threading.current_thread().name
print("monitor-q: %s - q unfinished_tasks %s - queue %s " % (tname, q.unfinished_tasks,q.queue))
threading.Thread(target=monitor, name="学委-q-monitor").start()
for t in threads:
t.start()
复制代码
除了主线程,这里面主要有三组线程,一组是往Queue里面放元素,一组是往Queue里面取出元素。还有一组只有一个线程是q-monitor,监控线程。
我们看到最后整个队列会被‘掏空’,没有任何逻辑问题,这是符合期待的。 但是unfinished_tasks这个数值最后一只是5,前篇文章也说过了没调用put成功一次,这个数值就会增长,所以也没有问题。
下面是一次运行结果:
unfinished_tasks居然没有清0? 原因在哪里?
首先,这个也并非bug!
上篇解析了大部分的方法,还差两个方法。我们看完代码会发现仅仅当task_done方法被调用了unfinished_tasks才会自减一。
def task_done(self):
with self.all_tasks_done:
unfinished = self.unfinished_tasks - 1
if unfinished <= 0:
if unfinished < 0:
raise ValueError('task_done() called too many times')
self.all_tasks_done.notify_all()
self.unfinished_tasks = unfinished
def join(self):
with self.all_tasks_done:
while self.unfinished_tasks:
self.all_tasks_done.wait()
复制代码
先说join方法
join方法获取all_tasks_done条件锁,这主要是为了保证操作all_tasks_done锁的原子性,整个with代码块任意时刻只有一个线程访问;
然后进入循环,判断unfinished_tasks(未完成任务数量),如果一只还有未完成的任务(需要调用task_done) 则当前线程进入等待。
再说 task_done
task_done也获取all_tasks_done条件锁,获取不到就等待其他获取的线程做完。
获取到了all_tasks_done条件锁,unfinished_tasks减去1。在减1的过程中,如果发现未完成任务数为0了,通知所有调用join的线程,接触休眠,可以继续执行了。
很明显,join和task_done是需要配合使用的。因为他们基于all_tasks_done这个Condition锁来实现多线程协作。
下面学委准备了一段代码:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2022/2/19 12:40 上午
# @Author : LeiXueWei
# @CSDN/Juejin/Wechat: 雷学委
# @XueWeiTag: CodingDemo
# @File : qdemo_producer_consumer.py
# @Project : hello
import datetime
import threading
import time
import queue
q = queue.Queue()
threads = []
for i in range(5):
def operate_q():
time.sleep(0.1)
tname = threading.current_thread().name
print("%s - tname %s - before put" % (datetime.datetime.now(), tname))
q.put(tname)
print("%s - tname %s - before task done" % (datetime.datetime.now(), tname))
q.task_done()
def get_q():
time.sleep(0.1)
tname = threading.current_thread().name
print("%s - tname %s - before join" % (datetime.datetime.now(), tname))
q.join()
#print(" %s after join" % tname)
ele = q.get()
print("%s - tname %s - get q: %s" % (datetime.datetime.now(), tname, ele))
t = threading.Thread(target=operate_q, name='点心-' + str(i + 1))
threads.append(t)
t2 = threading.Thread(target=get_q, name='开动-' + str(i + 1))
threads.append(t2)
def monitor():
i = 0
while i < 50:
i += 1
time.sleep(0.1)
tname = threading.current_thread().name
print("monitor-q: %s - q unfinished_tasks %s - queue %s " % (tname, q.unfinished_tasks, q.queue))
threading.Thread(target=monitor, name="学委-q-monitor").start()
for t in threads:
t.start()
复制代码
简短解释:
这里还是三组线程,区别本文前面的代码的地方是,get的那组线程在调用get之前都调用了join。
所以这个程序就是当队列有元素
运行效果如下:
总结
queue.Queue设计上比SimpleQueue更加复杂,结合上篇,关于这个类的解析到此结束。
可以说,看这个类的源码,也就是对开发者对线程中的Lock/Condition的知识的考验,组合用起来了。如果对多线程了解的不多,基本上是看不懂的,这块可以去学习学委之前的多线程文章,也在持续更新的专栏内。
而且用的很巧妙,如果稍微用错,设计出来的queue队列就容易出现死锁。
喜欢Python的朋友,请关注学委的 Python基础专栏 or Python入门到精通大专栏
持续学习持续开发,我是雷学委!
编程很有趣,关键是把技术搞透彻讲明白。
欢迎关注微信,点赞支持收藏!