MQTT简介:
MQTT是一种基于发布/订阅模式的轻量级物联网消息传输协议 ,可在严重受限的硬件设备和低带宽、高延迟的网络上实现稳定传输。它凭借简单易实现、支持 QoS、报文小等特点,占据了物联网协议的半壁江山。
下面主要介绍如何在 Flutter 项目中使用 MQTT,实现客户端与 MQTT 服务器的连接、订阅、取消订阅、收发消息等功能。
1.添加MQTT依赖库
//首先在pubspec.yaml文件中添加MQTT依赖库
mqtt_client: ^9.6.6 #mqtt
//在页面中引入
import 'package:mqtt_client/mqtt_client.dart';
import 'package:mqtt_client/mqtt_server_client.dart';
2.配置
// 连接状态
bool connectStatus = false;
final sendMeassage = MqttClientPayloadBuilder();
String payload = "";
// 服务器地址,端口号
final client = MqttServerClient('服务器地址', '端口号');
// 设置默认主题
String topic = "Visitor";
3.参数配置
// 心跳时间 保持一分钟连接
client.keepAlivePeriod = 60;
// 设置协议版本,默认是3.1,根据服务器需要的版本来设置
client.setProtocolV311();
// 是否打印mqtt日志信息
client.logging(on: false);
// 连接断开回调
client.onDisconnected = onDisconnected;
// 订阅成功回调
client.onSubscribed = onSubscribed;
// 取消订阅回调
client.onUnsubscribed = _onUnSubscribed;
// 默认clientID
client.clientIdentifier = clientId;
final connMess = MqttConnectMessage();
//设置will消息的qos模式
connMess.withWillQos(MqttQos.atLeastOnce);
4.连接
//连接MQTT服务器
try {
await client.connect();
connectMessage = "你好";
} on Exception catch (e) {
print('---------连接失败: - ${e.toString()}');
client.disconnect();
connectMessage = e.toString();
}
/// 检测连接情况
if (client.connectionStatus!.state == MqttConnectionState.connected) {
connectStatus = true;
// 连接成功的时候订阅下自己
subscribe(clientId);
} else {
client.disconnect();
}
}
5.发送和接收消息
// 订阅和监听
subscribe(String topic) {
print("订阅主题:${topic}+_sub,Qos=${MqttQos.exactlyOnce}");
client.subscribe('${topic}_sub', MqttQos.exactlyOnce);
client.updates!.listen((dynamic event) {
var recvMessage = event[0].payload as MqttPublishMessage;
//二进制格式的消息
// bytes = recvMessage.payload.message;
// 字符串格式的消息
var message = Utf8Decoder().convert(recvMessage.payload.message);
if (message != '') {
setState(() {
messageList.insert(0, jsonDecode(message));
// 上拉
autoSlide();
});
}
// print('订阅主题 <${c[0].topic}>, 收到消息 <-- $receive -->');
//如果app处于后台运行之后
if (!pageState) {
countMessage++;
Get.snackbar("提示", "未读消息" + countMessage.toString() + "条",
duration: Duration(hours: 1000));
}
});
}
// 发布消息
publish(String send, topic) async {
// 使用MqttClientPayloadBuilder构造消息内容,然后使用MqttServerClient.publishMessage()发送消息
var builder = MqttClientPayloadBuilder();
//发送JSON数据
builder.addUTF8String(send);
print("发布主题:${topic},Qos=${MqttQos.exactlyOnce},payload=${send}");
setState(() {
messageList.insert(0, jsonDecode(send));
autoSlide();
});
await client.publishMessage(topic, MqttQos.exactlyOnce, builder.payload!);
}
//点击发送按钮发布消息
onPressed: () {
if (_controller.text == '') return;
Map sendObj = {};
sendObj['Content'] = _controller.text;
sendObj['ContentType'] = "string";
sendObj['ClientId'] = clientId;
sendObj['Date'] = formatDate(DateTime.now(), [
yyyy,"-",mm,"-",dd," ",HH,":",nn,":",ss]);
sendObj['MessageType'] = "chat";
sendObj['UserName'] = box.read("appCreate");
//发布消息
publish(json.encode(sendObj), clientId + "_pub");
setState(() {
_controller.text = '';
});
},
6.其它回调
// 取消订阅回调
unsubscribe(String topic) {
client.unsubscribe(topic);
}
/// 订阅成功回调
void onSubscribed(String topic) {
print('订阅主题成功------ $topic');
}
// 连接断开回调
void onDisconnected() {
print('取消连接成功');
}
// 取消订阅回调
_onUnSubscribed(String? topic) {
print("_取消订阅主题成功---topic:$topic");
}
7.页面初始化和销毁
@override
void initState() {
// TODO: implement initState
super.initState();
//1.连接注册mqtt
// print("打开页面visitorRegister.connectStatus=${connectStatus}");
if (!connectStatus) {
getVisitorRegister();
messageList.insert(0, {
'Content': '您好,这里是xxx智能客服',
'ContentType': 'string',
// 'ClientId': clientId,
'MessageType': 'chat',
'UserName': '客服',
'Date': formatDate(
DateTime.now(), [yyyy, "-", mm, "-", dd, " ", HH, ":", nn, ":", ss])
});
// autoSlide();
}
//2.页面初始化的时候,添加一个状态的监听者
WidgetsBinding.instance?.addObserver(this);
}
@override
void dispose() {
super.dispose();
//3. 页面销毁时,移出监听者
WidgetsBinding.instance?.removeObserver(this);
if (client != null) {
client.disconnect();
}
}
订阅和发布的主题要跟后端沟通好,多测试几次基本就能通