1. 如何创建进程
1.1 Unix/Linux操作系统提供了一个fork()系统调用,它非常特殊。普通的函数调用,调用一次,返回一次,但是fork()
调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在
父进程和子进程内返回。
1.2 子进程永远返回0,而父进程返回子进程的ID。这样做的理由是,一个父进程可以fork出很多子进程,所
以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID。
1.3 父进程和子进程, 如果父进程结束, 子进程也随之结束,先有父进程, 再有子进程.
1.4 Python的os模块封装了常见的系统调用,其中就包括fork,可以在Python程序中轻松创建子进程:
常用函数:
os.fork() #创建一个子进程
os.getpid() # 获取当前进程的pid (process id)
os.getppid() # 获取当前进程的父进程pid (parent process id)
2. 实例化对象实现多进程
2.1 # 通过类的实例化实现
p1 = multiprocessing.Process(target=job, name="我的第一个子进程")
p1.start()
# 通过类的实例化实现
p2 = multiprocessing.Process(target=job, name="我的第二个子进程")
p2.start()
# join方法, 等待所有的子进程执行结束, 再执行主进程
p1.join()
p2.join()
3. 重写run方法实现多进程
3.1 class MyProcess(multiprocessing.Process):
# 重写run方法(start方法默认执行run方法)
def run(self):
print("当前子进程的名称%s....."%(multiprocessing.current_process()))
p1 = MyProcess(name="first")
p1.start()
p2 = MyProcess(name="second")
p2.start()
p1.join()
p2.join()
print("all finish.....")
4. 协程
4.1 什么是协程:
协程,又称微线程,纤程,英文名Coroutine。协程的作用,是在执行函数A时,可以随时中断,去执行函数B,然
后中断继续执行函数A(可以自由切换)。但这一过程并不是函数调用(没有调用语句),这一整个过程看似
像多线程,*然而协程只有一个线程执行*。
4.2 协程的优势
- 执行效率极高,因为子程序切换(函数)不是线程切换,由程序自身控制,没有切换线程的开销。所以与多
线程相比,线程的数量越多,协程性能的优势越明显。
- 不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在控制共享资
源时也不需要加锁,因此执行效率高很多。
- 说明:协程可以处理 IO 密集型程序的效率, 但是处理 CPU 密集型不是它的长处,如
要充分发挥 CPU 利用率可以结合多进程 + 协程。
4.3 协程实现方法1(通过yield方法实现):
def job():
for i in range(10):
print(i)
yield "result: %s" %(i)
j = job()
j.__next__()
print(next(j))
- 函数里面包含yield关键字, 调用函数返回的是生成器对象;
- yield工作原理: 如果遇到yield就停止运行,调用next方法, 从yield停止的地方继续
运行;
4.4 协程实现方法2(通过gevent库实现):
4.4.1 什么是gevent:gevent是第三方库,通过greenlet实现协程,其基本思想:
当一个greenlet遇到IO操作时,比如访问网络,就自动切换到其他的greenlet,等到IO
操作完成,再在适当的时候切换回来继续执行。由于IO操作非常耗时,经常使程序
处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是
等待IO。
4.4.2 Gevent使用说明:
monkey: 可以使一些阻塞的模块变得不阻塞,机制:遇到IO操作则自动切
换,手动切换可以用gevent.sleep()或者yield;
gevent.sleep(0): (将爬虫代码换成这个,效果一样可以达到切换上下文)
gevent.spawn: 启动协程,参数为函数名称,参数名称
gevent.join(): 等待某个协程执行结束
gevent.joinall: 等待所有的协程执行结束;
4.4.3 实现:
引入第三方模块:
import threading
import gevent
from gevent import monkey
monkey.patch_all() //自动切换使用monkey.patch_all()方法
def job(n):
for i in range(n):
print(gevent.getcurrent(), i, n)
print("当前线程数:", threading.active_count())
gevent.sleep(1)
def main():
# 创建协程, 分配任务;
g1 = gevent.spawn(job, 1)
g2 = gevent.spawn(job, 2)
g3 = gevent.spawn(job, 3)
# #
# g1.join()
# g2.join()
# g3.join()
print("当前线程数:", threading.active_count())
gevent.joinall([g1, g2, g3])
print("任务执行结束.....")
if __name__ == '__main__':
main()
5. 分布式进程
5.1 什么是分布式进程:
将所要完成的任务通过master主机分配给多个slave主机进行处理,当slave主机处理完后将结果返回
给master主机。
5.2 分布式进程的实现:
5.2.1 设置master:
import random
from queue import Queue
# BaseManager: 提供了不同机器之间共享数据的一种方法(ip:port)
from multiprocessing.managers import BaseManager
# 1. 创建存储任务需要的队列
task_queue = Queue()
# 2. 存储任务执行结果的队列
result_queue = Queue()
# 3. 将队列注册到网上(使得其他主机也可以访问)
BaseManager.register('get_task_queue', callable=lambda : task_queue)
BaseManager.register('get_result_queue', callable=lambda : result_queue)
# 绑定ip和端口, 并且设置一个暗号;
manager = BaseManager(address=('172.25.254.250', 4000), authkey=b'westos')
# 4. 启动manager对象, 开始共享队列
manager.start()
# 5. 通过网络访问共享的Queue对象;
# BaseManager.register会注册一个方法, 当调用方法时, 执行函数lambda : task_queue;
task = manager.get_task_queue()
result = manager.get_result_queue()
# 6. 往队列里面放执行任务需要的数据;
for i in range(1000):
# 模拟有1000个数字;
n = random.randint(1, 100)
task.put(n)
print("任务列表中加入任务: %d" %(n))
# 7. 从result队列中读取各个机器中任务执行的结果;
for i in range(1000):
res = result.get()
print("队列任务执行的result: %s" %(res))
# 8. 关闭manager对象, 取消共享的队列
manager.shutdown()
5.2.2 设置slave:
import time
from multiprocessing.managers import BaseManager
# 1. 连接Master端, 获取共享的队列;ip是master端的ip, port'也是master端manager进程绑定的端口;
slave = BaseManager(address=('172.25.254.250', 4000), authkey=b'westos')
# 2. 注册队列, 获取共享的队列内容;
BaseManager.register('get_task_queue')
BaseManager.register('get_result_queue')
# 3. 连接master端;
slave.connect()
# 4. 通过网络访问共享的队列;
task = slave.get_task_queue()
result = slave.get_result_queue()
# 5. 读取管理端共享的任务, 并依次执行;
for i in range(500):
n = task.get()
print("slave1 运行任务 %d ** 2: " % (n))
res = "slave1: %d ** 2 = %d" % (n, n ** 2)
time.sleep(1)
# 将任务的运行结果放入队列中;
result.put(res)
print("执行结束........")