Python GIL(Global Interpreter Lock) *****
在Cpython解释器中,同一个进程下开启的多线程,同一时刻只能有一个线程执行,无法利用多核优势 首先需要明确的一点是GIL并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。就好比C++是一套语言(语法)标准,但是可以用不同的编译器来编译成可执行代码。有名的编译器例如GCC,INTEL C++,Visual C++等。Python也一样,同样一段代码可以通过CPython,PyPy,Psyco等不同的Python执行环境来执行。像其中的JPython就没有GIL。然而因为CPython是大部分环境下默认的Python执行环境。所以在很多人的概念里CPython就是Python,也就想当然的把GIL归结为Python语言的缺陷。所以这里要先明确一点:GIL并不是Python的特性,Python完全可以不依赖于GIL 在一个python的进程内,不仅有test.py的主线程或者由该主线程开启的其他线程,还有解释器开启的垃圾回收等解释器级别的线程,总之,所有线程都运行在这一个进程内,毫无疑问 1、什么是GIL(这是Cpython解释器) GIL本质就是一把互斥锁,那既然是互斥锁,原理都一样,都是让多个并发线程同一时间只能有一个执行 即:有了GIL的存在,同一进程内的多个线程同一时刻只能有一个在运行,意味着在Cpython中 一个进程下的多个线程无法实现并行===》意味着无法利用多核优势 但不影响并发的实现 GIL可以被比喻成执行权限,同一进程下的所以线程 要想执行都需要先抢执行权限 2、为何要有GIL 垃圾回收线程 因为Cpython解释器自带垃圾回收机制不是线程安全的 保证了垃圾回收线程和其他线程对数据的操作不冲突 防止垃圾回收线程把其他线程的数据回收掉 同一时刻必须只有一个线程执行。 详细原因: 如果多个线程的target=work,那么执行流程是 多个线程先访问到解释器的代码,即拿到执行权限,然后将target的代码交给解释器的代码去执行 解释器的代码是所有线程共享的,所以垃圾回收线程也可能访问到解释器的代码而去执行,这就导致了一个问题:对于同一个数据100,可能线程1执行x=100的同时,而垃圾回收执行的是回收100的操作,解决这种问题没有什么高明的方法,就是加锁处理,如下图的GIL,保证python解释器同一时间只能执行一个任务的代码
验证GIL 不能保护数据,必须要mutex 只有GIL 没有mutex GIL保护的是解释器级的数据,保护用户自己的数据则需要自己加锁处理,如下图 修改两次应该count=2 但是结果是:count=1 GIL vs 自定义互斥锁 GIL相当于执行权限,会在任务无法执行的况,被强行释放 自定义互斥锁即便是无法执行,也不会自动释放 GIL:保证同一时刻只有一个线程执行,防止回收其他线程要用的数据,不保护数据的修改,如果没有GIL可能造成,其他线程刚把数据造出来,就被垃圾线程回收了 mutex:保证对数据的操作是串行执行,保护数据的修改,如果这个锁不释放,其他线程有GIL 也没法改数据 只有第一个拿到锁的线程,修改完数据后,释放了mutex 其他线程才能抢mutex,继续修改数据
问题: 有了GIL的存在,同一时刻同一进程中只有一个线程被执行 进程可以利用多核,但是开销大,而python的多线程开销小,但却无法利用多核优势 #1. cpu到底是用来做计算的,还是用来做I/O的? #2. 多cpu,意味着可以有多个核并行完成计算,所以多核提升的是计算性能 #3. 每个cpu一旦遇到I/O阻塞,仍然需要等待,所以多核对I/O操作没什么用处 一个工人相当于cpu,此时计算相当于工人在干活,I/O阻塞相当于为工人干活提供所需原材料的过程,工人干活的过程中如果没有原材料了,则工人干活的过程需要停止,直到等待原材料的到来。 如果你的工厂干的大多数任务都要有准备原材料的过程(I/O密集型),那么你有再多的工人,意义也不大,还不如一个人,在等材料的过程中让工人去干别的活, 反过来讲,如果你的工厂原材料都齐全,那当然是工人越多,效率越高 结论: 对计算来说,cpu越多越好,但是对于I/O来说,再多的cpu也没用 当然对运行一个程序来说,随着cpu的增多执行效率肯定会有所提高(不管提高幅度多大,总会有所提高),这是因为一个程序基本上不会是纯计算或者纯I/O,所以我们只能相对的去看一个程序到底是计算密集型还是I/O密集型,从而进一步分析python的多线程到底有无用武之地 #分析: 我们有四个任务需要处理,处理方式肯定是要玩出并发的效果,解决方案可以是: 方案一:开启四个进程 方案二:一个进程下,开启四个线程 #单核情况下,分析结果: 如果四个任务是计算密集型,没有多核来并行计算,方案一徒增了创建进程的开销,方案二胜 如果四个任务是I/O密集型,方案一创建进程的开销大,且进程的切换速度远不如线程,方案二胜 #多核情况下,分析结果: 如果四个任务是计算密集型,多核意味着并行计算,在python中一个进程中同一时刻只有一个线程执行用不上多核,方案一胜 如果四个任务是I/O密集型,再多的核也解决不了I/O问题,方案二胜 #结论:现在的计算机基本上都是多核,python对于计算密集型的任务开多线程的效率并不能带来多大性能上的提升,甚至不如串行(没有大量切换),但是,对于IO密集型的任务效率还是有显著提升的。 计算密集型:多进程,多核,并行 I/O密集型:多线程,单核,并发
''' # from threading import Thread,Lock # import time # # mutex=Lock() # n=100 # # def task(): # global n # mutex.acquire() # temp=n # time.sleep(0.1) # n=temp-1 # mutex.release() # # if __name__ == '__main__': # t_l=[] # start_time=time.time() # for i in range(3): # t=Thread(target=task) # t_l.append(t) # t.start() # # for t in t_l: # t.join() # # stop_time=time.time() # print(n) # print(stop_time-start_time) # # 计算密集型 # from multiprocessing import Process # from threading import Thread # import os,time # def work1(): # res=0 # for i in range(100000000): # res*=i # # def work2(): # res=0 # for i in range(100000000): # res*=i # # def work3(): # res=0 # for i in range(100000000): # res*=i # # def work4(): # res=0 # for i in range(100000000): # res*=i # # # # if __name__ == '__main__': # l=[] # # print(os.cpu_count()) #本机为4核 # start=time.time() # # p1=Process(target=work1) # # # p2=Process(target=work2) # # p3=Process(target=work3) # # p4=Process(target=work4) # # p1=Thread(target=work1) # # p2=Thread(target=work2) # p3=Thread(target=work3) # p4=Thread(target=work4) # # p1.start() # p2.start() # p3.start() # p4.start() # p1.join() # p2.join() # p3.join() # p4.join() # stop=time.time() # print('run time is %s' %(stop-start)) #6.484470367431641 # # # run time is 17.391708850860596 # # IO密集型 from multiprocessing import Process from threading import Thread import os,time def work1(): time.sleep(5) def work2(): time.sleep(5) def work3(): time.sleep(5) def work4(): time.sleep(5) if __name__ == '__main__': l=[] # print(os.cpu_count()) #本机为4核 start=time.time() # p1=Process(target=work1) # # p2=Process(target=work2) # p3=Process(target=work3) # p4=Process(target=work4) p1=Thread(target=work1) # p2=Thread(target=work2) p3=Thread(target=work3) p4=Thread(target=work4) p1.start() p2.start() p3.start() p4.start() p1.join() p2.join() p3.join() p4.join() stop=time.time() print('run time is %s' %(stop-start)) #run time is 5.162574291229248 # run time is 5.002141714096069
定时器 *
定时器,指定n秒后执行某操作 代码演示: from threading import Timer def hello(): print("hello, world") t = Timer(1, hello) t.start() # after 1 seconds, "hello, world" will be printed 验证码定时器: from threading import Timer import random,time class Code: def __init__(self): self.make_cache() def make_cache(self,interval=5): self.cache=self.make_code() print(self.cache) self.t=Timer(interval,self.make_cache) self.t.start() def make_code(self,n=4): res='' for i in range(n): s1=str(random.randint(0,9)) s2=chr(random.randint(65,90)) res+=random.choice([s1,s2]) return res def check(self): while True: inp=input('>>: ').strip() if inp.upper() == self.cache: print('验证成功',end='\n') self.t.cancel() break if __name__ == '__main__': obj=Code() obj.check()
线程queue **
import queue # 队列:先进先出 # q=queue.Queue(3) # q.put(1) # q.put(2) # q.put(3) # # print(q.get()) # print(q.get()) # print(q.get()) # 堆栈:先进后出 # q=queue.LifoQueue() # q.put(1) # q.put(2) # q.put(3) # print(q.get()) # print(q.get()) # print(q.get()) # 优先级队列:优先级高先出来,数字越小,优先级越高 # q=queue.PriorityQueue() # q.put((3,'data1')) # q.put((-10,'data2')) # q.put((11,'data3')) # # print(q.get()) # print(q.get()) # print(q.get())
进程池与线程池 同步与异步 *****
1、什么时候用池: 池的功能是限制启动的进程数或线程数, 什么时候应该限制??? 当并发的任务数远远超过了计算机的承受能力时,即无法一次性开启过多的进程数或线程数时 就应该用池的概念将开启的进程数或线程数限制在计算机可承受的范围内 2、同步vs异步 同步、异步指的是提交任务的两种方式 同步:提交完任务后就在原地等待,直到任务运行完毕后拿到任务的返回值,再继续运行下一行代码 异步:提交完任务(绑定一个回调函数)后根本就不在原地等待,直接运行下一行代码,等到任务有返回值后会自动触发回调函数
#用法 from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import os,time,random def task(n): print('%s is runing' %os.getpid()) time.sleep(random.randint(1,3)) return n**2 if __name__ == '__main__': executor=ProcessPoolExecutor(max_workers=3) futures=[] for i in range(11): future=executor.submit(task,i) futures.append(future) executor.shutdown(True) print('+++>') for future in futures: print(future.result()) 线程池用法与进程池相同 回调函数: from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor from multiprocessing import Pool import requests import json import os def get_page(url): print('<进程%s> get %s' %(os.getpid(),url)) respone=requests.get(url) if respone.status_code == 200: return {'url':url,'text':respone.text} def parse_page(res): res=res.result() print('<进程%s> parse %s' %(os.getpid(),res['url'])) parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text'])) with open('db.txt','a') as f: f.write(parse_res) if __name__ == '__main__': urls=[ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/' ] # p=Pool(3) # for url in urls: # p.apply_async(get_page,args=(url,),callback=pasrse_page) # p.close() # p.join() p=ProcessPoolExecutor(3) for url in urls: p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一个future对象obj,需要用obj.result()拿到结果
# # from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor # import os # import time # import random # # def task(n): # print('%s run...' %os.getpid()) # time.sleep(10) # return n**2 # # def parse(res): # print('...') # if __name__ == '__main__': # pool=ProcessPoolExecutor(4) # # pool.submit(task,1) # # pool.submit(task,2) # # pool.submit(task,3) # # pool.submit(task,4) # # l=[] # for i in range(1,5): # future=pool.submit(task,i) # l.append(future) # # print(future) # # print(future.result()) # # pool.shutdown(wait=True) #shutdown关闭进程池的入口 # for future in l: # # print(future.result()) # parse(future.result()) # # print('主') # from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor # import os # import time # import random # # def task(n): # print('%s run...' %os.getpid()) # time.sleep(5) # return n**2 # # def parse(future): # time.sleep(1) # res=future.result() # print('%s 处理了 %s' %(os.getpid(),res)) # # if __name__ == '__main__': # pool=ProcessPoolExecutor(4) # # pool.submit(task,1) # # pool.submit(task,2) # # pool.submit(task,3) # # pool.submit(task,4) # # start=time.time() # for i in range(1,5): # future=pool.submit(task,i) # future.add_done_callback(parse) # parse会在futrue有返回值时立刻触发,并且将future当作参数传给parse # pool.shutdown(wait=True) # stop=time.time() # print('主',os.getpid(),(stop - start)) from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor from threading import current_thread import os import time import random def task(n): print('%s run...' %current_thread().name) time.sleep(5) return n**2 def parse(future): time.sleep(1) res=future.result() print('%s 处理了 %s' %(current_thread().name,res)) if __name__ == '__main__': pool=ThreadPoolExecutor(4) start=time.time() for i in range(1,5): future=pool.submit(task,i) future.add_done_callback(parse) # parse会在futrue有返回值时立刻触发,并且将future当作参数传给parse pool.shutdown(wait=True) stop=time.time() print('主',current_thread().name,(stop - start))
server: from socket import * from threading import Thread def talk(conn): while True: try: data=conn.recv(1024) if len(data) == 0:break conn.send(data.upper()) except ConnectionResetError: break conn.close() def server(ip,port,backlog=5): server = socket(AF_INET, SOCK_STREAM) server.bind((ip, port)) server.listen(backlog) print('starting...') while True: conn, addr = server.accept() t = Thread(target=talk, args=(conn,)) t.start() if __name__ == '__main__': server('127.0.0.1',8080) client: from socket import * from threading import Thread def talk(conn): while True: try: data=conn.recv(1024) if len(data) == 0:break conn.send(data.upper()) except ConnectionResetError: break conn.close() def server(ip,port,backlog=5): server = socket(AF_INET, SOCK_STREAM) server.bind((ip, port)) server.listen(backlog) print('starting...') while True: conn, addr = server.accept() t = Thread(target=talk, args=(conn,)) t.start() if __name__ == '__main__': server('127.0.0.1',8080)
上节课复习
上节课复习: 1、互斥锁 将多个任务对共享数据修改的那一部分代码由并发变成“串行” 牺牲了效率保证数据安全 2、信号量 今日内容: 1、GIL全局解释器锁=》多进程vs多线程(*****) 2、GIL全局解释器锁vs互斥锁(*****) 3、定时器 4、线程queue 5、进程池与线程池(*****) 同步vs异步