生产者消费者模型
这是基于进程间通信的案例
利用Queue实现,多个生产者对多个消费者案例(不合适):
import time
import random
from multiprocessing import Process, Queue
# 生产者功能
def producer(name, food, q):
for i in range(10):
data = '%s 制造了%s' % (name, food)
# 模拟制造食物延迟
time.sleep(random.randint(1, 3))
print(data)
q.put(food)
# 消费者功能
def consumer(name, q):
while True:
food = q.get()
if food is None:return # 当队列中取出None,之间结束
# 模拟吃食物延迟
time.sleep(random.randint(1, 3))
print('%s消费了%s' % (name, food))
if __name__ == '__main__':
q = Queue()
# 创造两个生产者
p = Process(target=producer, args=('egon', '包子', q))
p.start()
p1 = Process(target=producer, args=('alex', '泔水', q))
p1.start()
# 创造三个消费者
c = Process(target=consumer, args=('鸡哥', q))
c.start()
c1 = Process(target=consumer, args=('王铁蛋', q))
c1.start()
c2 = Process(target=consumer, args=('李铁柱', q))
c2.start()
p.join() # 等待p进程执行完成再放
p1.join() # 等待p1(另一个生产者)进程执行完成再放
# 生产者生产完毕,放与消费者对应数量的None
q.put(None)
q.put(None)
q.put(None)
- 这个方案每新增一个消费者,下方就得多put一个None,可以说非常蠢了
新语法介绍: JoinableQueue([maxsize])
引入模块:from multiprocess import JoinableQueue
JoinableQueue的实例p除了与Queue对象相同的方法之外,还具有以下方法:
【q.task_done()】使用者使用此方法发出信号,表示q.get()返回的项目已经被处理。如果调用此方法的次数大于从队列中删除的项目数量,将引发ValueError异常。
【q.join()】生产者将使用此方法进行阻塞,直到队列中所有项目均被处理。阻塞将持续到为队列中的每个项目均调用q.task_done()方法为止。
生产者消费者模型终极版
# 最终版本,不用放None
import time
import random
from multiprocessing import Process, Queue,JoinableQueue
def producer(name, food, q):
for i in range(10):
data = '%s 制造了%s' % (name, food)
# 模拟制造食物延迟
time.sleep(random.randint(1, 3))
print(data)
q.put(food)
def consumer(name, q):
while True:
food = q.get()
# 模拟吃食物延迟
time.sleep(random.randint(1, 3))
print('%s消费了%s' % (name, food))
q.task_done() # 把队列中维护的数字减一
if __name__ == '__main__':
# q = Queue()
# 内部为何了一个数字,放一个数字会加一
# 消费一个数字减一
q = JoinableQueue()
# 创造一个生产者
p = Process(target=producer, args=('egon', '包子', q))
p.start()
p1 = Process(target=producer, args=('alex', '泔水', q))
p1.start()
# 创造一个消费者
c = Process(target=consumer, args=('鸡哥', q))
# c.daemon = True
c.start()
c1 = Process(target=consumer, args=('王铁蛋', q))
# c1.daemon = True
c1.start()
c2 = Process(target=consumer, args=('李铁柱', q))
# c2.daemon = True
c2.start()
# 主结束,消费进程也结束,把每个消费进程都设置成守护进程
# 等待所有生产者生产结束,主进程再结束
p.join()
p1.join()
q.join() # 会卡再者,一直等待q队列中数据没有了,才继续往下走
print('生产者结束了,主进程结束')
线程
嘛是线程?要线程做啥子?
-
计算机相当于大工厂,工厂里有一个个的车间(进程),有很多人(线程)干不同的事
-
真正干活的是线程–>线程是cup调度的最小单位
-
进程是资源分配的最小单位,线程是CPU调度的最小单位。每一个进程中至少有一个线程(这是一道面试题)
-
线程开销更小,更轻量级
线程与进程的区别
-
地址空间和其它资源(如打开文件):进程间相互独立,同一进程的各线程间共享。某进程内的线程在其它进程不可见。
-
通信:进程间通信IPC,线程间可以直接读写进程数据段(如全局变量)来进行通信——需要进程同步和互斥手段的辅助,以保证数据的一致性。
-
调度和切换:线程上下文切换比进程上下文切换要快得多。
-
在多线程操作系统中,进程不是一个可执行的实体。
开启线程的两种方法(跟进程大同小异)
第一种,直接调用函数
from threading import Thread
import time
def task():
print('开始')
time.sleep(1)
print('结束')
if __name__ == '__main__':
t=Thread(target=task,) # 实例化得到一个对象
t.start() # 对象.start()启动线程
print('主')
第二种,通过类继承的方式
from threading import Thread
import time
class MyThread(Thread):
def run(self):
print('开始')
time.sleep(1)
print('结束')
if __name__ == '__main__':
t=MyThread()
t.start()
print('主')
join的使用(与进程一样)
from threading import Thread
import time
def task(n):
print('开始')
time.sleep(n)
print('结束')
if __name__ == '__main__':
t=Thread(target=task,args=(2,))
t.start()
t1=Thread(target=task,args=(3,))
t1.start()
t.join() # 等待子线程执行结束
t1.join()
print('主')
多线程下数据共享
from threading import Thread
import time
money = 99
def task(n):
global money
money=n
print('开始')
# time.sleep(n)
print('结束')
if __name__ == '__main__':
t = Thread(target=task, args=(2,))
t.start()
t1 = Thread(target=task, args=(66,))
t1.start()
t.join()
t1.join()
print(money)
print('主')
获取线程的名字:t.name t.getName()
获取当前进程下存活的线程的数量:active_count()
t1.is_alive() 检验当前线程是否存活
t1.ident 当作是线程id号
t1.setDaemon(True) 为线程t1开启守护线程
以上方法的实用案例
from threading import Thread, current_thread,active_count
import time
import os
def task(n):
print('开始')
print(current_thread().name) # 线程名字
# 如果打印进程id号,会是什么
print(os.getpid()) # 与线程id毫无关系
time.sleep(n)
print('结束')
if __name__ == '__main__':
t1 = Thread(target=task,name='egon',args=(2,)) # 参数中可以自定义线程名字
t2 = Thread(target=task,args=(8,)) # 没有传入名字就会给默认的格式,这里是:Thread-1
t1.start()
t2.start()
t1.join()
print('---------',t1.is_alive()) # 死了
print('---------',t2.is_alive()) # 活着
# 当作线程id号
print('*********',t1.ident)
print('*********',t2.ident)
print(os.getpid()) # 进程pid与线程id不一样
print(active_count()) # 打印出3 ,开了两个线程,还有一个主线程
守护线程
无论是进程还是线程,都遵循:守护xx会等待主xx运行完毕后被销毁。需要强调的是:运行完毕并非终止运行。
-
对主进程来说,运行完毕指的是主进程代码运行完毕
-
对主线程来说,运行完毕指的是主线程所在的进程内所有非守护线程统统运行完毕,主线程才算运行完毕
from threading import Thread, current_thread,active_count
import time
import os
def task(n):
print('开始')
time.sleep(n)
# print('-----',active_count())
print('结束')
if __name__ == '__main__':
t1 = Thread(target=task,name='egon',args=(10,))
# t1.daemon = True
t1.setDaemon(True)
t1.start()
t2 = Thread(target=task,name='egon',args=(4,))
t2.start()
print('主')
多个线程抢占资源的情况
不安全写法:
from threading import Thread
import os,time
def work():
global n
temp=n # 这个地方因为是并发的线程,所有线程都会得到100
time.sleep(0.1)
n=temp-1 # 然后所有线程执行n = 100-1,只不过重复给n赋值99罢了
if __name__ == '__main__':
n=100
l=[]
for i in range(100):
p=Thread(target=work)
l.append(p)
p.start()
for p in l:
p.join()
print(n) #结果为99
同步锁的引用
from threading import Thread,Lock
import os,time
def work():
global n
lock.acquire() # 锁住
temp=n
time.sleep(0.1)
n=temp-1
lock.release() # 开锁
if __name__ == '__main__':
lock=Lock() # 创造一个锁,目的使n-1这一步变成串行
n=100
l=[]
for i in range(100):
p=Thread(target=work)
l.append(p)
p.start()
for p in l:
p.join()
print(n) #结果肯定为0,由原来的并发执行变成串行,牺牲了执行效率保证了数据安全