一、背景
最近的业务场景,要进行数据同步。简单来说源数据要存到Oracle库,Oracle库中历史数据太多,查询耗时太长,用户无法接受,所以在mysql库进行了分表,要把数据同步到mysql库中。原来的实现是每天跑定时任务,对前一天的数据进行同步,问题来了,数据同步不及时,查询的话要分两个接口,查历史数据走Mysql,查当天数据走Oracle。但是两个接口的逻辑完全相同,因为业务功能是一样的,这就很**【难以描述】。所以重点来了,使用消息的方式,当数据保存到Oracle时,发消息,消息体即保存的数据,消费消息,将数据保存到Mysql,基本保证了实时性。如果不使用消息呢?保存完Oracle,再保存到Mysql,那这个保存接口是不是时间太长了?兄dei【不是
二、实现
1. 消息生产方配置
/**
* 消息生产方配置
*
* @yx8102 2019/1/25
*/
@Component
public class MessageProducerConfig {
// 消息服务器地址
@Value("${message.producer.serverAddress}")
private String serverAddress;
// 发送分组名称
@Value("${message.producer.groupName}")
private String groupName;
@Bean
public RocketMqClient initRocketMqClient(){
validateMessage();
RocketMqServer server = RocketMqServer.build(this.serverAddress);
return RocketMqClient.build(server, this.groupName);
}
private void validateMessage(){
Assert.notNull(this.serverAddress, "serverAddress is blank");
Assert.notNull(this.groupName, "groupName is blank");
}
public void setServerAddress(String serverAddress) {
this.serverAddress = serverAddress;
}
public void setGroupName(String groupName) {
this.groupName = groupName;
}
}
2. 发送消息服务
/**
* 发送消息服务
*
* @xyang010 2019/1/30
*/
@Service
@Slf4j
public class PictureSyncMessageService {
@Autowired
private RocketMqClient rocketMqClient;
/**
* 发送同步消息
* @param syncFailedDO
*/
public void sendPictureSyncMessage(PictureSyncFailedDO syncFailedDO) {
//1. 拼装消息
RocketMqMessage mqMessage = new RocketMqMessage();
mqMessage.setTopic(MessageConstant.ROCKET_MQ_TOPIC);
mqMessage.setTag(MessageConstant.PICTURE_SYNC_TAG);
mqMessage.setKey(buildMessageKey(syncFailedDO.getZpFileId()));
mqMessage.setData(syncFailedDO);
//2. 发送消息
ITryCatchResult<SendResult> invokeResult = rocketMqClient.send(mqMessage);
//3. 发送成功
if(invokeResult.isSuccess() && invokeResult.getResult() != null) {
SendResult sendRes = invokeResult.getResult();
SendStatus sendStatus = sendRes.getSendStatus();
if (sendStatus.equals(SendStatus.SEND_OK)) {
}
}
}
/**
* 创建messageKey
* @param id
* @return
*/
private static String buildMessageKey(Long id){
return "zpFileId:"+ id +"_" + UUIDUtils.getUUID("N");
}
}
3. 消费消息方配置
/**
* 消息消费方配置
*
* @yx8102 2019/2/2
*/
@Component
@Slf4j
public class MessageConsumerConfig {
@Value("${message.consumer.serverAddress}")
private String serverAddress;
@Value("${message.consumer.groupName}")
private String groupName;
@Value("${message.consumer.topic}")
private String topic;
@Autowired
private PictureSyncMessageListener messageListener;
@Bean
public DefaultMQPushConsumer getRocketMQConsumer() {
validateMessage();
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
consumer.setNamesrvAddr(serverAddress);
consumer.setMessageListener(messageListener);
consumer.setConsumeMessageBatchMaxSize(10);
consumer.setConsumeThreadMax(10);
consumer.setConsumeThreadMin(5);
try{
consumer.subscribe(topic, MessageConstant.PICTURE_SYNC_TAG);
consumer.start();
log.info("message consumer is start successfully");
} catch (MQClientException e){
log.error("message consumer start error {}", e);
}
return consumer;
}
public void setServerAddress(String serverAddress) {
this.serverAddress = serverAddress;
}
public void setGroupName(String groupName) {
this.groupName = groupName;
}
public void setTopic(String topic) {
this.topic = topic;
}
private void validateMessage(){
Assert.notNull(this.serverAddress, "serverAddress is blank");
Assert.notNull(this.groupName, "groupName is blank");
Assert.notNull(this.topic, "topic is blank");
}
}
4. 消费消息服务
/**
* 消息监听服务
*
* @yx8102 2019/2/2
*/
@Component
@Slf4j
public class PictureSyncMessageListener implements MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt messageExt : msgs) {
String message = new String(messageExt.getBody());
log.info("receive picture sync message {}", message);
PictureSyncFailedDO syncFailedDO = JSON.parseObject(message, PictureSyncFailedDO.class);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
}
5. 说明
- 增加消费方的配置:clusterName,brokerName,groupName
- 增加topic:使用驼峰命名
- 生产方和消费方每个工程使用一个producerGroupName和consumerGroupName就好
- 同一个topic可根据tag不同进行过滤
- producer只能start一次
- consumer要记得start
- 本地单测的时候,要sleep的时间足够长,保证消息发送完成,以及监听得到
- consumerListener 配到对应的类即可,不同的topic要不同的配置类