由于最近一直在做思维导图,所以博客以后求精不求速度!!!
2.1. Mutil-channel technology 6
2.1.1. Come into being in the background 6
2.1.3. Time reuse(base on spatial) 7
2.1.3.1. Cpu encountered I/O operation 7
2.1.3.2. Process takes up too much cpu time 7
2.1.3.3. Higher priority processes 7
3.1. Create process(windows) 8
3.2. Orphan(孤儿) process(understand) 8
3.3. Zombie(僵尸) process(understand) 8
3.4. Three states of a process 8
3.5.3. Parent-child process isolation via(通过) 'global' validation(验证) 10
3.5.6. terminate() and is_alive()(understand) 13
3.5.7. os.getpid() and os.getppid() 14
http://www.cnblogs.com/linhaifeng/articles/6817679.html
It reefers to a running program or a running process of the program, which is an abstract concept.
The program is dead, the process is alive.
A program for achieve more task simultaneously, it only a abstract concept.
Concurrence is a pseudo(伪) parallel, it looks like multitasking. Cpu and multi-channel technologies can be implemented.
The real multitasking runs simultaneously, only multiple cpus can be implemented.
Reduce more I/P operation for improve efficient.
I/O operation: E.s f.read(), f.write()...
time.sleep simulate I/O operation,it not real I/O operation.
solve serial problem and achieve concurrency effect under single kernel
Mutiple processes share one memory bar, but the memory that is occupied is physically isolated
Multiple processes reuse the same CPU time.
cpu encountered I/O operation switching, it increases efficiency.
A process that takes up too much CPU time also switches,In order to achieve the concurrency effect, it will reduce the efficiency of the program.
Switch anther higher priority process process when a running process encountered a higher priority process.
Only after a task has been completed can the next task be run.
It seems that multiple tasks can run at the same time, and a single core can achieve concurrency.
In a real sense(真正意义上的), multiple tasks are running simultaneously(同时地), and only multi-core can achieve parallel
The CPU is used for calculation, and the CPU cannot perform IO operations. Once an IO operation is encountered, the CPU should be allowed to perform other tasks.
fork process(linux)
Windows through CreateProcess create a new process to load(装入) the code to run and copy the data from the parent process. The windows copy is a little different from the parent process, but the linux copy is the same as the parent process.
If a parent process exits and one or more of its child processes are still running, those child processes become orphaned processes.The orphaned process will be adopted by the init process (process number 0) and the init process will complete the state collection(状态收集) on them.
Child use the 'fork'(a tool) to create a process, if the exit of the child(the child process exits not entirely(完全地), taking up the pid number and leaving other information) and the parent is not calling(调用) 'wait' or 'waitpid' get child process status information, so the child process status information is still stored in the system.This process is called a zombie process。
If the parent process is not dead and the zombie program is not processed(处理), it will take up operating system pid and memory space,it will be harmful.
operation - ready - block
1,The process is blocked waiting for input
2,The operating system selects another program
3,The operating system selects this program
4,There is valid(有效) input
Create a child process object that copies the data of the parent process as a module(把父进程当成模块) to implement multiple process.
Because the child process copies all the data of the parent process, we must use '__main__' to prevent recursive, if not use '__main__', the child process copies the parent process, and it will get the object of the child process again and copy again.
from multiprocessing import Process
import time
def task(name):
print("%s start" % name)
time.sleep(2)
print("%s end" % name)
if __name__ == '__main__':
obj = Process(target=task, args=(("child process",)))
obj.start()
print("parent process")
from multiprocessing import Process
import time
class MyProcess(Process):
def run(self):
print("%s start" % self.name)
time.sleep(2)
print("%s end" % self.name)
if __name__ == '__main__':
obj = MyProcess()
obj.start()
print("parent process")
from multiprocessing import Process
x = 100
def task():
global x
x = 0
print("child process %s" % x) # x = 0
if __name__ == '__main__':
obj = Process(target=task)
obj.start()
print("parent process %s" % x) # x = 100
from multiprocessing import Process
import time
import random
class Piao(Process):
def __init__(self,name):
self.name=name
super().__init__()
def run(self):
print('%s is piaoing' %self.name)
time.sleep(random.randrange(1,3))
print('%s is piao end' %self.name)
if '__name__' == '__main__':
obj=Piao('egon')
obj.start()
obj.join() # Wait for the child process to finish running before running the next line
print('start')
from multiprocessing import Process
import time
import random
def piao(name):
print('%s is piaoing' %name)
time.sleep(random.randint(1,3))
print('%s is piao end' %name)
if '__name__' == '__main__':
p1=Process(target=piao,args=('egon',))
p2=Process(target=piao,args=('alex',))
p3=Process(target=piao,args=('yuanhao',))
p4=Process(target=piao,args=('wupeiqi',))
p1.start()
p2.start()
p3.start()
p4.start()
p1.join()
p2.join()
p3.join()
p4.join()
The above 'start' process and 'join' process can be abbreviated as:
p_l=[p1,p2,p3,p4]
for p in p_l:
p.start()
for p in p_l:
p.join()
The detailed analysis is as follows:
# process starts as soon as start, so when p1-p4.start(), there are already four concurrent processes in the system
# while we p1. Join () is waiting for p1 to end, it is true that p1 will stay stuck as long as the main thread does not end, which is also the key to the problem
# the join is to let the main thread, etc., and the p1 - p4 is still the concurrent execution, p1. Join, the rest of the p2, p3, p4 is still running, such as # p1. Join the end, may the p2, p3, p4 have already ended, so p2. Join, p3. Join. P4. Join directly through testing, without waiting for
conclusion: so the total time spent on the four joins is still the time spent running the longest running process
from multiprocessing import Process
import time
import random
class Piao(Process):
def __init__(self,name):
# self.name=name
# super().__init__() #The '__init__' method of process executes 'self.name= piao-1', so when added here, it overrides our 'self.name=name'
#The correct way to name the process we started
super().__init__()
self.name=name
def run(self):
print('%s is piaoing' %self.name)
time.sleep(random.randrange(1,3))
print('%s is piao end' %self.name)
p=Piao('egon')
p.start()
print('start')
print(p.pid) # View the subprocess pid
from multiprocessing import Process
import time
import random
class Piao(Process):
def __init__(self,name):
self.name=name
super().__init__()
def run(self):
print('%s is piaoing' %self.name)
time.sleep(random.randrange(1,5))
print('%s is piao end' %self.name)
p1=Piao('egon1')
p1.start()
p1.terminate()#Close the process, not immediately, so the results of 'is_alive' immediate results(立刻查看的结果) may still be alive
print(p1.is_alive()) #Result is True
print('start')
print(p1.is_alive()) #Result is False
from multiprocessing import Process
import time
import os
def task():
print('child process id:%s parent process id:%s ' %(os.getpid(),os.getppid()))
time.sleep(200)
if __name__ == '__main__':
p1=Process(target=task)
p1.start()
print('parent',os.getpid(),os.getppid())
当我们使用cmd开启一个py文件时,如果关闭重启一个cmd窗口关闭打开py文件的cmd进程时,按理来说应该会把cmd窗口完完整整的关闭,但是实际是cmd窗口还会存留,因为我们只是关闭了cmd进程,遗留的python进程还需要使用cmd端口打印消息,所以端口没有关闭
当我们使用join的时候,join会执行wait命令回收子进程占用操作系统的pid号,但是只是针对操作系统而言,所以如果我们在父进程运行完join后还可以查看子进程的pid号
守护进程也是一个子进程,不过这里的守护有着伴随的意思,守护进程会在父进程的代码运行完毕后死掉
关键字就两个:
进程:
当父进程需要将一个任务并发出去执行,需要将该任务放到一个子进程里
守护:
当孩子进程的代码子在父进程代码运行完毕后就没有存在的意义了,就应该将孩子进程设置成为守护进程,会在父进程代码结束后死掉。
from multiprocessing import Process
import time
def task(name):
print("start%s" % name)
if __name__ == '__main__':
p = Process(target=task,args=('守护',))
p1 = Process(target=task,args=(('正常',)))
p.daemon = True
p.start()
p1.start()
print('parent')
# 输出
parent
start正常
from multiprocessing import Process
import time
def foo():
print(123)
time.sleep(1)
print("end123")
def bar():
print(456)
time.sleep(3)
print("end456")
if __name__ == '__main__':
p1=Process(target=foo)
p2=Process(target=bar)
p1.daemon=True
p1.start()
p2.start()
print("main-------")
'''
main-------
456
enn456
'''
'''
main-------
123
456
enn456
'''
'''
123
main-------
456
end456
'''
可以将要执行任务的部分代码(只涉及到修改共享数据的代码)变成串行,牺牲了效率,保证了数据安全
def task():
check()
with mutex:
get()
join是把要执行任务的所有代码整体串行
互斥锁是局部串行
from multiprocessing import Process, Lock
import random
import time
import json
import os
dic = {"count": 1}
with open('互斥锁测试文件.txt', 'wt', encoding='utf8') as f:
json.dump(dic, f)
def check():
time.sleep(random.random())
with open('互斥锁测试文件.txt', 'rt', encoding='utf8') as f:
data = json.load(f)
print("%s现在还有余票%s张" % (os.getpid(), data['count']))
def purchase():
time.sleep(random.random())
with open('互斥锁测试文件.txt', 'rt', encoding='utf8') as f:
data = json.load(f)
if data['count'] > 0:
data['count'] -= 1
time.sleep(random.random())
with open('互斥锁测试文件.txt', 'wt', encoding='utf8') as f:
json.dump(data, f)
print("%s购票成功" % os.getpid())
else:
print("%s购票失败" % os.getpid())
def task(mutex):
check()
with mutex:
purchase()
# mutex.acquire() # 互斥锁不能连续的acquire,必须是release以后才能重新acquire
# get()
# mutex.release()
if __name__ == '__main__':
mutex = Lock()
for i in range(10):
p = Process(target=task, args=((mutex,)))
p.start()
inter program communication
进程间通信,把共享数据放入内存当中,而不是从硬盘中获取
pipe
queue: pipe+锁
1、ICP机制是先进先出
2、队列占用的是内存空间
3、不应该往队列中放大数据,应该只存放数据量较小的消息
4、get和put中的block=False相当于no_wait,也就是说如果管道中没有东西可以获取或者没有东西可以放进管道会直接报错。
由于把这个当生产者消费者模型去写了,让我回过头写这么简单的东西,不可能的,直接复制粘贴老师的了
from multiprocessing import Queue
# 掌握的
# q.put('first')
# q.put({'k':'sencond'})
# q.put(['third',])
# # q.put(4)
#
# print(q.get())
# print(q.get())
# print(q.get())
# print(q.get())
#
#了解的
# q=Queue(3) #先进先出
# q.put('first',block=True,timeout=3)
# q.put({'k':'sencond'},block=True,timeout=3)
# q.put(['third',],block=True,timeout=3)
# print('===>')
# # q.put(4,block=True,timeout=3)
#
#
# print(q.get(block=True,timeout=3))
# print(q.get(block=True,timeout=3))
# print(q.get(block=True,timeout=3))
# print(q.get(block=True,timeout=3))
# q=Queue(3) #先进先出
# q.put('first',block=False,)
# q.put({'k':'sencond'},block=False,)
# q.put(['third',],block=False,)
# print('===>')
# # q.put(4,block=False,) # 队列满了直接抛出异常,不会阻塞
#
# print(q.get(block=False))
# print(q.get(block=False))
# print(q.get(block=False))
# print('get over')
# print(q.get(block=False))
q=Queue(3) #先进先出
q.put_nowait('first') #q.put('first',block=False,)
q.put_nowait(2)
q.put_nowait(3)
# q.put_nowait(4)
print(q.get_nowait())
print(q.get_nowait())
print(q.get_nowait())
print(q.get_nowait())
生产者:比喻的是程序中负责产生数据的任务
消费者:比喻的是程序中负责处理数据的任务
生产者->共享的介质(队列)<-消费者
实现了生产者与消费者的解耦和,生产者可以不停地生产,消费者也可以不停地消费
从而平衡了生产者的生产能力(造数据)与消费者消费能力(处理数据),提升了程序整体运行的效率
当我们的程序中存在明显的两类任务,一类负责产生数据,另外一类负责处理数据,此时就应该考虑使用生产者消费者模型来提升程序的效率
from multiprocessing import Process, JoinableQueue
import time
import random
def consumer(name, q):
while True:
food = q.get()
time.sleep(random.random())
if food == "满汉全席":
print("%s成为了天选之人" % (name))
else:
print("\033[46m%s 吃了 %s\033[0m" % (name, food))
q.task_done()
def producer(name, food,q):
for i in range(3):
time.sleep(random.random())
if food == "满汉全席":
print("%s 完成了惊天之举%s" % (name,food))
else:
print("\033[41m%s 做了 %s%s\033[0m" % (name, food,i+1))
q.put(food)
if __name__ == '__main__':
q = JoinableQueue()
p1 = Process(target=producer, args=(("陈有德", "满汉全席", q)))
p2 = Process(target=producer, args=(("小福贵", "东坡肉", q)))
p3 = Process(target=producer, args=(("街头乞丐", "小青菜", q)))
c1 = Process(target=consumer, args=(("小猪崽", q)))
c2 = Process(target=consumer, args=(('狂暴野猪', q)))
p1.start()
p2.start()
p3.start()
c1.daemon = True
c2.daemon = True
c1.start()
c2.start()
p1.join()
p2.join()
p3.join()
q.join()
# def producer_process(name,food,queue=q):
# p = Process(target=producer, args=((name, food, queue)))
# p.start()
# p.join()
# def consumer_process(name,queue=q):
# c = Process(target=consumer, args=((name, queue)))
# c.daemon = True
# c.start()
#
# producer_process("陈有德", "满汉全席")
# producer_process("小福贵", "东坡肉")
# producer_process("街头乞丐", "小青菜")
# consumer_process("小猪崽")
# consumer_process("狂暴野猪")
# q.join() # 等待队列被取干净,结束意味着:主进程的代码运行完毕--->(生产者运行完毕)+队列中的数据也被取干净了->消费者没有存在的意义
一台机器的性能是有限的,如果我们一台计算机上放10000个生产者或者消费者,那几乎是不可能的,所以这个时候我们可以把10000个生产者放到100台计算机上或者把10000个消费者放到100台计算机上,性能和稳定性都提升了,这就是分布式运行
缺点:生产者消费者计算机多了,就应该使用基于网络的队列(基于套接字的队列),因此应该在其中再放一台电脑专门建立队列来传输数据下载成功的信息,生产者生产数据把数据下载成功以及数据下载的地址的消息放进队列,消费者因此再去获取消息,拿到数据的地址再去特定的地址获取数据
进程其实不是一个执行单位,而是一个资源单位,每个进程内自带一个线程,线程才是cpu的执行单位。如果把操作系统比喻成一座工厂,那么线程相当于车间,而线程相当于车间里的流水线,并且该流水线不只一条。
我们可以把python运行想象成一个进程,它会向内存申请一个内存空间,而python中的代码则是一个线程,代码会自上而下的运行,并且这个运行时基于内存空间的,代码的运行是由cpu执行的,因为cpu执行的时代码(某种计算)而不是内存资源。
进程与线程都是一个抽象概念,通过这个抽象概念可以更简洁的来描述一个过程。
我们现在想象一下文本编辑器的工作原理:首先写入文件、其次取出来往硬盘打印、最后实现一个定期从内存往硬盘刷入的功能。
这个时候如果三个功能使用三个进程,由于数据在进程间传来传去会变得很麻烦;如果三个功能使用三个线程,数据是共用的,这时实现这个功能会很方便。
守护线程会在该进程内所有非守护线程结束才跟着结束,即守护整个进程的运行周期(所有的非守护线程都运行结束)
# from threading import Thread,current_thread
# import time
#
#
# def task():
# print('%s is running' % current_thread().name)
# time.sleep(3)
# print('%s is done' % current_thread().name)
#
#
# if __name__ == '__main__':
# t = Thread(target=task,name='守护线程')
# t.daemon=True
# t.start()
# print('主')
from threading import Thread
import time
def foo():
print(123)
time.sleep(3)
print("end123")
def bar():
print(456)
time.sleep(1)
print("end456")
t1=Thread(target=foo)
t2=Thread(target=bar)
t1.daemon=True
t1.start()
t2.start()
print("main-------")
'''
123
456
main-------
end456
'''
线程运行速度太快,会影响测试结果,所以一般情况会认为自带互斥锁效果,其实不然,同进程一样,线程也需要互斥锁来保护数据的安全性,同时在这个过程中也牺牲了效率。互斥锁将多个任务对共享数据的修改由并发变成串行
如果不使用join()会先打印main下面的print(x),也就造成了输出为100的打印结果(*****)
from threading import Thread, Lock
import time
x = 100
mutex = Lock()
def task():
global x
temp = x
time.sleep(1)
x = temp - 1
if __name__ == '__main__':
t = Thread(target=task)
t.start()
print(x)
# 速度运行过快时会造成数据传输安全的假象,因此给他放缓速度
from threading import Thread, Lock
import time
x = 100
def task():
global x
temp = x
time.sleep(1)
temp -= 1
x = temp
if __name__ == '__main__':
tlst = []
for i in range(100):
t = Thread(target=task)
tlst.append(t)
t.start()
for i in range(len(tlst)):
tlst[i].join()
print(x)
# 加互斥锁版本
from threading import Thread, Lock
x = 100
mutex = Lock()
def task():
global x
with mutex:
x -= 1
if __name__ == '__main__':
for i in range(100):
t = Thread(target=task)
t.start()
print(x)
运行代码的过程中就已经造出一条线程,它是主线程。
不论多少个子线程都是子线程,都是主线程下的分支。
线程可以不用main,因为线程之间共享同一个进程的内存空间。
from threading import Thread
import time
def task(name):
print("%s running" % name)
time.sleep(1)
print("%s end" % name)
if __name__ == '__main__':
t = Thread(target=task, args=('subthread',))
t.start()
print("parent")
from threading import Thread
import time
class MyThread(Thread):
def run(self):
print("%s running" % self.name)
print("%s end" % self.name)
if __name__ == '__main__':
t1 = MyThread()
t2 = MyThread()
t3 = MyThread()
t4 = MyThread()
t5 = MyThread()
t6 = MyThread()
t6.start()
t5.start()
t4.start()
t3.start()
t1.start()
t2.start()
print("parent")
主要有:isAlive,getName,setName
导入模块的:active_count, enmurate,current_thread
from threading import Thread, current_thread, active_count, enumerate
import time
def task():
print("%s 我是子线程的原名" % current_thread().name)
current_thread().setName("子线程")
time.sleep(1)
print("%s 我是修改后的子线程名" % current_thread().name)
if __name__ == '__main__':
t = Thread(target=task)
t.start()
print("我能判断线程是否存活", t.is_alive())
print("我能把所有线程放入列表", enumerate())
print("我能显示存活的线程数", active_count())
print("这是我主线程的原名", current_thread().getName())
current_thread().setName("主线程") # 修改主线程的名字
print("parent", current_thread().name, "我原名MainThread,现名主线程")
# 创建两把锁,创建多个线程,第一个线程拿着B锁等着线程二释放A锁, 第二个线程拿着A锁等着线程一释放B锁,总而言之是两个线程各自拿着对方需要的锁。
from threading import Thread,Lock,active_count
import time
mutexA = Lock()
mutexB = Lock()
class MyThread(Thread):
def run(self):
self.f1()
self.f2()
def f1(self):
mutexA.acquire()
print("A")
mutexB.acquire()
print("B")
mutexB.release()
mutexA.release()
def f2(self):
mutexB.acquire()
print("A")
time.sleep(1)
mutexA.acquire()
print("B")
mutexA.release()
mutexB.release()
if __name__ == '__main__':
for i in range(10):
t = MyThread()
t.start()
print(active_count())
递归锁其实是一把锁当多把锁用,每当有线程拿到锁后,该锁引用计数加1,每释放该锁后,引用计数减1,当引用计数为0时,多个线程又用了了公平抢锁的权力。总而言之,递归锁可以多次使用acquire方法。
from threading import Thread, active_count, RLock
import time
obj = RLock()
mutexA = obj
mutexB = obj
class MyThread(Thread):
def run(self):
self.f1()
self.f2()
def f1(self):
mutexA.acquire()
print("A")
mutexB.acquire()
print("B")
mutexB.release()
mutexA.release()
def f2(self):
mutexB.acquire()
print("A")
time.sleep(1)
mutexA.acquire()
print("B")
mutexA.release()
mutexB.release()
if __name__ == '__main__':
for i in range(10):
t = MyThread()
t.start()
print(active_count())
控制同一时间互斥锁的量,可以使用多把互斥锁,并且有线程释放锁会马上有另一个线程拿到锁
from threading import Thread, Semaphore, current_thread
import time
import random
sm = Semaphore(5)
def task():
with sm:
print("%s正在舒服" % current_thread().name)
time.sleep(random.random())
if __name__ == '__main__':
for i in range(20):
t = Thread(target=task)
t.start()
启动一个线程对象,但是这个线程内设置了等待时间,只有时间结束了,才会运行改线程
from threading import Timer, current_thread
def task(x):
print('%s run....' % x)
print(current_thread().name)
if __name__ == '__main__':
t = Timer(3, task, args=(10,))
t.start()
print('主')
# 先进先出
import queue
q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')
print(q.get())
print(q.get())
print(q.get())
'''
结果(先进先出):
first
second
third
'''
# 后进先出
last in first out
import queue
q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')
print(q.get())
print(q.get())
print(q.get())
'''
结果(后进先出):
third
second
first
'''
# 有限极高的先出,数字越低优先级越高
import queue
q=queue.PriorityQueue()
#put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))
print(q.get())
print(q.get())
print(q.get())
'''
结果(数字越小优先级越高,优先级高的优先出队):
(10, 'b')
(20, 'a')
(30, 'c')
'''
进程:资源的申请与销毁
线程:单指代码的执行过程
进程:多进程内存空间彼此隔离
线程:同进程内的多线程共享进程内存
from threading import Thread
x = 100
def task():
global x
x=0
if __name__ == '__main__':
t = Thread(target=task) # 在parent打印前线程运行
t.start()
print("parent",x) # x输出为0
造线程的速度远远快于造进程的速度,因为造进程需要在内存中申请一个内存空间,而造线程只需要向操作系统发送请求让他完成某个任务,拿个通俗的比喻则是造车间耗费的时间一定慢于造一个流水线的时间。
from threading import Thread
from multiprocessing import Process
import time
def task(name):
print("%s running"%name)
time.sleep(1)
print("%s end"%name)
if __name__ == '__main__':
t = Thread(target=task,args=('subthread',)) # 在parent打印前线程运行
p = Process(target=task,args=('subprocess',)) # 在paren打印后运行
p.start()
t.start()
print("parent")
Global Interpreter Lock
GIl本质就是把互斥锁,既然都是互斥锁,原理都一样,两者都是住址多个并发的线程,同一时间只有一个能执行,即,有了GIL的存在,同一个进程内的多个西纳城同一时刻只有一个才能运行,意味着在cpython解释器中一个进程下的多个线程无法实现并行》》意味着无法利用多核优势,但不影响并发的实现,GIl可以比喻成执行权限,所有线程想运行都得抢到执行权限。
因为cpython解释器自带垃圾回收机制,垃圾回收机制相当于一个线程,如果线程和垃圾回收机制线程一起运行,那么可能会存在线程在定义一个值的时候,垃圾回收机制线程也在运行,这个时候这个值还没有赋予变量名,会因为引用计数为0被python回收。所以GIL锁保证了线程安全。
GIL相当于执行权限,GIL会在线程遇到I/O操作等状态会强行释放GIL锁;
自定义互斥锁即使无法执行也不会主动释放,只会在互斥锁内代码运行结束时才会释放。
计算密集型是指计算部分比较多的程序,一般使用多进程,因为计算比较多的程序一般不会遇到内存切换,并且能把多核优势最大化。并且每一个进程就一个主线程,四个进程没必要去抢GIL
I/O密集型则是I/O操作比较多的程序,I/O操作多会有大量的数据修改,这个时候不能使用并行,并行又只能在多核下使用,因此多核优势无法体现,一般采用多线程,又因为I/O密集型程序需要切换多次,在线程之内切换速度会快于在内存之间切换,如果我们非要使用多进程,多进程的启动时间也慢于多线程的启动时间,并且I/O操作多使用多进程意义不大。就算使用多线程也会有并发的效果。
查看计算机是几核CPU
池的功能是限制启动的进程数或线程数,当并发的任务数远远超过了计算机的承受能力时,即无法一次性开启过的进程数或线程数时,就应该用池的概念将开启的进程数或线程数限制在计算机可承受的的范围内。控制池子内的线程/进程个数后,以后无论有多少任务,都是由池里的进程/线程工作,也就是说比如进程池只有4,你有10个任务,10个任务只能由这4个进程完成。
提交任务的一种方式,提交完任务后就在原地等待,知道任务运行完毕后拿到任务的返回值,再继续运行下一个任务。
异于串行,同步是提交任务,串行是执行任务。
提交完任务(绑定一个回调函数)后根本就不在原地等待,直接运行下一行代码,等到任务有返回值后会自动触发回调函数。异步通常与回调同用。
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import os,time,random
def task(n):
print('%s is runing' %os.getpid())
time.sleep(random.randint(1,3))
return n**2
if __name__ == '__main__':
executor=ProcessPoolExecutor(max_workers=3)
futures=[]
for i in range(11):
future=executor.submit(task,i)
futures.append(future)
executor.shutdown(True)
print('+++>')
for future in futures:
print(future.result())
异步提交任务
相当于进程池的pool.close()+pool.join()操作
wait=True,等待池内所有任务执行完毕回收完资源后才继续
wait=False,立即返回,并不会等待池内的任务执行完毕
但不管wait参数为何值,整个程序都会等到所有任务执行完毕
submit和map必须在shutdown之前
不允许再提交任务,并且等进程池结束了在运行下一行代码
取得结果
取代for循环submit的操作
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import os,time,random
def task(n):
print('%s is runing' %os.getpid())
time.sleep(random.randint(1,3))
return n**2
if __name__ == '__main__':
executor=ThreadPoolExecutor(max_workers=3)
# for i in range(11):
# future=executor.submit(task,i)
executor.map(task,range(1,12)) #map取代了for+submit
map的用法
回调函数,通过异步提交任务后,然后返回一个对象,对这个对象使用add_done_callback回调函数后,改对象被当做参数传入回调函数,我们可以使用result获取该对象返回的结果然后通过回调函数进行处理。
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from multiprocessing import Pool
import requests
import json
import os
def get_page(url):
print('<进程%s> get %s' %(os.getpid(),url))
respone=requests.get(url)
if respone.status_code == 200:
return {'url':url,'text':respone.text}
def parse_page(res):
res=res.result()
print('<进程%s> parse %s' %(os.getpid(),res['url']))
parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
with open('db.txt','a') as f:
f.write(parse_res)
if __name__ == '__main__':
urls=[
]
# p=Pool(3)
# for url in urls:
# p.apply_async(get_page,args=(url,),callback=pasrse_page)
# p.close()
# p.join()
p=ProcessPoolExecutor(3)
for url in urls:
p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一个future对象obj,需要用obj.result()拿到结果
进程池的回调由主进程运行,线程池的回调哪个线程空闲了哪个线程运行该回调函数。
想要在单线程下实现并发=》切换+保留状态
在应用程序里控制多个任务的切换+保存状态
应用程序的切换速度远远高于操作系统
多个任务一旦有一个遇到I/O操作没有切换,整个任务都会阻塞,即该线程内其他任务都会阻塞,因此一旦引入协程就要检测单线程下所有I/O操作,实现遇到I/O操作就切换。
只遇到I/O操作切换才提升效率,才是有意义的协程
如果不是遇到I/O操作切换,不会提升效率,这也是协程,但是是一种无意义的协程
异步提交
模拟I/O操作,如果直接time.sleep()则会变成串行
把time.sleep()变成模拟I/O操作
客户端产生数据=》客户端操作系统接受数据=》基于网络传输=》服务操作系统缓存数据
由本地将操作系统中数据拷贝到应用程序的内存中,拷贝的速度远远快于等待的速度
本地copy数据
遇到I/O操作就原地等待,最早学习的基于I/O操作的套接字服务端和客户端就是阻塞I/O
wait data的时候会向操作系统发信息,并且操作系统会有响应消息,在最早的基于I/O操作的套接字服务端上加入一个server.blocking(False),让阻塞I/O变成非阻塞I/O。
对CPU的占用高
for循环的列表过大,会导致并发现象消失
如果数据刚好在切换任务的时候来了,会导致用户接受消息不及时
from gevent import spawn,monkey;monkey.patch_all()
from socket import *
from threading import Thread
def talk(conn):
while True:
try:
data=conn.recv(1024)
if len(data) == 0:break
conn.send(data.upper())
except ConnectionResetError:
break
conn.close()
def server(ip,port,backlog=5):
server = socket(AF_INET, SOCK_STREAM)
server.bind((ip, port))
server.listen(backlog)
print('starting...')
while True:
conn, addr = server.accept()
spawn(talk, conn,)
if __name__ == '__main__':
g=spawn(server,'127.0.0.1',8080)
g.join()
调用select方法,使非阻塞I/O中的死循环——不断地给操作系统发送请求问有没有数据传送给操作系统,变成把这个发送请求的任务交个select功能,他会把请求放到列表中,并且循环遍历等待操作系统给数据,如果操作系统返回了数据,select功能则马上把数据返回给通信循环。
#服务端
from socket import *
import select
server = socket(AF_INET, SOCK_STREAM)
server.bind(('127.0.0.1',8093))
server.listen(5)
server.setblocking(False)
print('starting...')
rlist=[server,]
wlist=[]
wdata={}
while True:
rl,wl,xl=select.select(rlist,wlist,[],0.5)
print(wl)
for sock in rl:
if sock == server:
conn,addr=sock.accept()
rlist.append(conn)
else:
try:
data=sock.recv(1024)
if not data:
sock.close()
rlist.remove(sock)
continue
wlist.append(sock)
wdata[sock]=data.upper()
except Exception:
sock.close()
rlist.remove(sock)
for sock in wl:
sock.send(wdata[sock])
wlist.remove(sock)
wdata.pop(sock)
#客户端
from socket import *
client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8093))
while True:
msg=input('>>: ').strip()
if not msg:continue
client.send(msg.encode('utf-8'))
data=client.recv(1024)
print(data.decode('utf-8'))
client.close()
select网络IO模型
与多路复用I/O半斤八两,他只是解决了如果循环列表过大时出现用户无法及时收到回复消息的问题。提供更高效的操作系统与应用层的接口。
epoll模型内有一大堆函数,把每个列表中等待接受数据的链接建立一个类似于回调函数的东西,如果有数据传输给操作系统会触发回调函数,这些数据会自动告诉应用层有数据传输给了操作系统
把wait和copy阶段全部交给其他功能完成,自己只完成计算的事情
首先是由于第三代计算机有了多道技术这个概念,我们想实现高效率并发,因此有了多进程,但是多进程适合计算密集型任务,如果一个任务有过多的I/O操作的话,多进程无法满足我们的需求,所以有了多线程的概念,这个时候拿我们写的cs架构的套接字举例,一个线程服务端对应一个客户端,如果客户端过多的话,那么会导致计算机很容易卡死,因此有了进程池\线程池的概念,但是线程池治标不治本,他只能控制计算机不会因为有大量客户端连接导致计算机卡死,用户过多,会导致并发效果消失,因此有了协程的概念,在应用程序中一个线程并发多个任务,使得并发效果最大化,所以后面有了I/O模型的介绍。