灵感来了挡也挡不住,基于Redis解决业务场景中延迟队列

在一次产品需求设计中,有这样一种场景,对于一个工单,专员可以选择将工单挂起(一次挂起2小时;二次挂起12小时),或者转存(表单输入转存的天数),然后提交工单 到既定时间后,该工单需要重新被领单,然后再次审核。对于工单,有优先级概念,意味着优先级越大,该工单则需要优先被派单,对于这种场景,我们可以基于Redis来解决场景问题。

一、业务概述

我们假定设置两个队列,一个队列维护正式工单,另一个队列维护挂起工单。对于挂起操作,我们通过Redis设置key有效时间,当key失效时,客户端监听失效事件,获取工单,实现 挂起工单队列的移除,正式队列的入队即可。

业务流程图:

灵感来了挡也挡不住,基于Redis解决业务场景中延迟队列

二、代码实现

灵感来了挡也挡不住,基于Redis解决业务场景中延迟队列

整个业务实现,可以分为三个大模块。

  • 队列模块,通过WorkOrderQueueTransfer带外提供内部的调用。
  • 工单模块、通过OperateStrategyManager管理器实现工单相关场景的操作。
  • 事件监听、通过继承KeyExpirationEventMessageListener,实现对Key过期失效的监听(特别说明:Redis服务器redis.conf需要开启Key过期失效通知)。

2.1、工单队列实现

我们可以基于Redis ZSet数据存储,ZSet是个有序集合,可以实现基于score排序。

2.1.1、定义上下文类(WorkOrderContext)

 
  1. /**

  2. * @description: 工单上下文对象

  3. * @Date : 2020/7/13 下午4:28

  4. * @Author : 石冬冬-Seig Heil

  5. */

  6. @Data

  7. @NoArgsConstructor

  8. @AllArgsConstructor

  9. @Builder

  10. public class WorkOrderContext {

  11. /**

  12. * 是否测试人员专用

  13. */

  14. private boolean isTest;

  15. /**

  16. * 工单号

  17. */

  18. private WorkOrder worOrder;

  19. /**

  20. * 队列类型

  21. */

  22. private QueueType queueType;

  23. /**

  24. * 创建-正式队列(立即需要被派单

  25. * @return

  26. */

  27. public static WorkOrderContext buildImmediate() {

  28. return WorkOrderContext.builder().queueType(QueueType.immediate).build();

  29. } /**

  30. * 创建-挂起队列(挂起n小时执行)

  31. * @return

  32. */

  33. public static WorkOrderContext buildSuspended() {

  34. return WorkOrderContext.builder().queueType(QueueType.suspended).build();

  35. } /**

  36. * 转存队列(转存n天后执行)

  37. * @return

  38. */

  39. public static WorkOrderContext buildStored() {

  40. return WorkOrderContext.builder().queueType(QueueType.stored).build();

  41. } /**

  42. * 创建-正式队列(立即需要被派单)

  43. *

  44. * @param workCode

  45. * @param priority

  46. * @return

  47. */

  48. public static WorkOrderContext buildImmediate(String workCode, double priority) {

  49. WorkOrder workOrder = WorkOrder.builder().workCode(workCode).priority(priority).delayedTime(0).build();

  50. return WorkOrderContext.builder().worOrder(workOrder).queueType(QueueType.immediate).build();

  51. } /**

  52. * 创建-挂起队列(挂起n小时执行)

  53. *

  54. * @param workCode

  55. * @param priority

  56. * @param delayedTime

  57. * @return

  58. */

  59. public static WorkOrderContext buildSuspended(String workCode, double priority, long delayedTime) {

  60. WorkOrder workOrder = WorkOrder.builder().workCode(workCode).priority(priority).delayedTime(delayedTime).build(); return WorkOrderContext.builder().worOrder(workOrder).queueType(QueueType.suspended).build();

  61. } /**

  62. * 转存队列(转存n天后执行)

  63. *

  64. * @param workCode

  65. * @param priority

  66. * @param delayedTime

  67. * @return

  68. */

  69. public static WorkOrderContext buildStored(String workCode, double priority, long delayedTime) {

  70. WorkOrder workOrder = WorkOrder.builder().workCode(workCode).priority(priority).delayedTime(delayedTime).build(); return WorkOrderContext.builder().worOrder(workOrder).queueType(QueueType.stored).build();

  71. } /**

  72. * 队列类型

  73. */

  74. public enum QueueType {

  75. /**

  76. * 正式队列(立即需要被派单)

  77. */

  78. immediate, /**

  79. * 挂起队列(挂起n小时执行)

  80. */

  81. suspended, /**

  82. * 转存队列(转存n天后执行)

  83. */

  84. stored } @Data

  85. @NoArgsConstructor

  86. @AllArgsConstructor

  87. @Builder

  88. public static class WorkOrder {

  89. /**

  90. * 工单号

  91. */

  92. private String workCode;

  93. /**

  94. * 优先级

  95. */

  96. private double priority;

  97. /**

  98. * 延迟时间

  99. */

  100. private long delayedTime;

  101. }}

2.1.2、定义抽象缓存类(AbstractCacheManager)

该抽象类定义了一个方法,抽象定义了一个具有重试机制的方法。其中注入的BeanRedisService是咱们子类基于Redis API的封装。

 
  1. /**

  2. * @description: 抽象缓存管理器

  3. * @Date : 2020/7/18 下午9:41

  4. * @Author : 石冬冬-Seig Heil

  5. */

  6. @Slf4j

  7. public abstract class AbstractCacheManager {

  8. final int MAX_RETRIES = 3;

  9. @Autowired

  10. RedisService redisService; /**

  11. * 重试操作

  12. * @param retries 重试次数

  13. * @param context 上下文

  14. * @param call 重试动作

  15. */

  16. public <T> void retry(int retries, T context, Function<Integer,Boolean> call){

  17. boolean done = false;

  18. int retry = 1;

  19. do {

  20. try {

  21. done = call.apply(retry); log.info("[retry] context={},retry={},done={}", JSONObject.toJSON(context),retry,done);

  22. retry ++; TimeUnit.MILLISECONDS.sleep(100);

  23. } catch (Exception e) {

  24. log.error("[retry] 异常 ctx={}", JSONObject.toJSON(context),e);

  25. retry ++; } }while (retry <= retries && !done);

  26. }}

2.1.3、定义基于Redis Cache的Manager类(WorkOrderCacheManager)

该类的主要作用,基于Redis String对象存储,实现具有Key失效机制的存储。

  • 内部静态类CacheValue,作为Redis String对象存储的Value值。
  • 内部枚举类CacheType,维护了缓存Key的业务前缀。
  • 特别说明的是,我们构成Redis String存储Key的命名规则,例如:carCarthage:stored_cache_单号。
 
  1. /**

  2. * @description: 工单缓存管理器

  3. * @Date : 2020/7/14 下午4:28

  4. * @Author : 石冬冬-Seig Heil

  5. */

  6. @Component

  7. @Slf4j

  8. public class WorkOrderCacheManager extends AbstractCacheManager{

  9. /**

  10. * 设置缓存并设置缓存失效日期

  11. * @param cache

  12. */

  13. public void setCacheInExpire(CacheValue cache){

  14. retry(MAX_RETRIES,cache,idx -> { String redisKey = redisService.getKey(getRedisKeySuffix(cache.getType(),cache.getWorkCode())); redisService.set(redisKey, JSONObject.toJSONString(cache),cache.getExpireSeconds()); log.info("[setCacheInExpire],redisKey={},CacheValue={}",redisKey,JSONObject.toJSONString(cache));

  15. return Boolean.TRUE;

  16. }); } /**

  17. * 查询某个工单号的缓存值

  18. * @param cacheType 缓存类型 {@link CacheType}

  19. * @param workCode 工单号

  20. * @return

  21. */

  22. public CacheValue get(CacheType cacheType,String workCode){

  23. String redisKey = redisService.getKey(getRedisKeySuffix(cacheType,workCode)); String value = redisService.get(redisKey,String.class);

  24. return JSONObject.parseObject(value,CacheValue.class);

  25. } /**

  26. * 从上下文队列类型获取队列redis key

  27. * @param cacheType 缓存类型 {@link CacheType}

  28. * @param workCode 工单号

  29. * @return

  30. */

  31. String getRedisKeySuffix(CacheType cacheType,String workCode){

  32. switch (cacheType){

  33. case stored_cache:

  34. return CacheType.stored_cache.getKey() + workCode;

  35. case suspended_cache:

  36. return CacheType.suspended_cache.getKey() + workCode;

  37. default:

  38. break;

  39. } return null;

  40. } /**

  41. * 缓存值

  42. */

  43. @Data

  44. @NoArgsConstructor

  45. @AllArgsConstructor

  46. @Builder

  47. public static class CacheValue{

  48. /**

  49. * 缓存类型

  50. */

  51. private CacheType type;

  52. /**

  53. * 工单号

  54. */

  55. private String workCode;

  56. /**

  57. * 优先级

  58. */

  59. private double priority;

  60. /**

  61. * 延迟截止时间(单位:时间戳)

  62. */

  63. private long delayedTime;

  64. /**

  65. * 缓存失效时间(单位:秒)

  66. */

  67. private long expireSeconds;

  68. /**

  69. * 创建-挂起队列(挂起n小时执行)

  70. *

  71. * @param workCode

  72. * @param priority

  73. * @param delayedTime

  74. * @param expireSeconds

  75. * @return

  76. */

  77. public static CacheValue buildSuspended(String workCode, double priority, long delayedTime,long expireSeconds) {

  78. return CacheValue.builder()

  79. .type(CacheType.suspended_cache) .workCode(workCode) .priority(priority) .delayedTime(delayedTime) .expireSeconds(expireSeconds) .build(); } /**

  80. * 转存队列(转存n天后执行)

  81. *

  82. * @param workCode

  83. * @param priority

  84. * @param delayedTime

  85. * @param expireSeconds

  86. * @return

  87. */

  88. public static CacheValue buildStored(String workCode, double priority, long delayedTime,long expireSeconds) {

  89. return CacheValue.builder()

  90. .type(CacheType.stored_cache) .workCode(workCode) .priority(priority) .delayedTime(delayedTime) .expireSeconds(expireSeconds) .build(); } } /**

  91. * 实现 挂起|转存 缓存key

  92. */

  93. @Getter

  94. public enum CacheType{

  95. stored_cache("stored_cache_"),

  96. suspended_cache("suspended_cache_"),

  97. ; CacheType(String key) { this.key = key;

  98. } private String key;

  99. }}

2.1.4、工单队列管理器(WorkOrderQueueManager)

该类基于Redis ZSet 对象结构的有序集合,可以实现按照优先级出队。

相关方法介绍:

  • String getRedisKey(WorkOrderContext context):从 WorkOrderContext中获取队列类型,返回集合要存储的Key。
  • Long queueSize(WorkOrderContext context):返回队列大小
  • Boolean leftPush(WorkOrderContext context):执行处理(入队操作)
  • Boolean leftPushIfAbsent(WorkOrderContext context):执行处理(入队操作),如果入队元素缺席则入队,返回true;否则返回false。
  • Long remove(WorkOrderContext context):从队列移除某个元素
  • WorkOrderContext.WorkOrder pop(WorkOrderContext context):从集合中获取评分最小的成员出队
  • Set<WorkOrderContext.WorkOrder> rank(WorkOrderContext context):按照升序查看队列中所有成员
  • Long removeRange(String key, long start, long end):按照范围移除队列元素
  • Long removeValues(String key, List<Object> values):移除指定元素
  • long getDelayedTime(WorkOrderContext.QueueType queueType,String workCode):获取对应工单的延迟时间(适用于 挂起 和 转存)
 
  1. /**

  2. * @description: 工单队列管理器

  3. * @Date : 2020/7/14 下午4:28

  4. * @Author : 石冬冬-Seig Heil

  5. */

  6. @Component

  7. @Slf4j

  8. public class WorkOrderQueueManager extends AbstractCacheManager{

  9. final String LOCK_KEY = "ZSET_ATOMIC_LOCK";

  10. @Autowired

  11. ZSetOperations<String, Object> zSetOperations; @Autowired

  12. WorkOrderCacheManager workOrderCacheManager; /**

  13. * 从上下文队列类型获取队列redis key

  14. * @param context

  15. * @return

  16. */

  17. String getRedisKey(WorkOrderContext context){ String keySuffix = null;

  18. switch (context.getQueueType()){ case immediate: keySuffix = CarthageConst.WorkOrderKey.IMMEDIATE_QUEUE_DEFAULT; break;

  19. case stored: keySuffix = CarthageConst.WorkOrderKey.STORED_QUEUE_DEFAULT; break;

  20. case suspended: keySuffix = CarthageConst.WorkOrderKey.SUSPENDED_QUEUE_DEFAULT; break;

  21. default:

  22. break;

  23. } if(null != keySuffix){

  24. if(context.isTest()){

  25. keySuffix += CarthageConst.TEST_SUFFIX; } return redisService.getKey(keySuffix);

  26. } return null;

  27. } /**

  28. * 返回队列大小

  29. * @param context

  30. * @return

  31. */

  32. public Long queueSize(WorkOrderContext context) {

  33. return zSetOperations.size(getRedisKey(context));

  34. } /**

  35. * 执行处理(入队操作)

  36. * @param context

  37. * @return

  38. */

  39. public Boolean leftPush(WorkOrderContext context) {

  40. String redisKey = getRedisKey(context); String workCode = context.getWorOrder().getWorkCode(); double priority = context.getWorOrder().getPriority(); Boolean action = zSetOperations.add(redisKey,workCode,priority);

  41. if(Objects.equals(Boolean.FALSE,action)){

  42. Long value = zSetOperations.rank(redisKey,workCode);

  43. log.info("[Queue.leftPush],hasLeftPushed,action={},value={}, context={}", action,value,JSONObject.toJSON(context));

  44. if(Objects.nonNull(value)){

  45. return Boolean.TRUE;

  46. } } log.info("[Queue.leftPush] context={}", JSONObject.toJSON(context));

  47. retry(MAX_RETRIES,context,idx -> action); return Optional.ofNullable(action).orElse(Boolean.FALSE);

  48. } /**

  49. * 执行处理(入队操作)

  50. * 如果入队元素缺席则入队,返回true;否则返回false。

  51. * @param context

  52. * @return

  53. */

  54. public Boolean leftPushIfAbsent(WorkOrderContext context) {

  55. String redisKey = getRedisKey(context); String workCode = context.getWorOrder().getWorkCode(); double priority = context.getWorOrder().getPriority(); Boolean action = zSetOperations.add(redisKey,workCode,priority);

  56. log.info("[WorkOrderQueue.leftPushIfAbsent,action={},context={}",action, JSONObject.toJSON(context));

  57. return Optional.ofNullable(action).orElse(Boolean.FALSE);

  58. } /**

  59. * 从队列移除某个元素

  60. * @param context

  61. * @return

  62. */

  63. public Long remove(WorkOrderContext context){

  64. String redisKey = getRedisKey(context); String workCode = context.getWorOrder().getWorkCode(); log.info("[WorkOrderQueue.remove] context={}", JSONObject.toJSON(context));

  65. Long rem = zSetOperations.remove(redisKey,workCode);

  66. Long action = Optional.ofNullable(rem).orElse(0L);

  67. retry(MAX_RETRIES,context,idx -> action.longValue() > 0);

  68. return action;

  69. } /**

  70. * 从集合中获取评分最小的成员出队

  71. * @param context

  72. * @return

  73. */

  74. public WorkOrderContext.WorkOrder pop(WorkOrderContext context) {

  75. WorkOrderContext.WorkOrder workOrder = null;

  76. try {

  77. String redisKey = getRedisKey(context); //通过分布式锁,实现 zset 的 zpopmin 命令操作

  78. boolean locked = redisService.lock(LOCK_KEY,5000);

  79. if(locked){

  80. //1、取出第一个最小评分元素

  81. Set<ZSetOperations.TypedTuple<Object>> set = redisService.zSetOperations().rangeWithScores(redisKey,0,0);

  82. if(set.isEmpty()){

  83. return null;

  84. }

  85. //2、移除该最小评分元素

  86. Long value = redisService.zSetOperations().removeRange(redisKey,0,0);

  87. retry(MAX_RETRIES,context,idx -> value.longValue() > 0);

  88. //3、返回出队成员

  89. workOrder = WorkOrderContext.WorkOrder.builder().build();

  90. for(ZSetOperations.TypedTuple<Object> each : set){

  91. workOrder.setWorkCode(each.getValue().toString());

  92. workOrder.setPriority(each.getScore());

  93. workOrder.setDelayedTime(0);

  94. break;

  95. }

  96. }

  97. } catch (Exception e) {

  98. log.error("[WorkOrderQueue.pop] exception ctx={}", JSONObject.toJSON(context));

  99. }finally {

  100. redisService.unlock(LOCK_KEY);

  101. }

  102. return workOrder;

  103. }

  104. /**

  105. * 按照升序查看队列中所有成员

  106. * @param context

  107. * @return

  108. */

  109. public Set<WorkOrderContext.WorkOrder> rank(WorkOrderContext context) {

  110. Set<ZSetOperations.TypedTuple<Object>> set = redisService.zSetOperations().rangeWithScores(getRedisKey(context),0,-1);

  111. Set<WorkOrderContext.WorkOrder> members = Sets.newLinkedHashSetWithExpectedSize(set.size());

  112. set.forEach(each -> {

  113. WorkOrderContext.WorkOrder every = WorkOrderContext.WorkOrder.builder()

  114. .workCode(each.getValue().toString())

  115. .priority(each.getScore())

  116. .delayedTime(getDelayedTime(context.getQueueType(),each.getValue().toString()))

  117. .build();

  118. members.add(every);

  119. });

  120. return members;

  121. }

  122. /**

  123. * 按照范围移除队列元素

  124. * @param key

  125. * @param start

  126. * @param end

  127. * @return

  128. */

  129. public Long removeRange(String key, long start, long end){

  130. String redisKey = redisService.getKey(key);

  131. Long count = zSetOperations.removeRange(redisKey,start,end);

  132. log.info("[WorkOrderQueue.removeRange] redisKey={},start={},end={},count={}", redisKey,start,end,count);

  133. return count;

  134. }

  135. /**

  136. * 移除指定元素

  137. * @param key

  138. * @param values

  139. * @return

  140. */

  141. public Long removeValues(String key, List<Object> values){

  142. String redisKey = redisService.getKey(key);

  143. LongAdder longAdder = new LongAdder();

  144. values.forEach(each -> {

  145. Long count = zSetOperations.remove(redisKey,each);

  146. longAdder.add(count);

  147. });

  148. Long count = longAdder.longValue();

  149. log.info("[WorkOrderQueue.removeValues] redisKey={},values={},count={}", redisKey,JSONObject.toJSONString(values),count);

  150. return count;

  151. }

  152. /**

  153. * 获取对应工单的延迟时间(适用于 挂起 和 转存)

  154. * @param queueType

  155. * @param workCode

  156. * @return

  157. */

  158. long getDelayedTime(WorkOrderContext.QueueType queueType,String workCode){

  159. long delayedTime = 0;

  160. WorkOrderCacheManager.CacheType cacheType = null;

  161. switch (queueType){

  162. case suspended:

  163. cacheType = WorkOrderCacheManager.CacheType.suspended_cache;

  164. break;

  165. case stored:

  166. cacheType = WorkOrderCacheManager.CacheType.stored_cache;

  167. break;

  168. default:

  169. break;

  170. }

  171. if(null != cacheType){

  172. WorkOrderCacheManager.CacheValue cacheValue = workOrderCacheManager.get(cacheType,workCode);

  173. if(null != cacheValue){

  174. delayedTime = cacheValue.getDelayedTime();

  175. }

  176. }

  177. return delayedTime;

  178. }

  179. }

2.2、工单队列转移管理器

2.2.1、工单队列转移管理器(WorkOrderQueueTransfer)

该类实现延迟队列出队、正式队列入队的原子操作(通过Redis分布式锁实现)。

 
  1. /**

  2. * @description: 工单队列转移管理器

  3. * @Date : 2020/7/23 下午6:15

  4. * @Author : 石冬冬-Seig Heil

  5. */

  6. @Component

  7. @Slf4j

  8. public class WorkOrderQueueTransfer extends AbstractCacheManager{

  9. final static String ATOMIC_KEY = "delayed_queue_key_expire_lock_{0}";

  10. final static long ATOMIC_KEY_EXPIRE = 5000;

  11. @Autowired

  12. RedisService redisService; @Autowired

  13. WorkOrderQueueManager workOrderQueueManager; @Autowired

  14. WorkOrderCacheManager workOrderCacheManager; /**

  15. * 从[挂起|暂存]队列转移到正式队列中

  16. * @param cacheType 挂起|暂存

  17. * @param delayedContext

  18. * @return

  19. */

  20. public Boolean transferImmediateQueue(WorkOrderCacheManager.CacheType cacheType,WorkOrderContext delayedContext){

  21. boolean tryLock = false;

  22. Boolean done = Boolean.FALSE; String lockKey = null;

  23. try {

  24. WorkOrderContext.WorkOrder workOrder = delayedContext.getWorOrder(); lockKey = redisService.getKey(MessageFormat.format(ATOMIC_KEY,workOrder.getWorkCode())); tryLock = redisService.lock(lockKey,ATOMIC_KEY_EXPIRE); if(tryLock){

  25. //1、构建正式队列

  26. WorkOrderContext immediateContext = WorkOrderContext.buildImmediate(workOrder.getWorkCode(),workOrder.getPriority());

  27. done = workOrderQueueManager.leftPushIfAbsent(immediateContext);

  28. //2、从当前延迟队列移除该元素

  29. Long count = workOrderQueueManager.remove(delayedContext);

  30. log.info("[挂起|转存队remove],count={},delayedContext={}", count,JSONObject.toJSONString(delayedContext));

  31. }

  32. } catch (Exception e) {

  33. log.error("[transferImmediateQueue]异常,delayedContext={},cacheType={}", JSONObject.toJSONString(delayedContext),cacheType);

  34. }finally {

  35. if(Objects.nonNull(lockKey) && tryLock){

  36. redisService.unlock(lockKey);

  37. }

  38. }

  39. return Optional.ofNullable(done).orElse(Boolean.FALSE);

  40. }

  41. }

2.3、Redis过期key监听

2.3.1、Redis过期回调监听(RedisKeyExpirationListener)

 
  1. /**

  2. * @description: Redis过期回调监听

  3. * @Date : 2020/7/18 上午10:43

  4. * @Author : 石冬冬-Seig Heil

  5. */

  6. @Component

  7. @Slf4j

  8. public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {

  9. final static String STORED_CACHE_KEY_PREFIX = WorkOrderCacheManager.CacheType.stored_cache.getKey();

  10. final static String SUSPENDED_CACHE_KEY_PREFIX = WorkOrderCacheManager.CacheType.suspended_cache.getKey();

  11. @Autowired

  12. TraceLogService traceLogService; @Autowired

  13. RedisService redisService; @Autowired

  14. WorkOrderService workOrderService; @Autowired

  15. DelayedScheduledOperateBridge delayedScheduledOperateBridge; public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {

  16. super(listenerContainer);

  17. } @Override

  18. public void onMessage(Message message, byte[] pattern) {

  19. Date startTime = TimeTools.createNowTime(); String expiredKey = message.toString(); String bizPrefix = redisService.getKeyPrefix().getName(); if(!expiredKey.startsWith(bizPrefix)){

  20. return;

  21. } String caseOfStored = redisService.getKey(STORED_CACHE_KEY_PREFIX); String caseOfSuspended = redisService.getKey(SUSPENDED_CACHE_KEY_PREFIX); WorkOrderCacheManager.CacheType cacheType; WorkOrderContext.QueueType queueType; if(expiredKey.startsWith(caseOfStored)){

  22. queueType = WorkOrderContext.QueueType.stored; cacheType = WorkOrderCacheManager.CacheType.stored_cache; }else if(expiredKey.startsWith(caseOfSuspended)){

  23. queueType = WorkOrderContext.QueueType.suspended; cacheType = WorkOrderCacheManager.CacheType.suspended_cache; }else{

  24. return;

  25. } String workCode = getWorkCode(expiredKey); log.info("监听到 redis key=[{}] 已过期",expiredKey);

  26. if(Objects.nonNull(workCode)){

  27. log.info("监听到 redis key=[{}],挂起|转存工单开始处理,workCode={}",expiredKey,workCode);

  28. WorkOrder workOrder = workOrderService.queryOne(workCode); if(Objects.isNull(workOrder)){

  29. log.info("监听到 redis key=[{}],挂起|转存工单开始处理,未找到工单,workCode={}",expiredKey,workCode);

  30. return;

  31. } WorkOrderContext delayedContext = WorkOrderContext.builder() .worOrder(WorkOrderContext.WorkOrder.builder().delayedTime(5).priority(workOrder.getCasePriority()).workCode(workOrder.getWorkCode()).build())

  32. .queueType(queueType). build(); Boolean done = delayedScheduledOperateBridge.transferImmediateQueue(cacheType,delayedContext); saveTraceLog(delayedContext,done,traceLog -> { JSONObject requestBody = new JSONObject();

  33. requestBody.put("expiredKey",expiredKey);

  34. requestBody.put("workCode",workCode);

  35. traceLog.setRequestBody(requestBody.toJSONString()); traceLog.setRequestTime(startTime); }); } } /**

  36. * traceLog入库

  37. * @param context

  38. * @param done

  39. * @param consumer

  40. */

  41. void saveTraceLog(WorkOrderContext context, Boolean done, Consumer<TraceLog> consumer){

  42. try {

  43. String hostAddress = InetAddress.getLocalHost().getHostAddress(); JSONObject responseBody = new JSONObject();

  44. responseBody.put("workOrderContext",context);

  45. responseBody.put("transferImmediateQueue",done);

  46. TraceLog traceLog = TraceLog.builder() .appCode(context.getWorOrder().getWorkCode()) .url("["+hostAddress+"]redisKeyExpirationListener.onMessage")

  47. .target(this.getClass().getPackage().getName() + "." + this.getClass().getSimpleName())

  48. .responseBody(responseBody.toJSONString()) .responseTime(TimeTools.createNowTime()) .traceType(TraceTypeEnum.REDIS_KEY_EXPIRE.getIndex()) .build(); consumer.accept(traceLog); traceLogService.insertRecord(traceLog); } catch (Exception e) {

  49. log.error("saveTraceLog exception,[context={}]",JSONObject.toJSONString(context),e);

  50. } } /**

  51. * 从字符串截取制定的工单号

  52. * @param value

  53. * @return

  54. */

  55. String getWorkCode(String value){

  56. return value.substring(value.lastIndexOf("_") + 1);

  57. }}

2.4、延迟订单既定处理桥接器

该类的主要作用,就是通过WorkOrderQueueTransfer实现队列元素的转移,同时通过OperateStrategyManager实现工单的数据库表操作。

 
  1. /**

  2. * @description: 延迟订单既定处理处理桥接

  3. * 场景描述:挂起|转存 工单到既定时间的处理

  4. * @Date : 2020/7/23 22:20

  5. * @Author : Seig Heil

  6. */

  7. @Slf4j

  8. @Component

  9. public class DelayedScheduledOperateBridge {

  10. static final String LOCK_KEY = CarthageConst.KEY_EXPIRE_LISTENER_LOCK;

  11. static final long EXPIRE_SECONDS = 120;

  12. @Autowired

  13. RedisService redisService; @Autowired

  14. WorkOrderQueueTransfer workOrderQueueTransfer; @Autowired

  15. OperateStrategyManager operateStrategyManager; /**

  16. * 实现从延迟队列到正式队列的转移业务处理,同时更新工单的状态

  17. * @param cacheType

  18. * @param delayedContext

  19. * @return

  20. */

  21. public Boolean transferImmediateQueue(WorkOrderCacheManager.CacheType cacheType, WorkOrderContext delayedContext){

  22. String workCode = delayedContext.getWorOrder().getWorkCode(); boolean tryLock = false;

  23. String redisKey = null;

  24. try {

  25. redisKey = redisService.getKey(MessageFormat.format(LOCK_KEY,workCode)); tryLock = redisService.lock(redisKey,EXPIRE_SECONDS); if(!tryLock){

  26. log.info("[DelayedScheduledOperateBridge.tryLock={}获取锁失败,redisKey={}]挂起|转存既定时间处理幂等,workCode={}",tryLock,redisKey,workCode);

  27. } if(tryLock){

  28. log.info("[DelayedScheduledOperateBridge.tryLock={}获取锁成功,redisKey={}]挂起|转存既定时间处理幂等,workCode={}",tryLock,redisKey,workCode);

  29. Boolean done = workOrderQueueTransfer.transferImmediateQueue(cacheType,delayedContext);

  30. if(!done.booleanValue()){

  31. return Boolean.FALSE;

  32. } OperateContext operateContext = OperateContext.builder() .operateStrategyEnum(OperateContext.OperateStrategyEnum.DELAYED_SCHEDULED_ORDER) .operateParam( OperateContext.OperateParam.builder().workCode(workCode).build() ).build(); operateStrategyManager.execute(operateContext); log.info("[DelayedScheduledOperateBridge.transferImmediateQueue],delayedContext={},callResult={}",

  33. JSONObject.toJSONString(delayedContext),JSONObject.toJSONString(operateContext.getExecuteResult())); return operateContext.getExecuteResult().isSuccess();

  34. } } catch (Exception e) {

  35. log.error("[DelayedScheduledOperateBridge]挂起|转存既定时间处理异常,workCode={},delayedContext={}",workCode,JSONObject.toJSONString(delayedContext));

  36. }finally {

  37. if(tryLock){

  38. redisService.unlock(redisKey); } } return false;

  39. }}

2.5、工单操作管理器

该类的主要作用,就是对外暴露工单操作策略类的管理,外部无需关注策略类的存在,策略类实例的创建由该类负责。

  • 通过OPERATE_STRATEGY_MAP维护枚举和策略类Bean的映射。
  • 通过init()实现OPERATE_STRATEGY_MAP容器的初始化工作。
  • 通过Result<String> execute(OperateContext context)实现对外提供策略类的操作。
 
  1. /**

  2. * @description: GPS工单操作策略管理类

  3. * @Date : 2020/7/15 下午5:43

  4. * @Author : 石冬冬-Seig Heil

  5. */

  6. @Component

  7. @Slf4j

  8. public class OperateStrategyManager {

  9. static final Map<OperateContext.OperateStrategyEnum, AbstractOperateStrategy> OPERATE_STRATEGY_MAP = Maps.newHashMapWithExpectedSize(6);

  10. @Autowired

  11. CreateOperateStrategy createOperateStrategy; @Autowired

  12. AllotOrderOperateStrategy allotOrderOperateStrategy; @Autowired

  13. SubmitWithFinishOperateStrategy submitWithFinishOperateStrategy; @Autowired

  14. SubmitWithStoreOperateStrategy submitWithStoreOperateStrategy; @Autowired

  15. SubmitWithSuspendOperateStrategy submitWithSuspendOperateStrategy; @Autowired

  16. DelayedScheduledOperateStrategy delayedScheduledOperateStrategy; @Autowired

  17. AssignOrderOperateStrategy assignOrderOperateStrategy; @PostConstruct

  18. private void init() {

  19. OPERATE_STRATEGY_MAP.put(OperateContext.OperateStrategyEnum.INIT_WORK_ORDER, createOperateStrategy); OPERATE_STRATEGY_MAP.put(OperateContext.OperateStrategyEnum.ALLOT_WORK_ORDER, allotOrderOperateStrategy); OPERATE_STRATEGY_MAP.put(OperateContext.OperateStrategyEnum.STORE_WORK_ORDER, submitWithStoreOperateStrategy); OPERATE_STRATEGY_MAP.put(OperateContext.OperateStrategyEnum.SUSPEND_WORK_ORDER, submitWithSuspendOperateStrategy); OPERATE_STRATEGY_MAP.put(OperateContext.OperateStrategyEnum.FINISH_WORK_ORDER, submitWithFinishOperateStrategy); OPERATE_STRATEGY_MAP.put(OperateContext.OperateStrategyEnum.DELAYED_SCHEDULED_ORDER, delayedScheduledOperateStrategy); OPERATE_STRATEGY_MAP.put(OperateContext.OperateStrategyEnum.ASSIGN_ORDER, assignOrderOperateStrategy); } /**

  20. * 对外提供对策略类的调用

  21. * @param context

  22. * @return

  23. */

  24. public Result<String> execute(OperateContext context) {

  25. StopWatch stopWatch = new StopWatch();

  26. stopWatch.start("OperateStrategyManager.execute");

  27. AbstractOperateStrategy operateStrategy = OPERATE_STRATEGY_MAP.get(context.getOperateStrategyEnum()); context.buildExecuteResultWithSuccess(); operateStrategy.execute(context); Result<Boolean> executeResult = context.getExecuteResult(); if(context.getExecuteResult().isSuccess()) {

  28. return Result.suc(executeResult.getMsg());

  29. } stopWatch.stop(); long spendMillSeconds = stopWatch.getLastTaskTimeMillis();

  30. long duration = (System.currentTimeMillis() - spendMillSeconds) / 1000;

  31. String executeResultMsg = executeResult.getMsg(); log.info("[execute] done,duration={},executeResultMsg={}",duration,executeResultMsg);

  32. return Result.fail(RemoteEnum.FAILURE, executeResultMsg);

  33. }}

2.6、工单策略类实现

由于工单涉及到创建、挂起、转存、处理完结等操作,因此对于这类场景我们可以通过策略类来实现。

2.6.1、挂起操作

 
  1. /**

  2. * @description: 提交调查结果(挂起操作)-策略类

  3. * @Date : 2020/7/15 下午5:32

  4. * @Author : 石冬冬-Seig Heil

  5. */

  6. @Slf4j

  7. @Component

  8. public class SubmitWithSuspendOperateStrategy extends AbstractSubmitOperateStrategy{

  9. static final Map<MoveToEnum,AttentionEventEnum> suspend_to_attention_event_map = new HashMap<>();

  10. static final Map<MoveToEnum,WorkOrderStatusEnum.SubStatusEnum> suspend_to_sub_status_map = new HashMap<>();

  11. static final Map<MoveToEnum,Integer> suspend_count_map = new HashMap<>();

  12. static {

  13. suspend_to_attention_event_map.put(MoveToEnum.SUSPENDED_AT_ONCE,AttentionEventEnum.SUSPENDED_AT_ONCE); suspend_to_attention_event_map.put(MoveToEnum.SUSPENDED_AT_TWICE,AttentionEventEnum.SUSPENDED_AT_TWICE); suspend_to_sub_status_map.put(MoveToEnum.SUSPENDED_AT_ONCE,WorkOrderStatusEnum.SubStatusEnum.SUSPENDED_AT_ONCE); suspend_to_sub_status_map.put(MoveToEnum.SUSPENDED_AT_TWICE,WorkOrderStatusEnum.SubStatusEnum.SUSPENDED_AT_TWICE); suspend_count_map.put(MoveToEnum.SUSPENDED_AT_ONCE,1);

  14. suspend_count_map.put(MoveToEnum.SUSPENDED_AT_TWICE,2);

  15. log.info("init... suspend_to_attention_event_map={}",suspend_to_attention_event_map.toString());

  16. log.info("init... suspend_to_sub_status_map={}",suspend_to_sub_status_map.toString());

  17. log.info("init... suspend_count_map={}",suspend_count_map.toString());

  18. } @Autowired

  19. DiamondConfigProxy diamondConfigProxy; @Override

  20. public void prepare(OperateContext context) {

  21. super.prepare(context);

  22. SurveyResult surveyResult = context.getSurveyResult(); MoveToEnum moveToEnum = MoveToEnum.getByIndex(surveyResult.getMoveTo()); AttentionEvent attentionEvent = suspend_to_attention_event_map.getOrDefault(moveToEnum,null);

  23. ATTENTION_EVENT_CONTEXT.set(attentionEvent); context.setAttentionEvent(attentionEvent); } @Override

  24. WorkOrder buildWorkOrder(OperateContext context){

  25. SurveyResult surveyResult = context.getSurveyResult(); MoveToEnum moveToEnum = MoveToEnum.getByIndex(surveyResult.getMoveTo()); WorkOrder workOrder = super.buildWorkOrder(context);

  26. workOrder.setSuspendedCount(suspend_count_map.getOrDefault(moveToEnum,0).intValue());

  27. workOrder.setMainStatus(WorkOrderStatusEnum.WAITING.getIndex()); workOrder.setSubStatus(suspend_to_sub_status_map.get(moveToEnum).getIndex()); workOrder.setIsFinished(Const.NON_INDEX); workOrder.setIsStore(Const.NON_INDEX); workOrder.setDelayedTime(context.getOperateParam().getDelayedTime()); return workOrder;

  28. } @Override

  29. void operationExtend(OperateContext context) {

  30. long delayedTime = context.getOperateParam().getDelayedTime().getTime();

  31. int delayedSeconds = context.getOperateParam().getDelayedSeconds();

  32. WorkOrder workOrder = context.getWorkOrder(); WorkOrderContext cxt = WorkOrderContext.buildSuspended(workOrder.getWorkCode(),workOrder.getCasePriority(),delayedTime); workOrderQueueManager.leftPush(cxt); WorkOrderCacheManager.CacheValue cacheValue = WorkOrderCacheManager.CacheValue. buildSuspended(workOrder.getWorkCode(),workOrder.getCasePriority(),delayedTime,delayedSeconds); workOrderCacheManager.setCacheInExpire(cacheValue); super.operationExtend(context);

  33. } @Override

  34. public void setDelayedTime(OperateContext context) {

  35. SurveyResult surveyResult = context.getSurveyResult(); MoveToEnum moveToEnum = MoveToEnum.getByIndex(surveyResult.getMoveTo()); DiamondConfig.SuspendOrderConfig suspendOrderConfig = diamondConfigProxy.suspendOrderConfig(); Date delayedTime = TimeTools.createNowTime(); int timeUnit = Calendar.HOUR_OF_DAY;

  36. int delayedSeconds = 0;

  37. int value = suspendOrderConfig.getConfig().getOrDefault(moveToEnum.name(),0);

  38. switch (suspendOrderConfig.getTimeUnit()){

  39. case "DAY":

  40. timeUnit = Calendar.DAY_OF_YEAR; delayedSeconds = value * 24 * 3600;

  41. break;

  42. case "HOUR":

  43. timeUnit = Calendar.HOUR_OF_DAY; delayedSeconds = value * 3600;

  44. break;

  45. case "MINUTE":

  46. timeUnit = Calendar.MINUTE; delayedSeconds = value * 60;

  47. break;

  48. case "SECOND":

  49. timeUnit = Calendar.SECOND; delayedSeconds = value; break;

  50. default:

  51. break;

  52. } TimeTools.addTimeField(delayedTime, timeUnit,value); context.getOperateParam().setDelayedTime(delayedTime); context.getOperateParam().setDelayedSeconds(delayedSeconds); }}

2.6.2、转存操作

 
  1. /**

  2. * @description: 提交调查结果(转存操作)-策略类

  3. * @Date : 2020/7/15 下午5:32

  4. * @Author : 石冬冬-Seig Heil

  5. */

  6. @Slf4j

  7. @Component

  8. public class SubmitWithStoreOperateStrategy extends AbstractSubmitOperateStrategy{

  9. /**

  10. * 转存天数 换算 秒数

  11. */

  12. static final int DAY_TO_SECONDS = 24 * 60 * 60;

  13. @Override

  14. public void prepare(OperateContext context) {

  15. ATTENTION_EVENT_CONTEXT.set(AttentionEventEnum.STORE_ORDER); context.setAttentionEvent(AttentionEventEnum.STORE_ORDER); super.prepare(context);

  16. } @Override

  17. public boolean paramCheck(OperateContext context) {

  18. if(Objects.isNull(context.getSurveyResult().getDelayedDays())){

  19. context.buildExecuteResultWithFailure("[surveyResult.delayedDays]为空!");

  20. } if(context.getSurveyResult().getDelayedDays() == 0){

  21. context.buildExecuteResultWithFailure("等待天数[delayedDays]必须大于0!");

  22. } return super.paramCheck(context);

  23. } @Override

  24. WorkOrder buildWorkOrder(OperateContext context){

  25. WorkOrder workOrder = super.buildWorkOrder(context);

  26. workOrder.setMainStatus(WorkOrderStatusEnum.PENDING.getIndex()); workOrder.setSubStatus(WorkOrderStatusEnum.SubStatusEnum.STORED.getIndex()); workOrder.setIsFinished(Const.NON_INDEX); workOrder.setIsStore(Const.YES_INDEX); //setSuspendedCount 这里需要重置为0,转存后派单流程状态依赖该字段

  27. workOrder.setSuspendedCount(0);

  28. workOrder.setDelayedTime(context.getOperateParam().getDelayedTime());

  29. return workOrder;

  30. }

  31. @Override

  32. void operationExtend(OperateContext context) {

  33. long delayedTime = context.getOperateParam().getDelayedTime().getTime();

  34. int delayedSeconds = context.getOperateParam().getDelayedSeconds();

  35. WorkOrder workOrder = context.getWorkOrder();

  36. WorkOrderContext cxt = WorkOrderContext.buildStored(workOrder.getWorkCode(),workOrder.getCasePriority(),delayedTime);

  37. workOrderQueueManager.leftPush(cxt);

  38. WorkOrderCacheManager.CacheValue cacheValue = WorkOrderCacheManager.CacheValue.

  39. buildStored(workOrder.getWorkCode(),workOrder.getCasePriority(),delayedTime,delayedSeconds);

  40. workOrderCacheManager.setCacheInExpire(cacheValue);

  41. super.operationExtend(context);

  42. }

  43. @Override

  44. public void setDelayedTime(OperateContext context) {

  45. int delayedDays = context.getSurveyResult().getDelayedDays();

  46. Date delayedTime = TimeTools.createNowTime();

  47. TimeTools.addTimeField(delayedTime, Calendar.DAY_OF_YEAR,delayedDays);

  48. context.getOperateParam().setDelayedTime(delayedTime);

  49. context.getOperateParam().setDelayedSeconds(delayedDays * DAY_TO_SECONDS);

  50. }

  51. }

三、交互UI介绍

  • 系统链路日志

灵感来了挡也挡不住,基于Redis解决业务场景中延迟队列

我们可以实现对于key失效事件监听,把工单落库,便于后期排查问题,无需再查看服务器日志。

  • 队列监控

灵感来了挡也挡不住,基于Redis解决业务场景中延迟队列

输出队列元素,直观看到当前挂起、转存的工单。我们可以直观看到key的剩余存活时间。

  • 参数配置

挂起配置,尽管业务实现按照天维度、但是我代码依然实现了可以支持天、时、分、秒维度,提供了可扩展性。具体可以看 SubmitWithStoreOperateStrategy类 方法void setDelayedTime(OperateContext context) 的实现。

四、总结

在接到该需求后,尽管这个只是其中需求的一小部分,整个产品需求33页,内容还是丰富的,对于延迟队列操作,我在调研技术方案时,也考虑过其他方法,譬如java自带的带有延迟特性的 队列DelayedQueue(对于分布式多实例场景它就不适合了),以及可以通过RabbitMQ实现(感觉实现比较复杂),最终选择了Redis(可以利用相关数据特性比如ZSet,String,Expire),技术实现比较简单上手。

猜你喜欢

转载自blog.csdn.net/python8989/article/details/113152983