一.Manager
(list列表,dict字典)进程之间的数据共享(列表或者字典等)
from multiprocessing import Process,Manager,Lock
def work(data, lock):
# 1.正常写法
"""
# 上锁
lock.acquire()
# 修改数据
data["count"] -= 1
# 解锁
lock.release()
"""
# 2.使用with语法可以简化上锁和解锁两步操作
with lock:
data[0] += 1
if __name__ == '__main__':
lst = []
lock = Lock()
m = Manager()
# data = m.dict({"count": 20000})
data = m.list([1, 2, 3])
for i in range(50):
p = Process(target=work, args=(data, lock))
p.start()
lst.append(p)
# 确保所有进程执行完毕之后,再向下执行,打印数据,负责报错
for i in lst:
i.join()
print(data)
二.线程
进程:资源分配的最小单位
线程:CPU执行程序的最小单位
1.一个进程资源中可以包含多个线程
from threading import Thread
from multiprocessing import Process
import os,time,random
def func(num):
time.sleep(random.uniform(0.1, 1))
print("当前进程{},参数是{}".format(os.getpid(), num))
for i in range(10):
t = Thread(target=func, args=(i,))
t.start()
print(os.getpid())
2.并发的多线程和多进程谁的速度快? 多线程!
def func(num):
print("当前进程是{},参数是{}".format(os.getpid(), num))
if __name__ == '__main__':
# 多线程
lst = []
# 记录开始的时间
startime = time.time()
for i in range(1000):
t = Thread(target=func, args=(i,))
t.start()
lst.append(t)
# 等到所有的子线程执行完毕
for i in lst:
i.join()
# 计算结束时间
endtime = time.time()
print("多线程执行时间", (endtime - startime))
# 多进程
"""
lst = []
# 记录开始时间
stratime = time.time()
for i in range(1000):
t = Process(target=func, args=(i,))
t.start()
lst.append(t)
# 等到所有的子进程执行完毕
for i in lst:
i.join()
# 计算结束时间
endtime = time.time()
print("多进程执行时间", (endtime - stratime))
"""
3.多线程之间,共享同一份进程资源
num = 1000
def func():
global num
num -= 1
for i in range(1000):
t = Thread(target=func)
t.start()
print(num)
三.用类定义线程
from threading import Thread
import os
import time
class MyThread(Thread):
def __init__(self, name):
# 手动调用父类的构造方法
super().__init__()
self.name = name
def run(self):
time.sleep(1)
print("当前进程号码是{},名字是{}".format(os.getpid(), self.name))
if __name__ == '__main__':
t = MyThread("当前是一个线程")
t.start()
print("主线程执行结束...")
四.线程相关的函数
线程.is_alive() 检测线程是否仍然存在
线程.setName() 设置线程名字
线程.getName() 获取线程名字
currentThread().ident 查看线程id号
enumerate() 返回目前正在运行的线程列表
activeCount() 返回目前正在运行的线程数量
def func():
time.sleep(1)
if __name__ == '__main__':
t = Thread(target=func)
t.start()
# 检测线程是否仍然存在
print(t.is_alive())
# 获取线程名字
print(t.getName()) # Thread-1
# 设置线程名字
t.setName("cwj")
# 获取线程名字
print(t.getName())
from threading import currentThread
1.currentThread().ident 查看线程id号
def func():
print("子线程的线程id{}".format(currentThread().ident))
if __name__ == '__main__':
Thread(target=func).start()
print("主线程的线程id{}".format(currentThread().ident))
2.enumerate() 返回目前正在运行的线程列表
3.activeCount() 返回目前正在运行的线程数量(了解)
from threading import enumerate
from threading import activeCount
def func():
print("子线程的线程id{}".format(currentThread().ident))
time.sleep(0.5)
if __name__ == '__main__':
for i in range(10):
Thread(target=func).start()
lst = enumerate()
# 主线程 + 10 个子线程
print(lst, len(lst))
# 3.activeCount() 返回目前正在运行的线程数量
print(activeCount()) # 11
五.守护线程:
等待所有线程全部执行完毕之后,自己再终止,守护所有线程
from threading import Thread
import time
def func1():
while True:
time.sleep(0.5)
print("我是func1")
def func2():
print("我是func2 start...")
time.sleep(3)
print("我是func2 end...")
def func3():
print("我是func3 start...")
time.sleep(5)
print("我是func3 end...")
if __name__ == '__main__':
t1 = Thread(target=func1)
t2 = Thread(target=func2)
t3 = Thread(target=func3)
# 在start调用之前,设置线程为守护线程
t1.setDaemon(True)
t1.start()
t2.start()
t3.start()
print("主线程执行结束...")
六.线程中安全问题 Lock
from threading import Lock,Thread
import time
n = 0
def func1(lock):
global n
with lock:
for i in range(1000000):
# lock.acquire()
n += 1
# lock.release()
def func2(lock):
global n
# with 自动完成上锁 + 解锁
with lock:
for i in range(1000000):
# lock.acquire()
n -= 1
# lock.release()
if __name__ == '__main__':
lst = []
lock = Lock()
startime = time.time()
for i in range(10):
t1 = Thread(target=func1, args=(lock,))
t2 = Thread(target=func2, args=(lock,))
t1.start()
t2.start()
lst.append(t1)
lst.append(t2)
for i in lst:
i.join()
endtime = time.time()
print("主线程执行结束...打印{} 时间是{}".format(n, endtime - startime))
七.信号量 Semaphore(线程)
from threading import Semaphore,Thread
import time
def func(i, sm):
# 上锁 + 解锁
with sm:
print(i)
time.sleep(3)
if __name__ == '__main__':
# 支持同一时间,5个线程上锁
sm = Semaphore(5)
for i in range(20):
Thread(target=func, args=(i, sm)).start()
在创建线程的时候是异步创建
在执行任务时,遇到Semaphore进行上锁,会变成同步程序
八.死锁 互斥锁 递归锁
from threading import Lock,Thread,RLock
import time
1.语法上的死锁
只上锁不解锁是死锁
lock = Lock()
lock.acquire()
lock.acquire()
lock.acquire()
lock.release()
print(1)
2.逻辑上的死锁
在Linux存在一定的概率性
noodle_lock = Lock()
kuaizi_lock = Lock()
def eat1(name):
noodle_lock.acquire()
print("%s 抢到面条了" % (name))
kuaizi_lock.acquire()
print("%s 抢到筷子了" % (name))
print("开始享受面条...")
time.sleep(0.5)
kuaizi_lock.release()
print("%s 放下筷子" % (name))
noodle_lock.release()
print("%s 放下面条" % (name))
def eat2(name):
kuaizi_lock.acquire()
print("%s 抢到筷子了" % (name))
noodle_lock.acquire()
print("%s 抢到面条了" % (name))
print("开始享受面条...")
time.sleep(0.5)
noodle_lock.release()
print("%s 放下面条" % (name))
kuaizi_lock.release()
print("%s 放下筷子" % (name))
if __name__ == '__main__':
name_list1 = ["宋云杰", "高云杰"]
name_list2 = ["王浩", "孙志心"]
for name in name_list1:
Thread(target=eat1, args=(name,)).start()
for name in name_list2:
Thread(target=eat2, args=(name,)).start()
3.递归锁的使用
递归锁专门用来解决这种死锁现象
临时用来快速解决线上项目发生阻塞死锁问题的
rlock = RLock()
rlock.acquire()
rlock.acquire()
rlock.acquire()
rlock.acquire()
print(112233)
rlock.release()
rlock.release()
rlock.release()
rlock.release()
print("程序结束...")
4.用递归锁解决死锁现象
noodle_lock = kuaizi_lock = RLock()
def eat1(name):
noodle_lock.acquire()
print("%s 抢到面条了" % (name))
kuaizi_lock.acquire()
print("%s 抢到筷子了" % (name))
print("开始享受面条...")
time.sleep(0.5)
kuaizi_lock.release()
print("%s 放下筷子" % (name))
noodle_lock.release()
print("%s 放下面条" % (name))
def eat2(name):
kuaizi_lock.acquire()
print("%s 抢到筷子了" % (name))
noodle_lock.acquire()
print("%s 抢到面条了" % (name))
print("开始享受面条...")
time.sleep(0.5)
noodle_lock.release()
print("%s 放下面条" % (name))
kuaizi_lock.release()
print("%s 放下筷子" % (name))
if __name__ == '__main__':
name_list1 = ["宋云杰", "高云杰"]
name_list2 = ["王浩", "孙志心"]
for name in name_list1:
Thread(target=eat1, args=(name,)).start()
for name in name_list2:
Thread(target=eat2, args=(name,)).start()
5.互斥锁(尽量用一把锁解决问题)
mylock = Lock()
def eat1(name):
mylock.acquire()
print("%s 抢到面条了" % (name))
print("%s 抢到筷子了" % (name))
print("开始享受面条...")
time.sleep(0.5)
print("%s 放下筷子" % (name))
print("%s 放下面条" % (name))
mylock.release()
def eat2(name):
mylock.acquire()
print("%s 抢到面条了" % (name))
print("%s 抢到筷子了" % (name))
print("开始享受面条...")
time.sleep(0.5)
print("%s 放下筷子" % (name))
print("%s 放下面条" % (name))
mylock.release()
if __name__ == '__main__':
name_lst1 = ["宋云杰", "高云杰"]
name_lst2 = ["王浩", "孙志心"]
for name in name_lst1:
Thread(target=eat1, args=(name,)).start()
for name in name_lst2:
Thread(target=eat2, args=(name,)).start()
九.事件 Event
from threading import Event,Thread
import time,random
e = Event()
# wait 动态添加阻塞
# clear 将内部的阻塞值改为False
# set 将内部的阻塞值改为True
# is_set 获取内部的阻塞值状态(True False)
1.基本语法
e = Event()
print(e.is_set())
e.set()
print(e.is_set())
e.clear()
print(e.is_set())
# 代表最多阻塞3秒
e.wait(3)
print("程序运行中...")
2.模拟连接远程数据库
def check(e):
# 用一些延迟来模拟检测的过程
time.sleep(random.randrange(1, 6))
# time.sleep(1)
print("开始检测连接用户的合法性")
e.set()
def connect(e):
sign = False
for i in range(1, 4):
# 设置最大等待1秒
e.wait(1)
if e.is_set():
print("数据库连接成功...")
sign = True
break
else:
print("尝试连接数据库第%s次失败..." % (i))
if sign == False:
# 主动抛出异常(超时异常)
raise TimeoutError
e = Event()
# 线程1号负责执行连接任务
Thread(target=connect, args=(e,)).start()
# 线程2号负责执行检测任务
Thread(target=check, args=(e,)).start()