这几天遇到了一个非常好玩的东西,在系统中写了一个消息队列。
当消息队列启动时会不定期的有数据进入。由消息队列执行一些同步,验证等操作。
后来发现一个问题。如果分布式部署代码,那么就会出现问题。
分布了两台或者多台机器的时候队列也跟着变多了。但是实际上这是一个队列。
由此,衍生了下面的分布式锁的代码:
CuratorProperties curatorProperties = CuratorPropertiesBuilder.getInstance().build(new LocalZookeeperPropertiesLoader().load()); try (CuratorFramework curatorFramework = CRMSCuratorFrameworkFactory.createCuratorFramework(curatorProperties)) { //互斥分布式锁 String lockName = MEMBER_ON_ZOOKEEPER_PREFIX + importHistory.getImportBatch(); InterProcessMutex mutexLock = new InterProcessMutex(curatorFramework, lockName); if (mutexLock.acquire(-1, null)) { try { importHistory.setGeneratingCount(ordinaryMembersRepository.getSynNumByImportHistoryId(ordinaryMembersUse.getImportHistoryId().getId())); importHistory.setGeneratingNotCount(ordinaryMembersRepository.getNotSynNumByImportHistoryId(ordinaryMembersUse.getImportHistoryId().getId())); if (importHistory.getDataCount().equals((importHistory.getGeneratingCount() != null ? importHistory.getGeneratingCount() : 0) + (importHistory.getGeneratingNotCount() != null ? importHistory.getGeneratingNotCount() : 0) + (importHistory.getVerifyNotThroughCount() != null ? importHistory.getVerifyNotThroughCount() : 0) )) { importHistory.setBatchStatus(BatchStatus.COMPLATE); } importHistoryRepository.saveAndFlush(importHistory); }catch (Exception ex) { LOGGER.error("同步普通会员:设置批次个数失败,会员ID:" + importHistory.getImportBatch() + "错误:", ex); } finally { mutexLock.release(); } } } catch (Exception ex) { LOGGER.error("同步普通会员:设置批次个数失败,会员ID:" + importHistory.getImportBatch() + "错误:", ex); }