建立测试项目
test-rabbitmq-producer/consumer
在pom的文件中添加依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>xc-framework-parent</artifactId>
<groupId>com.xuecheng</groupId>
<version>1.0-SNAPSHOT</version>
<relativePath>../xc-framework-parent/pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>test-rabbitmq-consumer</artifactId>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.3</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.springframework.boot</groupId>-->
<!-- <artifactId>spring-boot-starter-web</artifactId>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.springframework.boot</groupId>-->
<!-- <artifactId>spring-boot-starter-amqp</artifactId>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.springframework.boot</groupId>-->
<!-- <artifactId>spring-boot-starter-test</artifactId>-->
<!-- </dependency>-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
</dependencies>
</project>
生产者
package com.xuecheng.test.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Classname Producer01
* @Description rabbitmqd的入门程序
* @Date 2020/2/20 18:20
* @Created by mmz
*/
public class Producer01 {
//队列常量
private static final String QUEUE = "helloworld";
public static void main(String[] args) {
//通过连接工厂,创建新的连接
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置参数
connectionFactory.setHost("127.0.0.1");//ip地址
connectionFactory.setPort(5672);//端口
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//设置虚拟机,一个mqf服务可以设置多个虚拟机,每个虚拟机就相当于独立的mq
connectionFactory.setVirtualHost("/");
//生产者和mq建立连接
Connection connection = null;
try {
connection = connectionFactory.newConnection();
//创建会话通道,所有生产者和mq服务都在channel通道中
Channel channel = connection.createChannel();
//声明队列
//参数
channel.queueDeclare(QUEUE,true,false,false,null);
//发送消息
String message = "helloworld mmz";
channel.basicPublish("",QUEUE,null,message.getBytes());
System.out.println("send to mq" + message);
} catch (Exception e) {
e.printStackTrace();
} finally {
}
}
}
消费者
package com.xuecheng.test.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Classname Consumer01
* @Description 消费者
* @Date 2020/2/21 15:02
* @Created by mmz
*/
public class Consumer01 {
private static final String QUEUE = "helloworld";
public static void main(String[] args) throws IOException, TimeoutException {
//通过连接工厂,创建新的连接
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置参数
connectionFactory.setHost("127.0.0.1");//ip地址
connectionFactory.setPort(5672);//端口
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//设置虚拟机,一个mqf服务可以设置多个虚拟机,每个虚拟机就相当于独立的mq
connectionFactory.setVirtualHost("/");
//生产者和mq建立连接
Connection connection = null;
connection = connectionFactory.newConnection();
//创建会话通道,所有生产者和mq服务都在channel通道中
Channel channel = connection.createChannel();
//消费方法
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
//当接收到消息后,此方法将被调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//获取交换机
String exchange = envelope.getExchange();
//消息id,mq在channel中用来标识消费的id,可以用于确认消息已经接收
long deliveryTag = envelope.getDeliveryTag();
//消息内容
String message = new String(body,"utf-8");
System.out.println("receive message:" + message);
}
};
//监听队列
channel.queueDeclare(QUEUE,true,false,false,null);
channel.basicConsume(QUEUE,true,defaultConsumer);
}
}
消费者需要写一个接受函数