1.manager
from multiprocessing import Process,Manager,Lock def work(dic,lock): # 简写 使用with语法自动给你上锁和解锁 with lock: dic['count'] -= 1 """ # 正常写法 # 上锁 lock.acquire() # 数据值减一 dic['count'] -= 1 # 解锁 lock.release() """ if __name__ == '__main__': # 创建Manager m = Manager() # 创建一个锁对象 lock = Lock() lst = [] dic = m.dict({'count':100}) for i in range(100): p = Process(target=work,args=(dic,lock)) p.start() lst.append(p) for i in lst: i.join() print(dic) # {'count': 0}
2.pool进程池(调用多核来执行任务(函数))
基本概念
# 进程池: # 开启过多的进程并不一定提高你的效率, # 如果cpu负载任务过多,平均单个任务执行的效率就会低,反而降低执行速度. 1个人做4件事,4个人做4件事,4个人做1件事 显然后者执行速度更快, 前者是并发,后者是并行 利用进程池,可以开启cpu的并行效果 # apply 开启进程,同步阻塞,每次都要等待当前任务完成之后,在开启下一个进程 # apply_async 开启进程,异步非阻塞,(主进程 和 子进程异步)
# pool() 我的电脑是4核的 即是调用4核每次产生四个进程来执行任务,
# pool(1)即是4核执行一个任务 即是每次执行一个任务,四个核相互切换
2.1 计算你的机器有多少个cpu
# print(os.cpu_count())
# 4
# 理解:进程池为POOl(里面有多少个进程。一般来说不写代表默认核数多个进程数,也就是说接下来跑任务,就用这个
几个进程执行任务(响应的函数))
def func(num): time.sleep(3) # 同一时间最多允许4个进程同时执行任务. # time.sleep(random.uniform(0.1,1)) #异步并行的程序. print("这是发送的第%d邮件" % (num)) if __name__ == "__main__": startime = time.time() # (1)创建进程池对象 # Pool()里面的参数是同一时间允许多少个进程并行. """ 6个任务 (1)1个人做6个 (2)6个人做6个 (3)6个人做1个 任务量较少时,3的速度较快,任务量较大时,2的速度更快. 因为如果任务线拉长,频繁切换cpu会占点时间. """ # 进程里面的数据不给的时候 是默认你的内核数,也就是保证每一个内核产生一个进程 p = Pool() for i in range(100): p.apply_async(func, args=(i,)) # 关闭进程池,不在接受新的进程 p.close() # 主进程阻塞,等待子进程全部完成后再退出 p.join() endtime = time.time() print(endtime - startime) # 0.19946622848510742 # 这是发送的第0邮件 # 这是发送的第1邮件 # 这是发送的第2邮件 # 这是发送的第3邮件 # 这是发送的第4邮件 # 这是发送的第5邮件 # 这是发送的第6邮件 # 这是发送的第7邮件 # 这是发送的第8邮件 # 这是发送的第9邮件 # 这是发送的第10邮件 # 这是发送的第11邮件
# (比对一下之间用单核执行任务,也就是使用process 单核并发程序 不使用进程池的时候) # (1)只需将上面的p = Pool(1) 也就是4个核每次车执行一个进程
# 单核并发程序
startime = time.time() lst = [] for i in range(100): p = Process(target=func,args=(i,)) p.start() lst.append(p) for i in lst: i.join() endtime = time.time() print(endtime-startime)
2.1两者对比代码:
import os, time from multiprocessing import Process,Pool # print(os.cpu_count()) def func(num): # time.sleep(3) # 同一时间最多允许6个进程同时执行任务. # time.sleep(random.uniform(0.1,1)) #异步并行的程序. print("这是发送的第%d邮件" % (num)) if __name__ == "__main__": startime = time.time() # (1)创建进程池对象 # Pool()里面的参数是同一时间允许多少个进程并行. """ 4个任务 (1)1个人做4个 (2)4个人做4个 (3)4个人做1个 任务量较少时,3的速度较快,任务量较大时,2的速度更快. 因为如果任务线拉长,频繁切换cpu会占点时间. """ ##########只是区别(1)的情况 相当于4个核产生一个进程就行互相切换执行任务 ##########p = Pool() 默认不写的情况是4个核产生4个进程去执行任务 p = Pool(1) for i in range(30): p.apply_async(func, args=(i,)) # 关闭进程池,不在接受新的进程 p.close() # 主进程阻塞,等待子进程全部完成后再退出 p.join() endtime = time.time() print(endtime - startime) # 0.19946622848510742 startime = time.time() lst = [] for i in range(30): p = Process(target=func, args=(i,)) p.start() lst.append(p) for i in lst: i.join() endtime = time.time() print(endtime - startime) # 这是发送的第0邮件 # 这是发送的第1邮件 # 这是发送的第2邮件 # 这是发送的第3邮件 # 这是发送的第4邮件 # 这是发送的第5邮件 # 这是发送的第6邮件 # 这是发送的第7邮件 # 这是发送的第8邮件 # 这是发送的第9邮件 # 0.28101587295532227 # 这是发送的第0邮件 # 这是发送的第1邮件 # 这是发送的第2邮件 # 这是发送的第5邮件 # 这是发送的第6邮件 # 这是发送的第3邮件 # 这是发送的第9邮件 # 这是发送的第7邮件 # 这是发送的第4邮件 # 这是发送的第8邮件 # 1.1240644454956055 # 4个任务 # (1)1个人做4个 # (2)4个人做4个 # (3)4个人做1个 # 可以充分的得到上面的杰伦是:任务量较少的时候3的速度叫快,当任务量较大的时候 # 2的速度更快 因为如果任务线拉长 频繁切换cpu会占点时间
2.2apply开启进程 同步阻塞 每次都要等待当前任务完成之后,再开启下一个进程可加上返回值
def task(num): time.sleep(random.uniform(0.1,1)) # 同步程序 print("%s:%s" % (num,os.getpid())) return num if __name__ == '__main__': p = Pool(2) for i in range(20): res = p.apply(task,args=(i,)) print("----->",res) # 完完全全的同步程序 等上面走完了再执行finish print("finish") # 0:7204 # -----> 0 # finish # 1:7156 # -----> 1 # finish # 2:7204 # -----> 2 # finish # 3:7156 # -----> 3 # finish # 4:7204 # -----> 4 # finish # 5:7156 # -----> 5 # finish # 6:7204 # -----> 6 # finish # 7:7156 # -----> 7 # finish # 小结:上面def task()函数中加的延时 很好证明 程序是同步的,不然产生的进程是并发的, # 上面的例子 p =Pool(2) 也就是说进程池中有2个进程, 来执行10个任务,其实也可以理解为 # 就是产生的10个进程 只是5个7204 5个7156 去执行 同等数量的10个task函数 也就是10个任务
2.3 apply_async 异步非阻塞程序 可以有返回值
def task(num): # time.sleep(3) time.sleep(random.uniform(0.1,1)) # 同步程序 print("%s:%s" % (num,os.getpid())) return os.getpid() if __name__ == "__main__": p = Pool() lst = [] lst2 = [] for i in range(10): res = p.apply_async(task,args=(i,)) # print(res) # 1.把返回的对象一个一个插入到列表里 lst.append(res) for i in lst: # 2.使用get方法获取返回值 lst2.append(i.get()) # 关闭进程池,不在接受新的进程 p.close() # 主进程阻塞,等待子进程全部完成后再退出 p.join() # 返回的是默认4个进程,因为当前机器是4个核心cpu print(set(lst2),len(set(lst2))) print("finish11222") # 0:7312 # 1:4804 # 2:6232 # 6:6232 # 3:1648 # 4:7312 # 8:1648 # 7:6232 # 5:4804 # 9:7312 # {7312, 6232, 4804, 1648} 4 # Pool() 默认创建4个子进程出来执行10个任务
2.4 进程池.map (与高阶函数map使用方法一样,只不过该map支持并行并发)
""" 进程池.map 返回的是列表 map默认底层中加了阻塞,等全部执行完毕之后,主进程在终止程序,区别于3 """ def task(num): # time.sleep() # time.sleep(random.uniform(0.1,1)) print("%s:%s"%(num,os.getpid())) return num ** 2 if __name__ == '__main__': p = Pool() lst = p.map(task,range(15)) print(lst) # 如果出现了join,一定需要加上close,要么同时出现,要么都没有 # p.close() # p.join() print(123456) # 0:4952 # 1:4952 # 2:4952 # 3:4952 # 4:4952 # 5:4952 # 6:4952 # 7:4952 # 8:4952 # 9:4952 # 10:4952 # 11:4952 # 12:4952 # 13:4952 # 14:4952 # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196] # 123456
2.5 关闭进程池不会再次接受新的进程
def task(num): time.sleep(random.uniform(0.1,1)) print("%s:%s" % (num,os.getpid())) return num ** 2 if __name__ == "__main__": p = Pool() lst = [] for i in range(15): res = p.apply_async(task,args=(i,)) lst.append(res) # get 函数内部默认加了阻塞 获取完所有值之后在向下执行 for i in lst: print(i.get()) p.close() # 如果执行了close 不能够继续往进程池里面添加进程了 # res = p.apply_async(task,args=(112233,)) p.join() print("finnish") # 0:8688 # 0 # 3:4996 # 1:8280 # 1 # 4:8688 # 2:8880 # 4 # 9 # 16 # 6:8280 # 5:4996 # 25 # 36 # 8:8880 # 7:8688 # 49 # 64 # 9:8280 # 81 # 10:4996 # 100 # 11:8880 # 121 # 14:4996 # 12:8688 # 144 # 13:8280 # 169 # 196 # finnish ## 小结:如果是在p.close()之前关闭添加又一个任务的话,可以, # 若在关闭之后再去添加就没用
小结:
# 进程池:可以简单的理解为就是,Pool里面有多少个产生多少个进程 # 1.一般来说不指定Pool()里面的数量 默认是按照每一个内核一个进程就行分配的,比如4核的cpu那么就是进程池有4个进程就行 # 来执行任务,开启任务有2种方式,都是拿进程池中的进程就行执行任务,若在调用的时候就会出现4对1(一个任务,只需一个进程4个核来做) 或者4对多 # 1.apply开启进程 同步阻塞 每次都要等待当前任务完成之后,再开启下一个进程可加上返回值 # 2.apply_async 异步非阻塞程序 可以有返回值 # 3.Process 产生的子进程 默认主进程等待所有子进程执行完毕之后在终止而pool进程池,只要主进程跑完了 立即终止所有程序
3.线程
基本概念
#进程是资源分配的最小单位 #线程是计算机中调度的最小单位 #线程的缘起 资源分配需要分配内存空间,分配cpu: 分配的内存空间存放着临时要处理的数据等,比如要执行的代码,数据 而这些内存空间是有限的,不能无限分配 目前配置高的主机,5万个并发已是上限.线程概念应用而生. #线程的特点 线程是比较轻量级,能干更多的活,一个进程中的所有线程资源是共享的. 一个进程至少有一个线程在工作
线程的缺陷
#线程可以并发,但是不能并行(同一个进程下的多个线程不能分开被多个cpu同时执行) #原因: python是解释型语言,执行一句编译一句,而不是一次性全部编译成功,不能提前规划,都是临时调度 容易造成不同的cpu却反复执行同一个程序.所以加了一把锁叫GIL 全局解释器锁(Cpython解释器特有) GIL锁:同一时间,一个进程下的多个线程只能被一个cpu执行 #想要并行的解决办法: (1)用多进程间接实现线程的并发 (2)换一个Pypy,Jpython解释器 #程序分为计算密集型和io密集型 对于计算密集型程序会过度依赖cpu,但网页,爬虫,OA办公,这种io密集型的程序里,python绰绰有余
1.一个进程可以有多个线程
from threading import Thread from multiprocessing import Process import os,time,random # (1)一个进程可以有多个线程 def func(num): time.sleep(random.uniform(0.1,1)) print("子线程",num,os.getpid()) for i in range(10): t = Thread(target=func,args=(i,)) t.start() # 子线程 6 5442 # 子线程 0 5442 # 子线程 9 5442 # 子线程 4 5442 # 子线程 8 5442 # 子线程 3 5442 # 子线程 1 5442 # 子线程 7 5442 # 子线程 5 5442 # 子线程 2 5442
小结:线程只能是并发
进程可以并发 可以并行 进程并发的时候创建进程池 指定进程池里面进程个数 也就是同一时间最多多少个进程出现
指定是1 的话也就是多个执行一个任务 产生一个进程了,核与核之间相互切换
3.2并发多线程和多进程的速度对比?多线程更快
def func(i): # time.sleep(random.uniform(0.1,1)) print("子线程",i,os.getpid()) if __name__ == '__main__': # 计算多线程的执行速度 startime = time.perf_counter() lst = [] for i in range(1000): t = Thread(target=func,args=(i,)) t.start() lst.append(t) for i in lst: i.join() print("程序执行结束") endtime = time.perf_counter() print(endtime-startime) # 2.计算多进程的执行速度 startime = time.perf_counter() lst =[] for i in range(1000): p = Process(target=func,args=(i,)) p.start() lst.append(p) for i in lst: i.join() print("程序结束") endtime = time.perf_counter() print(startime-endtime) # 子线程 0 5493 # 子线程 1 5493 # 子线程 3 5493 # 子线程 2 5493 # 子线程 4 5493 # 子线程 5 5493 # 子线程 6 5493 # ..... # 子线程 997 5493 # 子线程 998 5493 # 子线程 999 5493 # 程序执行结束 # 0.6118391980016895 # 子进程 0 6494 # 子进程 1 6495 # 子进程 2 6496 # 子进程 3 6497 # 子进程 4 6498 ... # 子进程 997 9537 # 子进程 999 9539 # 子进程 998 9538 # 程序结束 # 4.741718559002038 # 总结:多线程的执行速度一定是大于多进程的速度,可以看出一个进程里面可以有多线程 它们公用 #一份资源 而进程之间的数据独立的
3.3多线程共享同一份进程资源
num = 100 lst = [] def func(): global num num -= 1 for i in range(100): t = Thread(target=func) t.start() lst.append() for i in lst: i.join() print(num) # num = 0
3.4线程相关函数
'''
线程.is_alive() 检测线程是否仍然存在
线程.setName() 设置线程名字
线程.getName() 获取线程名字
1.currentThread().ident 查看线程id号
2.enumerate() 返回目前正在运行的线程列表
3.activeCount() 返回目前正在运行的线程数量
'''
线程.is_alive() 检测线程是否仍然存在
def func(): # time.sleep() pass t = Thread(target=func) t.start() print(t.is_alive()) print(t.getName()) t.setName("wangwen") print(t.getName()) time.sleep(2) print(t.is_alive()) # False # 程序一下就执行完了 # Thread-1 # wangwen # False
# 1.currentThread().ident 查看线程id from threading import currentThread def func(): print("子线程:",currentThread().ident) t = Thread(target = func) t.start() print("主线程:",currentThread().ident) # 主线程: 140391866324736 # 子线程: 140391827052288
#正在运行的线程列表
# 2.enumerate() 返回目前正在运行的线程列表 from threading import enumerate def func(): print("子线程:",currentThread().ident) time.sleep(0.5) for i in range(10): t = Thread(target = func) t.start() # time.sleep(3) # 这样只有1个主线程 # 10个子线程 + 1个主线程 = 11个正在运行的线程 print(enumerate()) #[<_MainThread(MainThread, started 140247126484736)>] print(len(enumerate())) # 子线程: 140012989323008 # 子线程: 140012980930304 # 子线程: 140012972537600 # 子线程: 140012964144896 # 子线程: 140012955752192 # 子线程: 140012947359488 # 子线程: 140012938966784 # 子线程: 140012930574080 # 子线程: 140012444186368 # [<_MainThread(MainThread, started 140013028595456)>, <Thread(Thread-1, started 140012989323008)>, <Thread(Thread-2, started 140012980930304)>, <Thread(Thread-3, started 140012972537600)>, <Thread(Thread-4, started 140012964144896)>, <Thread(Thread-5, started 140012955752192)>, <Thread(Thread-6, started 140012947359488)>, <Thread(Thread-7, started 140012938966784)>, <Thread(Thread-8, started 140012930574080)>, <Thread(Thread-9, started 140012444186368)>, <Thread(Thread-10, started 140012435793664)>] # 11 # 子线程: 140012435793664
# 3.activeCount() 返回目前正在运行的线程数量
from threading import activeCount def func(): print("子线程:",currentThread().ident) time.sleep(0.5) for i in range(10): t = Thread(target = func) t.start() print(activeCount()) # 子线程: 139908506162944 # 子线程: 139908425447168 # 子线程: 139908417054464 # 子线程: 139908408661760 # 子线程: 139908400269056 # 子线程: 139908391876352 # 子线程: 139908383483648 # 子线程: 139908375090944 # 子线程: 139907888576256 # 11 # 子线程: 139907880183552
4.守护线程
# 守护线程:等待所有线程执行结束之后 再自动结束 守护所有线程
from threading import Thread import time def func1(): while True: time.sleep(0.5) print("我是守护线程") def func2(): print("func2 -> start") time.sleep(3) print("func2 -> end") t1 = Thread(target=func1) # setDaemon t1线程对象变成守护线程 t1.setDaemon(True) t1.start() t2 = Thread(target=func2) t2.start() time.sleep(5) # t2.join() print("主线程执行结束")
func2 -> start
我是守护线程
我是守护线程
我是守护线程
我是守护线程
我是守护线程
func2 -> end
我是守护线程
我是守护线程
我是守护线程
我是守护线程
主线程执行结束
我是守护线程
# 这个结果告诉我们 守护线程是守护的所有线程。
5.lock
### 线程的数据安全 from threading import Thread,Lock import time n = 0 def func1(lock): global n # time.sleep(0.3 # print(11) for i in range(100000): # 正常上锁解锁 lock.acquire() # print(i) n -= 1 lock.release() def func2(lock): global n # time.sleep() # print(22) for i in range(1000000): # 用with自动上锁解锁 with lock: # print(n) n += 1 if __name__ == '__main__': # 创建一个锁 lock = Lock[] lst = [] for i in range(10): t1 = Thread(target=func1,args=(lock,)) t2 = Thread(target=func2,args=(lock,)) t1.start() t2.start() lst.append(t1) lst.append(t2) for i in lst: i.join() print("主线程执行结束") print(n) # 主线程执行结束... # 0 # 在里面创建2个线程分别执行func1 func2 由于修改数据的时候是加了锁的,于是不会出现 # 线程并发时,拿数据的紊乱
6.信号量
# ### 信号量(线程) from threading import Semaphore,Thread import time,random def func(i,sem): # with简写: with sem: print(i) time.sleep(random.uniform(0.1,1)) """ # 正常写法: sem.acquire() print(i) time.sleep(random.uniform(0.1,1)) sem.release() """ if __name__ == "__main__": sem = Semaphore(5) for i in range(20): Thread(target=func,args=(i,sem)).start() # 0 # 1 # 2 # 3 # 4 # 5 # 6 # 7 # 8 # 9 # 10 # 11 # 12 # 13 # 14 # 15 # 16 # 17 # 18 # 19 # 小结:同时产生5个以上的线程并是锁了的
7.死锁、互斥锁、递归锁
7.1死锁
from threading import Thread,Lock import time noodle_lock = Lock() kuaizi_lock = Lock() def eat1(name): noodle_lock.acquire() print("%s拿到面条" %(name)) kuaizi_lock.acquire() print("%s拿到筷子"%(name)) print("开始吃") time.sleep(0.7) kuaizi_lock.release() print("%s放下筷子" % (name)) noodle_lock.release() print("%s放下面条"%(name)) def eat2(name): kuaizi_lock.acquire() print("%s拿到筷子"%(name)) noodle_lock.acquire() print("%s拿到面条"%(name)) print("开始吃") time.sleep(0.7) noodle_lock.release() print("%s放下面条"%(name)) kuaizi_lock.release() print("%s放下筷子"%(name)) if __name__ == '__main__': name_list1 = ['马具强','熊卫华'] name_list2 = ['黄胸大','黄将用'] for name in name_list1: Thread(target=eat1,args=(name,)).start() for name in name_list2: Thread(target=eat2,args=(name,)).start() # 马具强拿到面条 # 马具强拿到筷子 # 开始吃 # 马具强放下筷子 # 马具强放下面条 # 熊卫华拿到面条 # 黄胸大拿到筷子 # 小结:当两个线程就行并发的去执行方法一还是方法二的时候 由于是并发的 于是会出现 # 一方只拿到筷子或者是一方只拿到面条的情况 这种情况是不可以解开2层锁的嵌套的于是不能打开锁 # 于是就是上面的结果
7.2递归锁
# (2)递归锁 from threading import Thread,RLock """ 递归锁专门用来解决死锁现场 临时用于快速解决服务器崩溃异常现象,用递归应急 解决应急问题的 """ # 基本语法 """ 递归锁如果是3个 就对象释放3个锁忽略上锁的过程 就行解锁 """ rlock = RLock() def func(name): rlock.acquire() print(name,1) rlock.acquire() print(name,2) rlock.acquire() print(name,3) rlock.release() rlock.release() rlock.release() lst = [] for i in range(10): t1 = Thread(target=func,args=("name%s" % (i) , )) t1.start() lst.append(t1) for i in lst: i.join() print("程序结束了") # name0 1 # name0 2 # name0 3 # name1 1 # name1 2 # name1 3 # name2 1 # name2 2 # name2 3 # name3 1 # name3 2 # name3 3 # name4 1 # name4 2 # name4 3 # name5 1 # name5 2 # name5 3 # name6 1 # name6 2 # name6 3 # name7 1 # name7 2 # name7 3 # name8 1 # name8 2 # name8 3 # name9 1 # name9 2 # name9 3 # 程序结束了
# 用递归锁解决应急死锁现象 noodle_lock=kuaizi_lock = RLock() def eat1(name): noodle_lock.acquire() print("%s拿到面条" % (name)) kuaizi_lock.acquire() print("%s拿到筷子" % (name)) print("开始吃") time.sleep(0.7) kuaizi_lock.release() print("%s放下筷子" % (name)) noodle_lock.release() print("%s放下面条" % (name)) def eat2(name): kuaizi_lock.acquire() print("%s拿到筷子" % (name)) noodle_lock.acquire() print("%s拿到面条" % (name)) print("开始吃") time.sleep(0.7) noodle_lock.release() print("%s放下面条" % (name)) kuaizi_lock.release() print("%s放下筷子" % (name)) if __name__ == "__main__": name_list1 = ["马具强","熊卫华"] name_list2 = ["黄熊大","黄将用"] for name in name_list1: Thread(target=eat1,args=(name,)).start() for name in name_list2: Thread(target=eat2,args=(name,)).start() # 马具强拿到面条 # 马具强拿到筷子 # 开始吃 # 马具强放下筷子 # 马具强放下面条 # 熊卫华拿到面条 # 熊卫华拿到筷子 # 开始吃 # 熊卫华放下筷子 # 熊卫华放下面条 # 黄熊大拿到筷子 # 黄熊大拿到面条 # 开始吃 # 黄熊大放下面条 # 黄熊大放下筷子 # 黄将用拿到筷子 # 黄将用拿到面条 # 开始吃 # 黄将用放下面条 # 黄将用放下筷子 # 小结:也就是就是子线程并发去抢筷子和面条的时候如果没有成功 也会正常的打印出来 # 因为我们是用的是递归锁 专门解决死锁的现象 这样就不会导致死锁的情况
7.3互斥锁
从语法上来看 锁是可以相互嵌套的,但是不要使用上一次锁 就对应解开一把锁
形成互斥锁 吃面条和拿筷子是同时的 上一次锁就够了 不要分别上锁尽量不要形成锁的
嵌套 容易死锁。