1、pom.xml配置:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.yf</groupId>
<artifactId>java-spring-boot-dubbo</artifactId>
<version>1.0.0-SNAPSHOT</version>
<name>java-spring-boot-dubbo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-cache</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
<version>5.1.46</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- alibaba的druid数据库连接池 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.9</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.32</version>
</dependency>
<dependency>
<groupId>org.mybatis.generator</groupId>
<artifactId>mybatis-generator-core</artifactId>
<version>1.3.5</version>
</dependency>
<!--rabbitMq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>4.3.18.RELEASE</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-test</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<!--添加mybatis generator maven插件-->
<plugin>
<groupId>org.mybatis.generator</groupId>
<artifactId>mybatis-generator-maven-plugin</artifactId>
<version>1.3.5</version>
<configuration>
<!--generatorConfig.xml位置-->
<configurationFile>src/main/resources/generatorConfig.xml</configurationFile>
<verbose>true</verbose>
<overwrite>true</overwrite>
</configuration>
<executions>
<execution>
<id>Generate MyBatis Artifacts</id>
<goals>
<goal>generate</goal>
</goals>
<phase>generate-sources</phase>
</execution>
</executions>
<!--此处必须添加mysql驱动包-->
<dependencies>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
<version>5.1.46</version>
</dependency>
<dependency>
<groupId>org.mybatis.generator</groupId>
<artifactId>mybatis-generator-core</artifactId>
<version>1.3.5</version>
</dependency>
</dependencies>
</plugin>
</plugins>
</build>
</project>
2、application.proerties中rabbitMq的配置
# RabbitMQ基础配置
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.connection-timeout=
spring.rabbitmq.virtual-host=/
# RabbitMQ监听配置
## 初始并发量
spring.rabbitmq.listener.simple.concurrency=5
## 最大并发量
spring.rabbitmq.listener.simple.max-concurrency=15
## 签收方式
spring.rabbitmq.listener.simple.acknowledge-mode=manual
## 最多一次消费多少条数据 -限流
spring.rabbitmq.listener.simple.prefetch=1
3、消息队列配置类:MyConfig
package com.yf.springboot.entity.rabbitMq;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* 消息队列配置
* @title MqConfig
* @author yf
* @date 2018年2月2日
* @since v1.0.0
*/
@Component
public class MqConfig {
/**
* 服务器域名或IP
*/
@Value("${spring.rabbitmq.host}")
private String host;
/**
* 端口,默认5672
*/
@Value("${spring.rabbitmq.port}")
private int port;
/**
* 用户名,默认guest
*/
@Value("${spring.rabbitmq.username}")
private String username;
/**
* 用户密码,默认guest
*/
@Value("${spring.rabbitmq.password}")
private String password;
/**
* 虚拟目录,默认/
*/
private String virtualHost = "/";
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getVirtualHost() {
return virtualHost;
}
public void setVirtualHost(String virtualHost) {
this.virtualHost = virtualHost;
}
}
4、自定义消息对象:MqMessage
package com.yf.springboot.entity.rabbitMq;
/**
* 消息队列消息对象
* @title MqMessage
* @author yf
* @date 2018年2月2日
* @since v1.0.0
*/
public class MqMessage {
/**
* 主机名的环境变量名
*/
private static final String HOSTNAME_ENV_NAME = "HOSTNAME";
/**
* 主机名
*/
private String hostName;
/**
* 转发器名称
*/
private String exchangeName;
/**
* 消息
*/
private String message;
private MqMessage() {
// 禁止实例化,避免生成没有主机名的消息
}
public static MqMessage newMessage(String exchangeName, String message) {
MqMessage msg = new MqMessage();
msg.setExchangeName(exchangeName);
msg.setMessage(message);
// 在消息上附加主机名,便于问题追踪
String hostName = "unknown";
try {
hostName = System.getenv(HOSTNAME_ENV_NAME);
} catch (Exception e) {
// nothing to do
}
msg.setHostName(hostName);
return msg;
}
public String getHostName() {
return hostName;
}
public void setHostName(String hostName) {
this.hostName = hostName;
}
public String getExchangeName() {
return exchangeName;
}
public void setExchangeName(String exchangeName) {
this.exchangeName = exchangeName;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
}
5、消息发布者:MessagePublisher
package com.yf.springboot.utils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.yf.springboot.entity.rabbitMq.MqConfig;
import com.yf.springboot.entity.rabbitMq.MqMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
/**
* 消息发布者
* @title MessagePublisher
* @author yf
* @date 2018年2月2日
* @since v1.0.0
*/
@Component
public class MessagePublisher {
private static final Logger LOGGER = LoggerFactory.getLogger(MessagePublisher.class);
/**
* 服务端配置
*/
@Resource
private MqConfig config;
private ConnectionFactory factory;
private Connection connection;
/**
* 初始化方法
*
* @throws IOException
* @throws TimeoutException
*/
@PostConstruct
public void init() throws IOException, TimeoutException {
// 1.设置MQ相关的信息
factory = new ConnectionFactory();
factory.setHost(config.getHost());
factory.setPort(config.getPort());
factory.setUsername(config.getUsername());
factory.setPassword(config.getPassword());
factory.setVirtualHost(config.getVirtualHost());
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(60000L);
// 2.创建一个新的连接
connection = factory.newConnection();
}
/**
* 向消息队列中发布一条消息
*
* @param exchangeName
* 转发器名称
* @param message
* 消息
* @return
*/
public boolean publish(String exchangeName, String message) { //exchangeName是交换器,message是对应的消息
try {
//创建一个通道
Channel channel = connection.createChannel();
MqMessage msgObj = MqMessage.newMessage(exchangeName, message);
String msg = JSON.toJSONString(msgObj);
channel.basicPublish(msgObj.getExchangeName(), "", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes("UTF-8"));
channel.close();
LOGGER.info("MessagePublisher消息推送成功: message={}", msg);
return true;
} catch (Exception e) {
LOGGER.error("MessagePublisher消息推送异常, message={}", message, e);
}
return false;
}
/**
* 设置MQ连接相关的信息
*
* @param config
*/
public void setConfig(MqConfig config) {
this.config = config;
}
}
6、消息处理器接口:MessageHandler
package com.yf.springboot.common;
/**
* 消息处理器接口
* 通过实现该接口来定制自己的消息处理逻辑
* @title 消息处理器接口
* @author yf
* @date 2018年2月2日
* @since v1.0.0
*/
public interface MessageHandler {
/**
* 处理消息
* @param message 消息
* @return true-如果消息最终被消费掉,该消息会从队列中移除 false-如果消息没有被消费,该消息会保持在队列中
*/
public boolean handleMessage(String message);
}
7、消息订阅者:MessageSubscriber
package com.yf.springboot.utils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.yf.springboot.common.MessageHandler;
import com.yf.springboot.entity.rabbitMq.MqConfig;
import com.yf.springboot.entity.rabbitMq.MqMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
/**
* 消息订阅者
*
* @title 消息订阅者
* @author yf
* @date 2018年2月2日
* @since v1.0.0
*/
@Component
public class MessageSubscriber {
private static final Logger LOGGER = LoggerFactory.getLogger(MessageSubscriber.class);
/**
* 服务端配置
*/
@Resource
private MqConfig config;
private ConnectionFactory factory;
private Connection connection;
@PostConstruct
public void init() throws IOException, TimeoutException {
// 1.设置MQ相关的信息
factory = new ConnectionFactory();
factory.setHost(config.getHost());
factory.setPort(config.getPort());
factory.setUsername(config.getUsername());
factory.setPassword(config.getPassword());
factory.setVirtualHost(config.getVirtualHost());
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(60000L);
// 2.创建一个新的连接
connection = factory.newConnection();
}
/**
* 订阅消息
* @param
* @param queueName 队列名称
* @param handler 消息处理器
* @throws IOException
*/
public void subscribe(String queueName, final MessageHandler handler) throws IOException {
// 暂时先使用一个connection多个channel的方式,后续根据量进行优化
Channel channel = connection.createChannel();
//创建消费者
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
try {
String msg = new String(body, "UTF-8");
LOGGER.info("MessageSubscriber接收消息: message={}", msg);
MqMessage msgObj = JSON.parseObject(msg, MqMessage.class);
boolean handleResult = handler.handleMessage(msgObj.getMessage());
if (handleResult) {
getChannel().basicAck(envelope.getDeliveryTag(), false);
LOGGER.info("MessageSubscriber消费消息成功: message={}", msg);
} else {
LOGGER.info("MessageSubscriber消费消息失败: message={}", msg);
}
} catch (Exception e) {
LOGGER.error("MessageSubscriber处理发生异常.", e);
}
}
};
// 消费消息,不使用消息自动确认机制
channel.basicConsume(queueName, false, consumer);
}
/**
* 设置MQ连接相关的信息
*
* @param config
*/
public void setConfig(MqConfig config) {
this.config = config;
}
}