AcitveMQ+spring整合

项目结构

添加依赖:

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
	<groupId>org.apache.activemq</groupId>
	<artifactId>activemq-pool</artifactId>
	<!-- <version>5.7.0</version> -->
</dependency>

配置文件:

server.port=8080
spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.in-memory=true
#如果此处设置为true,需要加如下的依赖包,否则会自动配置失败,报JmsMessagingTemplate注入失败
#<dependency>
#<groupId>org.apache.activemq</groupId>
#<artifactId>activemq-pool</artifactId>
#</dependency>
#这里我没开启了连接池, 默认是不开的
spring.activemq.pool.enabled=false
#spring.activemq.pool.max-connections=10

点对点模式:

生产者:

@Component
public class Producer {
    @Autowired
    // 也可以注入JmsTemplate,JmsMessagingTemplate对JmsTemplate进行了封装
    private JmsMessagingTemplate jmsMessagingTemplate;

    // 发送消息,destination是发送到的队列,message是待发送的消息
    public void sendMessage(Destination destination, final String message){
        jmsMessagingTemplate.convertAndSend(destination,message);
    }
    //接受comsumer返回的消息
    @JmsListener(destination = "out.queue1")
    public void consumereMessage(String text){
        System.out.println("从consumer队列返回消息:"+text);
    }
}

消费者:

@Component
public class Consumer {
    // 使用JmsListener配置消费者监听的队列,其中text是接收到的消息
    @JmsListener(destination = "mytest.queue1")
    //注解@SendTo("out.queue"),该注解的意思是将return回的值,再发送到"out.queue"队列中,该队列中的消息,就是我们返回的值!
    @SendTo("out.queue1")
    public String receiveMessage(String text){
        System.out.println("Consumer收到的报文"+text);
        return "return message" + text;
    }
}

测试类:

public class ProducerTest extends DemoApplicationTests{
    @Autowired
    private Producer producer;
    @Test
    public void sendMessage() throws InterruptedException {
        Destination destination = new ActiveMQQueue("mytest.queue");
        for (int i=0 ;i<10 ;i++){
            producer.sendMessage(destination,"你好,我是傻逼");
            Thread.sleep(2000);
        }
    }
}

Topic模式:只需要修改配置文件和测试类

添加配置:

#activeMQ默认提供ptp模式,若要使用topic模式需要假如最后一个配置为true
spring.jms.pub-sub-domain=true

修改测试类:

Topic destination = new ActiveMQTopic("mytest.topic");

猜你喜欢

转载自blog.csdn.net/qq_26857649/article/details/83057557