pyzmq提升REQ/REP模式可靠性(1)

  (原文在zguide,有兴趣慢慢看)

    提升REQ/REP模式的客户端侧可靠性,由于在此模式下,客户端与服务端严格遵循你一下我一下的乒乓规则,当然现实中不可能这么配合,比如服务器突然挂了,客户端就会阻塞在socket.recv(),然后过段时间,服务器又ok了,客户端也自动重连。但是有的场景客户端需要及时知道服务端状况而不是在那一直等,需要返回个错误之类。

    手册提供这种方法:

   Client-side Reliability (Lazy Pirate Pattern)

    不知道作者起这名字有啥含义,海盗?客户端代码:

import sys
import zmq

REQUEST_TIMEOUT = 2500
REQUEST_RETRIES = 3
SERVER_ENDPOINT = "tcp://localhost:5555"

context = zmq.Context(1)

print "I: Connecting to server…"
client = context.socket(zmq.REQ)
client.connect(SERVER_ENDPOINT)

poll = zmq.Poller()
poll.register(client, zmq.POLLIN)

sequence = 0
retries_left = REQUEST_RETRIES
while retries_left:
    sequence += 1
    request = str(sequence)
    print "I: Sending (%s)" % request
    client.send(request)

    expect_reply = True
    while expect_reply:
        socks = dict(poll.poll(REQUEST_TIMEOUT))
        if socks.get(client) == zmq.POLLIN:
            reply = client.recv()
            if not reply:
                break
            if int(reply) == sequence:
                print "I: Server replied OK (%s)" % reply
                retries_left = REQUEST_RETRIES
                expect_reply = False
            else:
                print "E: Malformed reply from server: %s" % reply

        else:
            print "W: No response from server, retrying…"
            # Socket is confused. Close and remove it.
            client.setsockopt(zmq.LINGER, 0)
            client.close()
            poll.unregister(client)
            retries_left -= 1
            if retries_left == 0:
                print "E: Server seems to be offline, abandoning"
                break
            print "I: Reconnecting and resending (%s)" % request
            # Create new connection
            client = context.socket(zmq.REQ)
            client.connect(SERVER_ENDPOINT)
            poll.register(client, zmq.POLLIN)
            client.send(request)

context.term()

    重点是:使用poll.poll(REQUEST_TIMEOUT),来执行一个限时读操作,如果时间段内该REQ socket没收到数据,视作服务端故障,这时就尝试先关闭当前socket,然后重连服务端,重发request。

  另外,注意client.setsockopt(zmq.LINGER, 0)设置linger为0,即马上丢掉socket中未处理的消息,否则context,term()啥时能执行完就不知道了。

  模拟一个会出错的服务端:

#coding=utf-8

from random import randint
import time
import zmq

context = zmq.Context(1)
server = context.socket(zmq.REP)
server.bind("tcp://*:5555")

cycles = 0
while True:
    request = server.recv()
    cycles += 1

    # Simulate various problems, after a few cycles
    if cycles > 3 and randint(0, 3) == 0:
        print "I: Simulating a crash"
        break
    elif cycles > 3 and randint(0, 3) == 0:
        print "I: Simulating CPU overload"
        time.sleep(2)
        break

    print "I: Normal request (%s)" % request
    time.sleep(1) # Do some heavy work
    server.send(request)

server.close()
context.term()

   看下运行结果:

服务端:
I: Normal request (1)
I: Normal request (2)
I: Normal request (3)
I: Simulating a crash
[Finished in 5.5s]
客户端:
I: Connecting to server…
I: Sending (1)
I: Server replied OK (1)
I: Sending (2)
I: Server replied OK (2)
I: Sending (3)
I: Server replied OK (3)
I: Sending (4)
No response from server, retrying
Reconnecting and resending (4)
No response from server, retrying
Reconnecting and resending (4)
No response from server, retrying
Server seems to be offline, abandoning
[Finished in 10.7s]

  其实这个也不是提高可靠性吧,只是强化客户端的感知能力。 

猜你喜欢

转载自pjwqq.iteye.com/blog/2261402