相关文章
1.《MQTT协议分析总结(一)》
2.《MQTT协议分析总结(二)》
3.《【IoT】如何连接到百度IoT云》
4.《【FreeRTOS】基于STM32移植LWIP 2.1.2之MQTT》
1.前言
MQTT协议工作在TCP之上,端和代理之间通过交换预先定义的控制报文来完成通信。因为 MQTT 是应用层协议,所以它需要运行在LwIP协议上,我们就使用 Socket API来进行移植。
物接入(IoT Hub)是面向物联网领域开发者的全托管云服务,通过主流的物联网协议MQTT通讯,可以在智能设备与云端之间建立安全的双向连接,快速实现物联网项目。
2.下载MQTT源码
MQTT源码下载地址:https://github.com/eclipse/paho.mqtt.embedded-c
这个存储库包含用于嵌入式平台的Eclipse Paho MQTT C/ c++客户端库的源代码。有三个子项目:
- MQTTPacket:对MQTT数据包进行简单的反/序列化
- MQTTClient:高级C++客户端
- MQTTClient-C:高级C客户端(几乎是C++客户端的克隆)
我们这里是STM32平台,选择的是MQTTPacket项目代码。
3.移植MQTT协议
- 创建一个
MQTT
文件夹,再将MQTTPacket\src
目录下的文件添加到工程目录MQTT
文件夹。
- 再将
MQTTPacket\samples
目录下的transport.c
、transport.h
添加到这个文件夹下。
- 添加MQTT头文件路径
4. 修改transport.c文件
transport.c
文件是MQTT协议调用底层socket接口的API,基本上可以沿用sample code里面transport.c
,这里有一些微调。如下:
#include "transport.h"
#include "lwip/opt.h"
#include "lwip/arch.h"
#include "lwip/api.h"
#include "lwip/inet.h"
#include "lwip/sockets.h"
#include "string.h"
static int mysock;
/************************************************************************
** 函数名称: transport_sendPacketBuffer
** 函数功能: 以TCP方式发送数据
** 入口参数: unsigned char* buf:数据缓冲区
** int buflen:数据长度
** 出口参数: <0发送数据失败
************************************************************************/
int32_t transport_sendPacketBuffer( uint8_t* buf, int32_t buflen)
{
int32_t rc;
rc = write(mysock, buf, buflen);
return rc;
}
/************************************************************************
** 函数名称: transport_getdata
** 函数功能: 以阻塞的方式接收TCP数据
** 入口参数: unsigned char* buf:数据缓冲区
** int count:数据长度
** 出口参数: <=0接收数据失败
************************************************************************/
int32_t transport_getdata(uint8_t* buf, int32_t count)
{
int32_t rc;
//这个函数在这里不阻塞
rc = recv(mysock, buf, count, 0);
return rc;
}
/************************************************************************
** 函数名称: transport_open
** 函数功能: 打开一个接口,并且和服务器 建立连接
** 入口参数: char* servip:服务器域名
** int port:端口号
** 出口参数: <0打开连接失败
************************************************************************/
int32_t transport_open(int8_t* servip, int32_t port)
{
int32_t *sock = &mysock;
int32_t ret;
// int32_t opt;
struct sockaddr_in addr;
//初始换服务器信息
memset(&addr,0,sizeof(addr));
addr.sin_len = sizeof(addr);
addr.sin_family = AF_INET;
//填写服务器端口号
addr.sin_port = PP_HTONS(port);
//填写服务器IP地址
addr.sin_addr.s_addr = inet_addr((const char*)servip);
//创建SOCK
*sock = socket(AF_INET,SOCK_STREAM,0);
//连接服务器
ret = connect(*sock,(struct sockaddr*)&addr,sizeof(addr));
if(ret != 0)
{
//关闭链接
close(*sock);
//连接失败
return -1;
}
//连接成功,设置超时时间1000ms
// opt = 1000;
// setsockopt(*sock,SOL_SOCKET,SO_RCVTIMEO,&opt,sizeof(int));
//返回套接字
return *sock;
}
/************************************************************************
** 函数名称: transport_close
** 函数功能: 关闭套接字
** 入口参数: unsigned char* buf:数据缓冲区
** int buflen:数据长度
** 出口参数: <0发送数据失败
************************************************************************/
int transport_close(void)
{
int rc;
// rc = close(mysock);
rc = shutdown(mysock, SHUT_WR);
rc = recv(mysock, NULL, (size_t)0, 0);
rc = close(mysock);
return rc;
}
5. 添加mqttclient.c文件
mqttclient.c
文件主要是创建了2个线程:Receive Thread和Send Thread。专门负责接收并且解析MQTT服务器发送的消息和发送消息给MQTT服务器。
-
Receive Thread
Receive Thread主要的流程图如下:
Receive Thread代码如下:void Client_Connect(void) { char* host_ip; #if LWIP_DNS ip4_addr_t dns_ip; netconn_gethostbyname(HOST_NAME, &dns_ip); host_ip = ip_ntoa(&dns_ip); printf("host name : %s , host_ip : %s\r\n",HOST_NAME,host_ip); #else host_ip = HOST_NAME; #endif MQTT_START: //创建网络连接 printf("1.开始连接对应云平台的服务器...\r\n"); printf("服务器IP地址:%s,端口号:%0d!\r\n",host_ip,HOST_PORT); while(1) { //连接服务器 MQTT_Socket = transport_open((int8_t*)host_ip,HOST_PORT); //如果连接服务器成功 if(MQTT_Socket >= 0) { printf("连接云平台服务器成功!\r\n"); break; } printf("连接云平台服务器失败,等待3秒再尝试重新连接!\r\n"); //等待3秒 vTaskDelay(3000); } printf("2.MQTT用户名与密钥验证登录...\r\n"); //MQTT用户名与密钥验证登录 if(MQTT_Connect() != Connect_OK) { //重连服务器 printf("MQTT用户名与密钥验证登录失败...\r\n"); //关闭链接 transport_close(); goto MQTT_START; } //订阅消息 printf("3.开始订阅消息...\r\n"); //订阅消息 if(MQTTSubscribe(MQTT_Socket,(char *)TOPIC,QOS1) < 0) { //重连服务器 printf("客户端订阅消息失败...\r\n"); //关闭链接 transport_close(); goto MQTT_START; } //无限循环 printf("4.开始循环接收订阅的消息...\r\n"); } void mqtt_recv_thread(void *pvParameters) { uint32_t curtick; uint8_t no_mqtt_msg_exchange = 1; uint8_t buf[MSG_MAX_LEN]; int32_t buflen = sizeof(buf); int32_t type; fd_set readfd; struct timeval tv; //等待时间 tv.tv_sec = 0; tv.tv_usec = 10; MQTT_START: //开始连接 Client_Connect(); //获取当前滴答,作为心跳包起始时间 curtick = xTaskGetTickCount(); while(1) { //表明无数据交换 no_mqtt_msg_exchange = 1; FD_ZERO(&readfd); FD_SET(MQTT_Socket,&readfd); //等待可读事件 select(MQTT_Socket+1,&readfd,NULL,NULL,&tv); //判断MQTT服务器是否有数据 if(FD_ISSET(MQTT_Socket,&readfd) != 0) { //读取数据包--注意这里参数为0,不阻塞 type = ReadPacketTimeout(MQTT_Socket,buf,buflen,0); if(type != -1) { mqtt_pktype_ctl(type,buf,buflen); //表明有数据交换 no_mqtt_msg_exchange = 0; //获取当前滴答,作为心跳包起始时间 curtick = xTaskGetTickCount(); } } //这里主要目的是定时向服务器发送PING保活命令 if((xTaskGetTickCount() - curtick) >(KEEPLIVE_TIME/2*1000)) { curtick = xTaskGetTickCount(); //判断是否有数据交换 if(no_mqtt_msg_exchange == 0) { //如果有数据交换,这次就不需要发送PING消息 continue; } if(MQTT_PingReq(MQTT_Socket) < 0) { //重连服务器 printf("发送保持活性ping失败....\r\n"); goto CLOSE; } //心跳成功 printf("发送保持活性ping作为心跳成功....\r\n"); //表明有数据交换 no_mqtt_msg_exchange = 0; } } CLOSE: //关闭链接 transport_close(); //重新链接服务器 goto MQTT_START; }
-
Send Thread
Send Thread主要的流程图如下:
-
Send Thread
Send Thread代码如下:void mqtt_send_thread(void *pvParameters) { uint8_t res; /* 定义一个创建信息返回值,默认为pdTRUE */ BaseType_t xReturn = pdTRUE; /* 定义一个接收消息的变量 */ Stu_Sys_Data_TypeDef* recv_data; //初始化json数据 cJSON* cJSON_Data = NULL; cJSON_Data = cJSON_Data_Init(); while(1) { xReturn = xQueueReceive( MQTT_Data_Queue, /* 消息队列的句柄 */ &recv_data, /* 发送的消息内容 */ portMAX_DELAY); /* 等待时间 3000ms */ if(xReturn == pdTRUE) { //更新数据 res = cJSON_Update(cJSON_Data, STUDENT_ID, &recv_data->sid); res = cJSON_Update(cJSON_Data, HEIGHT_NUM, &recv_data->heightNum); res = cJSON_Update(cJSON_Data, WEIGHT_NUM, &recv_data->weightNum); if(UPDATE_SUCCESS == res) { //更新数据成功, char* p = cJSON_Print(cJSON_Data); //发布消息 MQTTMsgPublish(MQTT_Socket,(char*)TOPIC,QOS0,(uint8_t*)p); vPortFree(p); p = NULL; } else printf("update fail\r\n"); } } }
6.验证测试
最后验证测试成功,如下:
7.资料下载地址
移植成功的完整代码下载地址如下:
https://download.csdn.net/download/ZHONGCAI0901/14023611