POM.XML
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.2.1</version>
</dependency>
RabbitMQProduct.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
/**
* 生产者 向RabbitMQ发送消息
*/
public class RabbitMQProduct {
private static final String QUEUE_NAME="demo.queue";
private static final String EXCHANGE_NAME="demo.exchange";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory(); //连接工厂
//配置连接参数 信息
factory.setUsername("rabbitstudy");
factory.setPassword("123456");
factory.setHost("192.168.0.1");// 自己的RabbitMQ的ip地址
factory.setPort(5672);
Connection connection = factory.newConnection(); //创建连接
Channel channel = connection.createChannel(); //创建信道 在信道上传递消息
channel.exchangeDeclare(EXCHANGE_NAME, "direct"); //创建交换器
channel.queueDeclare(QUEUE_NAME, false, false, false, null); //创建队列
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "com.cdsn.test"); //通过路由键绑定交换器与队列
//发送消息给RabbitMQ
channel.basicPublish(EXCHANGE_NAME, "com.cdsn.test",
MessageProperties.PERSISTENT_TEXT_PLAIN,
"Hello World!".getBytes());
//关闭资源
channel.close();
connection.close();
}
}
RabbitMQConsumer.java
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
/**
* 消费者 处理RabbitMQ中的消息
*/
public class RabbitMQConsumer {
private static final String QUEUE_NAME="demo.queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory(); //连接工厂
//配置连接参数 信息
factory.setUsername("rabbitstudy");
factory.setPassword("123456");
factory.setHost("192.168.0.1");// 自己RabbitMQ的连接地址
factory.setPort(5672);
Connection connection = factory.newConnection(); //创建连接
final Channel channel = connection.createChannel(); //创建信道 在信道上传递消息
//告诉RabbitMQ 我可以接收消息了
channel.basicConsume(QUEUE_NAME, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
System.out.println("消费者接收到:"+new String(body));
//告诉服务器, 我收到消息了
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
TimeUnit.SECONDS.sleep(1);
//关闭
channel.close();
connection.close();
}
}