消息队列的使用过程大概如下:
(1)客户端连接到消息队列服务器,打开一个channel。
(2)客户端声明一个exchange,并设置相关属性。
(3)客户端声明一个queue,并设置相关属性。
(4)客户端使用routing key,在exchange和queue之间建立好绑定关系。
(5)客户端投递消息到exchange。
1、简单队列”Hello RabbitMQ”
下面有一幅图,其中P表示生产者,C表示消费者,红色部分为消息队列
2、项目的创建
2.1、引入jar包
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.2</version>
</dependency>
2.2、创建连接
public class ConnectionUtils {
public static Connection getConnection() throws IOException,TimeoutException{
//定义一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
factory.setHost("127.0.0.1");
//设置端口,AMQP
factory.setPort(5672);
//设置vhost
factory.setVirtualHost("/vhost_huanghr");
//设置用户名
factory.setUsername("huanghe");
//设置密码
factory.setPassword("ab2453282");
Connection connection = factory.newConnection();
return connection;
}
}
2.3、生产者
public class Send {
private static final String QUEUE_NAME="test_simple_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection connection = ConnectionUtils.getConnection();
//从连接中获取一个通道
Channel channel = connection.createChannel();
//创建队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String msg="Hello simple";
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
System.out.println("--send msg:"+msg);
channel.close();
connection.close();
}
}
2.3、消费者
public class Recv {
private static final String QUEUE_NAME="test_simple_queue";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//获取连接
Connection connection = ConnectionUtils.getConnection();
//创建通道
Channel channel = connection.createChannel();
//定义队列消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
//监听队列
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println(msg);
}
}
}
3、简单队列的不足
- 耦合性高,生产者意义对应消费者(如果我想有多个消费者消费队列中的消息,这时候就不行了)
- 队列名变更,这时候得同时变更
4、work queue(工作队列)
模型是一个生产者对应多个消费者
为什么会出现工作队列?
simple队列是意义对应的,而我们实际开发中,生产者发送消息是毫不费力的,而消费者一般是需要和业务相结合的,消费者接受到消息是需要进行处理的,可能需要花费时间,这个时候队列就会积压很多的消息。一个消费者不够用就可以使用多个消费者处理消息
4.1、生产者
public class Send {
private static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//获取连接
Connection connection = ConnectionUtils.getConnection();
//获取channel
Channel channel = connection.createChannel();
//声明队列
//queue:队列的名称
//durable:boolean
//exclusive:boolean
//atuoDelete:boolean
//argument
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//发消息
for (int i = 0; i <50 ; i++) {
String msg = "hello" + i;
System.out.println("[work] send:"+msg);
//channel发送消息
//exchange:路由
//routingkey:
//prop:
//body:
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
Thread.sleep(i * 20);
}
channel.close();
connection.close();
}
}
4.2、消费者1
public class Recv1 {
private static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
//消息达到就会触发方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"utf-8");
System.out.println("[1] Rece msg:" + msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("[1] done");
}
}
};
//设置监听
boolean autoAck = true;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
4.3、消费者2
public class Recv2 {
private static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
//消息达到就会触发方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"utf-8");
System.out.println("[1] Rece msg:" + msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("[1] done");
}
}
};
//设置监听
boolean autoAck = true;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
4.4、现象
消费者1和消费者2处理的消息是一样的,
消费者1:都是偶数
消费者2:都是奇数
这种方式叫做轮询分发(round-robin):不管谁忙活着谁清闲都不会多给一个消息,任务总是你一个我一个;