1.模型
3个对象:
**p:**生产者
红色:队列
c:消费者
2.创建一个maven项目,我使用的软件是IDEAL
3.导入相关依赖
这里提供一个查找maven依赖的网址
Maven Repository
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.10</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.5</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/log4j/log4j -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<!-- https://mvnrepository.com/artifact/junit/junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.4.1</version>
</dependency>
</dependencies>
4.先获取MQ的连接
ConnectionUtil .java
package com.utils;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConnectionUtil {
/**
* 获取MQ的连接
*/
public static Connection getConnection() throws IOException, TimeoutException {
//定义- -个连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
factory.setHost("127.0.0.1");
//AMQP 5672
factory.setPort(5672);
//vhost
factory.setVirtualHost("/vhost_1");
//用户名
factory.setUsername("user_1");
//密码
factory.setPassword("user_1");
return factory.newConnection();
}
}
5.生产者:也就是消息发送者
send.java类
package com.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.utils.ConnectionUtil;
public class Send {
private final static String QUEUE_NAME = "test_queue";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
// 从连接中创建通道
Channel channel = connection.createChannel();
// 声明(创建)队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 消息内容
String message = "Hello world!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("--send msg" + message + "'");
//关闭通道和连接
channel.close();
connection.close();
}
}
6.消费者接收消息
Recv.java
package com.simple;
import com.rabbitmq.client.*;
import com.utils.ConnectionUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者获取消息
* */
public class Recv {
private static final String QUEUE_NAME = "test_simple_queue";
public static void main(String[] argv) throws IOException, InterruptedException, TimeoutException {
//获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
//创建频道
Channel channel = connection.createChannel();
//定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列
/*
true:表示自动确认,只要消息从队列中获取,无论消费者获取到消息后是否成功消费,都会认为消息已经成功消费
false:表示手动确认,消费者获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,
如果消费者一直没有反馈,那么该消息将一直处于不可用状态,并且服务器会认为该消费者已经挂掉,不会再给其
发送消息,直到该消费者反馈。
*/
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msgString = new String(delivery.getBody());
System.out.println("recv.msg:"+msgString);
}
}
}
点击运行后可发送消息
7.登录MQ查看发送的消息
简单队列的不足。
耦合性高,生产者----对应消费者(如果我想有多个消费者消费队列中消息,这时候就不行了)
队列名变更这时候得同时变更。