第一种:简单模式
结构:1个生产者+1个队列+1个消费者
图解:
pom配置文件代码:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.itheima</groupId>
<artifactId>rabbitmq-producer</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
生产者代码:
package com.itheima.producer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer_HelloWorld {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.新建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2.设置参数
connectionFactory.setHost("192.168.64.129");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
// 3.获取连接 Connection
Connection connection = connectionFactory.newConnection();
// 4.创建channel
Channel channel = connection.createChannel();
// 5.创建队列Queue
/**
* String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
* queue: 队列名称
* durable:是否持久化,重启之后还在
* exclusive:是否独占,只能有一个消费这监听这个队列;当连接Connection关闭时,是否删除队列
* autoDelete 是否自动删除
* arguments 参数
*/
// 如果不存在自动创建,如果存在使用原先的队列
channel.queueDeclare("hello", true, false, false, null);
// 6.发送消息
/**
* String exchange, String routingKey, BasicProperties props, byte[] body
* exchange: 交换机名称,简单模式下使用默认的
* routingKey:路由名称
* props:配置信息
* body:发送消息数据
*/
String body = "Hello,World ~~~~~~~~~~~~~~~~~~~~";
channel.basicPublish("", "hello",null, body.getBytes());
// 7.释放资源
channel.close();
connection.close();
}
}
重要信息:
注意:channel通道在发布消息时的routingKey是队列名称匹配正则,也就是说routingKey决定向哪个队列发送消息。
当资源不关闭时,在rabbitmq管理界面,可以看到连接在线、通道信息、队列中的信息数。
连接在线:
通道信息
队列中5条消息未消费
消费者代码
package com.itheima.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer_Hello {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.新建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2.设置参数
connectionFactory.setHost("192.168.64.129");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
// 3.获取连接 Connection
Connection connection = connectionFactory.newConnection();
// 4.创建channel
Channel channel = connection.createChannel();
// 5.接收消息
/**
* String queue, boolean autoAck, Consumer callback
* queue: 队列名称
* autoAck: 是否自动确认
* callback: 回调对象
*/
Consumer consumer = new DefaultConsumer(channel){
// 当收到消息后,会自动执行该方法
/**
*
* @param consumerTag 标识
* @param envelope 获取对应信息,比如交换机、路由key。。
* @param properties 配置
* @param body 内容
* @throws IOException 异常
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:"+ consumerTag);
System.out.println("Exchange:"+ envelope.getExchange());
System.out.println("RoutingKey:"+ envelope.getRoutingKey());
System.out.println("properties:"+ properties);
System.out.println("body:"+ new String(body));
}
};
channel.basicConsume("hello", true, consumer);
// 关闭资源?不要
}
}
第二种:工作队列
结构:1个生产者+1个队列+多个消费者
在工人之间分配任务(竞争消费者模式)
类比:在简单模式的基础上,增加了消费者的数量
图解:
应用场景:任务过多提高处理任务的速度
pom依然拥有简单模式的配置代码。
生产者代码
package com.itheima.producer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer_WorkQueues {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.新建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2.设置参数
connectionFactory.setHost("192.168.64.129");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
// 3.获取连接 Connection
Connection connection = connectionFactory.newConnection();
// 4.创建channel
Channel channel = connection.createChannel();
// 5.创建队列Queue
/**
* String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
* queue: 队列名称
* durable:是否持久化,重启之后还在
* exclusive:是否独占,只能有一个消费这监听这个队列;当连接Connection关闭时,是否删除队列
* autoDelete 是否自动删除
* arguments 参数
*/
// 如果不存在自动创建,如果存在使用原先的队列
channel.queueDeclare("work_queues", true, false, false, null);
// 6.发送消息
/**
* String exchange, String routingKey, BasicProperties props, byte[] body
* exchange: 交换机名称,简单模式下使用默认的
* routingKey:路由名称
* props:配置信息
* body:发送消息数据
*/
// 发10遍消息
for (int i=1; i<=10; i++){
String body = i+"Hello,workqueues ~~~~~~~~~~~~~~~~~~~~";
channel.basicPublish("", "work_queues",null, body.getBytes());
}
// 7.释放资源
channel.close();
connection.close();
}
}
消费者代码
第一位:
package com.itheima.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer_WorkQueues1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.新建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2.设置参数
connectionFactory.setHost("192.168.64.129");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
// 3.获取连接 Connection
Connection connection = connectionFactory.newConnection();
// 4.创建channel
Channel channel = connection.createChannel();
// 5.如果不存在自动创建,如果存在使用原先的队列
channel.queueDeclare("work_queues", true, false, false, null);
// 6.接收消息
/**
* String queue, boolean autoAck, Consumer callback
* queue: 队列名称
* autoAck: 是否自动确认
* callback: 回调对象
*/
Consumer consumer = new DefaultConsumer(channel){
// 当收到消息后,会自动执行该方法
/**
*
* @param consumerTag 标识
* @param envelope 获取对应信息,比如交换机、路由key。。
* @param properties 配置
* @param body 内容
* @throws IOException 异常
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:"+ new String(body));
}
};
channel.basicConsume("work_queues", true, consumer);
// 关闭资源?不要
}
}
第二位:
package com.itheima.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer_WorkQueues2 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.新建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2.设置参数
connectionFactory.setHost("192.168.64.129");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
// 3.获取连接 Connection
Connection connection = connectionFactory.newConnection();
// 4.创建channel
Channel channel = connection.createChannel();
// 5.如果不存在自动创建,如果存在使用原先的队列
channel.queueDeclare("work_queues", true, false, false, null);
// 6.接收消息
/**
* String queue, boolean autoAck, Consumer callback
* queue: 队列名称
* autoAck: 是否自动确认
* callback: 回调对象
*/
Consumer consumer = new DefaultConsumer(channel){
// 当收到消息后,会自动执行该方法
/**
*
* @param consumerTag 标识
* @param envelope 获取对应信息,比如交换机、路由key。。
* @param properties 配置
* @param body 内容
* @throws IOException 异常
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:"+ new String(body));
}
};
channel.basicConsume("work_queues", true, consumer);
// 关闭资源?不要
}
}
重要信息:必须先启动消费者。申明消费者时,在创建通道后,申明队列。消费者申明的队列要和生产者一样。
第三种:发布订阅模式
结构:1个生产者+1个交换机+多个队列+多个消费者
类比:比工作队列增加了交换机、队列。每个消费者消费相同的消息,没有竞争关系。
图解:
重要信息:
Exchange:交换机(X)。一方面,接收生产者发送的消息,一方面路由处理消息。例如,递交给某个特别队列、递交给所有队列、或是消息丢弃。
交换机常见3种类型:
Fanout:广播,将消息交给所有绑定到交换机的队列
Direct:定向,把消息交给符合指定routing key的队列
Topic通配符,把消息交割符合routing pattern(路由模式)的队列
Exchange交换机只负责转发消息,不具备存储消息的能力,因此如何没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
生产者代码
package com.itheima.producer;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer_PubSub {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.新建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2.设置参数
connectionFactory.setHost("192.168.64.129");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
// 3.获取连接 Connection
Connection connection = connectionFactory.newConnection();
// 4.创建channel
Channel channel = connection.createChannel();
// 5.创建交换机
/**
* String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments
* exchange: 交换机名称
* type: 枚举或字符串,交换机类型
* DIRECT("direct"), 定向
* FANOUT("fanout"), 广播(扇形),发送给每个队列
* TOPIC("topic"), 通配符
* HEADERS("headers"); 参数匹配
* durable:是否持久化
* autoDelete:是否自动删除
* internal:内部使用。一般false
* arguments:参数
*/
String exchangeName = "test_fanout";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
// 6.创建队列
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
// 7.绑定队列和交换机
/**
* String queue, String exchange, String routingKey
* queue: 队列名称
* exchange: 交换机名称
* routingKey: 路由键,绑定规则
* 如果交换机类型为fanout,routingKey设置为""
*/
channel.queueBind(queue1Name,exchangeName,"");
channel.queueBind(queue2Name,exchangeName,"");
// 8.发送消息
/**
* String exchange, String routingKey, BasicProperties props, byte[] body
*/
String body = "日志消息:张三调用了findAll方法~~~~~~~~~~";
channel.basicPublish(exchangeName,"",null,body.getBytes());
// 9.释放资源
channel.close();
connection.close();
}
}
消费者代码
消费者1:
package com.itheima.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer_PubSub1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.新建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2.设置参数
connectionFactory.setHost("192.168.64.129");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
// 3.获取连接 Connection
Connection connection = connectionFactory.newConnection();
// 4.创建channel
Channel channel = connection.createChannel();
// 5.如果不存在自动创建,如果存在使用原先的队列
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
channel.queueDeclare(queue1Name, true, false, false, null);
// 6.接收消息
/**
* String queue, boolean autoAck, Consumer callback
* queue: 队列名称
* autoAck: 是否自动确认
* callback: 回调对象
*/
Consumer consumer = new DefaultConsumer(channel){
// 当收到消息后,会自动执行该方法
/**
*
* @param consumerTag 标识
* @param envelope 获取对应信息,比如交换机、路由key。。
* @param properties 配置
* @param body 内容
* @throws IOException 异常
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("将日志信息打印到控制台~~~~~~~~~~~");
System.out.println("body:"+ new String(body));
}
};
channel.basicConsume(queue1Name, true, consumer);
// 关闭资源?不要
}
}
消费者2:
package com.itheima.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer_PubSub2 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.新建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2.设置参数
connectionFactory.setHost("192.168.64.129");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
// 3.获取连接 Connection
Connection connection = connectionFactory.newConnection();
// 4.创建channel
Channel channel = connection.createChannel();
// 5.如果不存在自动创建,如果存在使用原先的队列
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
channel.queueDeclare(queue2Name, true, false, false, null);
// 6.接收消息
/**
* String queue, boolean autoAck, Consumer callback
* queue: 队列名称
* autoAck: 是否自动确认
* callback: 回调对象
*/
Consumer consumer = new DefaultConsumer(channel){
// 当收到消息后,会自动执行该方法
/**
*
* @param consumerTag 标识
* @param envelope 获取对应信息,比如交换机、路由key。。
* @param properties 配置
* @param body 内容
* @throws IOException 异常
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("将日志信息保存到数据库~~~~~~~~");
System.out.println("body:"+ new String(body));
}
};
channel.basicConsume(queue2Name, true, consumer);
// 关闭资源?不要
}
}
第四种:路由模式
结构:1个生产者+1个交换机+多个队列+多个消费者
类比:发布订阅模式下,交换机模式是广播,改成直接,增加routingkey匹配
图解:
生产者代码:
package com.itheima.producer;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer_Routing {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.新建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2.设置参数
connectionFactory.setHost("192.168.163.128");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
// 3.获取连接 Connection
Connection connection = connectionFactory.newConnection();
// 4.创建channel
Channel channel = connection.createChannel();
// 5.创建交换机
/**
* String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments
* exchange: 交换机名称
* type: 枚举或字符串,交换机类型
* DIRECT("direct"), 定向
* FANOUT("fanout"), 广播(扇形),发送给每个队列
* TOPIC("topic"), 通配符
* HEADERS("headers"); 参数匹配
* durable:是否持久化
* autoDelete:是否自动删除
* internal:内部使用。一般false
* arguments:参数
*/
String exchangeName = "test_routing";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
// 6.创建队列
String queue1Name = "test_routing_queue1";
String queue2Name = "test_routing_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
// 7.绑定队列和交换机
/**
* String queue, String exchange, String routingKey
* queue: 队列名称
* exchange: 交换机名称
* routingKey: 路由键,绑定规则
* 如果交换机类型为fanout,routingKey设置为""
*/
// 第一个绑定通道和routingkey
channel.queueBind(queue1Name,exchangeName,"error");
// 第二个绑定通道和routingkey
channel.queueBind(queue2Name,exchangeName,"warning");
channel.queueBind(queue2Name,exchangeName,"info");
channel.queueBind(queue2Name,exchangeName,"error");
// 8.发送消息
/**
* String exchange, String routingKey, BasicProperties props, byte[] body
*/
String body = "日志消息:info级别消息只有队列二打印~~~~~~~~~~";
channel.basicPublish(exchangeName,"info",null,body.getBytes());
// 9.释放资源
channel.close();
connection.close();
}
}
消费者1代码:
package com.itheima.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer_Routing1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.新建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2.设置参数
connectionFactory.setHost("192.168.163.128");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
// 3.获取连接 Connection
Connection connection = connectionFactory.newConnection();
// 4.创建channel
Channel channel = connection.createChannel();
// 5.如果不存在自动创建,如果存在使用原先的队列
String queue1Name = "test_routing_queue1";
String queue2Name = "test_routing_queue2";
channel.queueDeclare(queue1Name, true, false, false, null);
// 6.接收消息
/**
* String queue, boolean autoAck, Consumer callback
* queue: 队列名称
* autoAck: 是否自动确认
* callback: 回调对象
*/
Consumer consumer = new DefaultConsumer(channel){
// 当收到消息后,会自动执行该方法
/**
*
* @param consumerTag 标识
* @param envelope 获取对应信息,比如交换机、路由key。。
* @param properties 配置
* @param body 内容
* @throws IOException 异常
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("将日志信息打印到控制台~~~~~~~~~~~");
System.out.println("body:"+ new String(body));
}
};
channel.basicConsume(queue1Name, true, consumer);
// 关闭资源?不要
}
}
消费者2代码:
package com.itheima.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer_Routing2 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.新建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2.设置参数
connectionFactory.setHost("192.168.163.128");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
// 3.获取连接 Connection
Connection connection = connectionFactory.newConnection();
// 4.创建channel
Channel channel = connection.createChannel();
// 5.如果不存在自动创建,如果存在使用原先的队列
String queue1Name = "test_routing_queue1";
String queue2Name = "test_routing_queue2";
channel.queueDeclare(queue2Name, true, false, false, null);
// 6.接收消息
/**
* String queue, boolean autoAck, Consumer callback
* queue: 队列名称
* autoAck: 是否自动确认
* callback: 回调对象
*/
Consumer consumer = new DefaultConsumer(channel){
// 当收到消息后,会自动执行该方法
/**
*
* @param consumerTag 标识
* @param envelope 获取对应信息,比如交换机、路由key。。
* @param properties 配置
* @param body 内容
* @throws IOException 异常
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("将日志信息打印到控制台~~~~~~~~~~~");
System.out.println("body:"+ new String(body));
}
};
channel.basicConsume(queue2Name, true, consumer);
// 关闭资源?不要
}
}
第五种通配符模式
结构:1个生产者+1个交换机+多个队列+多个消费者
类比:路由模式下,交换机类型是direct,通配符模式的交换机是topic。而且routingKey可以使用星号和#。星号描述1个单词,#表示任意个单词
图解:
生产者代码:
package com.itheima.producer;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer_Topic {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.新建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2.设置参数
connectionFactory.setHost("192.168.163.128");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
// 3.获取连接 Connection
Connection connection = connectionFactory.newConnection();
// 4.创建channel
Channel channel = connection.createChannel();
// 5.创建交换机
/**
* String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments
* exchange: 交换机名称
* type: 枚举或字符串,交换机类型
* DIRECT("direct"), 定向
* FANOUT("fanout"), 广播(扇形),发送给每个队列
* TOPIC("topic"), 通配符
* HEADERS("headers"); 参数匹配
* durable:是否持久化
* autoDelete:是否自动删除
* internal:内部使用。一般false
* arguments:参数
*/
String exchangeName = "test_topic";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
// 6.创建队列
String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
// 7.绑定队列和交换机
/**
* String queue, String exchange, String routingKey
* queue: 队列名称
* exchange: 交换机名称
* routingKey: 路由键,绑定规则
* 如果交换机类型为fanout,routingKey设置为""
*/
// 第一个绑定通道和routingkey
channel.queueBind(queue1Name,exchangeName,"*.orange.*");
// 第二个绑定通道和routingkey
channel.queueBind(queue2Name,exchangeName,"*.*.rabbit");
channel.queueBind(queue2Name,exchangeName,"Lazy.#");
// 8.发送消息
/**
* String exchange, String routingKey, BasicProperties props, byte[] body
*/
String body = "apple系统日志存入数据库";
channel.basicPublish(exchangeName,"apple.good.rabbit",null,body.getBytes());
String body2 = "orange系统日志打印控制台";
channel.basicPublish(exchangeName,"gooad.orange.good",null,body2.getBytes());
// 9.释放资源
channel.close();
connection.close();
}
}
消费者1代码:
package com.itheima.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer_Topic1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.新建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2.设置参数
connectionFactory.setHost("192.168.163.128");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
// 3.获取连接 Connection
Connection connection = connectionFactory.newConnection();
// 4.创建channel
Channel channel = connection.createChannel();
// 5.如果不存在自动创建,如果存在使用原先的队列
String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";
channel.queueDeclare(queue1Name, true, false, false, null);
// 6.接收消息
/**
* String queue, boolean autoAck, Consumer callback
* queue: 队列名称
* autoAck: 是否自动确认
* callback: 回调对象
*/
Consumer consumer = new DefaultConsumer(channel){
// 当收到消息后,会自动执行该方法
/**
*
* @param consumerTag 标识
* @param envelope 获取对应信息,比如交换机、路由key。。
* @param properties 配置
* @param body 内容
* @throws IOException 异常
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("将日志信息打印到控制台~~~~~~~~~~~");
System.out.println("body:"+ new String(body));
}
};
channel.basicConsume(queue1Name, true, consumer);
// 关闭资源?不要
}
}
消费者2代码:
package com.itheima.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer_Topic2 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.新建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2.设置参数
connectionFactory.setHost("192.168.163.128");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
// 3.获取连接 Connection
Connection connection = connectionFactory.newConnection();
// 4.创建channel
Channel channel = connection.createChannel();
// 5.如果不存在自动创建,如果存在使用原先的队列
String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";
channel.queueDeclare(queue2Name, true, false, false, null);
// 6.接收消息
/**
* String queue, boolean autoAck, Consumer callback
* queue: 队列名称
* autoAck: 是否自动确认
* callback: 回调对象
*/
Consumer consumer = new DefaultConsumer(channel){
// 当收到消息后,会自动执行该方法
/**
*
* @param consumerTag 标识
* @param envelope 获取对应信息,比如交换机、路由key。。
* @param properties 配置
* @param body 内容
* @throws IOException 异常
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("将日志信息存入数据库~~~~~~~~~~~");
System.out.println("body:"+ new String(body));
}
};
channel.basicConsume(queue2Name, true, consumer);
// 关闭资源?不要
}
}