概述
Redis作为内存数据库的一个典型代表,已经在很多应用场景中被使用,这里仅就Redis的pub/sub功能来说说怎样通过此功能来实现一个简单的作业调度系统。这里只是想展现一个简单的想法,所以还是有很多需要考虑的东西没有包括在这个例子中,比如错误处理,持久化等。
下面是实现上的想法
MyMaster:集群的master节点程序,负责产生作业,派发作业和获取执行结果。
MySlave:集群的计算节点程序,每个计算节点一个,负责获取作业并运行,并将结果发送会master节点。
channel CHANNEL_DISPATCH:每个slave节点订阅一个channel,比如“CHANNEL_DISPATCH_[idx或机器名]”,master会向此channel中publish被dispatch的作业。
channel CHANNEL_RESULT:用来保存作业结果的channel,master和slave共享此channel,master订阅此channel来获取作业运行结果,每个slave负责将作业执行结果发布到此channel中。
Master代码
01 |
#!/usr/bin/env python |
02 |
# -*- coding: utf-8 -*- |
03 |
import time |
04 |
import threading |
05 |
import random |
06 |
import redis |
07 |
08 |
09 |
REDIS_HOST = 'localhost' |
10 |
REDIS_PORT = 6379 |
11 |
REDIS_DB = 0 |
12 |
CHANNEL_DISPATCH = 'CHANNEL_DISPATCH' |
13 |
CHANNEL_RESULT = 'CHANNEL_RESULT' |
14 |
15 |
16 |
class MyMaster(): |
17 |
def __init__( self ): |
18 |
pass |
19 |
20 |
def start( self ): |
21 |
MyServerResultHandleThread().start() |
22 |
MyServerDispatchThread().start() |
23 |
24 |
25 |
class MyServerDispatchThread(threading.Thread): |
26 |
def __init__( self ): |
27 |
threading.Thread.__init__( self ) |
28 |
29 |
def run( self ): |
30 |
r = redis.StrictRedis(host = REDIS_HOST, port = REDIS_PORT, db = REDIS_DB) |
31 |
for i in range ( 1 , 100 ): |
32 |
channel = CHANNEL_DISPATCH + '_' + str (random.randint( 1 , 3 )) |
33 |
print ( "Dispatch job %s to %s" % ( str (i), channel)) |
34 |
ret = r.publish(channel, str (i)) |
35 |
if ret = = 0 : |
36 |
print ( "Dispatch job %s failed." % str (i)) |
37 |
time.sleep( 5 ) |
38 |
39 |
40 |
class MyServerResultHandleThread(threading.Thread): |
41 |
def __init__( self ): |
42 |
threading.Thread.__init__( self ) |
43 |
44 |
def run( self ): |
45 |
r = redis.StrictRedis(host = REDIS_HOST, port = REDIS_PORT, db = REDIS_DB) |
46 |
p = r.pubsub() |
47 |
p.subscribe(CHANNEL_RESULT) |
48 |
for message in p.listen(): |
49 |
if message[ 'type' ] ! = 'message' : |
50 |
continue |
51 |
print ( "Received finished job %s" % message[ 'data' ]) |
52 |
53 |
54 |
if __name__ = = "__main__" : |
55 |
MyMaster().start() |
56 |
time.sleep( 10000 ) |
说明
MyMaster类 – master主程序,用来启动dispatch和resulthandler的线程
MyServerDispatchThread类 – 派发作业线程,产生作业并派发到计算节点
MyServerResultHandleThread类 – 作业运行结果处理线程,从channel里获取作业结果并显示
Slave代码
01 |
#!/usr/bin/env python |
02 |
# -*- coding: utf-8 -*- |
03 |
from datetime import datetime |
04 |
import time |
05 |
import threading |
06 |
import random |
07 |
import redis |
08 |
09 |
10 |
REDIS_HOST = 'localhost' |
11 |
REDIS_PORT = 6379 |
12 |
REDIS_DB = 0 |
13 |
CHANNEL_DISPATCH = 'CHANNEL_DISPATCH' |
14 |
CHANNEL_RESULT = 'CHANNEL_RESULT' |
15 |
16 |
17 |
class MySlave(): |
18 |
def __init__( self ): |
19 |
pass |
20 |
21 |
def start( self ): |
22 |
for i in range ( 1 , 4 ): |
23 |
MyJobWorkerThread(CHANNEL_DISPATCH + '_' + str (i)).start() |
24 |
25 |
26 |
class MyJobWorkerThread(threading.Thread): |
27 |
28 |
def __init__( self , channel): |
29 |
threading.Thread.__init__( self ) |
30 |
self .channel = channel |
31 |
32 |
def run( self ): |
33 |
r = redis.StrictRedis(host = REDIS_HOST, port = REDIS_PORT, db = REDIS_DB) |
34 |
p = r.pubsub() |
35 |
p.subscribe( self .channel) |
36 |
for message in p.listen(): |
37 |
if message[ 'type' ] ! = 'message' : |
38 |
continue |
39 |
print ( "%s: Received dispatched job %s " % ( self .channel, message[ 'data' ])) |
40 |
print ( "%s: Run dispatched job %s " % ( self .channel, message[ 'data' ])) |
41 |
time.sleep( 2 ) |
42 |
print ( "%s: Send finished job %s " % ( self .channel, message[ 'data' ])) |
43 |
ret = r.publish(CHANNEL_RESULT, message[ 'data' ]) |
44 |
if ret = = 0 : |
45 |
print ( "%s: Send finished job %s failed." % ( self .channel, message[ 'data' ])) |
46 |
47 |
48 |
if __name__ = = "__main__" : |
49 |
MySlave().start() |
50 |
time.sleep( 10000 ) |
说明
MySlave类 – slave节点主程序,用来启动MyJobWorkerThread的线程
MyJobWorkerThread类 – 从channel里获取派发的作业并将运行结果发送回master
测试
首先运行MySlave来定义派发作业channel。
然后运行MyMaster派发作业并显示执行结果。