springBoot整合activeMQ,并绑定生产者ip

前言

因为物联网项目的毕业设计需要,最好使用mqtt协议通信。
有两个需求:
1.获得物联网设备的ip(生产者ip)
2.使用通配符订阅Queue(因为设备很多,而且是动态新增,主题是不确定的)

springBoot整合

1.依赖

 <dependency>
     <groupId>org.messaginghub</groupId>
     <artifactId>pooled-jms</artifactId>
 </dependency>
 <dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-activemq</artifactId>
 </dependency>
 <!--消息队列连接池-->
 <dependency>
     <groupId>org.apache.activemq</groupId>
     <artifactId>activemq-pool</artifactId>
 </dependency>

2.配置参数

#activeMQ
spring.activemq.broker-url=tcp://localhost:61616
#true 表示使用内置的MQ,false则连接服务器
spring.activemq.in-memory=false
#true表示使用连接池;false时,每发送一条数据创建一个连接,内存销毁巨大
spring.activemq.pool.enabled=true
#连接池最大连接数
spring.activemq.pool.max-connections=10

3.配置转化器

springBoot的自动配置默认使用的转化器是在监听到消息时转化为字符串,为了拿到更多信息,我们可以自己定义转化器,转化器需要实现MessageConverter 接口,toMessage方法在发送前会执行,fromMessage方法在监听回调前会执行

@Component
public class IOTMQConverter implements MessageConverter {
    @Autowired
    private ObjectMapper objectMapper;
    @Override
    public Message toMessage(Object o, Session session) throws JMSException, MessageConversionException {
        try {
            String ret = objectMapper.writeValueAsString(o);
            return session.createTextMessage(ret);
        }catch (Exception e){
            return null;
        }
    }

    @Override
    public Object fromMessage(Message message) throws JMSException, MessageConversionException {
        ActiveMQTextMessage textMessage = (ActiveMQTextMessage) message;
        return textMessage;
    }
}

然后使用自己定义的convert创建一个DefaultJmsListenerContainerFactory的bean对象就可以了

@Configuration
@AutoConfigureAfter(ActiveMQAutoConfiguration.class)
public class MQTTConfig{
    @Bean
    public JmsListenerContainerFactory queueContainer(ConnectionFactory connectionFactory, IOTMQConverter iotmqConverter){
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setPubSubDomain(Boolean.FALSE);//支持queue
        factory.setMessageConverter(iotmqConverter);
        return  factory;
    }
}

4.生产消息

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;
    
    public void solve(String queueName, String message){
        jmsMessagingTemplate.convertAndSend(queueName,message);
    }

只需要注入自动生成的jmsMessagingTemplate,使用convertAndSend方法即可完成生产

5.通配符订阅消息

activemq支持使用相同前缀,以.结束加上通配符的名称作为主题名来批量订阅(实现原理是他的虚拟主题)。如下订阅 A.*的所有主题(A.1,A.b,A.2,A.ss,A.xxx…)。此处的方法的参数,就是convert的fromMessage最终返回的对象,方法的参数类型需要兼容该对象(是其类型或父类),否则此条消息不会被消费

@Component
public class JmsListener{
    @JmsListener(destination = "A.*")
    public void solve(ActiveMQTextMessage message){
        //todo
    }
}

绑定生产者ip

activeMQ提供通过plugin扩展开发的方式,来在mq服务端做一些开发扩展。其使用的是责任链的设计模式,需要先定义一个实现BrokerPlugin的类,其只有一个 installPlugin方法等待重写,在里面使用自己定义的broker过滤器包装当前broker。
自己定义的broker过滤器需要继承BrokerFilter,然后可以重写其中的方法,在消息的不同阶段进行回调处理。创建一个新的maven工程,添加activemq的依赖(可以直接使用all),我这里因为自己要用序列化还用了阿里巴巴的fastjson

插件工程依赖

 <dependency>
     <groupId>org.apache.activemq</groupId>
     <artifactId>activemq-all</artifactId>
     <version>5.15.12</version>
 </dependency>
 <dependency>
     <groupId>com.alibaba</groupId>
     <artifactId>fastjson</artifactId>
     <version>1.2.61</version>
 </dependency>
public interface BrokerPlugin {
    Broker installPlugin(Broker broker) throws Exception;
}
public interface Broker extends Region, Service {

    Broker getAdaptor(Class<?> type);

    BrokerId getBrokerId();

    String getBrokerName();

    void addBroker(Connection connection, BrokerInfo info);

    void removeBroker(Connection connection, BrokerInfo info);

    void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception;

    void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception;

    void addSession(ConnectionContext context, SessionInfo info) throws Exception;

    void removeSession(ConnectionContext context, SessionInfo info) throws Exception;

    @Override
    void addProducer(ConnectionContext context, ProducerInfo info) throws Exception;

    @Override
    void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception;
   	.....
}

public class BrokerFilter implements Broker {

    protected final Broker next;

    public BrokerFilter(Broker next) {
        this.next = next;
    }

    public Broker getNext() {
        return next;
    }
    .....
}

因为我们要完成对ip的绑定,主要在生产的时候,所以重写他的send方法

public class IpPlugin implements BrokerPlugin {
    static class IpBindFilter extends BrokerFilter {

        public IpBindFilter(Broker next) {
            super(next);
        }

        @Override
        public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
            if(!(messageSend instanceof ActiveMQTextMessage)){
                throw new RuntimeException("not allow,must be json string message");
            }
            String remoteAddr = producerExchange.getConnectionContext().getConnection().getRemoteAddress();//tcp://127.0.0.1:61468
            String ip = remoteAddr.substring(6,remoteAddr.lastIndexOf(':'));
            ActiveMQTextMessage textMessage = (ActiveMQTextMessage) messageSend;
            String content = textMessage.getText();
            JSONObject jsonObj = JSON.parseObject(content);
            jsonObj.put("producerIp",ip);
            textMessage.setText(jsonObj.toJSONString());
            super.send(producerExchange, textMessage);
        }
    }
    public Broker installPlugin(Broker broker) throws Exception {
        return new IpBindFilter(broker);
    }
}

这样就完成了插件的开发,接下来需要通过maven打包,将jar包,以及插件除了activeMQ的其他依赖的jar包,都放到activemq的lib目录下(比如:本工程用到fastjson),然后修改activemq的conf目录中的activemq.xml,在broker标签中的plugins标签中,加入以下内容(plugins标签不存在就手动创建),bean的id随便写,class包名不要错就行

activemq.xml

<plugins>
	<bean xmlns="http://www.springframework.org/schema/beans" id="ipPlugin" class="iot.plugin.IpPlugin"/>
</plugins>

至此就完成了插件的安装和配置,重启activemq就可以了。
附生产测试代码:

public class Main {
    public static void main(String[] args) throws JMSException {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
        //2、使用连接工厂创建一个连接对象
        Connection connection = connectionFactory.createConnection();
        //3、开启连接
        connection.start();
        //4、使用连接对象创建会话(session)对象
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
        Queue queue = session.createQueue("myQueue");
        //6、使用会话对象创建生产者对象
        MessageProducer producer = session.createProducer(queue);
        //7、使用会话对象创建一个消息对象
        TextMessage textMessage = session.createTextMessage("{author:\"eetal\"}");
        //8、发送消息
        producer.send(textMessage);
        //9、关闭资源
        producer.close();
        session.close();
        connection.close();
    }
}

测试结果,把ip带过来了
测试结果
这里我上传了一份到gitee上——gitee上的demo仓库
欢迎找歪歪梯聊骚

发布了35 篇原创文章 · 获赞 12 · 访问量 5010

猜你喜欢

转载自blog.csdn.net/weixin_44627989/article/details/105370251