Python多任务-进程
1 进程和程序
- 进程:正在执行的程序
- 程序:没有执行的代码,是一个静态的
进程的状态
2、使用进程实现多任务
multiprocessing模块就是跨平台的多进程模块,提供了一个Process类来代表一个进程对象,这个对象可以理解为是一个独立的进程,可以执行另外的事情。
线程和进程之间的对比
进程:能够完成多任务,一台电脑上可以同时运行多个QQ
线程:能够完成多任务,一个QQ中的多个聊天窗口
进程和线程的根本区别:
- 进程是操作系统资源分配的基本单位
- 而线程是任务调度和执行的基本单位
2.1 队列间简单通信
进程间通信需要用到队列-Queue,作为参数在进程间进行数据传递
Queue-队列特点是 先进先出
队列属性,q.empty()、q.qsize()、q.full()
q = queue.Queue() #不能完成进程之间通信
q = multiprocessing.Queue() # 进程间通信
q = multiprocessing.Manager().Queue() #进程池中的进程通信
2.1.1 普通队列进程间通信 queue.Queue()
普通队列不能用 start() 方法,而是 run()
from queue import Queue
import multiprocessing
def demo1(q):
q.put('a')
def demo2(q):
data = q.get()
print('demo2队列传参:',data)
def main():
# 创建一个普通队列
q = Queue()
# q = queue.Queue() #不能完成进程之间通信
# q = multiprocessing.Queue() # 进程间通信
# q = multiprocessing.Manager().Queue() #进程池中的进程通信
p1 = multiprocessing.Process(target=demo1, args=(q,))
p2 = multiprocessing.Process(target=demo2, args=(q,))
#普通队列不能用 p1.start() 方法
p1.run()
p2.run()
if __name__ == '__main__':
main()
上面代码返回结果:
2.1.2 进程队列进程间通信 multiprocessing.Queue()
模拟下载数据,与数据处理
import multiprocessing
def download(q):
lis = [11, 22, 33]
for item in lis:
q.put(item)
# print(q.qsize())
# print(q.full())
print('下载完成并保存到队列中')
def analysis(q):
analysis_data = list()
print('队列长度:', q.qsize())
while True:
data = q.get_nowait()
analysis_data.append(data)
if q.empty():
break
print('队列内容:', analysis_data)
def main():
# 创建进程队列
q = multiprocessing.Queue(3)
# q = queue.Queue() #不能完成进程之间通信
# q = multiprocessing.Queue() # 进程间通信
q = multiprocessing.Manager().Queue() #进程池中的进程通信
# 队列作为进程的参数传递
p1 = multiprocessing.Process(target=download, args=(q,))
p2 = multiprocessing.Process(target=analysis, args=(q,))
p1.start()
p2.start()
if __name__ == '__main__':
main()
上面代码运行结果:
2.1.3 进程池队列,进程池间通信
import multiprocessing
def demo1(q):
try:
for i in range(10):
q.put(i)
except Exception as e:
print(e)
def demo2(q):
try:
for i in range(10):
data = q.get()
print(data, end=' ')
except Exception as e:
print(e)
def main():
# 创建一个进程池队列
q = multiprocessing.Manager().Queue()
# q = queue.Queue() #普通队列,不能完成进程之间通信
# q = multiprocessing.Queue() # 进程队列,进程间通信
q = multiprocessing.Manager().Queue() #进程池队列,进程池中的进程通信
# 创建进程池对象
po = multiprocessing.Pool(4)
po.apply_async(demo1, args=(q,))
po.apply_async(demo2, args=(q,))
# 关闭进程池,不允许再增加进程
po.close()
# 等待子进程结束
po.join()
if __name__ == '__main__':
main()
3、多进程共享全局变量
共享全局变量不适用于多进程编程,进程间不共享全局变量,而线程间是共享的
import multiprocessing
a = 1
def demo1():
global a
a += 1
def demo2():
# 进程间不共享全局变量,而线程是共享的
print(a)
def main():
p1 = multiprocessing.Process(target=demo1)
p2 = multiprocessing.Process(target=demo2)
p1.start()
p2.start()
if __name__ == '__main__':
main()
返回结果:
4、进程池
当需要创建的子进程数量不多时,可以直接利用multiprocessing中的Process动态生成多个进程,但是如果是上百甚至上千个目标,手动的去创建的进程的工作量巨大,此时就可以用到multiprocessing模块提供的Pool方法
初始化Pool时,可以指定一个最大进程数,当有新的请求提交到Pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求,但是如果池中的进程数已经达到指定的最大值,那么该请求就会等待,直到池中有进程结束,才会用之前的进程来执行新的任务
来看下面例子
from multiprocessing import Pool
import os
import time
import random
def worker(msg):
try:
t_start = time.time()
# print(time.ctime(t_start))
print("%s进程开始执行,进程号%d" % (msg, os.getpid()))
time.sleep(random.random() * 2)
t_stop = time.time()
print('%s执行完成耗时%0.2f' % (msg, (t_stop - t_start)))
except Exception as e:
print(e)
def demo():
pass
def main():
po = Pool(3)
for i in range(10):
po.apply_async(worker, (i,))
print('start')
po.close()
po.join()
print('end')
if __name__ == '__main__':
main()
上面代码返回值:可以看出,因为进程池定义了3个进程,所以可以从进程号看出来,10个任务轮番使用3个进程号
4.1 进程池间的进程通信
多任务文件夹复制
1 获取用户要copy的文件夹的名次
2 创建一个新的文件夹
3 获取文件夹的所有的待copy的文件名字
4 创建进程池
5 向进程池中添加拷贝任务
'''
1 源文件夹名字
2 创建目标文件夹
3 获取文件夹所有拷贝文件名字
4 创建进程池
5 添加拷贝任务
'''
import multiprocessing
import os
import time
def copy_file(q, file_name, old_folder_name, new_folder_name):
# print('拷贝的文件名字为:', file_name)
with open(old_folder_name + '/' + file_name, 'rb') as f:
content = f.read()
with open(new_folder_name + '/' + file_name, 'wb') as f:
f.write(content)
q.put(file_name)
time.sleep(0.1)
def main():
# 获取要拷贝文件夹的名字
# old_folder_name = input('请输入将要拷贝文件夹名字:')
old_folder_name = 'demo'
# 创建新文件夹名字
new_folder_name = old_folder_name + '复件'
if not os.path.exists(new_folder_name):
os.mkdir(new_folder_name)
# 获取待拷贝文件的名字
file_names = os.listdir(old_folder_name)
# print(file_names)
# 创建进程池
po = multiprocessing.Pool(5)
q = multiprocessing.Manager().Queue()
# 添加拷贝任务
for file_name in file_names:
# print(2)
# apply_async是异步非阻塞式的,不等子进程执行完毕,主进程就已经执行完了,所以要加po.join()告诉主进程等待
po.apply_async(copy_file, args=(q, file_name, old_folder_name, new_folder_name))
# apply是阻塞式的,即使没有po.join() 也能保证子进程执行完毕
# po.apply(copy_file, args=(file_name, old_folder_name, new_folder_name))
po.close()
# 文件总数
file_total_num = len(file_names)
copy_file_num = 0
# 文件拷贝进度
while True:
file_name = q.get()
copy_file_num += 1
print('\r文件拷贝进度%2.f%%' % (copy_file_num * 100 / file_total_num), end='')
if copy_file_num >= file_total_num:
break
po.join()
if __name__ == '__main__':
main()
执行结果:
可以看到,进度从1%到100%的变化
复制文件夹“demo”中文件到“demo复件”文件夹