上一篇文章中《ActiveMQ系列(一)Linux环境下载与安装》,我们已经在linux环境中部署好了ActiveMQ,本文开始实现消息队列的生产和消费,订阅类型的,放在后文中。
maven依赖
新建springboot的过程不再赘述,这里在pom文件中直接以用maven依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
yml配置
spring:
activemq:
broker-url: tcp://192.168.17.101:61616
user: admin
password: admin
jms:
pub-sub-domain: false #默认false = Queue true = Topic
#自己定义的队列名称
myqueue: boot-activemq-queue
配置bean
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.stereotype.Component;
import javax.jms.Queue;
@Component
@EnableJms
public class ActiveMqConfigBean {
@Value("${myqueue}")
private String myQueue;
@Bean
public Queue queue() {
return new ActiveMQQueue(myQueue);
}
}
生产者
@Component
public class QueueProduce {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Queue queue;
public void produceMsg() {
jmsMessagingTemplate.convertAndSend(queue, "*****" + UUID.randomUUID().toString().substring(0, 6));
}
}
消费者
消费者,应该新建一个单独的项目,类似生产者,这里不再赘述,然后编写消费者,
@Component
public class QueueConsumer {
@JmsListener(destination = "${myqueue}")
public void receive(TextMessage textMessage) throws JMSException {
System.out.println("消费者消费消息:" + textMessage.getText());
}
}
单元测试
1.编写生产者的单元测试方法,运行后回向队列中生产一条消息,
@SpringBootTest(classes = DemoApplication.class)
@RunWith(SpringJUnit4ClassRunner.class)
public class QueueProduceTest {
@Resource
private QueueProduce queueProduce;
@Test
public void produceMsg() {
queueProduce.produceMsg();
}
}
可以发现,有一条带消费的消息在队列中,其中表格中每列的含义如下:
- name :队列名称
- Number Of Pending Messages :队列中待消费的消息数目,=总接受数 - 出队数 ;
- Number Of Consumers :消费者端的消费者数目
- Messages Enqueued :已经入队的消息数目,包括已经出队的,数目只增不减;
- Messages Dequeued :消费者已经消费的消息数目
2.然后运行消费者的项目,
可以发现,
因为消费者服务还在运行,所以消费者数目为1;
总入队的消息数目是1;
有1条消息已经被消费;