一、业务场景
提示:对于Broker(MQTT服务器)来说,不论我们是发布方,还是订阅方,都是属于客户端
硬件方面将采集的数据上报至MQTT服务器,我们平台(自己的WEB服务)将订阅到的消息存储数据库,如何用JAVA作为客户端订阅消息在我的上一篇文章中有讲解如何使用JAVA编写MQTT客户端连接MQTT服务器。
好多朋友在看官网的时候对于这一点理解有点偏差,官网上面说开源版本不支持消息存储数据库,主要表达的意思是MQTT服务端不会自动将收到的消息存储数据库(当企业版的是可以配置后,MQTT服务端自动将收到的消息存储数据库),但是我们可以自己编写代码去实现这个功能,并不麻烦的。
二、解决方案
基于以上问题,针对开源版本,有两种方式将订阅到的消息存储数据库,如下:
方式一、我们可以自己编写插件,采用EMQX支持自定义插件扩展,但是个人不建议采用自定义插件,不灵活,比较麻烦,我是自己在回调函数中将订阅收到的消息存储数据库。
方式二、下面主要讲解如何在回调函数中调用我们的service服务,将订阅到的数据存储数据库
1、首先,采用Springboot搭建项目框架,编写MQTT客户端,在我的上一章节中有详细代码介绍JAVA编写MQTT客户端连接MQTT服务端。
2、如何在回调函数中调用我们的service服务存储数据,可能好多朋友直接在回调函数中使用注解形式@Autowired,就像我们在controller中那样直接调用service服务,但是实际用的时候发现注解注入的service服务是空的,给你报java.lang.NullException,然后MQTT就断开连接了。
其实采用注解形式是可以获取到service服务的,那是因为我们的编码不当造成的。下面给出一种采用ApplicationContext获取Spring是上下文的方式可以得到我们的service服务,然后就是对应的service调用操作数据库的方法了。
package com.siborui.am.util;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
/**
* Spring工具类,获取Spring上下文对象等
*
* @author Mr.Qu
* @since 2020/1/9 16:26
*/
@Component
public class SpringContextUtil implements ApplicationContextAware {
private static ApplicationContext applicationContext = null;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
if(SpringContextUtil.applicationContext == null){
SpringContextUtil.applicationContext = applicationContext;
}
}
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
public static Object getBean(String name){
return getApplicationContext().getBean(name);
}
public static <T> T getBean(Class<T> clazz){
return getApplicationContext().getBean(clazz);
}
public static <T> T getBean(String name,Class<T> clazz){
return getApplicationContext().getBean(name, clazz);
}
}
以上是我编写的一个操作ApplicationContext获取spring的上下文工具类,可以直接在回调函数中调用这个工具类的getBean()方法,例如:
package com.siborui.dc.mqtt;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
* MQTT回调函数
*
* @author Mr.Qu
* @since 2020/1/9 16:26
*/
@Slf4j
public class Callback implements MqttCallback {
/**
* MQTT 断开连接会执行此方法
*/
@Override
public void connectionLost(Throwable throwable) {
log.info("断开了MQTT连接 :{}", throwable.getMessage());
log.error(throwable.getMessage(), throwable);
}
/**
* publish发布成功后会执行到这里
*/
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
log.info("发布消息成功");
}
/**
* subscribe订阅后得到的消息会执行到这里
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// TODO 此处可以将订阅得到的消息进行业务处理、数据存储
log.info("收到来自 " + topic + " 的消息:{}", new String(message.getPayload()));
MqttDataService mqttDataService=SpringContextUtil.getBean(MqttDataService.class);
mqttDataService.save(new String(message.getPayload()));
}
}
主要是:MqttDataService mqttDataService = SpringContextUtil.getBean(MqttDataService.class);获取到我们的service服务。
三、结尾
有不懂的地方随时留言,感觉这篇文章对你有用的小伙伴,可以打赏个奶茶辛苦钱