1.项目结构及环境配置
项目结构
src
main
java
com
harve
common --------------- 公共类
dao ------------------ DAO
datasource ----------- 数据源
model ---------------- 实体类
mq
consumer --------- rocketmq 消费者
producer --------- rocketmq 生产者
service -------------- 业务处理
util ----------------- 工具类
resource
mapper ----------------------- mybatis 配置文件
application.yml -------------- 全局配置文件
banner.txt ------------------- logo
logback-spring.xml ----------- 日志配置
MySQL 创建表 SQL 脚本
drop table if exists `order_test`;
create table `order_test` (
`id` int(11) not null auto_increment,
`order_no` varchar(20) default null,
`user_id` varchar(20) default null,
`order_money` double default null,
`pay_type` int(11) default null,
`create_time` varchar(20) default null,
primary key (`id`)
) engine=innodb default charset=utf8;
RocketMQ 创建 Topic
$ sh mqadmin updateTopic -n IP:Port -c TestCluster -t OrderQueue
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option PermSize=128m; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=128m; support was removed in 8.0
create topic to 10.20.0.13:10911 success.
TopicConfig [topicName=OrderQueue, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false]
application.yml
# port
server:
port: 8080
# log
log:
root:
level: INFO
# mysql
datasource:
order:
url: jdbc:mysql://192.168.191.65:3306/orderdb?useUnicode=true&characterEncoding=utf8
username: root
password: P@ssw0rd
driver-class-name: com.mysql.jdbc.Driver
max-wait: 10000
min-idle: 2
type: com.alibaba.druid.pool.DruidDataSource
maxActive: 10
initial-size: 5
validation-query: SELECT 1
test-on-borrow: false
test-while-idle: true
time-between-eviction-runs-millis: 18800
filters: stat
# rocketmq
rmq:
order-queue:
nameAddress: test-rq01-a.mq.01zhuanche.com:9876
groupName: order_queue_group
instanceName: order_queue_instance
batchMaxSize: 16
topicName: OrderQueue
tag:
logback-spring.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="false" scanPeriod="1 minutes">
<springProperty scope="context" name="log.root.level" source="log.root.level"/>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender" >
<encoder>
<pattern>%d{yyyy-MM-dd.HH:mm:ss.SSS} %t %p %c{0} %X{traceId} : %m%n</pattern>
</encoder>
</appender>
<logger name="com.harvey" level="INFO" />
<root level="${log.root.level}" >
<appender-ref ref="CONSOLE"/>
</root>
</configuration>
banner.txt
_____ _ ____ _ _____ _ _ __ __ ____
/ ____| (_) | _ \ | | | __ \ | | | | | \/ |/ __ \
| (___ _ __ _ __ _ _ __ __ _| |_) | ___ ___ | |_ ______| |__) |___ ___| | _____| |_| \ / | | | |
\___ \| '_ \| '__| | '_ \ / _` | _ < / _ \ / _ \| __|______| _ // _ \ / __| |/ / _ \ __| |\/| | | | |
____) | |_) | | | | | | | (_| | |_) | (_) | (_) | |_ | | \ \ (_) | (__| < __/ |_| | | | |__| |
|_____/| .__/|_| |_|_| |_|\__, |____/ \___/ \___/ \__| |_| \_\___/ \___|_|\_\___|\__|_| |_|\___\_\
| | __/ |
|_| |___/
2.代码
1.数据源
- BaseDataSourceConfig.java
package com.harvey.datasource;
import com.github.pagehelper.PageHelper;
import org.apache.ibatis.plugin.Interceptor;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import javax.sql.DataSource;
import java.util.Properties;
/**
* @description 数据源配置基类
* @auther harvey
*/
public interface BaseDataSourceConfig {
/**
* 获取mapper.xml路径
* @return
*/
String getMapperLocations();
/**
* 基于PageHelper支持的SqlSessionFactory
* @param dataSource
* @param dialect
* @return
* @throws Exception
*/
default SqlSessionFactory createPageSessionFactory(DataSource dataSource, String dialect) throws Exception {
SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
factoryBean.setDataSource(dataSource);
factoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(this.getMapperLocations()));
PageHelper pageHelper = new PageHelper();
Properties props = new Properties();
props.setProperty("dialect", dialect);
pageHelper.setProperties(props); // 添加插件
factoryBean.setPlugins(new Interceptor[]{pageHelper});
return factoryBean.getObject();
}
}
- OrderDataSourceConfig.java
package com.harvey.datasource;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import javax.sql.DataSource;
/**
* @description 数据源配置类,多个数据源参照此类配置
* @auther harvey
*/
@Configuration
@MapperScan(basePackages = {"com.harvey.dao.order"}, sqlSessionTemplateRef = "orderSqlSessionTemplate")
public class OrderDataSourceConfig implements BaseDataSourceConfig{
static final String MAPPER_LOCATION = "classpath*:mapper/order/*.xml";
@Bean(name="orderProperties")
@ConfigurationProperties(prefix = "datasource.order")
@Primary
public DataSourceProperties dataSourceProperties() {
return new DataSourceProperties();
}
@Bean(name = "orderDS")
@ConfigurationProperties(prefix = "datasource.order")
@Primary
public DataSource dataSource(@Qualifier("orderProperties")DataSourceProperties dataSourceProperties) {
return dataSourceProperties.initializeDataSourceBuilder().build();
}
@Bean(name ="orderSqlSessionFactory")
@Primary
public SqlSessionFactory orderSqlSessionFactory(@Qualifier("orderDS") DataSource dataSource) throws Exception {
return this.createPageSessionFactory(dataSource,"mysql");
}
@Bean(name = "orderTransactionManager")
@Primary
public DataSourceTransactionManager orderTransactionManager(@Qualifier("orderDS") DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}
@Bean(name="orderSqlSessionTemplate")
@Primary
public SqlSessionTemplate orderSqlSessionTemplate(@Qualifier("orderSqlSessionFactory") SqlSessionFactory factory) {
return new SqlSessionTemplate(factory);
}
@Override
public String getMapperLocations() {
return MAPPER_LOCATION;
}
}
2.RocketMQ 配置
com.harvey.mq.consumer.common 包下
- BaseConsumer.java
package com.harvey.mq.consumer.common;
import com.harvey.common.Constants;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import java.util.Arrays;
import java.util.List;
/**
* @description RocketMQ 消费基类 BaseConsumer
* @auther harvey
*/
@Slf4j
@Configuration
public class BaseConsumer implements DisposableBean {
private String nameAddress;
private String groupName;
private int batchMaxSize;
private String topicName;
private String instanceName;
private String tag;
private MQMessageListener mqMessageListener;
private DefaultMQPushConsumer consumer = null;
public BaseConsumer() { }
public BaseConsumer(String nameAddress, String groupName, int batchMaxSize, String topicName, String instanceName, String tag) {
this.nameAddress = nameAddress;
this.groupName = groupName;
this.batchMaxSize = batchMaxSize;
this.topicName = topicName;
this.instanceName = instanceName;
this.tag = tag;
}
public void registerListener() {
try {
log.info("rocketmq init【topicName = " + this.topicName + ",groupName = " + this.groupName + "】");
if (consumer == null) {
consumer = new DefaultMQPushConsumer(this.groupName);
}
consumer.setNamesrvAddr(this.nameAddress);
// consumer 消费策略:从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
consumer.setConsumeTimestamp(Constants.CONSUMER_FROM_TIME);
consumer.setConsumeMessageBatchMaxSize(this.batchMaxSize);
if (StringUtils.isNotEmpty(this.tag)) {
consumer.subscribe(this.topicName, this.tag);
} else {
consumer.subscribe(this.topicName, "*");
}
// 注册监听
consumer.registerMessageListener(new MessageListenerOrderly() {
String errmsg = "";
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (int i = 0; i < msgs.size(); i++) {
MessageExt msgExt = msgs.get(i);
String msgId = msgExt.getMsgId();
String msgBody = new String(msgExt.getBody());
try {
mqMessageListener.handleMessage(msgBody, msgId);
} catch (Exception e) {
String errmsg = String.format("consumer error: topic = %s, groupName = %s, data:%s, exception:%s", topicName, groupName, msgBody, e.getMessage());
log.error(errmsg, e);
}
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
} catch (Exception e) {
String errmsg = "rocketmq registerMessageListener fail!【topicName = " + this.topicName + ",groupName = " + this.groupName + "】";
log.error(errmsg, e);
}
log.info("rocketmq registerMessageListener success!【topicName = " + this.topicName + ",groupName = " + this.groupName + "】");
}
@Override
public void destroy() throws Exception {
consumer.shutdown();
log.info("consumer shutdown...");
}
public String getTopicName() {
return topicName;
}
public void setTopicName(String topicName) {
this.topicName = topicName;
}
public String getGroupName() {
return groupName;
}
public void setGroupName(String groupName) {
this.groupName = groupName;
}
public String getNameAddress() {
return nameAddress;
}
public void setNameAddress(String nameAddress) {
this.nameAddress = nameAddress;
}
public int getBatchMaxSize() {
return batchMaxSize;
}
public void setBatchMaxSize(int batchMaxSize) {
this.batchMaxSize = batchMaxSize;
}
public String getInstanceName() {
return instanceName;
}
public void setInstanceName(String instanceName) {
this.instanceName = instanceName;
}
public String getTag() {
return tag;
}
public void setTag(String tag) {
this.tag = tag;
}
public MQMessageListener getMqMessageListener() {
return mqMessageListener;
}
public void setMqMessageListener(MQMessageListener mqMessageListener) {
this.mqMessageListener = mqMessageListener;
}
}
- MQMessageListener.java
package com.harvey.mq.consumer.common;
/**
* @description 数据收集MQ 监听
* @auther huhanwei
*/
@FunctionalInterface
public interface MQMessageListener {
public void handleMessage(String message, String msgId) throws Exception;
}
- MQConfig.java
package com.harvey.mq.consumer.common;
import com.harvey.mq.consumer.OrderConsumer;
import com.harvey.mq.consumer.conf.OrderConf;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @description 数据收集MQ 启动配置类
* @auther harvey
*/
@Configuration // 注解作用:标明为一个配置类并交由Spring管理
public class MQConfig {
@Autowired
private OrderConf orderConf;
// OrderQueue
@Bean
public BaseConsumer startOrderConsumer(@Qualifier("orderConsumer")OrderConsumer orderConsumer) {
BaseConsumer consumer = new BaseConsumer(orderConf.getNameAddress(),
orderConf.getGroupName(),
orderConf.getBatchMaxSize(),
orderConf.getTopicName(),
orderConf.getInstanceName(),
orderConf.getTag());
consumer.setMqMessageListener(orderConsumer);
new Thread(consumer::registerListener).start();
return consumer;
}
}
com.harvey.mq.consumer.conf包下
- OrderConf.java
package com.harvey.mq.consumer.conf;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* @description MQ OrderQueue 初始化配置类
* @auther harvey
*/
@Component
@ConfigurationProperties(prefix = "rmq.order-queue")
// 使用 @Getter 和 @Setter 就不用对属性提供get、set方法,找度娘安装个lombok插件
@Getter
@Setter
public class OrderConf {
private String nameAddress;
private String groupName;
private int batchMaxSize;
private String topicName;
private String instanceName;
private String tag;
}
3.实体类
package com.harvey.model;
import lombok.Getter;
import lombok.Setter;
@Getter
@Setter
public class Order {
private String orderNo; // 订单号
private String userId; // 用户id
private Double orderMoney; // 订单金额
private int payType; // 0:货到付款,1:在线支付
private String createTime; // 订单创建时间
}
4.DAO
- OrderDao.java
package com.harvey.dao.order;
import com.harvey.model.Order;
public interface OrderDao {
void saveOrder(Order order);
}
- OrderDao.xml
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.harvey.dao.order.OrderDao" >
<insert id="saveOrder" parameterType="com.harvey.model.Order">
INSERT INTO
order_test(order_no, user_id, order_money, pay_type, create_time)
VALUES
(#{orderNo}, #{userId}, #{orderMoney}, #{payType}, #{createTime})
</insert>
</mapper>
5.Service
- IOrderService.java
package com.harvey.service.order;
import com.harvey.model.Order;
public interface IOrderService {
void saveOrder(Order order);
}
- OrderServiceImpl.java
package com.harvey.service.order.impl;
import com.harvey.dao.order.OrderDao;
import com.harvey.model.Order;
import com.harvey.service.order.IOrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class OrderServiceImpl implements IOrderService {
@Autowired
private OrderDao orderDao;
@Override
public void saveOrder(Order order) {
orderDao.saveOrder(order);
}
}
6.消费者
package com.harvey.mq.consumer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.harvey.common.Constants;
import com.harvey.model.Order;
import com.harvey.mq.consumer.common.MQMessageListener;
import com.harvey.mq.consumer.conf.OrderConf;
import com.harvey.service.order.IOrderService;
import com.harvey.util.DateUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* @description 订单消费者
* @auther harvey
*/
@Component("orderConsumer")
@Slf4j
public class OrderConsumer implements MQMessageListener {
private static final ObjectMapper objectMapper = new ObjectMapper();
@Autowired
private IOrderService orderService;
@Autowired
private OrderConf orderConf;
@Override
public void handleMessage(String message, String msgId) throws Exception {
try {
if (StringUtils.isNotEmpty(message)) {
log.info("数据收集-消费者【topicName = " + orderConf.getTopicName() + ",data = " + message + "】");
// 业务处理...
Order order = objectMapper.readValue(message, Order.class);
orderService.saveOrder(order);
} else {
log.info("消息为空不做处理!");
}
} catch (Exception e) {
log.error("RocketMQ 消费数据->入库 异常!【topicName = " + orderConf.getTopicName() + "】", e);
}
}
}
7.生产者
package com.harvey.mq.producer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.harvey.model.Order;
import com.harvey.mq.consumer.conf.OrderConf;
import com.harvey.util.DateUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import java.util.concurrent.TimeUnit;
/**
* @description 订单生产者
* @auther harvey
*/
@Slf4j
public class OrderProducer {
private static final ObjectMapper objectMapper = new ObjectMapper();
private static String nameAddress = "test-rq01-a.mq.01zhuanche.com:9876";
private static String groupName = "order_queue_group";
private static String topicName = "OrderQueue";
private static String instanceName = "order_queue_instance";
private static String getMessageContent(int i) throws Exception {
Order order = new Order();
order.setOrderNo(String.valueOf(i + 123456789));
order.setUserId(String.valueOf(i + 1001));
order.setOrderMoney(Double.parseDouble(String.valueOf(100 + i)));
order.setPayType(i % 2 == 0 ? 0 : 1);
order.setCreateTime(DateUtil.getCurrentTime());
String jsonStr = objectMapper.writeValueAsString(order);
return jsonStr;
}
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer(groupName);
producer.setNamesrvAddr(nameAddress);
producer.setInstanceName(instanceName);
producer.start();
for (int i = 0; i < 100; i++) {
Message msg = new Message(topicName,
"tag" + String.valueOf(i),
"key" + String.valueOf(i),
getMessageContent(i).getBytes());
SendResult sendResult = producer.send(msg);
log.info(sendResult.toString());
TimeUnit.MILLISECONDS.sleep(1000);
}
producer.shutdown();
}
}
3.测试
- 1.启动程序
程序启动完成后,会打印如下两行日志,如果看到
rocketmq registerMessageListener success! 说明启动成功,rocketmq会监听topic,如果有消息,会主动拉取消息消费数据
rocketmq init【topicName = OrderQueue,groupName = order_queue_group】
rocketmq registerMessageListener success!【topicName = OrderQueue,groupName = order_queue_group】
- 启动生产者
查看控制台(部分日志)
INFO com.harvey.mq.producer.OrderProducer - SendResult [sendStatus=SEND_OK, msgId=AC10141731A465AB7765560B986B005A, offsetMsgId=0A14000D00002A9F0000000F7F74DABA, messageQueue=MessageQueue [topic=OrderQueue, brokerName=broker-a, queueId=0], queueOffset=11]
INFO com.harvey.mq.producer.OrderProducer - SendResult [sendStatus=SEND_OK, msgId=AC10141731A465AB7765560B9C5C005B, offsetMsgId=0A14000D00002A9F0000000F7F752C21, messageQueue=MessageQueue [topic=OrderQueue, brokerName=broker-a, queueId=1], queueOffset=11]
INFO com.harvey.mq.producer.OrderProducer - SendResult [sendStatus=SEND_OK, msgId=AC10141731A465AB7765560BA050005C, offsetMsgId=0A14000D00002A9F0000000F7F753FC5, messageQueue=MessageQueue [topic=OrderQueue, brokerName=broker-a, queueId=2], queueOffset=11]
INFO com.harvey.mq.producer.OrderProducer - SendResult [sendStatus=SEND_OK, msgId=AC10141731A465AB7765560BA443005D, offsetMsgId=0A14000D00002A9F0000000F7F7559B8, messageQueue=MessageQueue [topic=OrderQueue, brokerName=broker-a, queueId=3], queueOffset=11]
INFO com.harvey.mq.producer.OrderProducer - SendResult [sendStatus=SEND_OK, msgId=AC10141731A465AB7765560BA838005E, offsetMsgId=0A14000D00002A9F0000000F7F756FAC, messageQueue=MessageQueue [topic=OrderQueue, brokerName=broker-a, queueId=4], queueOffset=11]
INFO com.harvey.mq.producer.OrderProducer - SendResult [sendStatus=SEND_OK, msgId=AC10141731A465AB7765560BAC2D005F, offsetMsgId=0A14000D00002A9F0000000F7F757E79, messageQueue=MessageQueue [topic=OrderQueue, brokerName=broker-a, queueId=5], queueOffset=11]
INFO com.harvey.mq.producer.OrderProducer - SendResult [sendStatus=SEND_OK, msgId=AC10141731A465AB7765560BB0220060, offsetMsgId=0A14000D00002A9F0000000F7F759DF6, messageQueue=MessageQueue [topic=OrderQueue, brokerName=broker-a, queueId=6], queueOffset=12]
INFO com.harvey.mq.producer.OrderProducer - SendResult [sendStatus=SEND_OK, msgId=AC10141731A465AB7765560BB4230061, offsetMsgId=0A14000D00002A9F0000000F7F75A29B, messageQueue=MessageQueue [topic=OrderQueue, brokerName=broker-a, queueId=7], queueOffset=12]
INFO com.harvey.mq.producer.OrderProducer - SendResult [sendStatus=SEND_OK, msgId=AC10141731A465AB7765560BB81C0062, offsetMsgId=0A14000D00002A9F0000000F7F75BA28, messageQueue=MessageQueue [topic=OrderQueue, brokerName=broker-a, queueId=0], queueOffset=12]
INFO com.harvey.mq.producer.OrderProducer - SendResult [sendStatus=SEND_OK, msgId=AC10141731A465AB7765560BBC210063, offsetMsgId=0A14000D00002A9F0000000F7F75C482, messageQueue=MessageQueue [topic=OrderQueue, brokerName=broker-a, queueId=1], queueOffset=12]
- 观察消费者控制台
部分日志
INFO OrderConsumer : 数据收集-消费者【topicName = OrderQueue,data = {"orderNo":"123456878","userId":"1090","orderMoney":189.0,"payType":1,"createTime":"2018-08-17 16:59:59"}】
INFO OrderConsumer : 数据收集-消费者【topicName = OrderQueue,data = {"orderNo":"123456879","userId":"1091","orderMoney":190.0,"payType":0,"createTime":"2018-08-17 17:00:00"}】
INFO OrderConsumer : 数据收集-消费者【topicName = OrderQueue,data = {"orderNo":"123456880","userId":"1092","orderMoney":191.0,"payType":1,"createTime":"2018-08-17 17:00:01"}】
INFO OrderConsumer : 数据收集-消费者【topicName = OrderQueue,data = {"orderNo":"123456881","userId":"1093","orderMoney":192.0,"payType":0,"createTime":"2018-08-17 17:00:02"}】
INFO OrderConsumer : 数据收集-消费者【topicName = OrderQueue,data = {"orderNo":"123456882","userId":"1094","orderMoney":193.0,"payType":1,"createTime":"2018-08-17 17:00:03"}】
INFO OrderConsumer : 数据收集-消费者【topicName = OrderQueue,data = {"orderNo":"123456883","userId":"1095","orderMoney":194.0,"payType":0,"createTime":"2018-08-17 17:00:04"}】
INFO OrderConsumer : 数据收集-消费者【topicName = OrderQueue,data = {"orderNo":"123456884","userId":"1096","orderMoney":195.0,"payType":1,"createTime":"2018-08-17 17:00:05"}】
INFO OrderConsumer : 数据收集-消费者【topicName = OrderQueue,data = {"orderNo":"123456885","userId":"1097","orderMoney":196.0,"payType":0,"createTime":"2018-08-17 17:00:06"}】
INFO OrderConsumer : 数据收集-消费者【topicName = OrderQueue,data = {"orderNo":"123456886","userId":"1098","orderMoney":197.0,"payType":1,"createTime":"2018-08-17 17:00:07"}】
INFO OrderConsumer : 数据收集-消费者【topicName = OrderQueue,data = {"orderNo":"123456887","userId":"1099","orderMoney":198.0,"payType":0,"createTime":"2018-08-17 17:00:08"}】
INFO OrderConsumer : 数据收集-消费者【topicName = OrderQueue,data = {"orderNo":"123456888","userId":"1100","orderMoney":199.0,"payType":1,"createTime":"2018-08-17 17:00:09"}】
- 查看数据库是否写入数据