最近得到这样一个测试需求,测试客户端延迟收到服务端数据时的处理逻辑,具体实现属下:
1、分析:
客户端存在多个,需要内部维持一个路由表,收到客户端数据时,将更新路由表,同时将数据转发给服务器;转发节点接收到服务器数据时,记录数据接收时间,在满足设定的延迟时间后,将数据发送给客户端。思维导图如下:
2、实现
根据分析,程序包含如下几个模块:
1、main.py 主程序,主要功能:读取配置文件,创建监听socket监听客户端请求,创建控制器Control并启动
2、control.py 定义控制器Control线程,Control中维持一个路由表,接收到主程序的客户端请求后,更新路由表,并向后端服务器发送数据
3、base_udp.py 定义BaseUDP类,提供基础的udp发送、接收服务
4、udp_tool.py 定义UDPTool类,继承BaseUDP,同时记录接收到的服务端数据及接收时间。
4、代码
4.1 main.py
import json
import socket
from src.control import Control
if __name__ == "__main__":
# 读取配置文件
f_obj = open("conf/config.json")
config = json.load(f_obj)
f_obj.close()
# 创建监听端socket udp
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# 绑定监听端口
s.bind((config["ip"], config["port"]))
# 创建控制器
control = Control(s, config["dstip"], config["dstport"], config["delay"])
control.setDaemon(True)
control.start()
while True:
data, addr = s.recvfrom(2048)
control.add_client(addr, data)
4.2 control.py
import threading
import time
from src.udp_tool import UDPTool
class Control(threading.Thread):
def __init__(self, srcsock, dstip, dstport, delay=0):
"""
初始化
:param srcsock:
:param dstip:
:param dstport:
:param delay:
"""
threading.Thread.__init__(self)
self.clients = {} # 路由表
self.o_srcsock = srcsock
self.dstip = dstip
self.dstport = dstport
self.delay = delay
def add_client(self, clientaddr, data):
"""
主进程接收到客户端请求,则客户端创建路由表
:param clientaddr:
:param data:
:return:
"""
if clientaddr not in self.clients:
self.clients[clientaddr] = UDPTool(self.dstip, self.dstport)
self.clients[clientaddr].send(data) # 向后端服务器发送数据
def run(self):
while True:
for cl in self.clients:
ut = self.clients[cl]
# 获取每个缓冲区满足时间延迟的数据,发送给客户端
data_list = ut.pop(time.time() - self.delay / 1000)
for dat in data_list:
self.o_srcsock.sendto(dat, cl)
time.sleep(0.001)
4.3 base_udp.py
import socket
import threading
class BaseUDP(object):
def __init__(self, ip, port, func=None):
self.ip = ip
self.port = port
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.o_recv = ThRecv(self.sock, func)
self.o_recv.setDaemon(True)
self.flag = False
def send(self, value):
if len(value) > 0:
self.sock.sendto(value, (self.ip, self.port))
if not self.flag:
self.flag = True
self.o_recv.start()
class ThRecv(threading.Thread):
"""
独立线程接收服务端数据
"""
def __init__(self, sock, func=None):
threading.Thread.__init__(self)
self._sock = sock
self.func = func
def stop(self):
exit(0)
def run(self):
while True:
data, addr = self._sock.recvfrom(20480)
if self.func:
self.func(data)
4.4 udp_tool.py
import time
from src.base_udp import BaseUDP
class UDPTool(BaseUDP):
def __init__(self, ip, port, func=None):
"""
继承自BaseUDP类,记录接收到的数据,
:param ip:
:param port:
:param func:
"""
BaseUDP.__init__(self, ip, port, self.recv_data)
self.buflist = []
self.func = func
self.end = time.time()
def recv_data(self, data):
"""
为BaseUDP提供的回调函数,结合时间戳,记录到buf中
:param data:
:return:
"""
self.buflist.append((time.time(), data))
def pop(self, _t):
"""
将大于指定时间的数据从数据缓冲区导出
:param _t:
:return: 数据list
"""
re = []
for dat in self.buflist:
if dat[0] > _t:
break
re.append(dat[1])
self.buflist.pop(0)
return re
5、扩展功能
5.1 乱序控制
代码结构可同步实现乱序功能,需要重写UDPTool中的recv_data函数,添加判断条件,满足条件的,将时间戳增加一定值,如:服务端数据间隔为2秒,将接收到服务端每10个数据包,将其中一个包延迟3秒发送。代码如下:
def recv_data(self, data):
"""
为BaseUDP提供的回调函数,结合时间戳,记录到buf中
每10包数据将1包数据延迟3秒
:param data:
:return:
"""
self.index += 1
if self.index == 10:
self.buflist.append((time.time() + 3, data))
self.index = 0
else:
self.buflist.append((time.time(), data))
self.buflist.sort(key=lambda x: x[0]) # 重新排序
5.2 tcp通信
实现tcp转发节点,只需将代码中的udp连接修改为tcp连接即可,这里不再给出实例。