前言
同学面试大疆音视频驱动,问了很多Linux驱动细节和MQTT底层实现,被问麻了。我根据韦东山老师讲解MQTT的笔记补充和拓展了很多实现代码。(侵删)
参考资料
-
kawaii-mqtt源码:
-
博客
-
APP
- https://mosquitto.org/download/
- https://mqttx.app/zh
1.使用
几条代码使用MQTT:
void my_message_handler_t(void* client, message_data_t* msg)
{
}
int main(void)
{
int err;
mqtt_client_t *client = NULL;
err = mqtt_connect(client);
err = mqtt_subscribe(client, "100ask-topic", QOS0, my_message_handler_t);
while (1);
}
从上述代码中,提2个问题:
答案:
2.mqtt内部实现
数据储存
mqtt_client结构体里有2个链表,放着处理消息的函数和ack包的处理函数。
有mqtt_read_buf和mqtt_write_buf两个buf放着需要处理的mqtt_massage_t数据
包的解析、发送ack回应
-
在mqtt_connect中创建了一个线程mqtt_yield_thread,它会使用mqtt_get_client_state时刻检测客户端的状态,如果客户端没连接上服务器,则暂停本线程并打印错误信息。如果连接上了,那就使用mqtt_yield来处理。
-
在mqtt_yield中不断确定有没有连接上服务器,用mqtt_packet_handle来处理数据包信息。 启动定时器。mqtt_ack_list_scan扫描ack列表,销毁已经超时的ack处理程序或重新发送它们 (如果超时没回应则订阅失败,在mqtt_subscribe中构造的消息处理函数在这销毁)
-
mqtt_packet_handle会辨识别包头的信息判断出是那种类型的包。调用各自包的处理函数。最后发送心跳包。
-
对发布消息包的解析,
-
构造设置mqtt_message_t结构体。
-
发送回应包给服务器,使用将数据传递给用户定义的函数
-
-
调用过程
mqtt_deliver_message
mqtt_get_msg_handler
/* 遍历msg_handler_list以找到匹配的消息处理程序 */
LIST_FOR_EACH_SAFE(curr, next, &c->mqtt_msg_handler_list) {
msg_handler = LIST_ENTRY(curr, message_handlers_t, list);
mqtt_new_message_data(&md, topic_name, message); /* make a message data */
msg_handler->handler(c, &md); /* deliver the message,调用用户定义的处理函数 */
订阅主题消息发送
- 使用mqtt_subscribe订阅主题
- 收到回应(ACK)包的处理
MQTT Client向Broker发出某些数据包时,期待得到回应(ACK):会启动一个定时器。如果定时器超时表示没有收到ACK:
- 要么重发
- 要么出错
- 对于ACK包,一般无需提供处理函数
要订阅某个主题时,MQTT Client会发出SUBCRIBE包,期待得到回应的数据包:SUBACK包。代码如下:
mqtt_subscribe
msg_handler = mqtt_msg_handler_create(topic_filter, qos, handler);
rc = mqtt_ack_list_record(c, SUBACK, packet_id, len, msg_handler);
/* create a ack handler node */
ack_handler = mqtt_ack_handler_create(c, type, packet_id, payload_len, handler);
platform_timer_cutdown(&ack_handler->timer, c->mqtt_cmd_timeout);
mqtt_list_add_tail(&ack_handler->list, &c->mqtt_ack_handler_list);
如果在指定时间里没有收到SUBACK包,那么就会在mqtt_ack_handler_list中删除该handler。
如果收到队列SUBACK包,那么要做两件事:
- 在mqtt_ack_handler_list中删除该handler
- 把该handler放到mqtt_msg_handler_list中:以后收到PUBLISH数据包时这个handler被调用
发布消息
- 在主函数里创建发布线程。发布线程构造消息调用mqtt_publish发布。
- mqtt_publish根据MQTT协议构造数据包 根据平台相关的函数发送数据包。
调用过程:
main
res = pthread_create(&thread1, NULL, mqtt_publish_thread, client);
mqtt_publish_thread
mqtt_publish(client, "topic1", &msg);
// 1. 构造消息
mqtt_message_t msg;
memset(&msg, 0, sizeof(msg));
msg.payload = (void *) buf;
msg.payloadlen = xxx;
mqtt_publish(client, "topic1", &msg);
// 1.1 根据MQTT协议构造数据包
MQTTSerialize_publish(c->mqtt_write_buf, c->mqtt_write_buf_size, 0, msg->qos, msg->retained, msg->id, topic, (uint8_t*)msg->payload, msg->payloadlen);
// 1.2 根据平台相关的函数发送数据包
mqtt_send_packet
network_write
nettype_tcp_write
platform_net_socket_write_timeout
发送接收心跳包
函数调用流程:
mqtt_yield_thread
mqtt_yield
mqtt_packet_handle(c, &timer);
mqtt_keep_alive(c);
- mqtt_keep_alive构造MQTT心跳包消息,并发送,client中的mqtt_ping_outstanding计时器加一;
- 判断mqtt_ping_outstanding是否等于0,如果不是那就是断开连接了。
- 当收到服务器PINGRESP推送包时(表示成功保持心跳),mqtt_ping_outstanding计时器清零。
3.流程图
结语
如果有出错,恳请网友指出,多谢!