本部分内容是对《莫烦Python: 多进程》部分的总结。网址:https://morvanzhou.github.io/tutorials/python-basic/multiprocessing/。
1.Multiprocessing
充分利用多核优势,可以将任务平均分配给多核,每个核有自己的运算空间和运算能力。
弥补多线程的劣势,即GIL,避免只运行一个线程,这里可以运行多个线程。
Python中多进程和多线程threading使用差不多。
2.创建进程
(1)和threading类似
importmultiprocessing as mp
importthreading as td
defjob(a,d):
print('aaaaa')
t1= td.Thread(target=job,args=(1,2))
p1= mp.Process(target=job,args=(1,2))
t1.start()
p1.start()
t1.join()
p1.join()
(2)多进程
importmultiprocessing as mp
defjob(a,d):
print('aaaaa')
if__name__=='__main__': //一定要写
p1 = mp.Process(target=job,args=(1,2))//args=(1,2)是job函数的输入参数。
p1.start()
p1.join()
对于windows系统,直接运行出来结果。对于Mac系统,要在终端运行,否则用IDLE会报错。若用其它编译器,如Pycharm则不会报错。
3.存储进程输出Queue
Queue的功能是将每个核或线程的运算结果放在队里中, 等到每个线程或核运行完毕后再从队列中取出结果, 继续加载运算。原因很简单,多线程调用的函数不能有返回值,所以使用Queue存储多个线程运算的结果。
importmultiprocessing as mp
defjob(q):
res=0
for i in range(1000):
res+=i+i**2+i**3
q.put(res) #queue
if__name__=='__main__':
q = mp.Queue()
p1 = mp.Process(target=job,args=(q,))
p2 = mp.Process(target=job,args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()
res1 = q.get()
res2 = q.get()
print(res1+res2)
得出结果的步骤和2一样,结果是:
499667166000
4.多线程和多进程效率的对比
对比下多进程,多线程和什么都不做时的消耗时间,看看哪种方式更有效率。
importmultiprocessing as mp
importthreading as td
importtime #用来计算时间
defjob(q):
res = 0
for i in range(1000000):
res += i + i**2 + i**3
q.put(res)# queue
#多进程
defmulticore():
q = mp.Queue()
p1 = mp.Process(target=job, args=(q,))
p2 = mp.Process(target=job, args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()
res1 = q.get()
res2 = q.get()
print('multicore:',res1+ res2)
#多线程
defmultithread():
q = mp.Queue() # thread可放入process同样的queue中
t1 = td.Thread(target=job, args=(q,))
t2 = td.Thread(target=job, args=(q,))
t1.start()
t2.start()
t1.join()
t2.join()
res1 = q.get()
res2 = q.get()
print('multithread:',res1 + res2)
#正常
defnormal():
res = 0
for _ in range(2):
for i in range(1000000):
res += i + i**2 + i**3
print('normal:', res)
if__name__ == '__main__':
st = time.time()
normal()
st1 = time.time()
print('normal time:', st1 - st)
multithread()
st2 = time.time()
print('multithread time:', st2 - st1)
multicore()
print('multicoretime:', time.time() - st2)
结果显示:运行花的时间满足:多进程< 普通< 多线程。
5.进程池pool-Multiprocessing
进程池就是我们将所要运行的东西,放到池子里,Python会自行解决多进程的问题,如分配进程和结果。
importmultiprocessing as mp
#之前是不能将结果返回的,只能将结果放在队列Queue中。现在,使用pool后可以有返回值。
defjob(x):
return x*x
defmulticore():
pool = mp.Pool(processes=2)#自定义需要的核数量。
res = pool.map(job, range(10))#Pool和之前的Process的不同点是丢向Pool的函数有返回值,而Process的没有返回值。接下来用map()获取结果,在map()中需要放入函数和需要迭代运算的值,然后它会自动分配给CPU核,返回结果。
print(res)
#Pool除了map()外,还有可以返回结果的方式,那就是apply_async().apply_async()中只能传递一个值,它只会放入一个核进行运算,但是传入值时要注意是可迭代的,所以在传入值后需要加逗号,同时需要用get()方法获取返回值。
res = pool.apply_async(job, (2,))
print(res.get())
#若用apply_async()输出多个迭代,可以将apply_async()放入迭代器中,定义一个新的multi_res。同样在取出值时需要一个一个取出来。
multi_res =[pool.apply_async(job, (i,)) for i in range(10)]
print([res.get() for res in multi_res])
if __name__ == '__main__':
multicore()
我们怎么知道Pool是否真的调用了多个核呢?我们可以把迭代次数增大些,然后打开CPU负载看下CPU运行情况。打开CPU负载(Mac):活动监视器> CPU > CPU负载(单击一下即可)。Pool默认大小是CPU的核数,我们也可以通过在Pool中传入processes参数即可自定义需要的核数量。
6.共享内存
共享变量可以以用global。多进程中若想将global变量的一个CPU运行结果传递给另一个CPU是行不通的,要用到共享内存,实现CPU之间的信息交流。
我们可以通过使用Value数据存储在一个共享的内存表中。
importmultiprocessing as mp
#其中d和i参数用来设置数据类型的,d表示一个双精浮点类型,i表示一个带符号的整型。更多的形式请查看本页最后的表.可以被各CPU加载。
value1 = mp.Value('i',0)
value2= mp.Value('d', 3.14)
在Python的mutiprocessing中,有还有一个Array类,可以和共享内存交互,来实现在进程之间共享数据。
array= mp.Array('i', [1, 2, 3, 4])
这里的Array和numpy中的不同,它只能是一维的,不能是多维的。同样和Value一样,需要定义数据形式,否则会报错。
各参数代表的数据类型:
| Type code | C Type | Python Type | Minimum size in bytes |
| --------- | ------------------ |----------------- | --------------------- |
| `'b'` | signed char | int | 1 |
| `'B'` | unsigned char | int | 1 |
| `'u'` | Py_UNICODE | Unicode character | 2 |
| `'h'` | signed short | int | 2 |
| `'H'` | unsigned short | int | 2 |
| `'i'` | signed int | int | 2 |
| `'I'` | unsigned int | int | 2 |
| `'l'` | signed long | int | 4 |
| `'L'` | unsigned long | int | 4 |
| `'q'` | signed long long | int | 8 |
| `'Q'` | unsigned long long | int | 8 |
| `'f'` | float | float | 4 |
| `'d'` | double | float | 8 |
7. 进程锁Lock
#不加进程锁。结果显示两个进程出现冲突,相互抢着使用共享内存。
import multiprocessing as mp
import time
def job(v, num):
for_ in range(5):
time.sleep(0.1) # 暂停0.1秒,让输出效果更明显
v.value += num # v.value获取共享变量值
print(v.value, end="")
def multicore():
v =mp.Value('i', 0) # 定义共享变量
p1= mp.Process(target=job, args=(v,1))
p2= mp.Process(target=job, args=(v,3)) # 设定不同的number看如何抢夺内存
p1.start()
p2.start()
p1.join()
p2.join()
if __name__ == '__main__':
multicore()
#加进程锁
importmultiprocessing as mp
importtime
defjob(v, num, l):#多个l,所以锁住。
l.acquire()#锁住
for _ in range(10):
time.sleep(0.1)
v.value += num
print(v.value)
l.release()#释放
defmulticore():
l = mp.Lock()#定义一个进程锁
v = mp.Value('i', 0)#定义共享内存
p1 = mp.Process(target=job, args=(v, 1, l))#将Lock传入
p2 = mp.Process(target=job, args=(v, 3, l))
p1.start()
p2.start()
p1.join()
p2.join()
if__name__ == '__main__':
multicore()
不加Lock:1 加Lock: 1
4 2
5 3
8 4
9 5
12 8
13 11
16 14
17 17
20 20