目录
一、multiprocessing简介
1、multiprocessing概述
multiprocessing 是一个用于创建进程的包,具有与 threading 模块相似API。 multiprocessing 包同时提供本地和远程并发,使用子进程代替线程,有效避免 Global Interpreter Lock 带来的影响。因此, multiprocessing 模块允许程序员充分利用机器上的多核。
2、Process 概述
Process 是 multiprocessing 中的类,通过创建一个 Process 对象然后调用它的 start() 方法来创建子进程。 Process 和 threading.Thread API 大致相同:
class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None) # 使用关键字参数调用构造函数
简单举例:
import multiprocessing
import os
def info(title):
print(title)
print('parent process:', os.getppid()) # 父进程ID
print('process id:', os.getpid()) # 当前进程ID
if __name__ == '__main__':
print('process id:', os.getpid())
print('----------------------')
info('main process')
p = multiprocessing.Process(target=info, args=('subprocess',))
p.start()
二、进程间的队列通信Queue
进程之间通信可以使用Queue方法,通常自定义函数中引用全局变量:global 变量名不能起到进程之间的通信作用。
import multiprocessing
import os
import time
list_1 = [2]
def write_process(queue):
global list_1 # 使用全局变量,这里global是读取复制了一个list_1,修改list_1,不会对外面的全局变量list_1起任何作用
i = 0
while True:
if queue.full(): # 判断消息队列是否已满
print('已经写满')
print('进程名称:', multiprocessing.current_process()) # 获取当前进程的名称
break
else:
print(f'写入数据{i}')
# put()方法向消息队列中增加数据,有两个可选参数, block和 timeout ,
# block默认是Ture,消息列队满后程序阻塞,等待有数据取出后再写入;如果修改为False消息列队满后会抛出queue.Full异常
# timeout设置等待时间,在block=False的情况下,等待时间后仍然无法写入数据才抛出异常,否则立即抛出异常
queue.put(i)
list_1.append(i)
i += 1
time.sleep(0.5)
print(list_1)
def read_process(queue):
time.sleep(0.5)
while True:
if queue.empty(): # 判断消息队列是否为空
print('列队为空')
print('进程名称:', multiprocessing.current_process())
break
else:
# get()从消息队列中依此取值并将其从队列中移除,不能任意取值,get()有两个可选参数 block 和 timtout,
# block默认True,当队列中数据被全部数据为空时会阻塞程序,等待新数据写入。如果修改为False则会立即抛出异常_queue.Empty
# timtout 同理put的参数
value = queue.get() # 取出消息队列的一个数据
print('读取数据:', value)
def multi_queue():
# 创建消息队列,可以增加一个整数型参数,设置消息队列的数据数量,如果不设参数或者参数为负值,则默认为没有上限,直到内存用尽
queue = multiprocessing.Queue(5)
w_process = multiprocessing.Process(target=write_process, args=(queue,), name='wuwei_w')
r_process = multiprocessing.Process(target=read_process, args=(queue,), name='wuwei_r')
w_process.start()
w_process.join() # 设置运行先后顺序
r_process.start()
r_process.join() # 这里可以用来阻塞主进程,等子进程结束在运行主进程
print(queue.qsize()) # 获取消息队列中数据数量
print(list_1) # 这里注意虽然在子进程中修改了list_1,但是在全局变量list_1的值始终未改变
if __name__ == '__main__':
print(multiprocessing.cpu_count()) # 主机CPU核心数
# 指定进程启动的方法,['fork','spawn','forkserver'] spawn可用于windows(默认)、macOS(默认)和unix,fork仅用于unix(默认)
multiprocessing.set_start_method('spawn', True) # 开发环境和运行环境相同时可以不指定,否则最好指定
multi_queue()
三、锁(Lock)来实现进程间数据修改同步
类似于threading 的Lock,用法也相同,如果不使用锁的情况下,来自于多进程的输出很容易产生混淆。这里使用官方文档的例子:
from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
try:
print('hello world', i)
finally:
l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
四、进程间共享变量
1、共享内存( Value 和 Array )
Value 或 Array 可以将数据存储在共享内存映射中
def func(num, array):
num.value = 1.11
for i in range(len(array)):
array[i] = -array[i]
def multi_value_and_array():
# Value(typecode_or_type, *args) 第一个参数是指定数据类型(注意没有字符串格式,仅float和int),第二个参数可以接受变量的值
# typecode_or_type的类型使用简写并且带'',包括:'f'和'd'等同于: float;'b'/'B'/'h'/'H'/'i'/'I'/'l'/'L'等同于:int
num = multiprocessing.Value('d', 0.0) # 创建普通共享内存变量
arr = multiprocessing.Array('i', range(10))
p = multiprocessing.Process(target=func, args=(num, arr))
p.start()
p.join()
print(num.value)
print(arr[:])
输出结果:
1.11
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
multiprocessing.Value(typecode_or_type, *args)的参数类型: typecode可为以下Type Code列类型:
Type Code | Python Type |
---|---|
‘c’ | character |
‘b’ | int |
‘B’ | int |
‘u’ | character |
‘h’ | int |
‘H’ | int |
‘i’ | int |
‘I’ | int |
‘l’ | int |
‘L’ | int |
‘f’ | float |
‘d’ | float |
2、共享服务进程Manager
由 Manager() 返回的管理器对象控制一个服务进程,该进程保存Python对象并允许其他进程使用代理操作它们。
Manager() 返回的管理器支持类型: list 、 dict 、 Namespace 、 Lock 、 RLock 、 Semaphore 、 BoundedSemaphore 、 Condition 、 Event 、 Barrier 、 Queue 、 Value 和 Array 。
可以看到Manager() 也可以管理Value 和 Array,参数定义基本相同,只是取值赋值需要通过get()/set()方法进行。
例如:
# 不能将共享变量定义成全局变量然后通过global引用那样会报错,只能通过参数传递。但如果是在同一个类中,可以用self定义共享锁和变量
def sub_process(process_name, share_var, share_value, share_lock):
share_lock.acquire() # 获取锁
share_var.append(process_name) # 这里可以直接使用list的大部分方法,但是clear()方法会报错
print(share_value.get()) # 获取共享变量share_value
share_value.set(share_value.get()+2.0)
share_lock.release() # 释放锁
for item in share_var:
print(f"{process_name}-{item}")
def main_process():
# 单个值声明方式。typecode是进制类型,C写法和Python写法都可以;value是初始值。
# 这种单值形式取值赋值需要通过get()/set()方法进行,不能直接如一般变量那样取值赋值
share_value = multiprocessing.Manager().Value('f', 0.0)
# 数组声明方式。typecode是数组变量中的变量类型,sequence是数组初始值
# share_arr = multiprocessing.Manager().Array(typecode, sequence)
# 字典声明方式
# share_dict = multiprocessing.Manager().dict()
# 列表声明方式
share_list = multiprocessing.Manager().list()
share_list.append("start flag")
# 声明一个共享锁
share_lock = multiprocessing.Manager().Lock()
process_list = []
process_name = "process 1"
tmp_process = multiprocessing.Process(target=sub_process, args=(process_name, share_list, share_value, share_lock))
process_list.append(tmp_process)
process_name = "process 2"
tmp_process = multiprocessing.Process(target=sub_process, args=(process_name, share_list, share_value, share_lock))
process_list.append(tmp_process)
for process in process_list:
process.start()
for process in process_list:
process.join()
if __name__ == '__main__':
main_process()
输出如下:
0.0
process 1-start flag
process 1-process 1
2.0
process 2-start flag
process 2-process 1
process 2-process 2
class Multi_share(object):
def __init__(self):
# 单个值声明方式。typecode是进制类型,C写法和Python写法都可以;value是初始值。
# 这种单值形式取值赋值需要通过get()/set()方法进行,不能直接如一般变量那样取值赋值
self.share_value = multiprocessing.Manager().Value('f', 0.0)
# 数组声明方式。typecode是数组变量中的变量类型,sequence是数组初始值
# share_arr = multiprocessing.Manager().Array(typecode, sequence)
# 字典声明方式
# share_dict = multiprocessing.Manager().dict()
# 列表声明方式
self.share_list = multiprocessing.Manager().list()
self.share_list.append("start flag")
# 声明一个共享锁
self.share_lock = multiprocessing.Manager().Lock()
self.process_name = "process 1"
self.process_name = "process 2"
# 同一个类中,可以用self定义共享锁和变量
def sub_process(self):
self.share_lock.acquire() # 获取锁
self.share_list.append(self.process_name) # 这里可以直接使用list的大部分方法,但是clear()方法会报错
print(self.share_value.get()) # 获取共享变量share_value
self.share_value.set(self.share_value.get() + 2.0)
self.share_lock.release() # 释放锁
for item in self.share_list:
print(f"{self.process_name}-{item}")
def main_processing(self):
process_list = []
tmp_process = multiprocessing.Process(target=self.sub_process)
process_list.append(tmp_process)
tmp_process = multiprocessing.Process(target=self.sub_process)
process_list.append(tmp_process)
for process in process_list:
process.start()
process.join()
if __name__ == '__main__':
a = Multi_share()
a.main_processing()
把进程锁和共享变量在类中初始化中使用self定义是完全可以使用大的,输出的结果与上面是完全相同的:
0.0
process 2-start flag
process 2-process 2
2.0
process 2-start flag
process 2-process 2
process 2-process 2
3、共享实例化对象global和BaseManager
通过global共享的实例化对象是只读的,不可以对实例化对象做任何修改,其实修改也不会报错,只是输出结果让你感到迷茫。直接使用BaseManager可以解决这些问题,但是使用BaseManager有个很奇怪的地方,必须使用“from multiprocessing.managers import BaseManager”导入,如果只导入import multiprocessing,然后multiprocessing.managers.BaseManager()就会报错:AttributeError: module ‘multiprocessing’ has no attribute ‘managers’,怪哉!怪哉!(不知道是不是pycharm的问题)
# 定义一个要共享实例化对象的类
class TestMulti(object):
def __init__(self):
self.test_list = ["start flag"]
def add_list(self, arg):
self.test_list.append(arg)
print('-------')
def print_list(self):
print(self.test_list)
share_lock = multiprocessing.Lock() # 锁可以通过global传递,也可以在Process中传参
text = TestMulti() # 实例化对象也可以通过global传递进去,并且可以执行类方法,但是对类内变量做任何修改都是无效的
def test_sub_process(process_name, obj):
global share_lock
global text
share_lock.acquire()
obj.add_list(f"{process_name}")
share_lock.release()
# 在这里使用text调用add_list方法不会报错,而且add_list方法的内print('-------')也执行了,但是"wuwei"却没有添加到列表中
text.add_list("wuwei")
obj.print_list()
def test_main_process():
from multiprocessing.managers import BaseManager
# 为了更加直接我们直接以一个Test类的实例化对象来演示 multiprocessing.managers
manager = BaseManager()
# 必须要在start前注册,不然就注册无效
manager.register('Test', TestMulti) # 第一个参数是要注册实例对象名称(后面使用这个名称),第二个参数是共享实例对象
manager.start()
obj = manager.Test()
# 注册系统函数open可以如下操作
# manager = BaseManager()
# # 一定要在start前注册,不然就注册无效
# manager.register('open', open)
# manager.start()
# obj = manager.open("1.txt","a")
process_list = []
# 创建进程1
process_name = "process 1"
tmp_process = multiprocessing.Process(target=test_sub_process, args=(process_name, obj))
process_list.append(tmp_process)
# 创建进程2
process_name = "process 2"
tmp_process = multiprocessing.Process(target=test_sub_process, args=(process_name, obj))
process_list.append(tmp_process)
# 启动所有进程
for process in process_list:
process.start()
process.join()
if __name__ == '__main__':
print(multiprocessing.cpu_count()) # 主机CPU核心数
# 指定进程启动的方法,['fork','spawn','forkserver'] spawn可用于windows(默认)、macOS(默认)和unix,fork仅用于unix(默认)
multiprocessing.set_start_method('spawn', True) # 开发环境和运行环境相同时可以不指定,否则最好指定
# multi() # 使用进程
# multi_queue() # 进程间传递消息queue和global
# main_process() # 进程间共享变量
# multi_value_and_array()
# a = Multi_share() # 进程间以类的方式共享变量
# a.main_processing()
test_main_process()
text.print_list()
输出结果如下:
-------
-------
['start flag', 'process 1']
-------
-------
['start flag', 'process 1', 'process 2']
['start flag']