一:多进程
进程(process),是计算机中已运行程序的实体,是线程的容器;一个进程至少有一个线程
1,父进程与其子进程:
各进程的内存空间是互相隔离的
进程创建时,为该进程生成一个PCB(进程控制块);进程终止时,回收PCB。每个进程都有一个非负的唯一进程ID(PID)。虽然是唯一的,但是PID可以重用,当一个进程终止后,其他进程就可以使用它的PID了。
linux:调用fork创建子进程
PID为0的进程为调度进程,该进程是内核的一部分,也称为系统进程;PID为1的进程为init进程,它是一个普通的用户进程,但是以超级用户特权运行;PID为2的进程是页守护进程,负责支持虚拟存储系统的分页操作。
子进程刚开始,内核并没有为它分配物理内存,而是以只读的方式共享父进程内存,只有当子进程写时,才分配物理内存,复制父进程的数据给子进程,即“copy-on-write”
windows:调用CreateProcess创建子进程
子进程创建时就为它分配物理内存,复制父进程的数据给子进程
僵尸进程:子进程结束了,父进程还未结束,此时,子进程的一些资源(如PID)还未回收,需要等到父进程结束才能回收,就称这样的子进程为僵尸进程
孤儿进程(linux):父进程结束了,子进程还在,此时,这些子进程托管给init进程来管理
2,创建子进程的两种方式:
from multiprocessing import Process import os # multiprocessing 是一个包,threading是一个模块,它俩有着相似的API #方式一:自定义创建子进程的类,继承于Process类 class Myprocess(Process): def __init__(self,runner): super().__init__() self.runner = runner self.name = '子进程' # 自定义子进程的名字,默认为Myprocess-1,Myprocess-2 def run(self): # 子进程的任务函数,自动执行,名字必须为run print('%s is running' % self.runner) print(self.name) print('PID为:%s,其父进程PID为:%s' %(os.getpid(),os.getppid())) if __name__ == '__main__': subprocess_obj1 = Myprocess('rock') subprocess_obj2 = Myprocess('mike') subprocess_obj1.start() subprocess_obj2.start() # 方式二:使用Process类 def task(name): print('%s is running' %name) print('PID为:%s,其父进程PID为:%s' %(os.getpid(),os.getppid())) if __name__ == '__main__': # 必须写在 if __name__ == '__main__': 里面 subprocess_obj = Process(target=task,args=('rock',),kwargs={},name='自定义进程名字') subprocess_obj.start() # 向操作系统申请开启该子进程 subprocess_obj.daemon = True # 把该子进程对象设置为守护进程 # 守护进程--被守护的进程代码一执行完守护进程就结束 print(subprocess_obj.name) # 取得该子进程对象的名字 print(subprocess_obj.pid) # 取得该子进程对象的PID subprocess_obj.join() # 进入阻塞状态,等待该子进程执行结束 subprocess_obj.is_alive() # 判断该子进程对象是否仍存活 subprocess_obj.terminate() # 向操作系统申请结束该子进程 print('嘿,我是父进程,我的PID为%s' %os.getpid())
3,Queue 与 JoinableQueue
IPC机制:inter-process communication 进程间通信机制
主要有管道,队列等。。。
Queue 与 JoinableQueue:
共享的内存空间;有互斥锁机制,以保障共享数据的安全
from multiprocessing import Queue,JoinableQueue # Queue # Returns a process shared queue implemented using a pipe and a few locks/semaphores q = Queue(5) # 队列对象,参数5表示队列中能存的值的个数 q.put('abc') # 往队列里面存值;当队列满了时,这行语句执行被阻塞 q.get() # 从队列里面取值;当队列为空时,这行语句执行被阻塞 # JoinableQueue # JoinableQueue, a Queue subclass, is a queue which additionally has task_done() and join() methods. q1 = JoinableQueue() q1.put('abc') q1.get() # task_done()用于与join()搭配使用: q1.task_done() # 队列每被取走一个值就通知一次队列,通过这种方式能知道队列的值是否被取完 q1.join() # 处于阻塞状态,等待队列所有值被取走
二:多线程
线程(thread)是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。
1,进程与其线程:
一个进程下的所有线程共享该进程的全部资源
2,创建子线程的两种方式:
from threading import Thread,current_thread #方式一:自定义创建子线程的类,继承于Thread类 class Mythread(Thread): def __init__(self,runner): super().__init__() self.runner = runner # self.name = '子线程' # 自定义子线程的名字,默认为Thread-1,Thread-2 def run(self): # 子线程的任务函数,自动执行,名字必须为run print('%s is running' % self.runner) print('my name is %s' %current_thread().name) if __name__ == '__main__': subthread_obj1 = Mythread('rock') subthread_obj2 = Mythread('mike') subthread_obj1.start() subthread_obj2.start() # 方式二:使用Process类 def task(name): print('%s is running' %name) print('my name is %s' %current_thread().name) # Thread-1 if __name__ == '__main__': subthread_obj = Thread(target=task,args=('rock',)) subthread_obj.start() # 向操作系统申请开启该子线程 print(subthread_obj.getName()) print(subthread_obj.name) # 取得该子线程对象的名字 subthread_obj.daemon = True # 设置该子线程对象为守护线程 # 守护线程--被守护的线程结束(代码执行完不一定马上就结束)守护线程才结束 subthread_obj.join() # 进入阻塞状态,等待该子线程执行结束 subthread_obj.is_alive() # 判断该子线程对象是否仍存活 print('嘿,我是主线程,我的名字是%s' %current_thread().name) # MainThread
3,线程队列 queue
import queue # The queue module implements multi-producer, multi-consumer queues. It is especially useful # in threaded programming when information must be exchanged safely between multiple threads. q = queue.Queue() # 普通队列 先进先出 q.put(1) q.get() # task_done()用于与join()搭配使用: q.task_done() # 队列每被取走一个值就通知一次队列,通过这种方式能知道队列的值是否被取完 q.join() # 处于阻塞状态,等待队列所有值被取走 q1 = queue.LifoQueue() # 栈队列 先进后出 q1.put('a') q1.get() q1.task_done() q1.join() q2 = queue.PriorityQueue() # 优先级队列,优先级最高的先出来 q2.put((1,'abc')) # 参数格式:(priority_number, data) 数字越小优先级越高 q2.get() q2.task_done() q2.join()
三:三种锁的介绍
# 1,互斥锁 from multiprocessing import Lock # from threading import Lock lock = Lock() # 创建互斥锁对象,该对象有两种状态:locked 和 unlocked ;初始状态为 unlocked def task(lock): lock.acquire() # acquire(block=True, timeout=None) # 实现原理: # 当锁对象是unlocked状态时,该函数调用正常,并立即将锁对象的状态转为locked状态; # 当锁对象是locked状态时,该函数调用被阻塞 pass lock.release() # 把锁对象的状态由locked 转为 unlocked。若锁对象已经是unlocked状态时,会报错 # 2,递归锁 # from multiprocessing import RLock from threading import RLock r_lock = RLock() # 这个类实现了可重入的锁对象,其初始递归级别为0 r_lock.acquire() # 某线程第一次调用该函数时: # 当锁对象的递归级别为0时,该函数调用正常,且立即将递归级别加1;其后该线程可多次调用该函数,每次将递归级别加1 # 当锁对象的递归级别不为0时,阻塞 r_lock.release() # 将锁对象的递归级别减1 # 3,信号量 # from multiprocessing import Semaphore from threading import Semaphore s_lock = Semaphore(5) # 产生信号量对象,内部计数器初始值为5;不传值则默认值为1 s_lock.acquire() # Acquire a semaphore # 当内部计数器值大于0时,该函数调用正常,且内部计数器立即执行减1操作; # 当内部计数器值为0时,该函数调用被阻塞,直到内部计数器大于0时,恢复调用正常,内部计数器减1 s_lock.release() # Release a semaphore # 内部计数器立即执行加1操作
# 各种锁可以使用with声明语句,语法如下: with some_lock: # do something... # 其等同于: some_lock.acquire() try: # do something... finally: some_lock.release()
四:进程池与线程池
进程池:装子进程的池子,能控制子进程的个数,是一种保护机制,确保进程个数在机器性能可承受范围内
线程池:装子线程的池子,能控制子线程的个数,是一种保护机制,确保线程个数在机器性能可承受范围内
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor # ProcessPoolExecutor # An Executor subclass that executes calls asynchronously using a pool of at most max_workers processes. # ThreadPoolExecutor # An Executor subclass that uses a pool of at most max_workers threads to execute calls asynchronously def task(n): n +=1 return n def fn(future): res = future.result() pass executor = ThreadPoolExecutor(5) # 可创建的子线程数量最大为5,不传参数则默认为机器的cpu数*5 future = executor.submit(task,3) # submit(fn, *args, **kwargs) 给线程池中的线程提交任务fn,即执行fn(*args, **kwargs); # 返回一个表示可被调用的执行的Future对象 future.result() # 取得任务执行完毕的返回值 future.add_done_callback(fn) # 回调函数 # 把可调函数fn绑定给future对象;当future被取消或执行完毕时,fn会被调用(future作为fn的唯一参数) executor.shutdown() # 等当前所有挂起的future对象执行完再回收所有的资源 # You can avoid having to call this method explicitly if you use the with statement, # which will shutdown the Executor with ThreadPoolExecutor(5) as e: e.submit(task,3) e.submit(task,4) # 进程池与线程池的使用方式基本相同; # 不同点:进程池,父进程执行回调函数绑定的函数fn(因为future是父进程的变量); # 线程池,某个线程继续执行函数fn if __name__ == '__main__': p = ProcessPoolExecutor(5) # 可创建的子进程数量最大为5,不传参数则默认为机器的cpu数 future = p.submit(task,4) print(future.result()) #Return the value returned by the call.