Python快速而美丽[v1.0.0][线程通信]

线程调度由操作系统完成,存在一定的随机性无法准确的控制线程的轮换执行,通过线程通信可以达到该效果

使用Condition实现线程通信

  • 使用Condition可以让那些已经得到Lock对象却无法继续执行的线程释放Lock对象,Condition对象也可以唤醒其他处于等待状态的线程
  • 将Condition和Lock对象组合使用,可以为每个对象提供多个等待集,从Condition的构造器可以看到,它有个lock=None参数,程序创建Condition的时候通过lock参数传入要绑定的Lock对象,如果没有传该参数,则创建Condition的时候会自动创建一个与它绑定的Lock对象

Condition类

该类提供了如下常用方法:

  • acquire([timeout])/release():调用Condition关联的Lock的acquire()或release()方法
  • wait([timeout]):导致当前线程进入Condition的等待池等待通知并释放锁,直到其他线程调用该Condition的notify()或notify_all()方法来唤醒该线程,在调用该wait()方法时可传入一个timeout阐述,指定该线程最多等待多少秒
  • notify():唤醒在该Condition等待池中的单个线程并通知它,收到通知的线程将会自动调用acquire()方法尝试加锁,如果所有线程都在该Condition等待池中等待,则会任意选择唤醒其中一个线程
  • notify_all():唤醒在该Condition等待池中等待的所有线程并通知它们
import threading

class Account:
    # 定义构造器
    def __init__(self, account_no, balance):
        # 封装账户编号、账户余额的两个成员变量
        self.account_no = account_no
        self._balance = balance
        self.cond = threading.Condition()
        # 定义代表是否已经存钱的旗标
        self._flag = False

    # 因为账户余额不允许随便修改,所以只为self._balance提供getter方法
    def getBalance(self):
        return self._balance
    # 提供一个线程安全的draw()方法来完成取钱操作
    def draw(self, draw_amount):
        # 加锁,相当于调用Condition绑定的Lock的acquire()
        self.cond.acquire()
        try:
            # 如果self._flag为假,表明账户中还没有人存钱进去,取钱方法阻塞
            if not self._flag:
                self.cond.wait()
            else:
                # 执行取钱操作
                print(threading.current_thread().name 
                    + " 取钱:" +  str(draw_amount))
                self._balance -= draw_amount
                print("账户余额为:" + str(self._balance))
                # 将标识账户是否已有存款的旗标设为False
                self._flag = False
                # 唤醒其他线程
                self.cond.notify_all()
        # 使用finally块来释放锁
        finally:
            self.cond.release()
    def deposit(self, deposit_amount):
        # 加锁,相当于调用Condition绑定的Lock的acquire()
        self.cond.acquire()
        try:
            # 如果self._flag为真,表明账户中已有人存钱进去,存钱方法阻塞
            if self._flag:            
                self.cond.wait()
            else:
                # 执行存款操作
                print(threading.current_thread().name\
                    + " 存款:" +  str(deposit_amount))
                self._balance += deposit_amount
                print("账户余额为:" + str(self._balance))
                # 将表示账户是否已有存款的旗标设为True
                self._flag = True
                # 唤醒其他线程
                self.cond.notify_all()
        # 使用finally块来释放锁
        finally:
            self.cond.release()
import threading
import Account
#  定义一个函数,模拟重复max次执行取钱操作
def draw_many(account, draw_amount, max):
    for i in range(max):
        account.draw(draw_amount)
#  定义一个函数,模拟重复max次执行存款操作
def deposit_many(account, deposit_amount, max):
    for i in range(max):
        account.deposit(deposit_amount)
# 创建一个账户
acct = Account.Account("1234567" , 0)
# 创建、并启动一个“取钱”线程
threading.Thread(name="取钱者", target=draw_many, 
    args=(acct, 800, 100)).start()
# 创建、并启动一个“存款”线程
threading.Thread(name="存款者甲", target=deposit_many, 
    args=(acct , 800, 100)).start();
threading.Thread(name="存款者乙", target=deposit_many, 
    args=(acct , 800, 100)).start()
threading.Thread(name="存款者丙", target=deposit_many, 
    args=(acct , 800, 100)).start()

程序阻塞并不是死锁,死锁是等待其他线程释放同步监视器

使用队列Queue控制线程通信

Queue模块提供了三中阻塞队列,这些队列主要用于实现线程通信:

  • queue.Queue(maxsize=0):代表FIFO(先进先出)的常规队列,maxsize用于限制队列的大小,如果队列的大小达到上限,就会加锁,再进入元素的时候就会阻塞,直到队列中的元素被消费,如果maxsize为0或者为负,则队列大小无限
  • queue.LifoQueue(maxsize=0):代表LIFO(后进先出)的队列,与Queue的区别就是出队列的顺序不同
  • PriorityQueue(maxsize = 0):代表优先级队列,优先级最小的元素先出队列

这三种队列的属性和函数基本相同:

  • Queue.qsize():返回队列的实际大小,也就是该队列中包含几个元素
  • Queue.empty():判断队列是否为空
  • Queue.full():判断队列是否已满
  • Queue.put(item, block=True, timeout=None): 向队列中放入元素,如果队列已满且block参数为True,当前线程被阻塞,timeout指定阻塞时间,如果timeout设置为None则代表一直阻塞,直到队列元素被消费,如果队列已满且block为False,则直接引发queue.FULL异常
  • Queue.put_nowait(item):向队列中放入元素,不阻塞,相当于上个函数将block设置为False
  • Queue.get(item, block=True, timeout=None):从队列中取出元素,如果队列为空且block为True,当前线程被阻塞,timeout指定阻塞时间,如果timeout为None则表示一直阻塞,知道有元素被放入队列中,如果队列为空,且block为False,则直接引发queue.EMPTY异常
  • Queue.get_nowait(item):从队列中取出元素,不阻塞,相当于上个函数将block设置为False
import queue

# 定义一个长度为2的阻塞队列
bq = queue.Queue(2)
bq.put("Python")
bq.put("Python")
print("1111111111")
bq.put("Python")  
print("2222222222")
import threading
import time
import queue

def product(bq):
    str_tuple = ("Python", "Kotlin", "Swift")
    for i in range(99999):
        print(threading.current_thread().name + "生产者准备生产元组元素!")
        time.sleep(0.2);
        # 尝试放入元素,如果队列已满,则线程被阻塞
        bq.put(str_tuple[i % 3])
        print(threading.current_thread().name \
            + "生产者生产元组元素完成!")
def consume(bq):
    while True:
        print(threading.current_thread().name + "消费者准备消费元组元素!")
        time.sleep(0.2)
        # 尝试取出元素,如果队列已空,则线程被阻塞
        t = bq.get()
        print(threading.current_thread().name \
            + "消费者消费[ %s ]元素完成!" % t)
# 创建一个容量为1的Queue
bq = queue.Queue(maxsize=1)
# 启动3个生产者线程
threading.Thread(target=product, args=(bq, )).start()
threading.Thread(target=product, args=(bq, )).start()
threading.Thread(target=product, args=(bq, )).start()
# 启动一个消费者线程
threading.Thread(target=consume, args=(bq, )).start()

使用Event控制线程通信

Event是一种简单的通信机制,就是一个线程发出一个Event,这个Event触发另一个线程,它提供了如下常用方法:

  • is_set():该方法返回Event的内部旗标是否为True
  • set():该方法将会把Event的内部旗标设置为True,并唤醒所有处于等待状态的线程
  • clear():该方法将Event的内部旗标设置为False,通常接下来会调用wait()方法来阻塞当前线程
  • wait(timeout=None):该方法会阻塞当前线程
import threading
import time

event = threading.Event()
def cal(name):
    # 等待事件,进入等待阻塞状态
    print('%s 启动' % threading.currentThread().getName())
    print('%s 准备开始计算状态' % name)
    event.wait()    # ①
    # 收到事件后进入运行状态
    print('%s 收到通知了.' % threading.currentThread().getName())
    print('%s 正式开始计算!'% name)
# 创建并启动两条,它们都会①号代码处等待
threading.Thread(target=cal, args=('甲', )).start()
threading.Thread(target=cal, args=("乙", )).start()
time.sleep(2) # 主线程等待   
print('------------------')
# 发出事件
print('主线程发出事件')
event.set() # 将Event内部旗标设置为True,并唤醒所有等待线程

代码示例 生产者消费者

import threading

class Account:
    # 定义构造器
    def __init__(self, account_no, balance):
        # 封装账户编号、账户余额的两个成员变量
        self.account_no = account_no
        self._balance = balance
        self.lock = threading.Lock()
        self.event = threading.Event()
    # 因为账户余额不允许随便修改,所以只为self._balance提供getter方法
    def getBalance(self):
        return self._balance
    # 提供一个线程安全的draw()方法来完成取钱操作
    def draw(self, draw_amount):
        # 加锁
        self.lock.acquire()
        # 如果Event内部旗标为True,表明账户中已有人存钱进去
        if self.event.is_set():
            # 执行取钱操作
            print(threading.current_thread().name 
                + " 取钱:" +  str(draw_amount))
            self._balance -= draw_amount
            print("账户余额为:" + str(self._balance))
            # 将Event内部旗标设为False
            self.event.clear()
            # 释放加锁
            self.lock.release()
            # 阻塞当前线程阻塞
            self.event.wait()
        else:
            # 释放加锁
            self.lock.release()
            # 阻塞当前线程阻塞
            self.event.wait()
    def deposit(self, deposit_amount):
        # 加锁
        self.lock.acquire()
        # 如果Event内部旗标为False,表明账户中还没有人存钱进去
        if not self.event.is_set():
            # 执行存款操作
            print(threading.current_thread().name\
                + " 存款:" +  str(deposit_amount))
            self._balance += deposit_amount
            print("账户余额为:" + str(self._balance))
            # 将Event内部旗标设为True
            self.event.set()
            # 释放加锁
            self.lock.release()
            # 阻塞当前线程阻塞
            self.event.wait()
        else:
            # 释放加锁
            self.lock.release()
            # 阻塞当前线程阻塞
            self.event.wait()
import threading
import Account

#  定义一个函数,模拟重复max次执行取钱操作
def draw_many(account, draw_amount, max):
    for i in range(max):
        account.draw(draw_amount)
#  定义一个函数,模拟重复max次执行存款操作
def deposit_many(account, deposit_amount, max):
    for i in range(max):
        account.deposit(deposit_amount)
# 创建一个账户
acct = Account.Account("1234567" , 0)
# 创建、并启动一个“取钱”线程
threading.Thread(name="取钱者", target=draw_many, 
    args=(acct, 800, 100)).start()
# 创建、并启动一个“存款”线程
threading.Thread(name="存款者甲", target=deposit_many, 
    args=(acct , 800, 100)).start();
threading.Thread(name="存款者乙", target=deposit_many, 
    args=(acct , 800, 100)).start()
threading.Thread(name="存款者丙", target=deposit_many, 
    args=(acct , 800, 100)).start()

猜你喜欢

转载自blog.csdn.net/dawei_yang000000/article/details/105689704