版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/phone13144830339/article/details/79041203
所有订阅者都可以获取到来自交换机转发的任务消息
连接到Rabbit,我们都需要一个新的空的队列,消费者与Rabbit断开,消费者所接收的那个队列应该被自动删除
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.et</groupId>
<artifactId>PublishSubscribeMode</artifactId>
<version>0.0.1-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.9.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-mail</artifactId>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.7</source>
<target>1.7</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>
application.properties
server.port=80
spring.mail.host=smtp.qq.com
spring.mail.password=jdxmhkogrggedjfj
spring.mail.protocol=smtp
[email protected]
spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true
package cn.et;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringBootMain {
public static void main(String[] args) {
SpringApplication.run(SpringBootMain.class, args);
}
}
package cn.et.control;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.mail.SimpleMailMessage;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.util.SerializationUtils;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
/**
* 处理任务
* @author lucheng
*/
@RestController
public class ConsumeControl {
@Autowired
private JavaMailSender jms;
/**
* 启动消息处理
* @return
* @throws Exception
*/
static Map<Channel,Connection> map = new HashMap();
@RequestMapping("/upConsumeTask")
public void upConsumeTask() throws Exception {
//创建连接生产工厂对象
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.27.128");
factory.setPort(5672);
//获取连接
Connection connection = factory.newConnection();
//从连接中获取通道
Channel channel = connection.createChannel();
//声明交换机 交换机类型有direct、topic、headers、fanout
channel.exchangeDeclare("PUBLISH_SUBSCRIBE_EXCHANGE","fanout");
//声明(选择)队列
channel.queueDeclare("PUBLISH_SUBSCRIBE_QUEUE",false,false,false,null);
//绑定队列到交换机(队列中的任务消息由交换机转发)
channel.queueBind("PUBLISH_SUBSCRIBE_QUEUE","PUBLISH_SUBSCRIBE_EXCHANGE","");
map.put(channel,connection);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {
Map<String,String> map = (Map<String,String>) SerializationUtils.deserialize(body);
String message = map.get("message");
String mail = map.get("mail");
SimpleMailMessage smm = new SimpleMailMessage();
//邮件来源
smm.setFrom("[email protected]");
//邮件接收
smm.setTo(mail);
//邮件标题
smm.setSubject(message+"请求验证");
//邮件内容
smm.setText("验证码:"+mail);
jms.send(smm);
}
};
channel.basicConsume("PUBLISH_SUBSCRIBE_QUEUE",true, consumer);
System.out.println("已经开启任务处理");
}
/**
* 停止任务处理
* @return
* @throws TimeoutException
* @throws IOException
* @throws Exception
*/
@RequestMapping("/downConsumeTask")
public String downConsumeTask() throws Exception {
Set<Channel> set = map.keySet();
for(Channel ch : set) {
ch.close();
map.get(ch).close();
}
/*try {
try {
channel.close();
} catch (TimeoutException e) {
e.printStackTrace();
}
connection.close();
} catch (IOException e) {
e.printStackTrace();
}*/
return "任务处理已经从开启变更为关闭状态";
}
}
package cn.et.control;
import java.util.Map;
import java.util.TreeMap;
import org.springframework.util.SerializationUtils;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 产生任务
* @author lucheng
*/
@RestController
public class ProduceControl {
@RequestMapping("/produceTask")
public String produceTask() throws Exception {
//创建连接生产工厂对象
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.27.128");
factory.setPort(5672);
//获取连接
Connection connection = factory.newConnection();
//从连接中获取通道
Channel channel = connection.createChannel();
//声明交换机 交换机类型有direct、topic、headers、fanout
channel.exchangeDeclare("PUBLISH_SUBSCRIBE_EXCHANGE","fanout");
String message = "Hello RabbitMQ";
String mail = "[email protected]";
Map<String,String> map = new TreeMap<String,String>();
map.put("message",message);
map.put("mail",mail);
//使用org.springframework.util.SerializationUtils序列化Map对象,发送任务信息到交换机
channel.basicPublish("PUBLISH_SUBSCRIBE_EXCHANGE","",null,SerializationUtils.serialize(map));
//关闭通道
channel.close();
//关闭连接
connection.close();
return "任务(请求验证码)发送成功";
}
}