背景
公司的上游数据,通过MQ给我们做数据同步,但是消息存在各个业务节点,且短时间内可能发送多次,并且我们收到消息反查上游服务时,他们的业务数据存在一定延时,这就导致收到的消息,并不一定是他们最新的
解决方案
消费方,收到消息以后,做消息的暂存处理,并对重复数据进行去重,过了3s后再拿出来消费
技术选型
1.java提供的线程安全队列
2.使用guava的延时发布、订阅
3.使用mq做延时消费
4.使用redis的zset【我选择的】
我们是分布式的服务,选择内存级别的处理,或多或少存在服务崩溃导致数据丢失的问题,但是使用redis则不会,数据依旧存在,风险是最小的。延时MQ方案则存在消息套娃,不考虑!
实现方式
public void inQueue(String key, String tag, Long carId, Integer delay) {
ZSetOperations zSetOperations = redisTemplate.opsForZSet();
zSetOperations.add(key, String.format("%s%s%s", tag, SYMBOL, carId), delay * 1000 + new Date().getTime());
}
/**
* 延迟消费
*
* @param key
*/
public void delayConsume(String key, Consumer<Long> consumer) throws InterruptedException {
ZSetOperations zSetOperations = redisTemplate.opsForZSet();
// 获取score的范围0-当前时间戳的数据,一次性取100条
Set<String> messages = zSetOperations.rangeByScore(key, 0, System.currentTimeMillis(), 0, 100);
if (ObjectUtils.isEmpty(messages)) {
cmdSyncLogger.info("数据同步记录,redis延迟消费,3秒内无数据");
TimeUnit.SECONDS.sleep(3);
return;
}
HashSet<Long> carIds = new HashSet<>();
Map<Long, String> carMsgMap = new HashMap<>();
for (String msg : messages) {
String[] split = msg.split(SYMBOL);
Long carId = Long.valueOf(split[1]);
carIds.add(carId);
carMsgMap.put(carId, msg);
}
for (Long carId : carIds) {
consumer.accept(carId);
// 删除zset数据
zSetOperations.remove(key, carMsgMap.get(carId));
}
Set set = zSetOperations.rangeByScore(key, 0, System.currentTimeMillis(), 0, 100);
cmdSyncLogger.info("数据同步记录,redis延迟消费,carId = {}-合并消费成功,{}", carIds, set.size());
}
@PostConstruct
public void loop() {
String key = RedisConstant.SALE_PREFIX + "sale_car_index_sync";
new Thread(() -> {
while (true) {
try {
boolean lockResult = redisLock.tryLock(key, 500);
if (lockResult) {
messageMergeManage.delayConsume("car_sync_queue", carId -> {
boolean flag = saleCarDataTransferService.saleCmdDataTransfer(carId);
cmdSyncLogger.info("数据同步记录:车源数据变更事件,carId = {}-消费{}", carId, flag ? "成功" : "失败");
});
}
} catch (Exception e) {
// doNothing
cmdSyncLogger.error("数据同步记录:车源数据变更事件,消费异常", e);
} finally {
redisLock.unLock(key);
}
}
}).start();
}