工作纪实_30-MQ消费redis延时处理合并消息

背景

公司的上游数据,通过MQ给我们做数据同步,但是消息存在各个业务节点,且短时间内可能发送多次,并且我们收到消息反查上游服务时,他们的业务数据存在一定延时,这就导致收到的消息,并不一定是他们最新的

解决方案

消费方,收到消息以后,做消息的暂存处理,并对重复数据进行去重,过了3s后再拿出来消费

技术选型

1.java提供的线程安全队列
2.使用guava的延时发布、订阅
3.使用mq做延时消费
4.使用redis的zset【我选择的】

我们是分布式的服务,选择内存级别的处理,或多或少存在服务崩溃导致数据丢失的问题,但是使用redis则不会,数据依旧存在,风险是最小的。延时MQ方案则存在消息套娃,不考虑!

实现方式

 	public void inQueue(String key, String tag, Long carId, Integer delay) {
    
    
        ZSetOperations zSetOperations = redisTemplate.opsForZSet();
        zSetOperations.add(key, String.format("%s%s%s", tag, SYMBOL, carId), delay * 1000 + new Date().getTime());
    }

	/**
     * 延迟消费
     *
     * @param key
     */
    public void delayConsume(String key, Consumer<Long> consumer) throws InterruptedException {
    
    
        ZSetOperations zSetOperations = redisTemplate.opsForZSet();
        // 获取score的范围0-当前时间戳的数据,一次性取100条
        Set<String> messages = zSetOperations.rangeByScore(key, 0, System.currentTimeMillis(), 0, 100);
        if (ObjectUtils.isEmpty(messages)) {
    
    
            cmdSyncLogger.info("数据同步记录,redis延迟消费,3秒内无数据");
            TimeUnit.SECONDS.sleep(3);
            return;
        }
        HashSet<Long> carIds = new HashSet<>();
        Map<Long, String> carMsgMap = new HashMap<>();
        for (String msg : messages) {
    
    
            String[] split = msg.split(SYMBOL);
            Long carId = Long.valueOf(split[1]);
            carIds.add(carId);
            carMsgMap.put(carId, msg);
        }
        for (Long carId : carIds) {
    
    
            consumer.accept(carId);
            // 删除zset数据
            zSetOperations.remove(key, carMsgMap.get(carId));
        }
        Set set = zSetOperations.rangeByScore(key, 0, System.currentTimeMillis(), 0, 100);
        cmdSyncLogger.info("数据同步记录,redis延迟消费,carId = {}-合并消费成功,{}", carIds, set.size());
    }

	@PostConstruct
    public void loop() {
    
    
        String key = RedisConstant.SALE_PREFIX + "sale_car_index_sync";
        new Thread(() -> {
    
    
            while (true) {
    
    
                try {
    
    
                    boolean lockResult = redisLock.tryLock(key, 500);
                    if (lockResult) {
    
    
                        messageMergeManage.delayConsume("car_sync_queue", carId -> {
    
    
                            boolean flag = saleCarDataTransferService.saleCmdDataTransfer(carId);
                            cmdSyncLogger.info("数据同步记录:车源数据变更事件,carId = {}-消费{}", carId, flag ? "成功" : "失败");
                        });
                    }
                } catch (Exception e) {
    
    
                    // doNothing
                    cmdSyncLogger.error("数据同步记录:车源数据变更事件,消费异常", e);
                } finally {
    
    
                    redisLock.unLock(key);
                }
            }
        }).start();
    }
    

猜你喜欢

转载自blog.csdn.net/u013553309/article/details/129954654