这个是 PUSH/PULL 模式,又叫做pipeline管道模式,取其一去不回头之意,一推一拉,数据滚滚向前。
这种socket封装的原本用意呢,是把数据交给一组worker端干活,PUSH会把任务均匀的(这个好像是zmq的招牌)的分配给下游的worker们,保证大家都有活干,图:
Producer:产生代处理数据,并将数据push出去
consumer(worker):pull到来自producer的数据,处理后,push出去
resultcolltor:pull到consumer的数据处理结果
Producer:
import time import zmq def producer(): context = zmq.Context() zmq_socket = context.socket(zmq.PUSH) zmq_socket.bind("tcp://127.0.0.1:5557") # 开始Producer之前必须先启动resultcollector和consumer for num in xrange(2000): work_message = { 'num' : num } zmq_socket.send_json(work_message) producer()
consumer:
import time import zmq import random def consumer(): consumer_id = random.randrange(1,10005) print "I am consumer #%s" % (consumer_id) context = zmq.Context() # recieve work consumer_receiver = context.socket(zmq.PULL) consumer_receiver.connect("tcp://127.0.0.1:5557") # send work consumer_sender = context.socket(zmq.PUSH) consumer_sender.connect("tcp://127.0.0.1:5558") while True: work = consumer_receiver.recv_json() data = work['num'] result = { 'consumer' : consumer_id, 'num' : data} if data%2 == 0: consumer_sender.send_json(result) consumer()
resultcollector:
import time import zmq import pprint def result_collector(): context = zmq.Context() results_receiver = context.socket(zmq.PULL) results_receiver.bind("tcp://127.0.0.1:5558") collecter_data = {} for x in xrange(1000): result = results_receiver.recv_json() if collecter_data.has_key(result['consumer']): collecter_data[result['consumer']] = collecter_data[result['consumer']] + 1 else: collecter_data[result['consumer']] = 1 if x == 999: pprint.pprint(collecter_data) result_collector()
run起来:
python resultcollector.py python consumer.py python consumer.py python producer.py
Producer的2000个数交给consumer处理(就是排除奇数),然后汇集到resultcollector,看下结果:
{ 3362: 233, 9312: 767 }
consumer:3362 处理 233,consumer:9312 处理767,结果还行,呵呵
注意一点就是,下游要先于上游开启,否则数据就丢了。感觉zmq对socket的这几种封装,有些场景还是挺适用,可以自由组装。有机会尝试一下。