首先在官网上下载相关的软件:rabbitmq-server-3.7.4.exe + Erlang for Windows(windows环境下),接着安装完就OK了
安装页面管理功能:
cmd到在rabbitmq目录下:D:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.4\sbin,输入rabbitmq-plugins.bat enable rabbitmq_management,运行,然后通过访问:http://127.0.0.1:15672/就可以看到相关页面,账号密码是:guest、guest
效果图:
一、基本操作:
发送者:
package com.example.java.t0;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class P {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/ywj");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" 发送 【" + message + "】");
channel.close();
connection.close();
}
}
接收者:
package com.example.java.t0;
import com.rabbitmq.client.*;
import java.io.IOException;
public class C {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/ywj");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" 收到【" + message + "】");
// 标记1:channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
/*note:channel.basicConsume(QUEUE_NAME, true, consumer);中的第二个参数是true,意味着接收者收到消息后,接收者自动给服务器反馈,服务器就删除相关消息,
如果把注释[标记1]去掉,意味着手动给服务器反馈信息,此时channel.basicConsume(QUEUE_NAME, true, consumer);中的第二个参数是不是true,而是false
*/
}
}
二、work模式
发送者同上,只不过消息多发送一些:
发送者循环发送多个消息到队列中:
for (int i = 0; i <100; i++){
String message = "Hello World!---"+i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" 发送 【" + message + "】");
}
接收者(多个 ):
package com.example.java.t0;
import com.rabbitmq.client.*;
import java.io.IOException;
public class C {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/ywj");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);// accept only one unack-ed message at a time (see below) 同一时刻只能获取一条数据
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
work(message);
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE_NAME, false, consumer);
}
public static void work(String msg){
try {
System.out.println("C接收:"+msg);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
三、发布和订阅
(注意:发布和订阅先运行接收者再运行发送者,因为这个模式下有个交换机,交换机没有保存消息的功能,所以先运行接收者声明一个队列且绑定交换机,发送者发送消息时,交换机才能转发消息给队列)
发送者:
package com.example.java.t0;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class P {
private final static String EXCHANGE_NAME = "exchange_0";// 交换机名称
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/ywj");
factory.setUsername("ywj");
factory.setPassword("0");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 绑定交换机
String message = "Hello World!";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" 发送 【" + message + "】");
channel.close();
connection.close();
}
}
接收者1:
package com.example.java.t0;
import com.rabbitmq.client.*;
import java.io.IOException;
public class C {
private final static String EXCHANGE_NAME = "exchange_0";// 交换机名称
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/ywj");
factory.setUsername("ywj");
factory.setPassword("0");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");// 绑定交换机
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("C接收:"+message);
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(queueName, false, consumer);
}
}
接收者2:
package com.example.java.t0;
import com.rabbitmq.client.*;
import java.io.IOException;
public class C2 {
private final static String EXCHANGE_NAME = "exchange_0";// 交换机名称
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/ywj");
factory.setUsername("ywj");
factory.setPassword("0");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");// 绑定交换机
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("C2接收:"+message);
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(queueName, false, consumer);
}
}
四、路由模式( direct : 匹配自定义关键字才发)
发送者:
package com.example.java.t0;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class P {
private final static String EXCHANGE_NAME = "exchange_0";// 交换机名称
private final static String DIRECT_NAME = "red";// 路由规则
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/ywj");
factory.setUsername("ywj");
factory.setPassword("0");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");// 绑定交换机
String message = "这里是信息";
channel.basicPublish(EXCHANGE_NAME, DIRECT_NAME, null, message.getBytes());
System.out.println(" 发送 【" + message + "】");
channel.close();
connection.close();
}
}
接收者1:(只接收发送者DIRECT_NAME=red的消息)
package com.example.java.t0;
import com.rabbitmq.client.*;
import java.io.IOException;
public class C {
private final static String EXCHANGE_NAME = "exchange_0";// 交换机名称
private final static String DIRECT_NAME = "red";// 路由规则,只接收red
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/ywj");
factory.setUsername("ywj");
factory.setPassword("0");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, DIRECT_NAME);// 绑定交换机并指定接收类型
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("C接收:"+message);
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(queueName, false, consumer);
}
}
接收者2:(只接收发送者DIRECT_NAME=blue的消息)
package com.example.java.t0;
import com.rabbitmq.client.*;
import java.io.IOException;
public class C2 {
private final static String EXCHANGE_NAME = "exchange_0";// 交换机名称
private final static String DIRECT_NAME = "blue";// 路由规则, 只接收blue的
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/ywj");
factory.setUsername("ywj");
factory.setPassword("0");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, DIRECT_NAME);// 绑定交换机并指定接收类型
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("C2接收:"+message);
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(queueName, false, consumer);
}
}
五、主题模式(类似正则表达式)
方式:# 号代表一个或多个词(是词,不是单个字母)
* 号代码一个词(是词,不是单个字母)
发送者:
package com.example.java.t0;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class P {
private final static String EXCHANGE_NAME = "exchange_topic";// 交换机名称
private final static String TOPIC_NAME = "apple.iphone8";// 当TOPIC_NAME=apple.iphone8时,接收者C和接收者C2都可以接收,当TOPIC_NAME=apple.iphone8.fxx时,只有接收者C2才能接收
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/ywj");
factory.setUsername("ywj");
factory.setPassword("0");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");// 绑定交换机
String message = "我是消息我是消息我是消息我是消息我是消息";
channel.basicPublish(EXCHANGE_NAME, TOPIC_NAME, null, message.getBytes());
System.out.println(" 发送 【" + message + "】");
channel.close();
connection.close();
}
}
接收者C:
package com.example.java.t0;
import com.rabbitmq.client.*;
import java.io.IOException;
public class C {
private final static String EXCHANGE_NAME = "exchange_topic";// 交换机名称
private final static String TOPIC_NAME = "apple.iphone8";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/ywj");
factory.setUsername("ywj");
factory.setPassword("0");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, TOPIC_NAME);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("C接收:"+message);
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(queueName, false, consumer);
}
}
接收者C2:
package com.example.java.t0;
import com.rabbitmq.client.*;
import java.io.IOException;
public class C2 {
private final static String EXCHANGE_NAME = "exchange_topic";// 交换机名称
private final static String TOPIC_NAME = "apple.#";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/ywj");
factory.setUsername("ywj");
factory.setPassword("0");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, TOPIC_NAME);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("C2接收:"+message);
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(queueName, false, consumer);
}
}
执久化:
发送者中:
交换机执久化:channel.exchangeDeclare(xxx,xx,xxx....)方法中有个参数boolean durable,设置为true
队列执久化:channel.queueDeclare(xxx,xx,xxx....)方法中有个参数boolean durable,设置为true
消息执久化:channel.basicPublish("", queue_name, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());// 核心是MessageProperties.PERSISTENT_TEXT_PLAIN