1. 线程局部变量
多线程之间使用threading.local对象用来存储数据 而其他线程不可见
实现对线程之间的数据隔离
本质上就是不同的线程使用这个对象时 为其创建一个只属于当前线程的字典拿空间换时间的方法
from threading import local, Thread loc = local() # print(loc) def func(name,age): global loc loc.name = name loc.age = age print(loc.name,loc.age) # 创建第一个线程 t1 = Thread(target=func,args=("黄胸大",98)) t1.start() # 创建第二个线程 t2 = Thread(target=func,args=("黄江永", 18)) t2.start() # <_thread._local object at 0x7f643e42dbf8> # 黄胸大 98 # 黄江永 18
2.事件
from threading import Event,Thread import time,random ## 模拟链接数据库 """ # wait() 动态添加阻塞 # set() 将内部睡醒改成True # clear() 将内部属性改成False # is_set() 判断当前属性(默认为False) # 语法 e = Event() print(e.is_set()) # wait(timeout=3) 最多堵塞三秒 e.wait(3) print(e.is_set()) """ def check(e): print("开始检测数据链接的合法性") res = time.sleep(random.randrange(1, 7)) print(res) e.set() def connect(e): sign = False # 最多尝试连接数据库 连接不上就报错 报超时等待错误 TimeoutError for i in range(3): e.wait(1) if e.is_set(): sign = True print("数据库连接成功") break else: print("尝试连接数据库%s失败了" % (i+1)) if sign == False: #主动抛出异常 raise TimeoutError e = Event() # 创建线程1 Thread(target=check, args= (e,)).start() # 创建线程2 Thread(target=connect,args=(e,)).start() # 开始检测数据链接的合法性 # 尝试连接数据库1失败了 # 尝试连接数据库2失败了 # 数据库连接成功 # 如果在上面的def check()产生随机数如果是时间快于下面的3秒 那么久可以通过开始检测数据链接的合法性 # 极限值为3秒 # 3 # 尝试连接数据库1失败了 # 尝试连接数据库2失败了 # 数据库连接成功
3.条件
wait 和 notfiy 他两是一对:
wait 负责添加堵塞
notify 负责释放阻塞
语法 无法是wait 还是notify 在使用时 前后必须上锁
# (1) 语法:wait 前后上锁
acquire()
wait()
...code...
release()
# (2)语法:notify 前后上锁
acquire()
notify(自定义释放多少注释 释放多少线程数量 默认放行1个)
release()
"""
### notify不太完善 不建议使用
from threading import Condition,Thread import time def func(con,index): print("%s在等待" % (index)) con.acquire() # 添加阻塞 con.wait() print("%s do something" % (index)) con.release() con = Condition() for i in range(10): t = Thread(target=func,args=(con,i)) t.start() # 写法一 # time.sleep(1) # con.acquire() # con.notify(10) # con.release() # 0在等待 # 1在等待 # 2在等待 # 3在等待 # 4在等待 # 5在等待 # 6在等待 # 7在等待 # 8在等待 # 9在等待 # 0 do something # 1 do something # 2 do something # 4 do something # 6 do something # 8 do something # 7 do something # 5 do something # 9 do something # 3 do something # 写法二 count = 10 while count>0: num = int(input(">>>\n")) con.acquire() con.notify(num) con.release() count -= num # 0在等待 # 1在等待 # 2在等待 # 3在等待 # 4在等待 # 5在等待 # 7在等待 # 8在等待 # 9在等待 # >>> # 6在等待 # 3 # >>> # 0 do something # 1 do something # 2 do something # 3 # >>> # 3 do something # 5 do something # 4 do something # 3 # >>> # 8 do something # 7 do something # 9 do something # 1 # >>> # 6 do something
4.队列
# 队列
import queue
# from queue import Queue
"""
put 往队列里面放值 超过队列长度 直接阻塞
get.获取值如果获取不到 阻塞
put_nowait() 如果放入的值超过了队列的长度 直接报错
get_nowait() 如果获取的值已经没有了 直接报错
# (1) queue 先进先出
q = queue.Queue() q.put(1) q.put(2) print(q.get()) print(q.get()) # print(q.get()) # 获取不到直接阻塞 # 1 # 2 # 线程中支持get_nowait() # 可以限制队列的长度 q = queue.Queue(2) q.put(3) q.put(4) # q.put(5) 增加阻塞 # q.put_nowait(6) # 增加堵塞
# (2)lifoQueue 后进先出 (数据结构中 栈队列的存取顺序)
from queue import LifoQueue lq = LifoQueue() lq.put(1) lq.put(2) print(lq.get()) # 2
# (3)PriorityQueue 按照优先级顺序进行排序
from queue import PriorityQueue """ 默认按照数字大小排序 然后在按照ascill 编码从小到大排序 """ pq = PriorityQueue() pq.put((12, "zhangsan")) pq.put((6, "lisi")) pq.put((18, "lijiuiju")) pq.put((6, "wangwu")) print(pq.get()) print(pq.get()) print(pq.get()) print(pq.get()) # (6, 'lisi') # (6, 'wangwu') # (12, 'zhangsan') # (18, 'lijiuiju') # 单独一个元素 必须放同一种类型 # (1) 如果是数字[默认从小到大排序] pq = PriorityQueue() pq.put(13) pq.put(18) pq.put(3) # pq.put("abc") # 会报错 print(pq.get()) print(pq.get()) print(pq.get()) # print(pq.get())#没有值直接报错 # 3 # 13 # 18 # (2)如果是字符串 [默认按照ASCII编码从小到大排序] pq = PriorityQueue() pq.put("aabb") pq.put("ddbb") pq.put("王五") pq.put("李四") # pq.put(990) # error print(pq.get()) print(pq.get()) print(pq.get()) # aabb # ddbb # 李四
5.更新版的进程池和线程池
# (1) ProcessPoolExecutor 进程池的基本使用(改良版)
'''
相对于旧版的进程池,
一定会等待子进程全部执行完毕之后,在终止程序,相当于过去的Process流程
shutdown 相当于 Process里面的join
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor import os import time def func(i): print("Process", i, os.getpid()) time.sleep(0.1) print("Process...end") return 88899 if __name__ == "__main__": # (1) ProcessPoolExecutor() <==> Pool() p = ProcessPoolExecutor(5) # (2) submit() <==> apply_async() res = p.submit(func, 55) # (3) result() <==> get() res = res.result() print(res) # 88899 # (4) shutdown <==> close + join # p.shutdown() print("主进程执行结束...") # Process 55 4673 # Process...end # 88899 # 主进程执行结束...
# (2)线程池
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor from threading import current_thread as cthread def func(i): print("thread", i, cthread().ident) time.sleep(0.1) print("thread %s end" % (i)) # 可以在参数中指定并发的线程数 tp = ThreadPoolExecutor(10) for i in range(20): tp.submit(func, i) tp.shutdown() print("主线程执行结束...") # thread 0 140610245797632 # thread 1 140610237404928 # thread 2 140610229012224 # thread 3 140610220619520 # thread 4 140610212226816 # thread 5 140610203834112 # thread 6 140610195179264 # thread 7 140609713075968 # thread 8 140609704683264 # thread 9 140609696290560 # thread 0 end # thread 10 140610245797632 # thread 1 end # thread 4 end # thread 11 140610212226816 # thread 12 140610237404928 # thread 3 end # thread 13 140610220619520 # thread 5 end # thread 14 140610203834112 # thread 2 end # thread 15 140610229012224 # thread 6 end # thread 16 140610195179264 # thread 7 end # thread 17 140609713075968 # thread 8 end # thread 18 140609704683264 # thread 9 end # thread 19 140609696290560 # thread 11 end # thread 13 end # thread 14 end # thread 10 end # thread 12 end # thread 15 end # thread 16 end # thread 17 end # thread 19 end # thread 18 end # 主线程执行结束...
# (3)线程池的返回值
from threading import current_thread as cthread def func(i): print("thread", i, cthread().ident) # 加延迟防止个别线程因为执行速度过快 又接受了新任务 阻碍新线程的创建 time.sleep(0.1) print("thread %s end" % (i)) # return "*"* i return cthread().ident tp = ThreadPoolExecutor(5) lst = [] setvar = set() for i in range(10): res = tp.submit(func, i) lst.append(res) for i in lst: # print(i.result()) setvar.add(i.result()) print(setvar, len(setvar)) print("主线程结束") # thread 0 140350051657472 # thread 1 140350043264768 # thread 2 140350034872064 # thread 3 140350026479360 # thread 4 140350018086656 # thread 0 end # thread 5 140350051657472 # thread 1 end # thread 6 140350043264768 # thread 2 end # thread 7 140350034872064 # thread 3 end # thread 8 140350026479360 # thread 4 end # thread 9 140350018086656 # thread 5 end # thread 6 end # thread 7 end # thread 8 end # thread 9 end # {140350051657472, 140350043264768, 140350026479360, 140350018086656, 140350034872064} 5 # 主线程结束
# (4) map返回迭代器
def func(i): print("thread", i, cthread().ident) time.sleep(0.1) print("thread %s end" % (i)) return "*" * i tp = ThreadPoolExecutor(5) it = tp.map(func, range(20)) # map from collections import Iterable, Iterator print(isinstance(it, Iterator)) for i in it: print(i) tp.shutdown() print("主线程执行结束") # thread 0 139780739147520 # thread 1 139780730754816 # thread 2 139780650170112 # thread 3 139780641777408 # True # thread 4 139780633384704 # thread 0 end # thread 5 139780739147520 # thread 1 end # thread 6 139780730754816 # thread 3 end # thread 7 139780641777408 # thread 2 end # thread 8 139780650170112 # * # ** # *** # thread 4 end # thread 9 139780633384704 # **** # thread 5 end # thread 10 139780739147520 # ***** # thread 8 end # thread 11 139780650170112 # thread 6 end # thread 12 139780730754816 # ****** # thread 7 end # thread 13 139780641777408 # thread 9 end # thread 14 139780633384704 # ******* # ******** # ********* # thread 10 end # thread 15 139780739147520 # ********** # thread 11 end # thread 16 139780650170112 # thread 12 end # thread 17 139780730754816 # *********** # ************ # thread 14 end # thread 18 139780633384704 # thread 13 end # thread 19 139780641777408 # ************* # ************** # thread 15 end # thread 17 end # thread 16 end # *************** # **************** # ***************** # thread 18 end # ****************** # thread 19 end # ******************* # 主线程执行结束
6.回调函数
# 回调函数
"""
回调函数:
把函数当成参数传递给另外一个函数
函数先执行,最后在执行当参数传递的这个函数,整个过程是回调,这个参数是回调函数
"""
# (1)线程池的回调函数是由子线程完成的
from concurrent.futures import ThreadPoolExecutor from threading import current_thread as cthread import time def func(i): print("thread", i, cthread().ident) time.sleep(0.1) print("thread %s end" % (i)) return "*" * i # 定义成回调函数 def call_back(args): print("call_back:", cthread().ident) print(args.result()) tp = ThreadPoolExecutor(5) for i in range(1, 11): # submit(函数,参数).add_done_callback(要添加的回调函数) tp.submit(func, i).add_done_callback(call_back) tp.shutdown() print("主线程:", cthread().ident) # thread 1 140428135343872 # thread 2 140428126951168 # thread 3 140428049381120 # thread 4 140428040988416 # thread 5 140428032595712 # thread 1 end # call_back: 140428135343872 # * # thread 6 140428135343872 # thread 2 end # call_back: 140428126951168 # ** # thread 7 140428126951168 # thread 3 end # call_back: 140428049381120 # *** # thread 8 140428049381120 # thread 4 end # call_back: 140428040988416 # **** # thread 9 140428040988416 # thread 5 end # call_back: 140428032595712 # ***** # thread 10 140428032595712 # thread 6 end # call_back: 140428135343872 # ****** # thread 7 end # call_back: 140428126951168 # ******* # thread 8 end # call_back: 140428049381120 # ******** # thread 9 end # call_back: 140428040988416 # ********* # thread 10 end # call_back: 140428032595712 # ********** # 主线程: 140428185872128
# (2)进程池的回调函数是由主进程完成
from concurrent.futures import ProcessPoolExecutor import os import time def func(i): print("Process", i, os.getpid()) time.sleep(0.1) print("Process %s end" % (i)) return i * "*" # 回调函数 def call_back(args): print("call_back:", os.getpid()) print(args.result()) if __name__ == '__main__': # 同一时间最多允许5个进程并发 tp = ProcessPoolExecutor(5) for i in range(1, 11): tp.submit(func, i).add_done_callback(call_back) tp.shutdown() print("主进程Id:", os.getpid()) # Process 1 7171 # Process 2 7172 # Process 3 7173 # Process 4 7174 # Process 5 7175 # Process 2 end # Process 1 end # Process 6 7172 # Process 4 end # Process 3 end # call_back: 7170 # ** # Process 5 end # call_back: 7170 # * # call_back: 7170 # **** # call_back: 7170 # *** # call_back: 7170 # ***** # Process 7 7171 # Process 8 7174 # Process 9 7173 # Process 10 7175 # Process 6 end # call_back: 7170 # ****** # Process 7 end # Process 8 end # Process 9 end # Process 10 end # call_back: 7170 # ******** # call_back: 7170 # ********* # call_back: 7170 # ********** # call_back: 7170 # ******* # 主进程Id: 7170
7.协程
def gen(): for i in range(10): yield i #初始化生产器函数 返回生成器对象,简称生成器 mygen = gen() for i in mygen: print(i) # (1) 用协程改写生产者消费者模型 def producer(): for i in range(100): yiled i def consumer(): g = producer() for i in g: print(i) consumer() # 0 # 1 # 2 # 3 # 4 # 5 # 6 # 7 # 8 # 9 # 0 # 1 # 2 # 3 # 4 # 5 # 6 # 7 # 8 # 9
# (2) 协程的具体实现
from greenlet import greenlet import time """ switch 利用它就行任务的切换一般在阻塞的时候切换 只能默认手动切换 缺陷:不能够规避io,不能自动实现遇到阻塞就切换 """ def eat(): print("eat one1") # 手动切换到play这个协程中 g2.switch() time.sleep(1) print("eat one2") def play(): print("play one1") time.sleep(1) print("play one2") g1.switch() g1 = greenlet(eat) g2 = greenlet(play) g1.switch() # eat one1 # play one1 # play one2 # eat one2
# (3)缺陷 gevent 不能够识别time.sleep是阻塞
import gevent # gevent 其中有一个spawn 类似于switch也是切换任务的 def eat(): print("eat one1") time.sleep(1) print("eat one2") def play(): print("play one1") time.sleep(1) print("play one2") # 利用gevent 创建协程对象g1 g1 = gevent.spawn(eat) # 利用gevent 创建协程对象g2 g2 = gevent.spawn(play) # 协程的阻塞是join 等待当前协程执行完毕之后在向下执行 g1.join() # 阻塞直到g1协程执行完毕 g2.join() # 阻塞直到g2协程执行完毕 print("主线程执行完毕") # eat one1 # eat one2 # play one1 # play one2 # 主线程执行完毕
# (4)进价 用gevent.sleep 来取代time.sleep()
import gevent def eat(): print("eat one1") gevent.sleep(1) print("eat one2") def play(): print("play one1") gevent.sleep(1) print("play one2") g1 = gevent.spawn(eat) g2 = gevent.spawn(play) g1.join() g2.join() print("主线程执行完毕") # eat one1 # play one1 # eat one2 # play one2 # 主线程执行完毕
# (5) 终极解决识别问题
# spawn gevent中的spawn遇到阻塞会自动切换协程任务 from gevent import monkey # 把patch_all 下面引入的所有模块中的阻塞识别出来 monkey.patch_all() import time import gevent def eat(): print("eat one1") time.sleep(1) print("eat one2") def play(): print("play one1") time.sleep(1) print("play one2") g1 = gevent.spawn(eat) g2 = gevent.spawn(play) g1.join() g2.join() print("主线程执行结束..") # eat one1 # play one1 # eat one2 # play one2 # 主线程执行结束.. # 小结:使用from gevent import monkey模块 然后使用monkey.patch_all() # 把下面所有模块中的阻塞都识别出来于是方法中就可以写成time.sleep(1)替换 # gevent.sleep(1)形成终极方法五解决不能识别阻塞的问题
7.2协程的方法参数
# (1)spawn(函数,参数1,参数2.....) 启动一个协程
# (2)join() 阻塞,直到某个协程执行完毕
# (3)joinall() 等待所有协程执行完毕任务.
1.g1.join()
2.g2.join()
3.gevent.joinall([g1,g2]) 参数是一个列表
1和2并在一起 等于 3
# (4)value 获取协程返回值
a = 1;b=2 print(a,b) a=1 b=2 也可以简写a=1;b=2 # 两句话可以合并在一句话之后,但是语句和语句之间用分号隔开 from gevent import monkey;monkey.patch_all() import gevent import time def eat(): print("eat one1") time.sleep(1) print("eat one2") return "吃完了" def play(): print("play one1") time.sleep(1) print("play one2") return "玩完了" g1 = gevent.spawn(eat) g2 = gevent.spawn(paly) # 等待g1 g2两个协程任务全部执行完毕之后 再向下执行 gevent.joinall([g1, g2]) # 获取协程返回值 print(g1.value) print(g2.value)
# 用协程爬取界面
# 利用协程爬取页面数据 """ HTTP 状态码 200 ok 400 Bad Request 404 Not Found """ import requests # 抓取网站的信息 返回对象 response = requests.get("http://www.4399.com/") print(response) # 获取状态码 res = response.status_code print(res) # 获取字符编码集 res_code = response.apparent_encoding print(res_code) # response.encoding = res_code # 获取网页里面得内容 res = response.text print(res) import requests,time,gevent url_list = [ "http://www.baidu.com", "http://www.taobao.com", "http://www.4399.com", "http://www.jd.com", "http://www.7k7k.com" ] def get_url(url): response = requests.get(url) if response.status_code == 200: print(response.text) # 正常爬取数据 start = time.time() for i in url_list: get_url(i) end = time.time() print(end-start) # 0.96964430809021 # (2)用协程爬取数据 lst = [] start = time.time() for i in url_list: g = gevent.spawn(get_url,i) lst.append(g) gevent.joinall(lst) end = time.time() print(end-start)
8.定时器(了解)
# ## 定时器 (了解) """ Timer 几秒之后 执行某个任务 """ from threading import Timer def func(): print("正在执行某个任务...") # Timer (时间 执行的任务) t = Timer(3, func) t.start() print("主线程...") # 在实际生产中 利用linux的计划任务来取代 crontab来取代 # 主线程... 三秒后... # 正在执行某个任务...