项目地址:https://github.com/gongxianshengjiadexiaohuihui/RabbitMQ/tree/master/Exchange_RabbitMQ
项目结构:
流程图
补充知识:
Connection是RabbitMQ的sockert链接,封装了socket协议相关部分逻辑
ConnectionFactory是制造Connection的工厂
Connection是建立一个TCP连接,需要经过三次握手,如果每次生产和消费消息都要建立TCP连接十分耗费资源,因此我们用
Chanel建立虚拟连接,这个虚拟连接是建立在上面TCP连接的基础上,数据传输都通过Channel,不需要频繁的建立关闭tcp连接,而且tcp的连接数是有限制的
交换机一共有4种规则,第四种并不常用,因此我们只叙述前三种
fanout:类似与订阅发布,交换机把生产者的所有信息都发送到与它绑定的队列中
direct:交换机只发送信息到符合条件 rountingkey = bindingkey的队列中
topic:交换机发送信息到rountingkey符合匹配规则bindingkey的队列中,bindingkey类似于正则表达式
生产者代码
package main.java.com.ggp;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @ClassName Producer
* @Description TODO
* @Author Mr.G
* @Date 2018/11/21 9:16
* @Version 1.0
*/
public class Producer {
public Producer(String exchangeName, String exchangeType)throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(exchangeName, exchangeType);
/**
* 分发消息,根据不同的交换机规则分发
*
*/
if("fanout".equals(exchangeType)) {
for (int i = 0; i < 5; i++) {
String message = "Hello Rabbit " + i;
channel.basicPublish(exchangeName, "", null, message.getBytes());
System.out.println("Producer Send Message: " + message);
}
channel.close();
connection.close();
return;
}
if("direct".equals(exchangeType)){
String[] routingKeys = new String[]{"info", "warning", "error"};
for(String routingKey:routingKeys){
String message = "RoutingSendDirect Send the message level:"+routingKey;
channel.basicPublish(exchangeName,routingKey,null,message.getBytes());
System.out.println("RoutingSendDirect Send " + routingKey + ": "+message);
}
channel.close();;
connection.close();
return;
}
if("topic".equals(exchangeType)){
String[] routingKeys = new String[]{"a.b.c", "a.a.b.d","ad.dd.b.c"};
for(String routingKey:routingKeys){
String message = "Topic Send the message which the routingkey is "+routingKey;
channel.basicPublish(exchangeName,routingKey,null,message.getBytes());
System.out.println("send message: " + message + " successfully!");
}
channel.close();
connection.close();
return;
}
channel.close();
connection.close();
throw new RuntimeException("Unsupported the exchangeType!");
}
}
消费者代码
package main.java.com.ggp;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @ClassName Customer
* @Description TODO
* @Author Mr.G
* @Date 2018/11/21 10:12
* @Version 1.0
*/
public class Customer {
public Customer(String customerName,String exchangeName, String exchangeType)throws IOException,TimeoutException {
final String customer_name = customerName;
final String exchange_type = exchangeType;
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(exchangeName,exchangeType);
/**
* 创建随机队列,根据不同的交换机规则绑定队列
*/
String queueName = channel.queueDeclare().getQueue();
if("fanout".equals(exchangeType)){
channel.queueBind(queueName,exchangeName,"");
System.out.println(customerName+" is waiting message that the type of it is fanout");
}else if("direct".equals(exchangeType)){
String[] bindingKeys = new String[]{"info","warning"};
for(String bindingKey:bindingKeys){
channel.queueBind(queueName,exchangeName,bindingKey);
}
System.out.println(customerName+" is waiting message that the type of it is direct");
}else if("topic".equals(exchangeType)){
String bindingKey = "#.b.*";
channel.queueBind(queueName,exchangeName,bindingKey);
System.out.println(customerName+" is waiting message that the type of it is topic");
}else {
throw new RuntimeException("Unsupported the exchangeType!");
}
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException{
String message = new String(body,"UTF-8");
System.out.println(customer_name+" has dealt the "+exchange_type+" message: "+ message);
}
};
channel.basicConsume(queueName,true,consumer);
}
}
测试代码
package test.java.com.ggp.test;
import main.java.com.ggp.Customer;
import main.java.com.ggp.Producer;
/**
* @ClassName FanoutTest
* @Description TODO
* @Author Mr.G
* @Date 2018/11/21 10:47
* @Version 1.0
*/
public class FanoutTest {
public static void main(String[] args) throws Exception{
new Customer("ggp","fanout","fanout");
new Customer("rqb","fanout","fanout");
new Producer("fanout","fanout");
}
}
测试结果
参考资料:https://www.cnblogs.com/LipeiNet