前言
最近一个物联网项目需要用到MQTT, 写一个Demo测试下~
一、环境介绍
- JAVA
Intellij IDEA 2022.3.3 (Ultimate Edition)
jdk-1.8.0_77
- Ubuntu
Linux version 5.10.110 (root@seven-HP-ZHAN-99-Pro-G1-MT) (aarch64-none-linux-gnu-gcc (GNU Toolchain for the A-profile Architecture 10.3-2021.07 (arm-10.29)) 10.3.1 20210621, GNU ld (GNU Toolchain for the A-profile Architecture 10.3-2021.07 (arm-10.29)) 2.36.1.20210621) #11 SMP Fri Feb 10 18:15:24 CST 2023
openjdk version "11.0.18" 2023-01-17
OpenJDK Runtime Environment (build 11.0.18+10-post-Ubuntu-0ubuntu120.04.1)
OpenJDK 64-Bit Server VM (build 11.0.18+10-post-Ubuntu-0ubuntu120.04.1, mixed mode)
二、设备介绍
三、前期准备
1.部署MQTT服务器(有MQTT服务器的可以跳过)
1.1下载开源免费的MQTT服务
我这里用的是EMQX,EMQX官网有更详细的教程,这里就不赘述了下载 EMQXhttps://www.emqx.io/zh/downloads?os=Windows
1.2网页登录控制台
浏览器输入 http://127.0.0.1:18083/即可进入控制台
1.2.1浏览器无法进入http://127.0.0.1 解决办法:
打开控制面板--程序和功能--启动和关闭windows功能--安装IIS服务
点击确定后重启设备
重启后浏览器输入 http://127.0.0.1 , 能看到这样的就可以了,再次输入MQTT控制台网址即可
1.3测试发送消息
进入webSocket , 可以用来调试mqtt . 也可以用Easy-to-Use Online MQTT Client调试Easy-to-Use Online MQTT Client | Try NowOnline MQTT 5.0 client on the web, using MQTT over WebSocket to connect to the MQTT Broker and test message publishing and receiving in the browser.http://www.emqx.io/online-mqtt-client#/
Easy-to-Use Online MQTT Client截图:
能收到消息回复说明服务器已经搭建成功, 可以编写自己的程序测试了~
2.创建JAVA程序, 并引入依赖库
pom.xml中加入依赖:
<dependencies>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.4.6</version>
</dependency>
</dependencies>
四、编写java程序
代码如下(示例):
编写工具类MQTTUtils.java
需要根据自己环境调整 /** * mqtt服务器的地址和端口号 */ private String serviceURI = "tcp://127.0.0.1:1883"; /** * 客户端Id */ private final String clientId = "ARMT" + (int) (Math.random() * 1000000);
完整代码:
package org.example;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MQTTUtils {
private MqttClient mqttClient;
//TODO 需要根据自己环境调整
/**
* mqtt服务器的地址和端口号
*/
private String serviceURI = "tcp://127.0.0.1:1883";
/**
* 客户端Id
*/
private final String clientId = "ARMT" + (int) (Math.random() * 1000000);
/**
* 客户端connect连接mqtt服务器
*
* @param user 用户名
* @param pwd 密码
* @param mqttCallback 回调函数 , 不为null
**/
public void mqttClientConnect(String user, String pwd, MqttCallback mqttCallback) throws MqttException {
MqttConnectOptions options = mqttConnectOptions(user, pwd);
if (mqttCallback == null) {
throw new RuntimeException("MqttCallback is null");
}
mqttClient.setCallback(mqttCallback);
mqttClient.connect(options);
}
/**
* MQTT设置参数设置
*
* @param user 用户名
* @param pwd 密码
* @return MqttConnectOptions
*/
private MqttConnectOptions mqttConnectOptions(String user, String pwd) throws MqttException {
mqttClient = new MqttClient(serviceURI, clientId, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(user);
options.setPassword(pwd.toCharArray());
//设置超时时间,单位为秒
options.setConnectionTimeout(30);
//自动重连,也可以自定义自动重连
options.setAutomaticReconnect(true);//默认:false
//是否清空session,设置为false表示服务器会保留客户端的连接记录,客户端重连之后能获取到服务器在客户端断开连接期间推送的消息
//设置为true表示每次连接到服务端都是以新的身份
options.setCleanSession(false);
//设置心跳时间
options.setKeepAliveInterval(60);
return options;
}
/**
* 关闭MQTT连接
*/
public void close() throws MqttException {
mqttClient.close();
mqttClient.disconnect();
}
/**
* 往主题发布消息 默认qos==0 “至多一次”
*
* @param topic:发布的主题
* @param msg:发布的消息
*/
public void publish(String topic, String msg) throws MqttException {
publish(topic, msg, 0);
}
/**
* 往主题发布消息
*
* @param topic: 发布的主题
* @param msg: 发布的消息
* @param qos: 消息质量 Qos:0、“至多一次”,会发生消息丢失
* 1、“至少一次”,确保消息到达,但消息重复可能会发生
* 2、“只有一次”,确保消息到达一次
*/
public void publish(String topic, String msg, int qos) throws MqttException {
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(qos);
mqttMessage.setPayload(msg.getBytes());
MqttTopic mqttTopic = mqttClient.getTopic(topic);
MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
token.waitForCompletion();
}
/**
* 订阅主题 ,默认的的Qos等级为:0
*
* @param topic 主题
*/
public void subscribe(String topic) throws MqttException {
subscribe(topic, 0);
}
/**
* 订阅某一个主题,可携带Qos
*
* @param topic 所要订阅的主题
* @param qos 消息质量:0、1、2
*/
public void subscribe(String topic, int qos) throws MqttException {
mqttClient.subscribe(topic, qos);
}
}
mian.java中调用:
//初始化
mqttUtils = new MQTTUtils();
//TODO 我自己部署的服务器没改账号密码, 默认就是admin public ,
mqttUtils.mqttClientConnect("admin", "public", new MqttCallback() {
@Override
public void connectionLost(Throwable throwable) {
System.out.println("与服务器断开连接,可重连");
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
label.setText("<html>主题: "+topic+" Qos: "+mqttMessage.getQos()+" 内容:"+new String(mqttMessage.getPayload())+" </html>");
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
IMqttAsyncClient client = iMqttDeliveryToken.getClient();
System.out.println(client.getClientId()+" 发布消息成功!");
}
});
订阅主题:
//订阅主题testtopic mqttUtils.subscribe("testtopic");
发送消息:
//往主题"testtopic"发送一串随机字符 mqttUtils.publish("testtopic", "ARMT.user" + (int) (Math.random() * 1000000));
完整main.java代码:
package org.example;
import org.eclipse.paho.client.mqttv3.*;
import javax.swing.*;
import java.awt.*;
import java.awt.event.ActionEvent;
import java.awt.event.ActionListener;
public class Main {
private static MQTTUtils mqttUtils;
private static JLabel label;
public static void main(String[] args) throws MqttException {
mqttUtils = new MQTTUtils();
//TODO 我自己部署的服务器没改账号密码, 默认就是admin public ,
mqttUtils.mqttClientConnect("admin", "public", new MqttCallback() {
@Override
public void connectionLost(Throwable throwable) {
System.out.println("与服务器断开连接,可重连");
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
label.setText("<html>主题: "+topic+" Qos: "+mqttMessage.getQos()+" 内容:"+new String(mqttMessage.getPayload())+" </html>");
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
IMqttAsyncClient client = iMqttDeliveryToken.getClient();
System.out.println(client.getClientId()+" 发布消息成功!");
}
});
//订阅主题
mqttUtils.subscribe("testtopic");
createAndShowGUI();
}
/**
* 往主题发送消息
* @throws MqttException
*/
private static void publishMsg() throws MqttException {
//往主题"testtopic"发送一串随机字符
mqttUtils.publish("testtopic", "ARMT.user" + (int) (Math.random() * 1000000));
}
/**
* 创建UI
*/
private static void createAndShowGUI() {
JFrame.setDefaultLookAndFeelDecorated(true);
JFrame frame = new JFrame("Mqtt测试");
//设置弹窗大小
frame.setLocation(200, 200);
JPanel panel = new JPanel();
panel.setPreferredSize(new Dimension(600, 300));
panel.setLayout(null);
// 添加面板
JButton addBtn = new JButton("往主题发送消息");
addBtn.addActionListener(e -> {
try {
label.setText("");
publishMsg();
} catch (MqttException ex) {
ex.printStackTrace();
}
});
addBtn.setBounds(35, 25, 175, 30);
panel.add(addBtn);
label = new JLabel();
label.setLocation(35, 80);
label.setSize(535, 300);
label.setVerticalAlignment(SwingConstants.TOP);
label.setBackground(Color.WHITE);
panel.add(label);
frame.add(panel);
// 显示窗口
frame.pack();
frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
frame.setVisible(true);
}
}
完整源码:
总结:
以上是纯java代码,在各种操作系统上都能运行