GIL锁 *****
1.什么是GIL锁:指的是全局解释器锁,加到了解释器上,并且是一把互斥锁。
2.非线程安全 即 多个线程访问同一个资源,会有有问题
线程安全 即 多个线程访问同一个资源,不会有问题
3.主要目的:解决多个线程之间共享解释器时带来的数据错乱问题。(有了GIL后,多个线程将不可能在同一时间使用解释器,从而保证了解释器的数据安全)
GIL的加锁与解锁时机
1.加锁:在调用解释器时立即加锁
2.解锁:
1)当前线程遇到了IO释放
2)当前线程执行时间超过设定值时释放(默认100纳秒)
3)该线程任务结束
GIL给我们造成的影响以及解决方案
GIL的优点:
问题:多线程不能并行
解决方法:
区分任务类型
如果是IO密集(IO操作多):使用多线程
如果是计算密集(计算多):使用多进程
GIL锁与自定义锁的关系
1.GIL是加在cpython解释器上的,保护的是解释器级别的安全(如:对象的引用计数,垃圾分代数据等等)无法保护程序自己定义的数据安全;所以当程序中出现了共享自定义的数据时就要自己加锁
from threading import Thread,Lock
import time
a = 0
过程分析:
1.线程1获得CPU执行权,并获取GIL锁执行代码 ,得到a的值为0后进入睡眠,释放CPU并释放GIL
2.线程2获得CPU执行权,并获取GIL锁执行代码 ,得到a的值为0后进入睡眠,释放CPU并释放GIL
3.线程1睡醒后获得CPU执行权,并获取GIL执行代码 ,将temp的值0+1后赋给a,执行完毕释放CPU并释放GIL
4.线程2睡醒后获得CPU执行权,并获取GIL执行代码 ,将temp的值0+1后赋给a,执行完毕释放CPU并释放GIL,最后a的值也就是1
def task():
global a
temp = a
time.sleep(0.01)
a = temp + 1
t1 = Thread(target=task)
t2 = Thread(target=task)
t1.start()
t2.start()
t1.join()
t2.join()
print(a)
过程分析:
1.线程1获得CPU执行权,并获取GIL锁执行代码 ,得到a的值为0后进入睡眠,释放CPU并释放GIL
2.线程2获得CPU执行权,并获取GIL锁执行代码 ,得到a的值为0后进入睡眠,释放CPU并释放GIL
3.线程1睡醒后获得CPU执行权,并获取GIL执行代码 ,将temp的值0+1后赋给a,执行完毕释放CPU并释放GIL
4.线程2睡醒后获得CPU执行权,并获取GIL执行代码 ,将temp的值0+1后赋给a,执行完毕释放CPU并释放GIL,最后a的值也就是1
之所以出现问题是因为两个线程在并发的执行同一段代码,解决方案就是加锁!
from threading import Thread,Lock
import time
lock = Lock()
a = 0
def task():
global a
lock.acquire()
temp = a
time.sleep(0.01)
a = temp + 1
lock.release()
t1 = Thread(target=task)
t2 = Thread(target=task)
t1.start()
t2.start()
t1.join()
t2.join()
print(a)
过程分析:
1.线程1获得CPU执行权,并获取GIL锁执行代码 ,得到a的值为0后进入睡眠,释放CPU并释放GIL,不释放lock
2.线程2获得CPU执行权,并获取GIL锁,尝试获取lock失败,无法执行,释放CPU并释放GIL
3.线程1睡醒后获得CPU执行权,并获取GIL继续执行代码 ,将temp的值0+1后赋给a,执行完毕释放CPU释放GIL,释放lock,此时a的值为1
4.线程2获得CPU执行权,获取GIL锁,尝试获取lock成功,执行代码,得到a的值为1后进入睡眠,释放CPU并释放GIL,不释放lock
5.线程2睡醒后获得CPU执行权,获取GIL继续执行代码 ,将temp的值1+1后赋给a,执行完毕释放CPU释放GIL,释放lock,此时a的值为2
注意:GIL仅仅在CPython解释器中存在,在其他的解释器中没有,并不是Python这门语言的缺点
进程池与线程池
什么是进程/线程池
存储进程或线程的容器(列表)
为什么使用进程和线程池(好处)
1.自动管理线程/进程的开启和销毁
2.自动分配任务给空闲的线程/进程
3.可以规定开启线程/进程的数量 保证系统稳定
信号量中是限制同时并发多少,但是线程已经全都建完了
如何创建和使用
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import time
pool = ThreadPoolExecutor(10) # 线程池最大值,应该机器所能承受的最大值 当然需要考虑你的机器有几个任务要做(如果不指定默认为CPU个数 * 5)
def task(name,age):
print(name)
print(current_thread().name,'run')
time.sleep(2)
# 该函数提交任务到线程池中
pool.submit(task)
pool = ProcessPoolExecutor() # 具体的值也要参考机器性能
# 进程池的使用 同样可以设置最大进程数量 默认为cpu的个数
def task(name):
print(os.getpid())
print(name)
if __name__ == '__main__':
pool.submit(task,"jerry")
pool.shutdown()
pool.submit(task, "jerry")
# shutdown()使用
pool = ThreadPoolExecutor(3)
def task():
print(current_thread().name)
print(current_thread().isDaemon())
time.sleep(1)
for i in range(5):
pool.submit(task)
st = time.time()
pool.shutdown() # 等待所有任务全部完毕 销毁所有线程 后关闭线程池
# pool.submit(task)
print(time.time() - st)
print("over")
同步 异步
1.同步:发起任务后必须在原地等待任务执行完成,才能继续执行
2.异步:发起任务后不用等待任务执行完成,可以立即开启其他操作
3.问题:很明显异步调用效率更高,但是任务的执行结果如何获取呢?
4.解决方案1:使用shutdown()
from concurrent.futures import ThreadPoolExecutor
from threading import current_thread
import time
pool = ThreadPoolExecutor(3)
def task(i):
time.sleep(0.01)
print(current_thread().name,"working..")
return i ** i
if __name__ == '__main__':
objs = []
for i in range(3):
res_obj = pool.submit(task,i) # 异步方式提交任务# 会返回一个对象用于表示任务结果
objs.append(res_obj)
# 该函数默认是阻塞的 会等待池子中所有任务执行结束后执行
pool.shutdown(wait=True)
# 从结果对象中取出执行结果
for res_obj in objs:
print(res_obj.result())
print("over")
解决方案2:使用reslut()
from concurrent.futures import ThreadPoolExecutor
from threading import current_thread
import time
pool = ThreadPoolExecutor(3)
def task(i):
time.sleep(0.01)
print(current_thread().name,"working..")
return i ** i
if __name__ == '__main__':
objs = []
for i in range(3):
res_obj = pool.submit(task,i) # 会返回一个对象用于表示任务结果
print(res_obj.result()) #result是同步的一旦调用就必须等待 任务执行完成拿到结果
print("over")
异步回调
1.什么是异步回调:异步回调指的就是一个函数,该函数会在任务后自动被调用,并且会传入Future对象 ,通过Future象的result()获取执行结果 ,有了回调函数 就可以在任务完成时 及时处理它。
2.为什么需要异步回调
之前在使用线程池或进程池提交任务时,如果想要处理任务的执行结果则必须调用result函数或是shutdown函数,而它们都是是阻塞的,会等到任务执行完毕后才能继续执行,这样一来在这个等待过程中就无法执行其他任务,降低了效率,所以需要一种方案,即保证解析结果的线程不用等待,又能保证数据能够及时被解析,该方案就是异步回调
3.如何使用:
mport requests,re,os,random,time
from concurrent.futures import ProcessPoolExecutor
def get_data(url):
print("%s 正在请求%s" % (os.getpid(),url))
time.sleep(random.randint(1,2))
response = requests.get(url)
print(os.getpid(),"请求成功 数据长度",len(response.content))
#parser(response) # 3.直接调用解析方法 哪个进程请求完成就那个进程解析数据 强行使两个操作耦合到一起了
return response
def parser(obj):
data = obj.result()
htm = data.content.decode("utf-8")
ls = re.findall("href=.*?com",htm)
print(os.getpid(),"解析成功",len(ls),"个链接")
if __name__ == '__main__':
pool = ProcessPoolExecutor(3)
urls = ["https://www.baidu.com",
"https://www.sina.com",
"https://www.python.org",
"https://www.tmall.com",
"https://www.mysql.com",
"https://www.apple.com.cn"]
# objs = []
for url in urls:
# res = pool.submit(get_data,url).result() # 1.同步的方式获取结果 将导致所有请求任务不能并发
# parser(res)
obj = pool.submit(get_data,url) #
obj.add_done_callback(parser) # 4.使用异步回调,保证了数据可以被及时处理,并且请求和解析解开了耦合
# objs.append(obj)
# pool.shutdown() # 2.等待所有任务执行结束在统一的解析
# for obj in objs:
# res = obj.result()
# parser(res)
# 1.请求任务可以并发 但是结果不能被及时解析 必须等所有请求完成才能解析
# 2.解析任务变成了串行,
总结:异步回调使用方法就是在提交任务后得到一个Futures对象,调用对象的add_done_callback来指定一个回调函数,
在进程池中回调函数是在父进程中执行,原因是 任务是由父进程发起的,所以结果也应该交给父进程
在线程池中回调函数就在子线程中执行,原因是 线程之间数据本来是共享的