发布订阅模式可能大家都熟悉,消息队列、redis等很多中间件都有发布订阅模式,但你知道我们平时用的spring也有发布订阅模式吗?
在我们系统中,可能会遇到处理完一个流程以后,接下来要同时处理多个流程,比如用户支付成功以后,接下来会同时减少库存、发送下单成功短信等,这种情况就可以用到发布订阅。减少库存和发送下单成功短信服务监听支付成功事件,当用户支付成功以后,发布这个消息,另外两个服务则会监听到,接着执行对应的逻辑。
可能在真实情况下,我们应用都是分布式的,处理这种情况可能都是使用消息队列等中间件来处理,但是有时候在单体应用中,使用spring的发布订阅也未尝不是个好的选择。
定义事件
public class PayEvent extends ApplicationEvent {
public PayEvent(Object source) {
super(source);
}
}
先定义一个支付成功的事件对象,这个对象要继承spring的ApplicationEvent。
定义事件监听者
当支付成功以后,会减少库存和发送短信,这里定义两个监听者
/**
* 短信监听者
* @author zhanpengguo
* @date 2020-04-09 8:29
*/
@Slf4j
@Component
public class SmsListener {
@EventListener(PayEvent.class)
public void sendSms(PayEvent payEvent) {
log.info("短信服务监听到支付成功事件:{}", payEvent.getSource());
try {
log.info("开始发送短信");
Thread.sleep(3000);
log.info("短信发送成功");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 库存监听者
* @author zhanpengguo
* @date 2020-04-09 8:30
*/
@Slf4j
@Component
public class StockListener {
@EventListener(PayEvent.class)
public void reduceStock(PayEvent payEvent) {
log.info("库存服务监听到支付成功事件:{}", payEvent.getSource());
try {
log.info("开始减少库存");
Thread.sleep(2000);
log.info("库存减少完毕");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
事件监听者需要使用@EventListener注解,里面传入事件对象,也可以继承ApplicationListener来实现。上面定义了发送短信和减少库存两个事件,发送短信模拟耗时3秒钟,减少库存模拟耗时2秒钟。
发布事件
事件对象和监听者都定义好了,这个时候我们需要借助ApplicationContext来发布事件。而ApplicationContext需要通过实现ApplicationContextAware来获取。
@Component
public class SpringContextHolder implements ApplicationContextAware {
private static ApplicationContext applicationContext = null;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
SpringContextHolder.applicationContext = applicationContext;
}
/**
* 发布事件
* @param applicationEvent
*/
public static void publishEvent(ApplicationEvent applicationEvent){
if (applicationEvent != null) {
applicationContext.publishEvent(applicationEvent);
}
}
}
ApplicationContext继承自ApplicationEventPublisher,通过调用ApplicationEventPublisher的publishEvent()方法,来实现发布事件的目的。
测试
@GetMapping("/pay")
public void pay() {
log.info("开始支付");
log.info("支付成功,开始发布事件");
SpringContextHolder.publishEvent(new PayEvent("支付成功"));
log.info("保存支付记录");
}
在Controller中定义一个支付成功的伪代码,里面调用上面定义的发布事件方法。
输出:
09:06:09.675 INFO 22340 --- [nio-8080-exec-3] ... : 开始支付
09:06:09.675 INFO 22340 --- [nio-8080-exec-3] ... : 支付成功,开始发布事件
09:06:09.676 INFO 22340 --- [nio-8080-exec-3] ... : 短信服务监听到支付成功事件:支付成功
09:06:09.676 INFO 22340 --- [nio-8080-exec-3] ... : 开始发送短信
09:06:12.677 INFO 22340 --- [nio-8080-exec-3] ... : 短信发送成功
09:06:12.677 INFO 22340 --- [nio-8080-exec-3] ... : 库存服务监听到支付成功事件:支付成功
09:06:12.677 INFO 22340 --- [nio-8080-exec-3] ... : 开始减少库存
09:06:14.677 INFO 22340 --- [nio-8080-exec-3] ... : 库存减少完毕
09:06:14.677 INFO 22340 --- [nio-8080-exec-3] ... : 保存支付记录
从输出结果可以看到,事件发布和订阅是单线程的,支付成功发布事件以后被阻塞住了,这种肯定不行,需要改进一下。
可以通过@Async来实现异步监听,注意:要在启动类上加@EnableAsync
改进:
@Async
@EventListener(PayEvent.class)
public void sendSms(PayEvent payEvent) {
log.info("短信服务监听到支付成功事件:{}", payEvent.getSource());
try {
log.info("开始发送短信");
Thread.sleep(3000);
log.info("短信发送成功");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Async
@EventListener(PayEvent.class)
public void reduceStock(PayEvent payEvent) {
log.info("库存服务监听到支付成功事件:{}", payEvent.getSource());
try {
log.info("开始减少库存");
Thread.sleep(2000);
log.info("库存减少完毕");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
输出:
09:22:57.505 INFO 16036 --- [nio-8080-exec-1] : 开始支付
09:22:57.505 INFO 16036 --- [nio-8080-exec-1] : 支付成功,开始发布事件
09:22:57.507 INFO 16036 --- [nio-8080-exec-1] : 保存支付记录
09:22:57.509 INFO 16036 --- [cTaskExecutor-1] : 短信服务监听到支付成功事件:支付成功
09:22:57.509 INFO 16036 --- [cTaskExecutor-2] : 库存服务监听到支付成功事件:支付成功
09:22:57.509 INFO 16036 --- [cTaskExecutor-1] : 开始发送短信
09:22:57.509 INFO 16036 --- [cTaskExecutor-2] : 开始减少库存
09:22:59.509 INFO 16036 --- [cTaskExecutor-2] : 库存减少完毕
09:23:00.509 INFO 16036 --- [cTaskExecutor-1] : 短信发送成功
这样事件监听就变成异步执行了,主流程不会阻塞。