AcitveMQ springboot 实现

1.pom文件

 <parent>
        <artifactId>spring-boot-starter-parent</artifactId>
        <groupId>org.springframework.boot</groupId>
        <version>1.5.11.RELEASE</version>
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-test</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>4.3.15.RELEASE</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.10</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.10</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

2.重要的类

ConcectionFactory : 连接工厂,提供连接池,有俩个实现类,分别是SingleConnectionFactory和CacheConnectionFactory

JmsTemplate: 用于接收和发送消息的模板类,线程安全的。

MessageListerner: 消息监听器,用来监听消息,只有一个OnMessage方法,参数为Message类型。

3.代码实现

进行连接工厂的配置以及创建destination:

@Configuration
public class JmsConfig {

    @Bean
    public ConnectionFactory getConnectionFactory(){
        ConnectionFactory factory = new  ActiveMQConnectionFactory("tcp://localhost:61616");
        //需要添加可信任序列化的列表,否则不能向消息中添加对象
        ((ActiveMQConnectionFactory) factory).setTrustAllPackages(true);
        return factory;
    }

    //创建一个queue
    @Bean()
    public Queue getQueue(){
        return new ActiveMQQueue("test-queue");
    }
    //创建一个topic

    @Bean
    public Topic getTopic(){
        return new ActiveMQTopic("test-topic");
    }
}

消息的容器,Person类:

@Data
@AllArgsConstructor
@NoArgsConstructor
//这里必须实现序列化,因为ActiveMQ是通过Java序列化将存储对象的,不实现会抛异常
public class Person implements Serializable {

    //保险起见,我们创建序列化ID
    private static final long serialVersionUID = -5809782578272943999L;

    private String name;

    private Integer age;

}

进行生产者的创建:


@Service
public class Produce {

    @Autowired
    private JmsMessagingTemplate template;
    
    //p2p模式
    public void sendQueue() throws JMSException {
        for (int i = 1; i< 100;i++)
        //有俩个参数,第一个为destination的名字,第二个为消息内容
        template.convertAndSend("test-queue", new Person("name" + i, i));
    }
        
    //topic模式
    public void sendTopic() throws JMSException, InterruptedException {
        for (int i = 1; i< 100;i++)
        template.convertAndSend("test-topic", new Person("name" + i, i));
    }
}

生产者:

@Service
public class Customer {
    @Autowired
    JmsMessagingTemplate template;

    //SpringBoot启动扫描到这个注解会帮助我们创建一个监听器,来监听该destination的消息,方法参数为Message。如果不使用这个注解的话,我们可以自己定义一个定时任务来进行消息的接收,然后使用receiveQueue这个方法的重载来获取某个目标的消息
    @JmsListener(destination = "test-queue")
    public void receiveQueue(Message<Person> message){
        System.out.println(message.getPayload());
    }

    @JmsListener(destination = "test-topic")
    public void reciveTopic(Message<Person> message){
        System.out.println(message.getPayload());
    }

}

测试类:

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class Test01 {

    @Autowired
    private Produce produce;

    @Autowired
    private Customer customer;

    @Test
    public void testSendQueue() throws JMSException {
        produce.sendQueue();
    }

    @Test
    public void testSendTopic() throws JMSException, InterruptedException {
        produce.sendTopic();
    }
}

可以从控制台上看到输出的内容。

源码:github地址

猜你喜欢

转载自blog.csdn.net/qq_39158142/article/details/81416679