【全民Python】使用ZeroMQ在Python中发布-订阅模式商用框架

目录

一.前言

二.适用场景

三.服务端(发布)代码逻辑

3.1 初始化发布服务器

3.2 发布消息

3.3 接收消息

3.4 添加消息到队列

3.5 解析处理消息

四.客户端(订阅)代码逻辑

五.运行结果

六.源码地址


一.前言

        当使用发布-订阅模式时,一个进程(发布者)可以向多个进程(订阅者)广播消息,而这些订阅者可以选择性地接收感兴趣的消息。这种模式通常用于实现消息传递系统,其中发布者产生事件或消息,而订阅者可以选择订阅特定类型的消息。

在实际开发中我有多个端需要同步一些数据,所以我选用发布和订阅模式来处理数据的同步问题。所以这里完善并记录了我在使用ZMQ作为通讯的框架。

二.适用场景

  • 消息广播系统: 发布者向多个订阅者广播消息,订阅者可以根据自己的兴趣选择性地接收消息。

  • 实时数据更新: 当系统中的某些数据发生变化时,通过发布-订阅模式可以实现实时的数据更新通知。

  • 多端(设备)数据同步:当某端数据发生变化时,需要发布给其他端进行后续处理

三.服务端(发布)代码逻辑

  3.1 初始化发布服务器

        设置发布和订阅端口

  def __init__(self, putport: int, subport: int, donewmsg: Callable[[str], None]):
        if self.is_port_open(subport):
            print(subport, '订阅端口被占用,服务端启动失败!')
            return
        if donewmsg is None:
            print('接收到数据后的处理过程不能为空!')
            return
         
        threading.Thread.__init__(self)
        self.SendIng = False
        self.donewmsg = donewmsg
      
        self.context = zmq.Context()
        self.pub_socket = self.context.socket(zmq.PUB)
        self.pub_socket.bind("tcp://*:" + str(putport))

        self.sub_socket = self.context.socket(zmq.SUB)
        self.sub_socket.bind("tcp://*:" + str(subport))  # 绑定到本地地址
        self.sub_socket.setsockopt_string(zmq.SUBSCRIBE, '')  # 订阅所有频道
        print(f'主机服务已启动!   订阅端口:{subport}  发布端口:{putport}.......')

3.2 发布消息

消息使用拼接的方式: topic + '##' + data,消息类型和消息内容

 def sendMsg(self,topic:str, msg:str) -> None:
        if self.SendIng:
            return
        self.SendIng = True
        try:   
            message = [topic.encode("utf-8"), msg.encode("utf-8")]       
            self.pub_socket.send_multipart(message)
            print("【S_SendMesg】: "+topic+"/"+msg)   
        finally :
            self.SendIng = False

3.3 接收消息

接收到消息后并不会立即进行处理,而是直接把消息放进接收消息的队列里面。然后通过对消息队列的维护进行消息解析处理

 def run(self) -> None:
        print(f'主机服务! 开启"接收订阅"线程',end='\n')
        try: 
            while True: 
                msg:List[byte] = self.sub_socket.recv_multipart()
                if len(msg) <2:
                    print("协议不正确!"+msg[0].decode("utf-8"),len(msg))
                    continue
                topic = msg[0].decode("utf-8")
                rdata = msg[1].decode("utf-8")
                recvmsg:str = topic + '##' + rdata
                print("【S_RecMsg】: " +topic+"/"+ rdata)

                self.donewmsg(recvmsg)
                
                ServerMsgAnalysis.instance().TryDoAnalysis()
                getMsg = ServerMsgAnalysis.instance().GetSendQueueStr()
                if getMsg == "": break
                ls = getMsg.split("##")
                self.sendMsg(ls[0], ls[1])
        except Exception as e:
            print(e)
        finally: 
            self.context.term()
            print('关闭“接收订”阅线程')
            self.sub_socket.close()
            self.pub_socket.close()

3.4 添加消息到队列

接收到消息后会把消息添加到接收的消息队列里面

  def AddMsg(self, sMsg: str) -> None:
        print('【S_AddMsgToQueue】' +sMsg.split("##")[0]+"/"+sMsg.split("##")[1], end='\n')
        if self.LastReceiveMsg == sMsg:
            # print('不处理:',sMsg)
            return

        if self.__recv_queue_str.full():
            _ = self.__recv_queue_str.get()  # 如果队列满,删除一条数据
            # print("_",_)
        self.__recv_queue_str.put(sMsg)  # 添加一条新数据

        self.LastReceiveMsg = sMsg

3.5 解析处理消息

从消息队列获取到消息后根据消息协议类型进行分类处理

def TryDoAnalysis(self) -> bool:
        if self.__recv_queue_str.qsize() <= 0:
            return True
        sMsg = ''
        try:
            "get() 默认情况下队列中没有数据可供获取,则该方法会一直阻塞,直到有数据可供获取为止。"
            sMsg = self.__recv_queue_str.get()
            # print("开始处理数据",sMsg)
            ls = sMsg.split("##")
            if len(ls) < 2:
                return False
            protocolCn = ls[0]
            data = json.loads(ls[1])
           
        except:
            print('str to json error!!!', sMsg)
            return False

      
        self.__AllocTask(protocolCn, data)
        return True
def __AllocTask(self, protocol: str, data: dict):
        if protocol == P_MSG_1:
            message = protocol + "##" + json.dumps(data)
            print("【S_Handle】"+message, end='\n')
            self.__send_queue_str.put(message)
        elif protocol == P_MSG_2:
            message = protocol + "##" + json.dumps(data)
            username = data["username"]   
            print("【S_Handle】" +message+ username, end='\n')   
            self.__send_queue_str.put(message)
        else:
            print('未设置当前协议:'+protocol, end='\n')
        pass

四.客户端(订阅)代码逻辑

客户端逻辑和服务端逻辑差不多这里直接贴代码

from ast import List
import socket
from typing import Callable
from numpy import byte
import zmq
import json
import threading
import queue

from ProtocolDefine import *
from ZMQClient.ClientMsgAnalysis import ClientMsgAnalysis
class ZmqClient(threading.Thread):
    def __init__(self, server_ip:str, putport: int, subport: int, username:str, donewmsg: Callable[[str], None]):
       
        if donewmsg is None:
            print('接收到数据后的处理过程不能为空!')
            return
        threading.Thread.__init__(self)
        self.SendIng = False
        self.donewmsg = donewmsg
        self.username = username
        ClientMsgAnalysis.instance().clentName = username
        self.password = "123-321"
        
        
        self.context = zmq.Context()
        self.pub_socket = self.context.socket(zmq.PUB)
        self.pub_socket.connect(f"tcp://{server_ip}:{subport}")

        self.sub_socket = self.context.socket(zmq.SUB)
        self.sub_socket.connect(f"tcp://{server_ip}:{putport}")
        self.sub_socket.setsockopt_string(zmq.SUBSCRIBE, '')
        
        self.sub_socket.subscribe(P_MSG_1)
        self.sub_socket.subscribe(P_MSG_2)
        
        print(f'客户端{ self.username} 启动!   订阅端口:{str(subport)} 发布端口:{ str(putport)}.......', end='\n')
        
        
 
    def run(self) -> None:
        print(f'客户端{ self.username} 开启"接收订阅"线程', end='\n')
        try: 
            while True: 
                msg:List[byte] = self.sub_socket.recv_multipart()
                if len(msg) <2:
                    print("协议不正确!"+msg[0].decode("utf-8"),len(msg))
                    continue
                topic = msg[0].decode("utf-8")
                rdata = msg[1].decode("utf-8")
                recvmsg:str = topic + '##' + rdata
                print("【"+self.username+" _RecMsg】: " +topic+"/"+ rdata, end='\n')

                self.donewmsg(recvmsg)
                ClientMsgAnalysis .instance().TryDoAnalysis()
                getMsg = ClientMsgAnalysis.instance().GetSendQueueStr()
                if getMsg == "":
                    break
                else:
                    ls = getMsg.split("##")
                    self.sendMsg(ls[0], ls[1])
        except Exception as e:
            print(e)
        finally: 
            self.context.term()
            print('关闭“接收订”阅线程')
            self.sub_socket.close()
            self.pub_socket.close()
            
    def sendMsg(self,topic:str, msg:str) -> None:
        if self.SendIng:
            return
        self.SendIng = True
        try:   
            message = [topic.encode("utf-8"), msg.encode("utf-8")]       
            self.pub_socket.send_multipart(message)
            print("【"+self.username+"_SendMesg】: "+topic+"/"+msg, end='\n')   
        finally :
            self.SendIng = False

    def close(self):
        self.sub_socket.close()


    def is_port_open(self, port: int) -> bool:
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            return s.connect_ex(('127.0.0.1', port)) == 0
        
    def __toJson(self):
        js = {"username": self.username,"password": self.password, }
        return js

    def toString(self):
        return json.dumps(self.__toJson())
'''
@File    :   ClientMsgAnalysis.py
@Time    :   2023/11/28 11:00:01
@Author  :   幻世界 
'''
import json
import queue
import threading

from ProtocolDefine import *


class ClientMsgAnalysis:
    __instance = None
    __lock = threading.Lock()

    @classmethod
    def instance(cls):
        if not cls.__instance:
            with cls.__lock:
                if not cls.__instance:
                    cls.__instance = cls()
        return cls.__instance

    def __init__(self):
        self.__recv_queue_str = queue.Queue(3)
        self.LastReceiveMsg = ''

        self.__send_queue_str = queue.Queue()

        self.CalaSSpeedThread = None

    def __clear_queue(self):
        with self.__send_queue_str.mutex:  # 获取锁
            self.__send_queue_str.queue.clear()  # 清空队列

        with self.__recv_queue_str.mutex:  # 获取锁
            self.__recv_queue_str.queue.clear()  # 清空队列

    def ReSetting(self):
        self.__clear_queue()
        self.LastReceiveMsg = ''

    """:AddMsg
       当接收到数据后,尝试向接收队列中增加,如果与上一包数据相同时,不向接收队列中增加
    """

    def AddMsg(self, sMsg: str) -> None:
        print("【C_AddMsgToQueue】: " +sMsg.split("##")[0]+"/"+sMsg.split("##")[1], end='\n')
        if self.LastReceiveMsg == sMsg:
            # print('不处理:',sMsg)
            return

        if self.__recv_queue_str.full():
            _ = self.__recv_queue_str.get()  # 如果队列满,删除一条数据
            # print("_",_)
        self.__recv_queue_str.put(sMsg)  # 添加一条新数据

        self.LastReceiveMsg = sMsg

    """:TryDoAnalysis
        分析数据后->分配任务->处理任务->加入发送队列中
        由LKJMgr线程类调用该方法
    """

    def TryDoAnalysis(self) -> bool:
        if self.__recv_queue_str.qsize() <= 0:
            return True
        sMsg = ''
        try:
            "get() 默认情况下队列中没有数据可供获取,则该方法会一直阻塞,直到有数据可供获取为止。"
            sMsg = self.__recv_queue_str.get()
            # print("开始处理数据",sMsg)
            ls = sMsg.split("##")
            if len(ls) < 2:
                return False
            protocolCn = ls[0]
            data = json.loads(ls[1])
           
        except:
            print('str to json error!!!', sMsg)
            return False

        self.__AllocTask(protocolCn, data)
        return True


    def GetSendQueueStr(self) -> str:
        if self.__send_queue_str.qsize() <= 0:
            return ""

        try:
            "get(block=False) 不阻塞获取数据,如果为空时,为报错。"
            message_str = self.__send_queue_str.get(block=False)

            return message_str
        except:
            return ""


    def __AllocTask(self, protocol: str, data: dict):
        message = protocol + "##" + json.dumps(data)
        if protocol == P_MSG_1:
            print("【C_Handle】"+ message, end='\n')
        elif protocol == P_MSG_2:
            username = data["username"]      
            print("【C_Handle】"+ message+ '/'+username, end='\n')
        else:
            print('未设置当前协议:'+protocol, end='\n')
        pass

五.运行结果

我们启动两个客户端

def main():
   server =ZmqServer(30011,30012, ServerMsgAnalysis.instance().AddMsg)
   server.start()

   client =ZmqClient("127.0.0.1",30011,30012,"C1", ClientMsgAnalysis.instance().AddMsg)
   client.start()
   client.sendMsg(P_MSG_2, '{"username":"C1","password":"123"}')
   time.sleep(1)
   client =ZmqClient("127.0.0.1",30011,30012, "C2",ClientMsgAnalysis.instance().AddMsg) 
   client.start()
   client.sendMsg(P_MSG_2, '{"username":"C2","password":"123"}')
 

if __name__ == '__main__':
     main()

六.源码地址

https://download.csdn.net/download/qq_37310110/88599199 

猜你喜欢

转载自blog.csdn.net/qq_37310110/article/details/134800790