(原文在zguide,有兴趣慢慢看)
提升REQ/REP模式的客户端侧可靠性,由于在此模式下,客户端与服务端严格遵循你一下我一下的乒乓规则,当然现实中不可能这么配合,比如服务器突然挂了,客户端就会阻塞在socket.recv(),然后过段时间,服务器又ok了,客户端也自动重连。但是有的场景客户端需要及时知道服务端状况而不是在那一直等,需要返回个错误之类。
手册提供这种方法:
Client-side Reliability (Lazy Pirate Pattern)
不知道作者起这名字有啥含义,海盗?客户端代码:
import sys import zmq REQUEST_TIMEOUT = 2500 REQUEST_RETRIES = 3 SERVER_ENDPOINT = "tcp://localhost:5555" context = zmq.Context(1) print "I: Connecting to server…" client = context.socket(zmq.REQ) client.connect(SERVER_ENDPOINT) poll = zmq.Poller() poll.register(client, zmq.POLLIN) sequence = 0 retries_left = REQUEST_RETRIES while retries_left: sequence += 1 request = str(sequence) print "I: Sending (%s)" % request client.send(request) expect_reply = True while expect_reply: socks = dict(poll.poll(REQUEST_TIMEOUT)) if socks.get(client) == zmq.POLLIN: reply = client.recv() if not reply: break if int(reply) == sequence: print "I: Server replied OK (%s)" % reply retries_left = REQUEST_RETRIES expect_reply = False else: print "E: Malformed reply from server: %s" % reply else: print "W: No response from server, retrying…" # Socket is confused. Close and remove it. client.setsockopt(zmq.LINGER, 0) client.close() poll.unregister(client) retries_left -= 1 if retries_left == 0: print "E: Server seems to be offline, abandoning" break print "I: Reconnecting and resending (%s)" % request # Create new connection client = context.socket(zmq.REQ) client.connect(SERVER_ENDPOINT) poll.register(client, zmq.POLLIN) client.send(request) context.term()
重点是:使用poll.poll(REQUEST_TIMEOUT),来执行一个限时读操作,如果时间段内该REQ socket没收到数据,视作服务端故障,这时就尝试先关闭当前socket,然后重连服务端,重发request。
另外,注意client.setsockopt(zmq.LINGER, 0)设置linger为0,即马上丢掉socket中未处理的消息,否则context,term()啥时能执行完就不知道了。
模拟一个会出错的服务端:
#coding=utf-8 from random import randint import time import zmq context = zmq.Context(1) server = context.socket(zmq.REP) server.bind("tcp://*:5555") cycles = 0 while True: request = server.recv() cycles += 1 # Simulate various problems, after a few cycles if cycles > 3 and randint(0, 3) == 0: print "I: Simulating a crash" break elif cycles > 3 and randint(0, 3) == 0: print "I: Simulating CPU overload" time.sleep(2) break print "I: Normal request (%s)" % request time.sleep(1) # Do some heavy work server.send(request) server.close() context.term()
看下运行结果:
服务端: I: Normal request (1) I: Normal request (2) I: Normal request (3) I: Simulating a crash [Finished in 5.5s] 客户端: I: Connecting to server… I: Sending (1) I: Server replied OK (1) I: Sending (2) I: Server replied OK (2) I: Sending (3) I: Server replied OK (3) I: Sending (4) No response from server, retrying Reconnecting and resending (4) No response from server, retrying Reconnecting and resending (4) No response from server, retrying Server seems to be offline, abandoning [Finished in 10.7s]
其实这个也不是提高可靠性吧,只是强化客户端的感知能力。