多任务原理
现代操作系统(Windows、Mac OS X、Linux、UNIX等)都支持“多任务”。
什么叫多任务? 操作系统同时可以运行多个任务。
为什么会需要多任务?
单核CPU实现多任务原理:操作系统轮流让各个任务交替执行,QQ执行2us,切换到微信,在执行2us,再切换到陌陌,执行2us……。表面是看,每个任务反复执行下去,但是CPU调度执行速度太快了,导致我们感觉就行所有任务都在同时执行一样。
多核CPU实现多任务原理:真正的并行执行多任务只能在多核CPU上实现,但是由于任务数量远远多于CPU的核心数量,所以,操作系统也会自动把很多任务轮流调度到每个核心上执行。并发:看上去一起执行,任务数多于CPU核心数。并行:真正一起执行,任务数小于等于CPU核心数。
实现多任务的方式: 1、多进程模式 2、多线程模式 3、协程模式 4、多进程+多线程模式
进程
对于操作系统而言,一个任务就是一个进程。
进程是系统中程序执行和资源分配的基本单位。每个进程都有自己的数据段、代码段、和堆栈段。
先来体会一下单任务的场景:
from time import sleep # 任务一 def run(): while True: print('hot day') sleep(1.2) if __name__ == '__main__': # 任务二 while True: print('good day') sleep(1) # 只有上面的while循环结束才会执行到run run()
如何让上面代码中的run方法执行呢。使用多进程。
python中多进程的库为multiprocessing.这是一个垮平台版本的多进程模块,提供了一个Process类来代表一个进程对象。
使用最简单的多进程实现上面的多任务场景:
from multiprocessing import Process from time import sleep def run(): while True: print('hot day') sleep(1.2) if __name__ == '__main__': print('主(父)进程启动') # 创建子进程 p = Process(target=run) # 启动进程 p.start() while True: print('good day') sleep(1)
如果run方法需要接收参数,可以通过args来传递:
p = p = Process(target=run, args=('hot',))
注意args是位置参数,传递位置参数时,必须按照参数位置来传递。如果需要传递关键词参数使用 kwargs.
每个进程都有一个唯一的id号,我们一般称之为pid。
在python代码中获取进程id号使用:os.getpid()。
获取当前进程的父进程ip使用:os.getppid().多出来的p表示parent。
Process常见方法和属性
属性:
字段 | 说明 |
---|---|
daemon | 是否为守护进程, 父进程终止后自动终止,且自己不能产生新进程,必须在start()之前设置 |
name | 进程的名字,自定义 |
exitcode | 进程的退出状态码 |
pid | 每个进程有唯一的PID编号。 |
方法:
方法名 | 说明 |
---|---|
is_alive() | 返回进程是否在运行 |
join([timeout]) | 阻塞当前上下文环境,直到调用此方法的进程终止或者到达指定timeout |
start() | 启动进程,等待CPU调度 |
terminate() | 不管任务是否完成,立即停止该进程 |
run() | start()调用该方法,当实例进程没有传入target参数,stat()将执行默认的run()方法 |
另一种创建进程的方式: 派生Process子类
使用Process子类的方式创建进程:
from multiprocessing import Process from time import sleep class MyProcess(Process): # 定义init方法方便传参数进来 def __init__(self, id, *args, **kwargs): # 调用父类的init方法 super().__init__(*args, **kwargs) # 赋值 self.id = id # 重写run方法 def run(self): print('子进程%d开始运行' % self.id) print(self) sleep(2) print('子进程%d结束运行' % self.id) if __name__ == '__main__': print('父进程开始运行') # 创建子进程 for i in range(4): p = MyProcess(name='子进程'+str(i),id=i) p.start() print('父进程结束')
父子进程的先后顺序
通过一个例子来体会父子进程的执行顺序:
from multiprocessing import Process from time import sleep def run(): print('子进程启动') sleep(3) print('子进程结束') if __name__ == '__main__': print('父进程启动') p = Process(target=run) p.start() print('父进程结束')
执行结果为:
父进程启动 父进程结束 子进程启动 子进程结束
可以发现父进程并不等待子进程的结束而结束,如果需要实现这一下效果,可以使用p.join()。
如果想给这个等待加一个期限,可以设置一个timeout时间,单位为秒。
全局变量在多个进程中不能共享
通过代码来说明这一现象:
from multiprocessing import Process from time import sleep # 定义全局变量 num = 100 # 子进程 def run(): print('开始执行子进程') # 引入全局变量 global num # 修改全局变量 num += 1 print(num) print('子进程结束') if __name__ == '__main__': print('父进程开始') p = Process(target=run) p.start() print('父进程结束--%d'%num
执行结果:
父进程开始 父进程结束--100 开始执行子进程 101 子进程结束
发现子进程中修改的全局变量在父进程中依然还是100.
出现这种现象的原因是子进程不和父进程共享全局变量,而是单独拷贝了一份全局变量。
不光父子进程不共享全局变量,兄弟进程也不共享全局变量。请写程序验证这一点。
使用进程池创建大量进程
先来看一个例子:
# coding: UTF-8 from multiprocessing import Process, Pool import os, time, random def run(name): print('子进程%d启动--%s' % (name, os.getpid())) start = time.time() time.sleep(random.choice([1, 2, 3])) end = time.time() print('子进程%d结束--%s--耗时%.2f' % (name, os.getpid(), end-start)) if __name__ == '__main__': print('父进程启动') # 创建进程池 # 4表示同时可以执行的进程数量,不写默认是电脑的cpu内核数 pp = Pool(4) for i in range(5): pp.apply_async(run, args=(i,)) # 关闭进程池,调用close之后不能再继续添加新的进程。 pp.close() # 等进程池中的进程都完成再结束进程池, 注意必须在调用close之后调用join pp.join() print('父进程结束')
使用进程池,需要创建进程池实例对象,指明这个进程池中可以放多少个进程。使用apply_async()方法在进程池中创建子进程。添加完子进程后,需要使用close方法关闭进程,关闭之后无法往进程池中添加新的子进程。使用进程池的join方法必须在调用close之后。
练习:使用进程池,把大量文件从一个目录拷贝到另一个目录。首先实现单进程方式,再用进程池实现多进程方式。对比两者的效率。
进程间通信
进程间通信使用Queue对象。
先来看一个例子:
from multiprocessing import Process, Queue import time, random, os def write(q): print('开始子进程%s' % (os.getpid())) for value in 'abdcef': print(time.ctime(), 'put %s to queue' % value) q.put(value) time.sleep(random.random()) def read(q): while True: value = q.get() print(time.ctime(), 'get %s from queue' % value) if __name__ == '__main__': # 主进程创建queue并传递给子进程 q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) pw.start() pr.start() pw.join() pr.terminate()
使用maxsize,可以为队列设置最大长度,当为maxsize<=0时,队列的最大长度会被设置为一个非常大的值 .
put(self, obj, block=True, timeout=None) 1、block为True,若队列已满,并且timeout为正值,该方法会阻塞timeout指定的时间,直到队列中有出现剩余空间,如果超时,会抛出Queue.Full异常 2、block为False,若队列已满,立即抛出Queue.Full异常
get(self, block=True, timeout=None) block为True,若队列为空,并且timeout为正值,该方法会阻塞timeout指定的时间,直到队列中有出现新的数据,如果超时,会抛出Queue.Empty异常 block为False,若队列为空,立即抛出Queue.Empty异常
在一个进程的内部,要同时干多件事,就需要同时运行多个“子任务”,我们把进程内的这些“子任务”叫做线程。
线程通常叫做轻型的进程。线程是共享内存空间的并发执行的多任务,每一个线程都共享一个进程的资源。
线程是最小的执行单元,而进程由至少一个线程组成。如何调度进程和线程,完全由操作系统决定,程序自己不能决定什么时候执行,执行多长时间。
模块 1、_thread模块 低级模块,接近底层。 2、threading模块 高级模块,对_thread进行了封装
线程基本用法
基本跟进程类似。
from threading import Thread, current_thread import time def run(num): print('子线程(%s)开始' % (current_thread().name,)) time.sleep(2) print('打印', num) time.sleep(2) print('子线程(%s)结束' % (current_thread().name,)) if __name__ == '__main__': # 任何进程默认就会启动一个线程,称为主线程,主线程可以启动新的子线程 # current_thread():返回返回当前线程的实例 print('主线程(%s)开始' % (current_thread().name,)) # 创建子线程 t = Thread(target=run, args=(1,), name='runThread') t.start() # 等待线程结束 t.join() print("主线程(%s)结束" % (current_thread().name))
常见方法
方法名 | 说明 |
---|---|
isAlive() | 返回线程是否在运行。正在运行指启动后、终止前。 |
get/setName(name) | 获取/设置线程名。 |
start() | 线程准备就绪,等待CPU调度 |
is/setDaemon(bool) | 获取/设置是守护线程(默认前台线程(False))。(在start之前设置) |
join([timeout]) | 阻塞当前上下文环境的线程,直到调用此方法的线程终止或到达指定的timeout(可选参数) |
线程间共享数据
线程数据是共享的。先来看一个简单的例子。
from threading import Thread from time import sleep # 全局数据 num = 100 def run(): print('子线程开始') global num num += 1 print('子线程结束') if __name__ == '__main__': print('主线程开始') # 创建主线程 t = Thread(target=run) t.start() t.join() print(num) print('主线程结束')
执行结果:
主线程开始 子线程开始 子线程结束 101 主线程结束
可以看到子线程修改的全局变量,在主线程中也体现出来了。
线程之间数据共享引发的问题
多线程和多进程最大的不同在于,多进程中,同一个变量,各自有一份拷贝存在每个进程中,互不影响。而多线程中,所有变量都由所有线程共享。所以,任何一个变量都可以被任意一个线程修改,因此,线程之间共享数据最大的危险在于多个线程同时修改一个变量,容易把内容改乱了。
下面来模拟这个问题:
from threading import Thread # 全局变量 num = 100 def run(n): global num for i in range(100000000): num = num + n num = num - n if __name__ == '__main__': t1 = Thread(target=run, args=(6,)) t2 = Thread(target=run, args=(9,)) t1.start() t2.start() t1.join() t2.join() print('num=',num)
运行之后发现num不等于100了。这就是因为两个线程都对num进行操作,中间发生了紊乱。
使用线程锁解决数据混乱问题
threading中的Lock类表示锁。
锁确保了这段代码只能由一个线程从头到尾的完整执行。阻止了多线程的并发执行,包含锁的某段代码实际上只能以单线程模式执行,所以效率大大滴降低了。
由于可以存在多个锁,不同线程持有不同的锁,并试图获取其他的锁,可能造成死锁,导致多个线程挂起。只能靠操作系统强制终止。
我们给之前的代码加上锁 来解决数据混乱的问题。
from threading import Thread, Lock # 全局变量 num = 100 # 锁对象 lock = Lock() def run(n): global num global lock for i in range(100000000): # 获取锁 lock.acquire() try: num = num + n num = num - n finally: # 修改完一定 要释放锁 lock.release() if __name__ == '__main__': t1 = Thread(target=run, args=(6,)) t2 = Thread(target=run, args=(9,)) t1.start() t2.start() t1.join() t2.join() print('num=',num)
使用上下文管理器 with,可以自动获取锁,释放锁。可以将上面代码的try语句改成如下:
with lock: num = num + n num = num - n