目录
一、Java集成Rabbitmq:
1、所需环境:
Java集成rabbitmq是通过amqp-client来完成的。amqp-client版本是5.x的就需要jdk8,amqp-client版本是4.x就需要jdk6。
2、引入jar包:
在pom.xml文件中添加jar包:
<!-- rabbitmq -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.5.0</version>
</dependency>
3、创建连接:
通过rabbitmq的IP、端口号、virtualHost、用户名、密码等参数创建公共的connect对象。其中用户一般为手动创建的不可登录web管理平台的角色为none的普通用户。
如果rabbitmq为单机那么IP就是安装rabbitmq服务器的IP地址,如果rabbitmq为集群那么IP就是集群对外映射的VIP地址。
//获取rabbitmq连接
public static Connection getConnection(){
Boolean isCluster = true;//是否是集群
//定义连接工厂
Connection connection =null;
try {
if(isCluster){
//rabbitmq集群连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setVirtualHost("test_mq");//virtual_host,类似Mysql中数据库
connectionFactory.setUsername("java_test");//用户名
connectionFactory.setPassword("123456");//密码
connectionFactory.setAutomaticRecoveryEnabled(true);//设置自动重新获取连接为true
connectionFactory.setNetworkRecoveryInterval(10000);//设置自动重新获取网络时间为10000ms=10s
Address[] addresses = new Address[5];
addresses[0]=new Address("192.168.0.141",5672);
addresses[1]=new Address("192.168.0.142",5672);
addresses[2]=new Address("192.168.0.143",5672);
addresses[3]=new Address("192.168.0.144",5672);
addresses[4]=new Address("192.168.0.145",5672);
connection = connectionFactory.newConnection(addresses);//获取连接(连接集群内某一个节点,如果一个节点宕机,自动尝试下一个节点)
}else{
//rabbitmq单机连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.1.141");//IP地址(连接某一个节点)
connectionFactory.setPort(5672);//端口号(连接某一个节点)
connectionFactory.setVirtualHost("test_mq");//virtual_host,类似Mysql中数据库
connectionFactory.setUsername("java_test");//用户名
connectionFactory.setPassword("123456");//密码
connection = connectionFactory.newConnection();//获取连接(连接某一个节点)
}
} catch (Exception e) {
e.printStackTrace();
}
return connection;
}
二、Java操作Rabbitmq:
1、持久化检查确认:
根据自己项目业务需要,提前确认并检查手动创建的exchange、queue是否开启了持久化,如果不开启持久化,那么rabbitmq重启后将不存在。
根据自己项目业务需要,提前确认并检查代码中生产者发送消息是否设置了消息持久化,如果不开启持久化,那么rabbitmq重启后消息将不存在,如果是消息开启持久化了,那么exchange、queue也必须要开启持久化才可以,否则rabbitmq重启后消息也将不存在。
2、操作大致思路:
通过java操作rabbitmq的大致思路如下:
首先,根据业务从五种队列中选择一种;
然后,根据选择的队列在rabbitmq web控制台创建对应的exchange和queue并将对应的关系bind好,注意创建exchange和queue时在Durability选项选择Druable(持久化),否则rabbitmq重启后对应的exchange和queue将不存在;
然后,通过公共的方法获取connection,然后编写对应队列的生产者代码,并关闭对应的channel和connection。
然后,通过公共的方法获取connection,然后编写对应队列的消费者代码,在消费者代码中获取消费消息后调用自己业务处理代码。
3、简单队列操作:
(1)Web控制台:
A、创建queue:
(2)生产者:
工具类方法:
public static void producerMessage(String queueName,byte[] messageByte){
Connection connection = RabbitmqClient.getConnection();//创建连接
Channel channel =null;
try {
channel = connection.createChannel();//从连接中创建通道
AMQP.BasicProperties.Builder basicProperties = new AMQP.BasicProperties().builder();
basicProperties.deliveryMode(2);//设置消息持久化,先确认对应exchange、queue等也是持久化,否则rabbitmq重启,消息将会丢失。
//channel.queueDeclare(queueName,false,false,false,null);//创建队列,提前手动创建好
channel.basicPublish("",queueName,basicProperties.build(), messageByte);//发送消息,无exchangeName,指定routeKey为queueName
} catch (Exception e) {
e.printStackTrace();
}finally{
try {
channel.close();//通道关闭
connection.close();//连接关闭
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
测试方法:
public static void main(String[] argus){
String queueName="lsyTest";
String message="this is first test queue message,2";
byte[] studentByte = BeanUtils.bean2Byte(message);
SimpleQueue.producerMessage(queueName,studentByte);
}
(3)消费者:
工具类方法:
public static void consumerMessage(String queueName,String consumeName){
final Connection connection = RabbitmqClient.getConnection();//创建连接
try {
final Channel channel = connection.createChannel();//从连接中创建通道
Boolean autoAck = false;
channel.basicConsume(queueName,autoAck,consumeName,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
long deliveryTag = envelope.getDeliveryTag();
System.out.println(routingKey);
System.out.println(contentType);
System.out.println(deliveryTag);
//StudentEntity studentEntity = (StudentEntity) BeanUtils.byte2Obj(body);
String message = (String)BeanUtils.byte2Obj(body);
System.out.println(message);
channel.basicAck(deliveryTag, false);
}
});
} catch (IOException e) {
e.printStackTrace();
}
}
测试方法:
public static void main(String[] argus){
String queueName="lsyTest";
SimpleQueue.consumerMessage(queueName,"lsyTestConsume");
}
4、工作队列操作:
(1)Web控制台:
A、创建queue:
与简单队列操作中创建queue一样,只是可能queue的名称和virtual host不同而已。
(2)生产者:
工具类方法:
public static void producerMessage(String queueName,byte[] messageByte){
Connection connection = RabbitmqClient.getConnection();//创建连接
Channel channel =null;
try {
channel = connection.createChannel();//从连接中创建通道
AMQP.BasicProperties.Builder basicProperties = new AMQP.BasicProperties().builder();
basicProperties.deliveryMode(2);//设置消息持久化,先确认对应exchange、queue等也是持久化,否则rabbitmq重启,消息将会丢失。
//channel.queueDeclare(queueName,false,false,false,null);//创建队列,提前手动创建好
channel.basicPublish("",queueName,basicProperties.build(), messageByte);//发送消息,无exchangeName,指定routeKey为queueName
} catch (IOException e) {
e.printStackTrace();
}finally{
try {
channel.close();//通道关闭
connection.close();//连接关闭
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
测试方法:
public static void main(String[] argus){
String queueName="lsyTest";
for(int i = 1; i <= 20; i++){
String message ="this is message :"+i;
System.out.println(message);
byte[] studentByte = BeanUtils.bean2Byte(message);
WorkerQueue.producerMessage(queueName,studentByte);
}
}
(3)消费者:
工具类方法:
public static void consumerMessage(String queueName,String consumeName,Integer seconds){
final Connection connection = RabbitmqClient.getConnection();//创建连接
try {
final Channel channel = connection.createChannel();//从连接中创建通道
channel.basicQos(1);//公平分发,哪个消费者处理能力强给哪个就多,反之就少
Boolean autoAck = false;
channel.basicConsume(queueName,autoAck,consumeName,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
long deliveryTag = envelope.getDeliveryTag();
//StudentEntity studentEntity = (StudentEntity) BeanUtils.byte2Obj(body);
String message = (String) BeanUtils.byte2Obj(body);
System.out.println(message);
try {
Thread.sleep(seconds);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(deliveryTag, false);
}
});
} catch (IOException e) {
e.printStackTrace();
}
}
测试方法:
public static void main(String[] argus){
String queueName="lsyTest";
String consumeName="lsyTestConsume";
WorkerQueue.consumerMessage(queueName,consumeName,50);
}
5、订阅模式操作:
(1)Web控制台:
A、创建exchange:
根据exchange的四种类型,选择fanout来创建订阅模式的exchange是正确的。
B、创建queue:
与简单队列操作中创建queue一样,只是可能queue的名称和virtual host不同而已。
C、绑定exchange和queue:
绑定有两种方式,一种是在exchange中绑定queue,另一种是在queue中绑定exchange。两种绑定的效果是一致的。
在exchange中绑定queue:
选中一个exchange点击进入,然后如下图输入queue名称即可。
在queue中绑定exchange:
选中一个queue点击进入,然后如下图输入exchange名称即可。
(2)生产者:
工具类方法:
public static void producerMessage(String exchangeName,byte[] messageByte){
Connection connection = RabbitmqClient.getConnection();//创建连接
Channel channel =null;
try {
channel = connection.createChannel();//从连接中创建通道
AMQP.BasicProperties.Builder basicProperties = new AMQP.BasicProperties().builder();
basicProperties.deliveryMode(2);//设置消息持久化,先确认对应exchange、queue等也是持久化,否则rabbitmq重启,消息将会丢失。
//channel.exchangeDeclare(exchangeName,"fanout");//创建exchange,忽略route key,提前手动创建好
channel.basicPublish(exchangeName,"",basicProperties.build(),messageByte);//发送消息,指定exchangeName,无routeKey
} catch (IOException e) {
e.printStackTrace();
}finally{
try {
channel.close();//通道关闭
connection.close();//连接关闭
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
测试方法:
public static void main(String[] argus){
String exchangeName="lsyExchangeTestOne";
String message="this is first test queue message,two";
byte[] studentByte = BeanUtils.bean2Byte(message);
SubscribeQueue.producerMessage(exchangeName,studentByte);
}
(3)消费者:
工具类方法:
public static void consumerMessage(String exchangeName,String queueName,String consumeName,Integer seconds){
final Connection connection = RabbitmqClient.getConnection();//创建连接
try {
final Channel channel = connection.createChannel();//从连接中创建通道
channel.basicQos(1);//公平分发,哪个消费者处理能力强给哪个就多,反之就少
Boolean autoAck = false;
channel.basicConsume(queueName,autoAck,consumeName,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
long deliveryTag = envelope.getDeliveryTag();
//StudentEntity studentEntity = (StudentEntity) BeanUtils.byte2Obj(body);
String message = (String) BeanUtils.byte2Obj(body);
System.out.println(message);
try {
Thread.sleep(seconds);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(deliveryTag, false);
}
});
} catch (IOException e) {
e.printStackTrace();
}
}
测试方法:
public static void main(String[] argus){
String exchangeName="lsyExchangeTestOne";
String queueName="lsyExchangeTestOneQueue";
String consumeName="lsyExchangeTestOneQueueConsume";
SubscribeQueue.consumerMessage(exchangeName,queueName,consumeName,50);
}
6、路由模式:
(1)Web控制台:
A、创建exchange:
根据exchange的四种类型,选择direct来创建订阅模式的exchange是正确的。创建方式与订阅模式中创建exchange相同,只是可能exchange的名称和virtual host不同而已,type就要选择direct。
B、创建queue:
与简单队列操作中创建queue一样,只是可能queue的名称和virtual host不同而已。
C、绑定exchange和queue:
与订阅模式操作中绑定exchange和queue一样,只是在routing key中写入具体的值了。就是这个queue中想要获取exchange中哪些消息了。
(2)生产者:
工具类方法:
public static void producerMessage(String exchangeName,String routeKey,byte[] messageByte){
Connection connection = RabbitmqClient.getConnection();//创建连接
Channel channel =null;
try {
channel = connection.createChannel();//从连接中创建通道
AMQP.BasicProperties.Builder basicProperties = new AMQP.BasicProperties().builder();
basicProperties.deliveryMode(2);//设置消息持久化,先确认对应exchange、queue等也是持久化,否则rabbitmq重启,消息将会丢失。
//channel.exchangeDeclare(exchangeName,"direct");//创建exchange,完全匹配route key,提前手动创建好
channel.basicPublish(exchangeName,routeKey,basicProperties.build(),messageByte);//发送消息,指定exchangeName,无routeKey
} catch (IOException e) {
e.printStackTrace();
}finally{
try {
channel.close();//通道关闭
connection.close();//连接关闭
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
测试方法:
public static void main(String[] argus){
String exchangeName="lsyExchangeTestTwo";
String routeKey="two";
for(int i = 1; i <= 10; i++){
String message ="this is message :"+i;
byte[] studentByte = BeanUtils.bean2Byte(message);
RouteQueue.producerMessage(exchangeName,routeKey,studentByte);
}
}
(3)消费者:
工具类方法:
public static void consumerMessage(String exchangeName,String queueName,String consumeName,Integer seconds){
final Connection connection = RabbitmqClient.getConnection();//创建连接
try {
final Channel channel = connection.createChannel();//从连接中创建通道
channel.basicQos(1);//公平分发,哪个消费者处理能力强给哪个就多,反之就少
Boolean autoAck = false;
channel.basicConsume(queueName,autoAck,consumeName,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
long deliveryTag = envelope.getDeliveryTag();
//StudentEntity studentEntity = (StudentEntity) BeanUtils.byte2Obj(body);
String message = (String) BeanUtils.byte2Obj(body);
System.out.println(message);
try {
Thread.sleep(seconds);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(deliveryTag, false);
}
});
} catch (IOException e) {
e.printStackTrace();
}
}
测试方法:
public static void main(String[] argus){
String exchangeName="lsyExchangeTestTwo";
String queueName="lsyExchangeTestTwoQueueTwo";
String consumeName="sss";
RouteQueue.consumerMessage(exchangeName,queueName,consumeName,50);
}
7、主体(通配符)模式操作:
(1)Web控制台:
A、创建exchange:
根据exchange的四种类型,选择topic来创建订阅模式的exchange是正确的。创建方式与订阅模式中创建exchange相同,只是可能exchange的名称和virtual host不同而已,type就要选择topic。
B、创建queue:
与简单队列操作中创建queue一样,只是可能queue的名称和virtual host不同而已。
C、绑定exchange和queue:
与订阅模式操作中绑定exchange和queue一样,只是在routing key中写入带有*、#、^等通配符的key值了。就是这个queue中想要获取exchange中哪些消息的通配符表达式了。
(2)生产者:
工具类方法:
public static void producerMessage(String exchangeName,String routeKey,byte[] messageByte){
Connection connection = RabbitmqClient.getConnection();//创建连接
Channel channel =null;
try {
channel = connection.createChannel();//从连接中创建通道
AMQP.BasicProperties.Builder basicProperties = new AMQP.BasicProperties().builder();
basicProperties.deliveryMode(2);//设置消息持久化,先确认对应exchange、queue等也是持久化,否则rabbitmq重启,消息将会丢失。
//channel.exchangeDeclare(exchangeName,"topic");//创建exchange,模糊匹配route key,提前手动创建好
channel.basicPublish(exchangeName,routeKey,basicProperties.build(),messageByte);//发送消息,指定exchangeName,无routeKey
} catch (IOException e) {
e.printStackTrace();
}finally{
try {
channel.close();//通道关闭
connection.close();//连接关闭
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
测试方法:
public static void main(String[] argus){
String exchangeName="lsyExchangeTestThree";
String routeKey="two.delete";
for(int i = 1; i <= 10; i++){
String message ="this is message delete:"+i;
byte[] studentByte = BeanUtils.bean2Byte(message);
TopicQueue.producerMessage(exchangeName,routeKey,studentByte);
}
}
(3)消费者:
工具类方法:
public static void consumerMessage(String exchangeName,String queueName,String consumeName,Integer seconds){
final Connection connection = RabbitmqClient.getConnection();//创建连接
try {
final Channel channel = connection.createChannel();//从连接中创建通道
channel.basicQos(1);//公平分发,哪个消费者处理能力强给哪个就多,反之就少
Boolean autoAck = false;
channel.basicConsume(queueName,autoAck,consumeName,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
long deliveryTag = envelope.getDeliveryTag();
//StudentEntity studentEntity = (StudentEntity) BeanUtils.byte2Obj(body);
String message = (String) BeanUtils.byte2Obj(body);
System.out.println(message);
try {
Thread.sleep(seconds);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(deliveryTag, false);
}
});
} catch (IOException e) {
e.printStackTrace();
}
}
测试方法:
public static void main(String[] argus){
String exchangeName="lsyExchangeTestThree";
String queueName="lsyExchangeTestThreeQueOneAdd";
String consumeName="sss";
RouteQueue.consumerMessage(exchangeName,queueName,consumeName,50);
}