目录
一、生产者消费者模型
1.生产者:泛指产生数据的一方
2.消费者:泛指处理数据的一方
3.模型:解决某个问题的固定方法或套路
4.生产者消费者模型要解决的问题:由于进程之间或内部可能会有数据的生产和处理两种情况,当两者的运行速度不同时,则双方会相互等待,这样就会导致效率低下
5.解决方法:
- 1.先将双放解开耦合,让不同的进程负责不同的任务
- 2.提供一个共享的容器,来平衡双方的能力,用进程队列来作为这个共享容器,因为队列可以在进程间共享
1.模型1
from multiprocessing import Process, Queue
import requests
import re, os, time, random
# 生产者任务
def producter(urls, q):
i = 0 # 记录生产了多少数据
for url in urls:
response = requests.get(url)
text = response.text
# 将生产完成的数据放入队列中
time.sleep(random.random())
q.put(text) # 生产的数据放入进程队列中
i += 1
print(os.getpid(), '生产了第%s个数据'%i)
# 消费者任务
def customer(q):
i = 0 # 由于记录处理了多少数据
while True:
text = q.get() # 取出进程队列中的元素
time.sleep(random.random())
res = re.findall('src=//(.*?) width', text)
i += 1
print('第%s个任务获取到%s个img'%(i, len(res)))
if __name__ == '__main__':
urls = [
"http://www.baidu.com",
"http://www.baidu.com",
"http://www.baidu.com",
"http://www.baidu.com",
]
# 创建一个双方共享的容器
q = Queue()
# 生产者进程
p1 = Process(target=producter, args=(urls, q))
p1.start()
# 消费者进程
c = Process(target=customer, args=(q,))
c.start()
无法解决进程何时结束的问题
2.joinableQueue
1.joinableQueue继承自Queue,所以用法一致
2.增加了join和taskdone(在每次用get方法调用队列的时候用 队列.taskdone,最后在需要结束进程之前用 队列.join来判断是否取完了队列中的元素)
3.join是个阻塞函数,阻塞会持续到taskdone的调用次数等于存入的元素个数时才会结束,可以用于表示队列任务处理完成
4.模型
from multiprocessing import Process, JoinableQueue
import requests
import re, os, time, random
# 生产者
def producter(q, name):
for i in range(5):
hotdog = '%s的热狗%s'%(name, (i+1)) # 记录热狗的个数和产地
time.sleep(random.random())
print('生产了',hotdog)
q.put(hotdog) # 将生产的数据放入进程队列
# 消费者
def customer(q):
while True:
hotdog = q.get() # 按序取出进程队列中的元素
time.sleep(random.random())
print('消费了%s'%dog)
q.task_done() # 每取出一个元素,就记录一次
if __name__ == '__main__':
# 创建一个双方能共享的容器
q = JoinableQueue()
# 生产者进程
p1 = Process(target=producter, args=(q, '上海分店'))
p2 = Process(target=producter, args=(q, '北京分店'))
p1.start()
p2.start()
# 消费者进程
c = Process(target=customer, args=(q,))
# c.daemon = True # 可以将消费者设置为守护进程,当主进程确认任务全部完成时,守护进程会随着主进程一起结束
c.start()
# 记住只有当join函数紧挨着start后面时,才会串行(自己理解)
p1.join()
p2.join() # 代码走到这里意味着生产方完成
q.join() # 意味着队列中的任务都处理完成,阻塞才会变为非阻塞
# terminate函数可以终止子进程
c.terminate() # 取完所有元素则终止进程
# 思路:
# 1.确定生产者任务完成
# 2.确定生产出来的数据已经全部处理完成
5.应用:
- 1.redis 消息队列
- 2.MQ消息队列
- 3.常用来做流量削峰,保证服务不会因为高并发而崩溃
二、多线程
适用于实现并发编程
1.线程的相关概念
1.进程是操作系统可以调度已经进行资源分配的基本单位,是一个资源单位,其中包含了运行这个程序所需的资源
2.线程是操作系统可以运算调度的最小单位,是真正的执行单位,其包含在进程中, 一个线程就是一条固定的控制流程,
3.一个进程可以包含多个线程,同一进程中的线程共享进程内的资源
4.线程特点:系统会为每一个进程自动创建一条线程,称之为主线程, 后续通过代码开启的线程称之为子线程
2.为什么需要线程
1.降低进程间切换的消耗,提高系统的并发性,突破一个进程只能干一件事的缺陷,使进程内并发成为可能
2.有多个任务要并发处理,使用进程资源开销太大,因此使用线程,它适用于任务非常多的情况
3.多线程使用
from threading import Tread
from multiprocessing import Process
import time
1.直接通过Thread类,实例化线程对象
# 线程与进程不同的是,线程不需要在maim判断中开启,开启线程的代码放哪里都可以
def task():
print('子线程 start')
t = Thread(target=task)
t.start()
print('主线程 end')
2.通过继承Thread类,并复写run方法,当开启子线程对象的start方法时,就会自动运行run方法
class MyThread(Thread):
def run(self):
# 把要在子线程中执行的代码放入run中
print('子线程 start')
mt = MyThread()
mt.start()
3.并发测试:通过连续打印主线程和子线程中的信息,看是否有并发效果
def task():
while True:
print('子线程')
t = Thread(target=task)
t.start()
while True:
print('主线程')
4.进程与线程
1.进程是资源单位,它里面包含了共享数据和代码,做多进程时,就是通过多个进程同时按照它里面的代码和数据进行运行;线程是最小的执行单位,它属于进程中的某个功能,享受进程中的所有资源
2.创建进程的开销(资源)远大于线程
3.多个进程之间内存是相互隔离的;而线程是共享进程内的所有资源
4.进程之间对硬件资源是竞争关系;而线程是协作关系
5.进程之间有层级关系;而线程之间没有层级关系,它们是平等的
6.无论开启了多少子线程,PID都是一样的
from threading import Tread
import os
def task():
print(os.getpid())
for i in range(100):
t = Thread(target=task)
t.start()
5.线程安全问题
只要并发访问了同一资源,就一定会产生安全问题,解决方法和多进程一样,给操作公共资源的代码加锁
from threading import Thread, Lock
import time
a = 10 # 定义一个不可变数据类型的变量作为参考
mutex = Lock() # 定义一把公共的锁
def task():
global a # 将线程中的变量声明为全局变量
mutex.acquire() # 当一个线程执行到这里时就锁住代码
temp = a # 定义个等于全局变量的临时变量
time.sleep(0.1) # 睡眠程序来验证是否会有数据错乱的情况
a = temp - 1 # 改变全局变量的值
mutex.release() # 解锁
tl = list()
for i in range(10):
t = Thread(target=task) # 实例化多个子线程
t.start() # 开启子线程
tl.append(t) # 收集子线程
for t in tl:
t.join() # 无论是哪个子线程最后执行完,都会先于主线程(让主线程等待)
print(a)
# 解释:
# 1.当我们不加锁时,多个子线程可能竞争共享资源(也就是操作全局变量的代码),那么可能出现a最后为9的情况,因为它们同时执行(并发)可能取到改变之前的全局变量a
# 2.当我加锁之后,先抢到锁的子线程就先执行,这样每次都会改变全局变量a,最终结果a必定为0
5.守护线程
1.一个子线程设置为守护线程,那么该子线程(守护线程)会随着主线程结束而直接结束
2.默认情况下,主线程即使代码执行完毕,也会等待所有非守护线程执行完毕后,才会结束整个程序,因为多个线程之间是协作关系
3.案例
from threading import Thread
import time
def task1():
# 守护线程会随着程序结束而直接结束
print('守护线程 start')
time.sleep(4)
print('守护线程 end') # 可能不会执行
def task2():
# 程序会等待子线程执行完之后才会结束
print('子线程 start')
time.sleep(2)
print('子线程 end') # 一定会执行
print('主线程 start')
t1 = Thread(target=task1)
t1.daemon = True
t1.start()
t2 = Thread(target=task2)
t2.start()
print('主线程 over')
# 会等待子线程结束后才结束整个程序
'''
主线程 start
守护线程 start
子线程 start
主线程 over
子线程 end
'''
6.线程中的常用属性和方法
from threading import Thread, currentThread, enumerate, activeCount
import time
t = Thread()
# 1 t.start() # 开启线程
# 2 t.join() # 强制按序执行,提高线程的优先级
# 3 t.is_alive()/t.isAlive() # 判断线程是否存活
# 4 t.ident # 线程标识符(线程的id)
# 5 t.daemon # 设置为守护线程
# 方法1:获取当前线程对象
print(currentThread())
# 方法2:获取正在运行的所有线程对象(是一个列表)
print(enumerate())
# 方法3:存活的线程数量
print(activeCount())