在介绍了rabbitmq的Work queues模式之后,这一节我们将阐述它的Publish/subscrige的用法。
1、前提约束
- 已经完成rabbitmq的第一个简单的测试程序
https://www.jianshu.com/p/77bfc4fe5a1a - 2、操作步骤
我们马上要测试的rabbitmq的工作模式如下:
- 在src/main/java文件夹下创建包net.wanho.rabbitmq.pubsub
- 在net.wanho.rabbitmq.pubsub创建Sender.java
package net.wanho.rabbitmq.pubsub;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Sender {
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("localhost");
String exchange_name = "logs";
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(exchange_name, "fanout");
String message = "hello3";
channel.basicPublish(exchange_name, "", null, message.getBytes("UTF-8"));
System.out.println("send:" + message);
}
}
- 在net.wanho.rabbitmq.pubsub创建Receiver.java
package net.wanho.rabbitmq.pubsub;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Receiver {
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String exchange_name = "logs";
channel.exchangeDeclare(exchange_name, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchange_name, "");
DefaultConsumer consumer = new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//交换机
String exchange = envelope.getExchange();
//路由key
String routingKey = envelope.getRoutingKey();
//消息id
long deliveryTag = envelope.getDeliveryTag();
//消息内容
String msg = new String(body, "utf-8");
System.out.println("receive message:" + msg);
}
};
channel.basicConsume(queueName, true, consumer);
}
}
- 测试
启动Receiver两次,启动Sender一次,两个Receiver都能收到消息。
以上就是我们完成的rabbitmq的publish/subscribe模式的测试。