本报文格式不能处理粘包问题,因为处理粘包问题的成本太高,会极大的降低服务端的处理效率以及增加内存消耗,如果传输速度很高,建议使用UDP,UDP传输速度快,并且应用层做响应,增加重传机制可以有效的保证数据的可靠性,目前我做的RTU升级等功能都是使用UDP完成,服务器端开销小,并且不会粘包。
通讯建议:不要做什么握手机制,直接发送数据,服务器收到了就响应,收到响应后就结束通讯,握手增加耗电又增加流量消耗,并且在GPRS通讯上是非常浪费资源的,本来网络就不可靠,直接发送数据,服务器收到了就响应通讯就结束了,如果弄成握手方式,就是发送请求,握手成功,发送数据,等待响应,结束通讯,就增加了1个来回,特别是之前用过MQTT多了几个来回,太浪费通讯资源了。
使用HEX格式的通讯协议非常高效,不管是发送打包还是接收拆包,1个结构体就能搞定,无需一个字节一个字节解析,如果用字符串json方式,使用单片机作为客户端就很为难了,一般内存不会很多,并且解析json需要的内存会非常多。
报文全部使用小端模式,便于C语言处理。
注:由于我之前做了一个支持10W个设备的数据解析软件,其实占用的内存非常少,并且对CPU消耗很低,我们使用了一个很低端的服务器就能做到每秒1K条以上的数据处理,并且每条数据都在1秒内得到响应,刚开始的时候还好,后面因为设备越来越多,导致服务器搜索设备变得很漫长(我的设备数据都是放到内存中,使用了一个指针数组进行管理),由于设备会动态的增加,不能对序列号进行排序,因此查找设备的索引时间将会不可控,解析一条数据非常快,存储是异步的,有1W条的缓存,但是唯独查找设备这个看似简单的过程变得非常复杂,由于之前的协议我没有增加索引,我只能对设备进行分组,我建了1W个指针数组,根据设备尾号后4位进行分组,这样会增加几十兆的内存消耗,但是查找却变得非常迅速。通过这个事情之后,我决定在协议中直接增加索引,服务端收到索引后直接取出当前设备的配置与SN进行对比,如果相同就直接解析存储数据,如果不相同就进行查找即可,没有就进行新建,最后建议将设备分组与协议中增加索引进行整合,集各自的优点,可以最大限度的提高数据的解析能力,当每个设备都通讯一次之后,解析任何一个设备上传的数据的时间都是一样的,并且是最小的。
禁忌:解析数据是千万不要把配置什么的都放到数据库,数据库相比内存实在是慢太多了,而且时间不可控,数据库本身就不是非常可靠,建议将数据库操作做异步处理,中间用FIFO通讯,这样数据解析线程可以以最高效的方式运行(一个线程轻松解析成千上万条数据,通过增加一个线程可以提高1倍的处理能力)。
存储设备实时信息方式
我存储设备相关信息的方式是,每个设备有一个自己的统一的结构体,比如我最大支持10000个设备,我就会先申请1个指针数组,大小为10000,占用40000B,也就不到40KB内存。然后每添加一个设备,就给这个指针申请一个内存,存放设备的配置信息,同时会使用托管的线程池异步向数据库中增加一个设备。
但是如果有几千上万个设备的时候,寻找这个设备的配置就变得很吃力了(相对来说,内存中遍历都会比数据库快),这个时候我就会对设备进行分组,比如分100组,最好的情况下每个组有100个设备,但是不排除有一种可能,一个组中有超过100个设备的呀,我就要做一个二维指针数组,分100个数组,每个数组中1000个设备指针,这样内存占用就会是100*1000*4,也还好400K,但是如果一个组超过了1000个就不好办了,同样如果是10W个设备的时候,这个分组就很吃内存了,我的做法是每个分组用个变量进行记录,每个组给50个设备指针,当这个组超过50个之后,我重新对这个指针进行申请内存,大小为上一次的+50个,然后把之前的数据拷贝过来,释放掉之前的数据,这样就可以动态的进行分组控制,实现效率的提升与内存的最小消耗。
//查找指定SN的设备索引 //返回索引;-1:没有找到 //2017-12-27 : 会使用分组进行快速查找索引,只能在单线程中使用,不要跨线程搜索 int FindSN(char pSN[16]) { DWORD Hash; BYTE temp[3]; int GroupIndex; ONE_DEVICE_DATA_TYPE *pDeviceData; if (pSN == nullptr) return -1; pSN[15] = 0; if(strlen(pSN) != 15) return-1; //检查最后3位是否为000-999 temp[0] = pSN[12] - '0'; //字符转换为数字 temp[1] = pSN[13] - '0'; //字符转换为数字 temp[2] = pSN[14] - '0'; //字符转换为数字 if (temp[0] > 9 || temp[1] > 9 || temp[2] > 9) { return -1; } GroupIndex = temp[0] * 100 + temp[1] * 10 + temp[2];//计算当前设备的分组索引 if (GroupIndex >= DEVICE_GROUP_CNT) return -1; //分组索引无效 Hash = USER_LIB.BKDRHash(pSN); //计算哈希结果 //在指定的分组内去搜索当前设备 for (DWORD i = 0; i < this->GroupDeviceCnt[GroupIndex]; i++) { pDeviceData = (ONE_DEVICE_DATA_TYPE *)this->pDeviceDataGroupPointerBuff[GroupIndex][i]; //遍历当前分组 if (pDeviceData->Config.Hash == Hash) //先判断哈希结果是否正确 { if (strcmp(pSN, pDeviceData->Config.SN) == 0) return pDeviceData->Index; //找到了,返回索引 } } return -1; }
//添加一个设备到配置缓冲区,返回索引,<0:添加失败 int AddDevice(DEVICE_CONFIG_TYPE *pConfig, char **pError) { ONE_DEVICE_DATA_TYPE *p; BYTE temp[4]; int GroupIndex; if (pConfig == nullptr) { *pError = "无效的配置缓冲区"; return -1; } if (this->DeviceCnt >= DEVICE_MAX_CNT) //设备数量超出范围 { *pError = "设备配置超出范围了"; return -1; } pConfig->SN[15] = 0; if (strlen(pConfig->SN) != 15) { *pError = "设备序列号必须为15位"; return -1; } //检查最后3位是否为000-999 temp[0] = pConfig->SN[12] - '0'; //字符转换为数字 temp[1] = pConfig->SN[13] - '0'; //字符转换为数字 temp[2] = pConfig->SN[14] - '0'; //字符转换为数字 if (temp[0] > 9 || temp[1] > 9 || temp[2] > 9) { return -1; } GroupIndex = temp[0] * 100 + temp[1] * 10 + temp[2]; //计算当前设备的分组索引 if (GroupIndex >= DEVICE_GROUP_CNT) { *pError = "无效的分组索引"; return -1; } if (FindSN(pConfig->SN) >= 0) //寻找指定地址的设备是否存在 { *pError = "当前设备地址已经存在"; return -1; } p = new ONE_DEVICE_DATA_TYPE; //申请内存 memset(p, 0, sizeof(ONE_DEVICE_DATA_TYPE)); //清控配置区域 memcpy(&p->Config, pConfig, sizeof(DEVICE_CONFIG_TYPE)); //拷贝数据 p->Config.Hash = USER_LIB.BKDRHash(p->Config.SN); //生成SN的哈希值 p->DbStatus.ReadHistCongifCnt = p->DbStatus.ReadHistStatusCnt = p->DbStatus.WriteHistCongifCnt = -1; //将历史记录条数置为-1无效值 p->Index = this->DeviceCnt; //记录当前设备的索引 //将当前设备编号进行分组 if (this->AddDeviceToGroup(GroupIndex, (DWORD)p) == false) { *pError = "添加设备到分组失败!"; return -1; } U32_DeviceDataPointerBuff[this->DeviceCnt] = (DWORD)p; //存放指针 pDeviceDataPointerBuff[this->DeviceCnt++] = p; //存储指针 return (this->DeviceCnt - 1); //返回当前的索引 }
//添加一个设备到指定分组中,如果分组慢,将会申请扩容 //不会进行编号等检查,只能在AddDevice中被调用 bool AddDeviceToGroup(int GroupIndex, DWORD DeviceDataPointer) { if (GroupIndex < 0 || GroupIndex >= DEVICE_GROUP_CNT) return false; if (this->GroupSize[GroupIndex] >= DEVICE_MAX_CNT) return false; //分组大小不能超过总设备限制 if (this->GroupDeviceCnt[GroupIndex] > this->GroupSize[GroupIndex]) return false; if (this->GroupDeviceCnt[GroupIndex] == this->GroupSize[GroupIndex]) //需要扩容 { if (DeviceGroupExpansion(GroupIndex) == false) //扩容失败,返回 { SYS_LOG.Write(__FILE__ + __LINE__ + "分组扩容失败,分组索引" + GroupIndex+"\r\n"); return false; } SYS_LOG.Write("分组" + GroupIndex + "扩容成功\r\n"); } this->pDeviceDataGroupPointerBuff[GroupIndex][this->GroupDeviceCnt[GroupIndex]] = DeviceDataPointer; //保存当前设备指针 this->GroupDeviceCnt[GroupIndex] ++; //分组内的设备数量增加 return true; }
//为指定的分组进行扩容,会先复制缓冲区,然后重新申请,再释放之前的缓冲区(注意:使用分组索引只能在一个线程中使用) //必须分组已经满了再调用 bool DeviceGroupExpansion(int GroupIndex) { if (GroupIndex < 0 || GroupIndex >= DEVICE_GROUP_CNT) return false; if (this->GroupSize[GroupIndex] >= DEVICE_MAX_CNT) return false; //分组大小不能超过总设备限制 DWORD *p = this->pDeviceDataGroupPointerBuff[GroupIndex]; //先复制之前的指针 DWORD Size = this->GroupSize[GroupIndex]; //记录之前分组容量 this->GroupSize[GroupIndex] += GROUP_DEVICE_INC; //当前分组容量增加 this->pDeviceDataGroupPointerBuff[GroupIndex] = new DWORD[this->GroupSize[GroupIndex]]; //重新为当前分组申请内存 memcpy(this->pDeviceDataGroupPointerBuff[GroupIndex], p, Size*sizeof(DWORD)); //将之前的分组设备信息拷贝到新缓冲区中 USER_DELTE(p); //释放掉之前的旧缓冲区 return true; }