介绍
Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上。一个服务进程可以作为调度者,将任务分布到其他多个机器的多个进程中,依靠网络通信。
想到这,就在想是不是可以使用此模块来实现一个简单的作业调度系统。
实现
Job
首先创建一个Job类,为了测试简单,只包含一个job id属性,将来可以封装一些作业状态,作业命令,执行用户等属性。
job.py
1 |
#!/usr/bin/env python |
2 |
# -*- coding: utf-8 -*- |
3 |
4 |
class Job: |
5 |
def __init__( self , job_id): |
6 |
self .job_id = job_id |
Master
Master用来派发作业和显示运行完成的作业信息
master.py
01 |
#!/usr/bin/env python |
02 |
# -*- coding: utf-8 -*- |
03 |
04 |
from Queue import Queue |
05 |
from multiprocessing.managers import BaseManager |
06 |
from job import Job |
07 |
08 |
09 |
class Master: |
10 |
11 |
def __init__( self ): |
12 |
# 派发出去的作业队列 |
13 |
self .dispatched_job_queue = Queue() |
14 |
# 完成的作业队列 |
15 |
self .finished_job_queue = Queue() |
16 |
17 |
def get_dispatched_job_queue( self ): |
18 |
return self .dispatched_job_queue |
19 |
20 |
def get_finished_job_queue( self ): |
21 |
return self .finished_job_queue |
22 |
23 |
def start( self ): |
24 |
# 把派发作业队列和完成作业队列注册到网络上 |
25 |
BaseManager.register( 'get_dispatched_job_queue' , callable = self .get_dispatched_job_queue) |
26 |
BaseManager.register( 'get_finished_job_queue' , callable = self .get_finished_job_queue) |
27 |
28 |
# 监听端口和启动服务 |
29 |
manager = BaseManager(address = ( '0.0.0.0' , 8888 ), authkey = 'jobs' ) |
30 |
manager.start() |
31 |
32 |
# 使用上面注册的方法获取队列 |
33 |
dispatched_jobs = manager.get_dispatched_job_queue() |
34 |
finished_jobs = manager.get_finished_job_queue() |
35 |
36 |
# 这里一次派发10个作业,等到10个作业都运行完后,继续再派发10个作业 |
37 |
job_id = 0 |
38 |
while True : |
39 |
for i in range ( 0 , 10 ): |
40 |
job_id = job_id + 1 |
41 |
job = Job(job_id) |
42 |
print ( 'Dispatch job: %s' % job.job_id) |
43 |
dispatched_jobs.put(job) |
44 |
45 |
while not dispatched_jobs.empty(): |
46 |
job = finished_jobs.get( 60 ) |
47 |
print ( 'Finished Job: %s' % job.job_id) |
48 |
49 |
manager.shutdown() |
50 |
51 |
if __name__ = = "__main__" : |
52 |
master = Master() |
53 |
master.start() |
Slave
Slave用来运行master派发的作业并将结果返回
slave.py
01 |
#!/usr/bin/env python |
02 |
# -*- coding: utf-8 -*- |
03 |
04 |
import time |
05 |
from Queue import Queue |
06 |
from multiprocessing.managers import BaseManager |
07 |
from job import Job |
08 |
09 |
10 |
class Slave: |
11 |
12 |
def __init__( self ): |
13 |
# 派发出去的作业队列 |
14 |
self .dispatched_job_queue = Queue() |
15 |
# 完成的作业队列 |
16 |
self .finished_job_queue = Queue() |
17 |
18 |
def start( self ): |
19 |
# 把派发作业队列和完成作业队列注册到网络上 |
20 |
BaseManager.register( 'get_dispatched_job_queue' ) |
21 |
BaseManager.register( 'get_finished_job_queue' ) |
22 |
23 |
# 连接master |
24 |
server = '127.0.0.1' |
25 |
print ( 'Connect to server %s...' % server) |
26 |
manager = BaseManager(address = (server, 8888 ), authkey = 'jobs' ) |
27 |
manager.connect() |
28 |
29 |
# 使用上面注册的方法获取队列 |
30 |
dispatched_jobs = manager.get_dispatched_job_queue() |
31 |
finished_jobs = manager.get_finished_job_queue() |
32 |
33 |
# 运行作业并返回结果,这里只是模拟作业运行,所以返回的是接收到的作业 |
34 |
while True : |
35 |
job = dispatched_jobs.get(timeout = 1 ) |
36 |
print ( 'Run job: %s ' % job.job_id) |
37 |
time.sleep( 1 ) |
38 |
finished_jobs.put(job) |
39 |
40 |
if __name__ = = "__main__" : |
41 |
slave = Slave() |
42 |
slave.start() |
测试
分别打开三个linux终端,第一个终端运行master,第二个和第三个终端用了运行slave,运行结果如下
master
01 |
$ python master.py |
02 |
Dispatch job: 1 |
03 |
Dispatch job: 2 |
04 |
Dispatch job: 3 |
05 |
Dispatch job: 4 |
06 |
Dispatch job: 5 |
07 |
Dispatch job: 6 |
08 |
Dispatch job: 7 |
09 |
Dispatch job: 8 |
10 |
Dispatch job: 9 |
11 |
Dispatch job: 10 |
12 |
Finished Job: 1 |
13 |
Finished Job: 2 |
14 |
Finished Job: 3 |
15 |
Finished Job: 4 |
16 |
Finished Job: 5 |
17 |
Finished Job: 6 |
18 |
Finished Job: 7 |
19 |
Finished Job: 8 |
20 |
Finished Job: 9 |
21 |
Dispatch job: 11 |
22 |
Dispatch job: 12 |
23 |
Dispatch job: 13 |
24 |
Dispatch job: 14 |
25 |
Dispatch job: 15 |
26 |
Dispatch job: 16 |
27 |
Dispatch job: 17 |
28 |
Dispatch job: 18 |
29 |
Dispatch job: 19 |
30 |
Dispatch job: 20 |
31 |
Finished Job: 10 |
32 |
Finished Job: 11 |
33 |
Finished Job: 12 |
34 |
Finished Job: 13 |
35 |
Finished Job: 14 |
36 |
Finished Job: 15 |
37 |
Finished Job: 16 |
38 |
Finished Job: 17 |
39 |
Finished Job: 18 |
40 |
Dispatch job: 21 |
41 |
Dispatch job: 22 |
42 |
Dispatch job: 23 |
43 |
Dispatch job: 24 |
44 |
Dispatch job: 25 |
45 |
Dispatch job: 26 |
46 |
Dispatch job: 27 |
47 |
Dispatch job: 28 |
48 |
Dispatch job: 29 |
49 |
Dispatch job: 30 |
slave1
01 |
$ python slave.py |
02 |
Connect to server 127.0 . 0.1 ... |
03 |
Run job: 1 |
04 |
Run job: 2 |
05 |
Run job: 3 |
06 |
Run job: 5 |
07 |
Run job: 7 |
08 |
Run job: 9 |
09 |
Run job: 11 |
10 |
Run job: 13 |
11 |
Run job: 15 |
12 |
Run job: 17 |
13 |
Run job: 19 |
14 |
Run job: 21 |
15 |
Run job: 23 |
slave2
01 |
$ python slave.py |
02 |
Connect to server 127.0 . 0.1 ... |
03 |
Run job: 4 |
04 |
Run job: 6 |
05 |
Run job: 8 |
06 |
Run job: 10 |
07 |
Run job: 12 |
08 |
Run job: 14 |
09 |
Run job: 16 |
10 |
Run job: 18 |
11 |
Run job: 20 |
12 |
Run job: 22 |
13 |
Run job: 24 |