延时队列的定义:延时队列在进入队列是会设定一个延迟时间,在达到指定的延迟时间后执行相关操作。
应用场景:
1、订单成功后,在30分钟内没有支付,自动取消订单
2、外卖平台发送订餐通知,下单成功后60s给用户推送短信。
实现方案:
1、使用java延迟队列
2、使用redis的过期监听
3、使用redis的zset数据结构
实现方式一:
引入组件
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.7.12</version>
</dependency>
实现代码:
实体类:
@Data
public class TestOrder implements Delayed {
private String orderNo;
private String phone;
private String createTime;
final TimeUnit unit = TimeUnit.SECONDS;
/**
* 延迟时间
*/
@JsonFormat(locale = "zh", timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")
private long time;
public TestOrder(String orderNo, String phone, String createTime, long time) {
this.orderNo = orderNo;
this.phone = phone;
this.createTime = createTime;
this.time =System.currentTimeMillis()+(time > 0 ? unit.toMillis(time) : 0);
}
public TestOrder(String orderNo, String phone, long time) {
this.orderNo = orderNo;
this.phone = phone;
this.time =System.currentTimeMillis()+(time > 0 ? unit.toMillis(time) : 0);
}
@Override
public long getDelay(TimeUnit unit) {
final long currentTime = System.currentTimeMillis();
return this.time - currentTime;
}
@Override
public int compareTo(Delayed o) {
TestOrder order = (TestOrder) o;
long diff = this.time - order.getTime();
if (diff <= 0) {
return -1;
} else {
return 1;
}
}
}
实现类:
public class DelayQueueDemo {
public static void main(String[] args) {
final Snowflake snowflake = IdUtil.getSnowflake();
final String snowId = snowflake.nextIdStr();
TestOrder Order1 = new TestOrder(snowflake.nextIdStr(), '1'+RandomUtil.randomNumbers(10), DateUtil.format(LocalDateTime.now(),"yyyy-MM-dd HH:mm:ss"), 1);
TestOrder Order2 = new TestOrder(snowflake.nextIdStr(), '1'+RandomUtil.randomNumbers(10),DateUtil.format(LocalDateTime.now(),"yyyy-MM-dd HH:mm:ss"), 20);
TestOrder Order3 = new TestOrder(snowflake.nextIdStr(), '1'+RandomUtil.randomNumbers(10),DateUtil.format(LocalDateTime.now(),"yyyy-MM-dd HH:mm:ss"), 60);
DelayQueue<TestOrder> delayQueue = new DelayQueue<>();
delayQueue.put(Order1);
delayQueue.put(Order2);
delayQueue.put(Order3);
System.out.println("订单延迟队列开始时间:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
while (delayQueue.size() != 0) {
/**
* 取队列头部元素是否过期
*/
TestOrder task = delayQueue.poll();
if (task != null) {
System.out.format("订单:{%s}被取消,订单创建时间:{%s}, 取消时间:{%s}\n", task.getOrderNo(),task.getCreateTime(), LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
实现方式二
引入redis
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
配置redis序列化
@Component
public class RedisConfig {
@Bean
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<Object,Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
//设置Key的序列化采用StringRedisSerializer
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
//设置值的序列化采用FastJsonRedisSerializer
final FastJsonRedisSerializer fastJsonRedisSerializer = new FastJsonRedisSerializer(Object.class);
redisTemplate.setValueSerializer(fastJsonRedisSerializer);
redisTemplate.setHashValueSerializer(fastJsonRedisSerializer);
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
}
配置redis监听器
@Configuration
public class RedisListenerConfig {
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
return container;
}
}
redis键值失效监听类
@Component
@Log4j2
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
@Override
public void onMessage(Message message, byte[] pattern) {
String expiredKey = message.toString();
if(StrUtil.isNotBlank(expiredKey)&&expiredKey.startsWith(OrderConstants.ORDER_HASH_KEY)){
log.info("获取到订单编号:{},处理时间{},\n后续处理逻辑在此处编写",expiredKey, DateUtil.format(LocalDateTime.now(),"yyyy-MM-dd HH:mm:ss"));
}
}
}
测试web类
@RestController
public class DelayController {
@Autowired
private RedisTemplate<Object,Object> redisTemplate;
@PostMapping("/delay/redis/expire")
public void testRedis(){
final Snowflake snowflake = IdUtil.getSnowflake();
TestOrder Order1 = new TestOrder(snowflake.nextIdStr(), '1'+ RandomUtil.randomNumbers(10), DateUtil.format(LocalDateTime.now(),"yyyy-MM-dd HH:mm:ss"), 1);
TestOrder Order2 = new TestOrder(snowflake.nextIdStr(), '1'+RandomUtil.randomNumbers(10),DateUtil.format(LocalDateTime.now(),"yyyy-MM-dd HH:mm:ss"), 20);
TestOrder Order3 = new TestOrder(snowflake.nextIdStr(), '1'+RandomUtil.randomNumbers(10),DateUtil.format(LocalDateTime.now(),"yyyy-MM-dd HH:mm:ss"), 60);
final List<TestOrder> testOrders = CollUtil.newArrayList(Order1, Order2, Order3);
testOrders.forEach(s->{
redisTemplate.opsForValue().set(OrderConstants.ORDER_HASH_KEY +s.getOrderNo(),s,RandomUtil.randomInt(20,60),TimeUnit.SECONDS);
});
}
}
实现方式三
1、 往redis的zset数据类型中添加订单信息
@GetMapping("/delay/redis/zset")
public void testRedisZset() {
final Snowflake snowflake = IdUtil.getSnowflake();
TestOrder Order1 = new TestOrder(snowflake.nextIdStr(), '1' + RandomUtil.randomNumbers(10), DateUtil.format(LocalDateTime.now(), "yyyy-MM-dd HH:mm:ss"), 1);
TestOrder Order2 = new TestOrder(snowflake.nextIdStr(), '1' + RandomUtil.randomNumbers(10), DateUtil.format(LocalDateTime.now(), "yyyy-MM-dd HH:mm:ss"), 20);
TestOrder Order3 = new TestOrder(snowflake.nextIdStr(), '1' + RandomUtil.randomNumbers(10), DateUtil.format(LocalDateTime.now(), "yyyy-MM-dd HH:mm:ss"), 60);
final List<TestOrder> testOrders = CollUtil.newArrayList(Order1, Order2, Order3);
testOrders.forEach(s -> {
final Date createTime = DateUtil.parse(s.getCreateTime(), "yyyy-MM-dd HH:mm:ss");
final long times = createTime.getTime()/1000 + exipreTime+RandomUtil.randomInt(0,5);
//往redis的zset数据类型中添加订单信息
redisTemplate.opsForZSet().add(OrderConstants.ORDER_HASH_KEY, s.getOrderNo(), times);
});
}
二、在项目启动时用一个无限循环执行定时处理订单超时信息
@Component
@Slf4j
public class DelayRedisZset implements CommandLineRunner {
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
@Override
public void run(String... args) throws Exception {
//无限循环
while (true) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("初始化订单超时线程任务");
final Set<ZSetOperations.TypedTuple<Object>> typedTuples = redisTemplate.opsForZSet().rangeWithScores(OrderConstants.ORDER_HASH_KEY, 0, 0);
if (null == typedTuples || typedTuples.isEmpty()) {
continue;
}
final ZSetOperations.TypedTuple tuple = (ZSetOperations.TypedTuple) typedTuples.toArray()[0];
final Double score = tuple.getScore();
final long currentTime = System.currentTimeMillis()/1000;
if(currentTime>=score.longValue()){
final String value = tuple.getValue().toString();
final Long remove = redisTemplate.opsForZSet().remove(OrderConstants.ORDER_HASH_KEY, value);
if(remove>0){
log.info("执行了{}订单超时相关操作,时间{}",value, DateUtil.format(LocalDateTime.now(), "yyyy-MM-dd HH:mm:ss"));
}
}
}
}
}