匿名协议通过串口进行传送,但在python中,没有办法一个字节一个字节的读取串口缓存区内容,就算可以也效率低下。所以在python中解析匿名协议需要使用buf的方式实现。
匿名的协议
帧头:0xAA 0xAA
消息类型字节:0x02(固定为02)
数据区域长度:0x12(共9通道18个字节,此位固定为0x12)
数据区域:S0到S7通道,电压,单位为mV。
校验字节:为前面所有字节之和(包括帧头,消息类型,数据长度)取低八位。
c语言解析函数
每次接收一个字节进行解析状态的转换
void Data_Receive_Pre(u8 data , CSerialPort* serialport)
{
static u8 DT_RxBuffer[256],DT_data_cnt = 0;
static u8 _data_len = 0;
static u8 state = 0;
if(state==0&&data==0xAA) //帧头0xAA
{
state=1;
DT_RxBuffer[0]=data;
}
else if(state==1&&data==0xAA) //帧头0xAA
{
state=2;
DT_RxBuffer[1]=data;
}
else if(state==2) //当前帧计数
{
state=4;
DT_RxBuffer[2]=data;
}
else if(state==4) //数据区域长度
{
state = 5;
DT_RxBuffer[3]=data;
_data_len = data;
DT_data_cnt = 0;
}
else if(state==5&&_data_len>0)
{
_data_len--;
DT_RxBuffer[4+DT_data_cnt++]=data;
if(_data_len==0)
state = 6;
}
else if(state==6)
{
state = 0;
DT_RxBuffer[4+DT_data_cnt++]=data;
Data_Receive_Anl(DT_RxBuffer,DT_data_cnt+4,serialport);
}
else
state = 0;
}
收到一帧有效帧后进行后续处理
void Data_Receive_Anl(u8 *data_buf, u8 num, CSerialPort* serialport)
{
u8 sum = 0;
for(u8 i=0;i<(num-1);i++)
sum += *(data_buf+i);
if(!(sum==*(data_buf+num-1))) return; //判断sum
if(!(*(data_buf)==0xAA && *(data_buf+1)==0xAA)) return; //判断帧头
if(*(data_buf+3)==0X12)
{
u16 datatemp;
for(int i = 0;i<9;i++)
{
datatemp = (int16_t)((*(data_buf+4+i*2)<<8)|*(data_buf+4+i*2+1));
}
}
}
可以看到Data_Receive_Pre函数是每次接收一个字节,然后进行解析状态的判断。
python解析实现
#coding:utf-8
import numpy as np
import serial
class SerialDecoder(object):
"""
初始化参数:
bufsize:缓存区长度,可根据需要解析的包大小调节
com:串口号,是字符串,例如'COM9'
band:波特率,
debug:是否输出调试信息
功能:
实现对串口协议的解析,协议内容:
帧头:
0xAA 0xAA, 两个字节
功能字:
0x02, 一个字节
数据长度:
0x0A, 一个字节,代表数据内容长度,必定为偶数
数据内容:
字节个数不确定,每两个字节组成一个16位无符号整形数
高位在前,地位在后
校验和:
一个字节,为前面所以字节的和取低八位
"""
def __init__(self, bufsize=200, com='COM1', band=9600, debug=False):
"""
输入参数:
bufsize:缓存区长度,可根据需要解析的包大小调节
com:串口号,是字符串,例如'COM9'
band:波特率,
debug:是否输出调试信息
"""
self.current = 0 # 当前buf中数据长度
self.bufsize = bufsize
self.buf = [0 for i in range(bufsize)] # 保存数据buf,必须按照长度初始化
self.data_signal = [] # 解析到的有效数据
self.debug = debug # 是否输出调试信息
self.newdata = False # 自上次取获取数据后是否解析出新的的数据
self.ser = serial.Serial(com, band)
def getdata(self): # 获取解析到的数据
serialbuf = self.ser.read_all()
self.append(serialbuf)
newdata = self.newdata
self.newdata = False
return newdata, self.data_signal
def append(self, serialbuf): # 把串口数据加入到buf中,加入完成后尝试进行解析
filebuf = np.fromstring(serialbuf, dtype=np.uint8)
if self.current + filebuf.size > self.bufsize:
self.buf[self.current:self.bufsize] = \
filebuf[0:self.bufsize - self.current - filebuf.size]
else:
self.buf[self.current:self.current + filebuf.size] = filebuf
self.current += filebuf.size
self.process()
def process(self): # 进行数据解析
while self.current > 35: # 如果队列中数据多于2包数据
if self.buf[0] == 170 and self.buf[1] == 170:
if self.debug:
print("检测到帧头,功能字是" + str(self.buf[2]))
datalength = self.buf[3] # 有效数据长度
framelength = datalength + 5 # 帧长度
datasum = np.sum(self.buf[0:framelength - 1]) % 256
if datasum == self.buf[framelength - 1]: # 校验通过
self.data_signal = self.buf[4:4 + datalength]
self.data_signal = np.array(
self.data_signal, dtype='uint8')
self.data_signal = self.data_signal.reshape(-1, 2)
self.data_signal = self.data_signal[:, 0] * 256 + \
self.data_signal[:, 1]
self.newdata = True
if self.debug:
print(self.data_signal)
self.buf = np.roll(self.buf, -framelength)
self.current -= framelength
if self.debug:
print("解析到一帧数据")
else: # 校验失败
if self.debug:
print("校验和错误")
if 170 in self.buf[2:self.current]: # 帧头对,但是校验和错误
temparray = self.buf[2:self.current]
if not isinstance(temparray, list):
temparray = temparray.tolist()
offset = temparray.index(170)
self.buf = np.roll(self.buf, -offset)
self.current -= offset
# 如果解析不到,舍弃前面的数据,直到data[0] == 170
elif 170 in self.buf[0:self.current]:
if self.debug:
print("接收到无效数据")
temparray = self.buf[0:self.current]
if not isinstance(temparray, list):
temparray = temparray.tolist()
offset = temparray.index(170)
self.buf = np.roll(self.buf, -offset)
self.current -= offset
if __name__ == '__main__':
serialdecoder = SerialDecoder(100, 'COM3', 115200, False)
count = 0
while 1:
newdata, signal = serialdecoder.getdata()
if newdata:
count += 1
# print(str(count) + ":" + str(signal))
print(str(count) + ":" + str(signal))