概念:(一般用于处理高并发)
有了进程为什么还要线程?
进程只能在一个时间干一件事,如果相同时干两件事,进程就不行了
进程在执行过程中如果遇到阻塞,例如输入,整个进程就会挂起,即使进程中有些工作不依赖输入的数据,也将无法执行.
线程:
线程:线程是计算机中被cpu调度的最小单位.
线程:轻量级的进程/轻型进程
线程本身创建出来就是为了解决并发问题的
并且他整体效率比进程要高(相比于进程他省去了多数创建的时间和回收有时还有相互切换(轻量级))
是进程的一部分,不能独立存在
进程:(消耗资源)
对操作系统压力大
数据隔离
可以在操作系统中独立存在
计算机中资源分配的最小单位
多进程多线程都可以实现并行:资源够用同时进行,简单来说,多个cpu可以同时执行一个进程多个线程
全局解释器锁GiL: Cpython解释器下
python刚出来是单核的所以没有考虑到线程并行问题,随着以后cpu的的增多就有问题了,多个cpu(资源够用,会出现并行问题,那么当两个cpu同时执行一个线程时,就会出现数据混乱),所以 出现了全局解释器锁(锁线程的,结果看上去就又变成单核运算了,它是由解释器提供的)虽然慢了但是保障了数据的安全
所以: 1 GIL是锁进程的
2 这个锁是解释器提供的
所以当遇到高计算型:
多开线程或者换解释器
线程常见的几个模块:
thred(低级,不支持守护线程)
threding(高级的,功能更强大)
Queue :允许用户创建一个可以用于多个线程之间共享数据的队列数据结构。
threding模块
multiprocess模块的完全模仿了threading模块的接口,二者在使用层面,有很大的相似性,因而不再详细介绍(https://docs.python.org/3/library/threading.html?highlight=threading#)
线程的创建:
方式一:
def func(a): print('zi线程',a) if __name__ == '__main__': # 当前文件下才执行下面的代码,不同py文件xia不会执行 t=Thread(target=func,args=('李四',)) t.start() print('主线程')
#zi线程 李四
#主线程
方式二 : 面向对象型
class Sayhi(Thread): def __init__(self,name): super().__init__() self.name=name def run(self): time.sleep(2) print(self.name) if __name__ == '__main__': t=Sayhi('haha') t.start() print('主线程') #主线程 #haha
多线层与多进程:
多进程: 开多个进程,每个进程都有不同的pid
多线程:开启多个线程,每个的pid都和主进程都一样
同一进程内各个线程共享进程的数据
def work(): print('hello',os.getpid()) if __name__ == '__main__': #part1:在主进程下开启多个线程,每个线程都跟主进程的pid一样 t1=Thread(target=work) t2=Thread(target=work) t1.start() t2.start() print('主线程/00主进程pid',os.getpid()) #part2:开多个进程,每个进程都有不同的pid p1=Process(target=work) p2=Process(target=work) p1.start() p2.start() print('主线程/主进程pid',os.getpid())
# n = 100 # def func(): # global n # n -= 1 # # t = Thread(target=func) # t.start() # t.join() # print(n) # 随线一个线程的改动,整个进程中也改了
多线程实现socket通信
sever端 # socket实现多线程 import socket from threading import Thread def func(conn): while True: msg=conn.recv(1024).decode() conn.send(msg.upper().encode()) sk=socket.socket() sk.bind(('192.168.11.114',9000)) sk.listen() while True: conn,addr=sk.accept() Thread(target=func,args=(conn,)).start() Clinet端 import socket sk=socket.socket() sk.connect(('192.168.11.114',9000)) while True: sk.send(b'hello') print(sk.recv(1024).decode())
threading中的其他功能:
currentThread()# 查看线程号
activeCount() :查看当前还有几个活跃的线程
enumerate() 返回当前运行的线程list 里面是每个线程对象
例子:
import time from threading import Thread,currentThread,activeCount,enumerate class Mythread(Thread): def __init__(self,arg): super().__init__() self.arg = arg def run(self): time.sleep(1) print('in son',self.arg,currentThread()) for i in range(10): t = Mythread(123) t.start() print(t.ident) print(activeCount()) print(enumerate()) # 返回当前运行的线程list 里面是每个线程对象 # activeCount() :查看当前还有几个活跃的线程 # currentThread()# 查看线程号
守护线程:(setDaemon(True)
守护线程线程不会随着主线程的结束而结束,他会一直等着主进程结束然后继续等着其他子进程结束他才结束
守护进程 只守护主进程的代码,主进程代码结束他就结束,并且是在主进程结束之前给自己分配个结束点,而结束
进程 terminate 强制结束一个子进程
线程 没有强制结束的方法
线程结束: 线程内部代码结束执行完毕,就自动结束.
例子:
def func(): # while True: # print('in func') # time.sleep(0.5) # # def func2(): # print('start func2') # time.sleep(10) # print('end func2') # # Thread(target=func2).start() # t = Thread(target=func) # t.setDaemon(True) # t.start() # print('主线程') # time.sleep(2) # print('主线程结束')
(2)
锁
锁与Gil:(全局解释器锁,自然情况下大约每700条指令会轮转一次,所以即使有GIL也是不安全的,当你有个很长的代码是有可能运行到一半就移交给另一CPU了 所以就不准了)
总结来说:有了Gil换是会出现数据不安全的现象,所以还要用锁
from threading import Thread,Lock
n=100
def func():
global n
tmp=n-1
time.sleep(0.1) # 强行移交给另一个
n=tmp
if __name__ == '__main__':
l=[]
# lock=Lock()
for i in range (100):
t=Thread(target=func,)
t.start()
l.append(t)
for t in l:t.join()
print(n) # 99 # 如果没有sleep就是0
即使就拿现在的代码来说你不加sleep他的运算结果也没错,但是那是因为你的代码不够长所以你要操作其中的
数据时 一定要枷锁
必加锁 这样就安全了
import time from threading import Thread,Lock n = 100 def func(lock): global n # n -= 1 with lock: tmp = n-1 # n-=1 # time.sleep(0.1) n = tmp if __name__ == '__main__': l = [] lock = Lock() for i in range(100): t = Thread(target=func,args=(lock,)) t.start() l.append(t) for t in l:t.join() print(n)
死锁与递归锁
进程和线程都有死锁和递归锁
死锁:两个或两个以上的进程或线程在执行过程中,因抢夺资源而造成的一种互相等待的现象,若无
外力作用其,他们无法推进下去.此时系统产生了死锁
死锁
def eat1(name): noodle_lock.acquire() print('%s拿到面条了'%name) fork_lock.acquire() print('%s拿到叉子了'%name) print('%s开始吃面'%name) time.sleep(0.2) fork_lock.release() print('%s放下叉子了' % name) noodle_lock.release() print('%s放下面了' % name) def eat2(name): fork_lock.acquire() print('%s拿到叉子了' % name) noodle_lock.acquire() print('%s拿到面条了' % name) print('%s开始吃面' % name) time.sleep(0.2) noodle_lock.release() print('%s放下面了' % name) fork_lock.release() print('%s放下叉子了' % name) Thread(target=eat1,args=('alex',)).start() Thread(target=eat2,args=('wusir',)).start() Thread(target=eat1,args=('太白',)).start() Thread(target=eat2,args=('宝元',)).start()
解决方式:
lock = Lock() # def eat1(name): # lock.acquire() # print('%s拿到面条了'%name) # print('%s拿到叉子了'%name) # print('%s开始吃面'%name) # time.sleep(0.2) # lock.release() # print('%s放下叉子了' % name) # print('%s放下面了' % name) # # def eat2(name): # lock.acquire() # print('%s拿到叉子了' % name) # print('%s拿到面条了' % name) # print('%s开始吃面' % name) # time.sleep(0.2) # lock.release() # print('%s放下面了' % name) # print('%s放下叉子了' % name) # # Thread(target=eat1,args=('alex',)).start() # Thread(target=eat2,args=('wusir',)).start() # Thread(target=eat1,args=('太白',)).start() # Thread(target=eat2,args=('宝元',)).start()
互斥锁
无论在相同的线程还是不同的线程,都只能连续acquire一次
要想再acquire,必须先release
递归锁:Rlock()(在同一线程中,可以无限的acquire,
但是要想在其他线程中也acquire,
必须现在自己的线程中添加和acquire次数相同的release
)
例子:
rlock = RLock() # def func(num): # rlock.acquire() # print('aaaa',num) # rlock.acquire() # print('bbbb',num) # rlock.release() # rlock.release() # # Thread(target=func,args=(1,)).start() # Thread(target=func,args=(2,)).start()
总结:想要不出问题就锁一次开一次
信号量和事件:
信号量:
例子:
import time from threading import Semaphore,Thread def func(name,sem): sem.acquire() print(name,'kaishi') time.sleep(1) print(name,'jieshu') sem.release() sem=Semaphore(2) # 两个两个创建 for i in range(4): p=Thread(target=func,args=(i,sem)) p.start()
那他和进程池有什么区别:
进程池:如果是相同代码,进程池是先先创建2个然后后面一直利用这两个,可以避免一次开很多进程.
信号量:是一次性把4个都创建出来,然后两个两个放.
事件
几个常用方法:
event.isSet(): 返回event的状态.
event.wait() :如果event的状态为Flase将阻塞
event.clear(): 恢复event状态为Flase
event.set():设置其状态为True,所有阻塞池的状态进入激活状态,等待操作系统调度.
from threading import Event # 事件 # wait() 阻塞 到事件内部标识为True就停止阻塞 # 控制标识 # set # clear # is_set # 连接数据库 import time import random from threading import Thread,Event def connect_sql(e): count = 0 while count < 3: e.wait(0.5) # 最多阻塞0.5秒 然后程序向下执行 if e.is_set(): print('连接数据库成功') break else: print('数据库未连接成功') count += 1 def test(e): time.sleep(random.randint(0,3)) e.set() e = Event() Thread(target=test,args=(e,)).start() Thread(target=connect_sql,args=(e,)).start()
条件:
Python提供的Condition对象提供了对复杂线程同步问题的支持。
Condition被称为条件变量,除了提供与Lock类似的acquire和release方法外,
还提供了wait和notify方法。线程首先acquire一个条件变量,然后判断一些条件。
如果条件不满足则wait;如果条件满足,进行一些处理改变条件后,通过notify方法通知其他线程,
其他处于wait状态的线程接到通知后会重新判断条件。不断的重复这一过程,从而解决复杂的同步问题。
import threading def run(n): con.acquire() con.wait() print("run the thread: %s" % n) con.release() if __name__ == '__main__': con = threading.Condition() for i in range(10): t = threading.Thread(target=run, args=(i,)) t.start() while True: inp = input('>>>') if inp == 'q': break con.acquire() con.notify(int(inp)) con.release() print('****')
设置某个条件
# 如果满足这个条件 就可以释放线程
# 监控测试我的网速
# 20000个任务
# 测试我的网速 /系统资源
# 发现系统资源有空闲,我就放行一部分任务
定时器: 指定n秒后执行某个操作
from threading import Timer
from threading import Timer def func(): print('执行我啦') t = Timer(3,func) # 现在这个时间点我不想让它执行,而是预估一下大概多久之后它执行比较合适 t.start() print('主线程的逻辑')
# 3秒以后执行 func
线程队列: import queue
import queue # 线程对列 线程之间数据安全 p=queue.Queue(1) #设置队列大小 # 普通队列 p.get()# 获取 p.put() # 放入 p.get_nowait() p.put_nowait() # 连续p.put() 会让你的程序阻塞,他会一直等待有人拿他 p.get_nowait()#如果他有数据我就取,如果没有不阻塞而是报错 p.put(1) print(p.get(timeout=2))# 它里面可以放参数的 如果我等两秒没有拿到我就报错,但是一般不在程序中这么用,(死等)会影响效率 # 栈 先进后出 使用队列是可以实现栈的需求的 更加完整的约束我数据进出的顺序 lfq=queue.LifoQueue() lfq.put(1) lfq.put(2) lfq.put(3) print(lfq.get()) print(lfq.get()) print(lfq.get()) # 堆:最小的在最上面 # 优先级队列 根据第一个值的大小来排定优先级的 # 数字越小,优先级越高(阿斯克码的值越小,优先级越高) 第一个是数字,第二个是码 a=queue.PriorityQueue() a.put((10,'a')) a.put((1,'c')) print(a.get()) print(a.get())
线程池: from concurrent.futures import ThreadPoolExecutor 线程池类
from concurrent.futures import ThreadPoolExecutor import time import random from threading import currentThread # 可以查看线程号 ''' # 基本方法: submit(fn, *args, **kwargs) # 异步提交任务 map(func, *iterables, timeout=None, chunksize=1) #取代for循环submit的操作 shutdown(wait=True) # 相当于进程池的pool.close()+pool.join()操作 result(timeout=None)#取得结果 add_done_callback(fn)#回调函数 ''' def func(num): # time.sleep(2) print('in%s func'% num,currentThread()) # time.sleep(random.random()) tp=ThreadPoolExecutor(5) for i in range(30): tp.submit(func,i)
待续.....困得不行了