前言:前面两章我们已经开始了从安装到编写简单的demo的过程:
学习本章,需要了解的是RabbitMQ的交换机是什么,以及RabbitMQ的相关流程,以下是从网上找来的图示,看了下其实很容易理解(关于RabbitMQ的详细原理,我们到最后学会了RabbitMQ的使用,我们再去研究,这样理解起来就相当简单了):
详细看下上面的图,简单理解下rabbitmq的流程:
- 红色虚线方块代表着RabbitMQ的服务,里面会包含有交换机exchange,还有路由键routingKey和队列等;
- 生产者产生了消息,然后发布到 RabbitMQ 服务器,发布之前肯定要先连接上服务器,也就是要在应用程序和rabbitmq 服务器之间建立一条 TCP 连接,一旦连接建立,应用程序就可以创建一条 AMQP 信道。 蓝色方框就代表着这个连接,在现有知识的条件下,直接理解成上一章看到的通道即可。
- 客户端A,B发送消息并不是直接发送到队列,而是由RabbitMQ 的交换器进行接收,然后通过指定的路由键,发送到(专业名词:绑定)指定的队列中;
- 然后客户端1,2,3通过路由键,队列名称等,指定从哪个队列中获取消息,当队列有消息时,会发送到相应的客户端。
RabbitMQ有四种类型的交换机direct、fanout、topic、headers,中文分别为:直连,扇形,主题和头部。
以上仅作为一个简单的了解,以下分别是各个交换机的例子。
直连交换机(direct exchange)**
生产者:
package rabbitmqDemo.demo2.direct;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 直连交换机生产者测试
* 直连交换机,顾名思义就是从交换机-路由键-队列三者进行绑定,然后获取准确的队列信息
* @author fei
*
*/
public class Direct_Producer_demo2 {
public final static String QUEUENAME="mq.test_demo1";//队列名称
public final static String HOST="localhost"; //主机名
public final static String USERNAME="guest";
public final static String PASSWORD="guest";
public final static String EXCHANGE_NAME="direct.exchange.name";
public final static String ROUTING_KEY1="rk1";
public final static String ROUTING_KEY2="rk2";
public static void main(String[] args) throws IOException {
producer();
}
private static void producer() throws IOException {
ConnectionFactory factory=new ConnectionFactory();//创建一个连接工厂,用于生成与RabbitMQ进行连接
factory.setHost(HOST);//根据这个连接工厂设置RabbitMQ所在的主机,账号密码和端口号等(默认情况下就不需要账号密码和端口了)
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY1, null, "这里是客户端,这是由交换机和路由键1绑定的队列信息".getBytes());
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY2, null, "这里是客户端,这是由交换机和路由键2绑定的队列信息".getBytes());
channel.close();
connection.close();
System.out.println("客户端信息发送完毕!");
}
}
消费者:
package rabbitmqDemo.demo2.direct;
import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class Direct_Consumer_demo2 {
public static void main(String[] args) throws IOException {
consumer();
}
public final static String QUEUENAME="mq.test_demo1";//队列名称
public final static String HOST="localhost"; //主机名
public final static String USERNAME="guest";
public final static String PASSWORD="guest";
public final static String EXCHANGE_NAME="direct.exchange.name";
public final static String ROUTING_KEY1="rk1";
public final static String ROUTING_KEY2="rk2";
private static void consumer() throws IOException {
ConnectionFactory factory=new ConnectionFactory();//创建一个连接工厂,用于生成与RabbitMQ进行连接
factory.setHost(HOST);//根据这个连接工厂设置RabbitMQ所在的主机,账号密码和端口号等(默认情况下就不需要账号密码和端口了)
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String queueName = channel.queueDeclare().getQueue();//获取队列名称
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//绑定队列名。交换器和路由键
channel.queueBind(queueName, EXCHANGE_NAME, ROUTING_KEY1);
channel.queueBind(queueName, EXCHANGE_NAME, ROUTING_KEY2);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("路由键" + envelope.getRoutingKey() + "收到消息: " + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
fanout-扇形交换机:
生产者:
package rabbitmqDemo.demo2.direct;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 扇形交换机生产者测试
* 扇形交换机简单概念:
* fanout类型交换机会将接收到的消息广播给所有与之绑定的队列,也就是和路由键没有很大关系了
* @author fei
*
*/
public class Fanout_Producer_demo3 {
public final static String QUEUENAME="mq.test_demo1";//队列名称
public final static String HOST="localhost"; //主机名
public final static String USERNAME="guest";
public final static String PASSWORD="guest";
public final static String EXCHANGE_NAME="fanout.exchange.name";
public final static String EXCHANGE_NAME2="fanout.exchange.name2";
public final static String ROUTING_KEY1="rk1";
public final static String ROUTING_KEY2="rk2";
public static void main(String[] args) throws IOException {
producer();
}
private static void producer() throws IOException {
ConnectionFactory factory=new ConnectionFactory();//创建一个连接工厂,用于生成与RabbitMQ进行连接
factory.setHost(HOST);//根据这个连接工厂设置RabbitMQ所在的主机,账号密码和端口号等(默认情况下就不需要账号密码和端口了)
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//绑定交换机1
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY1, null, "这里是客户端,这是由交换机1绑定的队列信息1(fanout)".getBytes());
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY2, null, "这里是客户端,这是由交换机1绑定的队列信息2(fanout)".getBytes());
//绑定交换机2
channel.exchangeDeclare(EXCHANGE_NAME2, "fanout");
channel.basicPublish(EXCHANGE_NAME2, ROUTING_KEY1, null, "这里是客户端,这是由交换机2绑定的队列信息3(fanout)".getBytes());
channel.basicPublish(EXCHANGE_NAME2, ROUTING_KEY2, null, "这里是客户端,这是由交换机2绑定的队列信息4(fanout)".getBytes());
channel.close();
connection.close();
System.out.println("客户端信息发送完毕!");
}
}
消费者:
package rabbitmqDemo.demo2.direct;
import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class Fanout_Consumer_demo3 {
public static void main(String[] args) throws IOException {
consumer();
}
public final static String QUEUENAME="mq.test_demo1";//队列名称
public final static String HOST="localhost"; //主机名
public final static String USERNAME="guest";
public final static String PASSWORD="guest";
public final static String EXCHANGE_NAME2="fanout.exchange.name2";
public final static String ROUTING_KEY1="rk1";
public final static String ROUTING_KEY2="rk2";
private static void consumer() throws IOException {
ConnectionFactory factory=new ConnectionFactory();//创建一个连接工厂,用于生成与RabbitMQ进行连接
factory.setHost(HOST);//根据这个连接工厂设置RabbitMQ所在的主机,账号密码和端口号等(默认情况下就不需要账号密码和端口了)
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String queueName = channel.queueDeclare().getQueue();//获取队列名称
channel.exchangeDeclare(EXCHANGE_NAME2, "fanout");
//绑定队列名。交换器和路由键
// channel.queueBind(queueName, EXCHANGE_NAME, ROUTING_KEY1);
channel.queueBind(queueName, EXCHANGE_NAME2, ROUTING_KEY2);//路由键随便写了,fanout跟路由键无关
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("路由键" + envelope.getRoutingKey() + "收到消息: " + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
消费端收到信息:
可以看出,生产者这边通过两个交换机发送数据,消费者那边通过接收第二个交换机可以接收到数据。同时就算使用了路由键2进行绑定,但还是能接收到路由键1发送的数据,说明faout交换机和路由键其实是没什么关系的。这就是扇形交换机。
topic交换机-主题交换机
生产者:
package rabbitmqDemo.demo2.direct;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* TOPIC交换机生产者测试
* TOPIC交换机简单概念:
* TOPIC-主题交换机,用于匹配一个或多个路由键的,可以用*代替
* @author fei
*
*/
public class Topic_Producer_demo4 {
public final static String QUEUENAME="mq.test_demo1";//队列名称
public final static String HOST="localhost"; //主机名
public final static String USERNAME="guest";
public final static String PASSWORD="guest";
public final static String EXCHANGE_NAME="topic.exchange.name";
public final static String ROUTING_KEY1="apple.rk1";
public final static String ROUTING_KEY2="apple.rk2";
public final static String ROUTING_KEY3="apple.rk3";
public final static String ROUTING_KEY4="banana.rk1";
public final static String ROUTING_KEY5="banana.rk2";
public final static String ROUTING_KEY6="banana.rk3";
public static void main(String[] args) throws IOException {
producer();
}
private static void producer() throws IOException {
ConnectionFactory factory=new ConnectionFactory();//创建一个连接工厂,用于生成与RabbitMQ进行连接
factory.setHost(HOST);//根据这个连接工厂设置RabbitMQ所在的主机,账号密码和端口号等(默认情况下就不需要账号密码和端口了)
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//往多个路由键绑定消息
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY1, null, (ROUTING_KEY1+"路由键-客户端发送").getBytes());
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY2, null, (ROUTING_KEY2+"路由键-客户端发送").getBytes());
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY3, null, (ROUTING_KEY3+"路由键-客户端发送").getBytes());
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY4, null, (ROUTING_KEY4+"路由键-客户端发送").getBytes());
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY5, null, (ROUTING_KEY5+"路由键-客户端发送").getBytes());
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY6, null, (ROUTING_KEY6+"路由键-客户端发送").getBytes());
channel.close();
connection.close();
System.out.println("客户端信息发送完毕!");
}
}
消费者:
package rabbitmqDemo.demo2.direct;
import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class Topic_Consumer_demo4 {
public static void main(String[] args) throws IOException {
consumer();
}
public final static String QUEUENAME="mq.test_demo1";//队列名称
public final static String HOST="localhost"; //主机名
public final static String USERNAME="guest";
public final static String PASSWORD="guest";
public final static String EXCHANGE_NAME="topic.exchange.name";
private static void consumer() throws IOException {
ConnectionFactory factory=new ConnectionFactory();//创建一个连接工厂,用于生成与RabbitMQ进行连接
factory.setHost(HOST);//根据这个连接工厂设置RabbitMQ所在的主机,账号密码和端口号等(默认情况下就不需要账号密码和端口了)
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String queueName = channel.queueDeclare().getQueue();//获取队列名称
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//绑定队列名。交换器和路由键
channel.queueBind(queueName, EXCHANGE_NAME, "banana.*");//路由键用通配符匹配
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("路由键" + envelope.getRoutingKey() + "收到消息: " + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
打印消息:
可以看到,直接匹配banana.会获得绑定前缀为banana的路由键的消息。同理如果获取apple.,就会获取apple前缀的路由键发送的消息。
头部(header)交换机
头部交换机其实还是和其他交换机有所区别而且一开始难以理解,其实可以参考下这个解释:
看图示说明基本上就可以理解了。下面直接上该交换机的demo进行讲解:
生产者:
package rabbitmqDemo.demo2.direct;
import java.io.IOException;
import java.util.Hashtable;
import java.util.Map;
import org.springframework.amqp.core.ExchangeTypes;
import com.rabbitmq.client.AMQP.BasicProperties.Builder;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* Headers交换机生产者测试
* Headers交换机简单概念:
* Headers-头部交换机,headers类型的交换机分发消息不依赖routingKey,是使用发送消息时basicProperties对象中的headers来匹配的。
* @author fei
*
*/
public class Headers_Producer_demo5 {
public final static String QUEUENAME="mq.test_demo1";//队列名称
public final static String HOST="localhost"; //主机名
public final static String USERNAME="guest";
public final static String PASSWORD="guest";
public final static String EXCHANGE_NAME="header.exchange.name";
public final static String ROUTING_KEY1="apple.rk1";
public static void main(String[] args) throws IOException {
producer();
}
private static void producer() throws IOException {
ConnectionFactory factory=new ConnectionFactory();//创建一个连接工厂,用于生成与RabbitMQ进行连接
factory.setHost(HOST);//根据这个连接工厂设置RabbitMQ所在的主机,账号密码和端口号等(默认情况下就不需要账号密码和端口了)
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//往多个路由键绑定消息
channel.exchangeDeclare(EXCHANGE_NAME, "headers");
//设置消息头键值对信息
Map<String, Object> headers = new Hashtable<String, Object>();
headers.put("key1", "v1");
headers.put("key2" , 66);
Builder builder = new Builder();
builder.headers(headers);
channel.basicPublish(EXCHANGE_NAME, "", builder.build(), "头部交换器客户端发出的信息".getBytes());
channel.close();
connection.close();
System.out.println("客户端信息发送完毕!");
}
}
消费者:
package rabbitmqDemo.demo2.direct;
import java.io.IOException;
import java.util.Hashtable;
import java.util.Map;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class Headers_Consumer_demo5 {
public static void main(String[] args) throws IOException {
consumer();
}
public final static String QUEUENAME="mq.test_demo1";//队列名称
public final static String HOST="localhost"; //主机名
public final static String USERNAME="guest";
public final static String PASSWORD="guest";
public final static String EXCHANGE_NAME="header.exchange.name";
private static void consumer() throws IOException {
ConnectionFactory factory=new ConnectionFactory();//创建一个连接工厂,用于生成与RabbitMQ进行连接
factory.setHost(HOST);//根据这个连接工厂设置RabbitMQ所在的主机,账号密码和端口号等(默认情况下就不需要账号密码和端口了)
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String queueName = channel.queueDeclare().getQueue();//获取队列名称
channel.exchangeDeclare(EXCHANGE_NAME, "headers");
//设置消息头键值对信息
Map<String, Object> headers = new Hashtable<String,Object>();
//这里x-match有两种类型
//all:表示所有的键值对都匹配才能接受到消息
//any:表示只要有键值对匹配就能接受到消息
headers.put("x-match", "any");//改为all就无法匹配所有,就收不到生产者那边的消息了
headers.put("key1", "v1");
headers.put("key2" , 63);
//绑定队列名。头部-交换器
channel.queueBind(queueName, EXCHANGE_NAME,"", headers);//传入k-v对
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(headers.get("x-match")+"类型的消费端收到消息: " + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
消费者消费信息为:
在此,四种常见的交换机例子编写完毕。这已经是完整的代码,需要复制到自己的IDE看一看。慢慢来就会越来越熟悉,后续看原理的时候,就会豁然开朗的!