在我们分布式架构中,分布式任务,与drm是辅助业务系统的中间件必要存在的一部分
首先介绍一下分布式任务的背景与出现解决的问题。
1:背景,分布式架构中,你的业务系统部署了多台,但是,大多业务系统中,都存在定时任务,如果在业务系统用定时任务来实现的话,那么一个任务会再多台服务器上执行,这不是我们想要的结果,当然你可以选择使用redis的方式获取任务锁来执行任务,这样是可以达到效果的,但是这样的方式非常好性能,当你有很多个定时任务的时候,你需要开启很多线程来维护定时任务,这样的对业务系统的影响非常大。
2:以上介绍了分布式定时任务的出现背景,这里介绍一下,博主的实现,大家应该了解过spring cloud stream这个组件,,这个组件是在mq的基础上做了api的封装,能比较好的使用,先介绍一下该组件,在博主的分布式任务中扮演的角色,spring cloud stream扮演了一个消息机制传递的效果,利用了他技术中分组,发布订阅,等模式,他具体的功能和实现后续会介绍。
3:好了,博主实现的配置与代码部分
3.1spring boot中propertis配置加入
#rabbitmq配置
spring.cloud.stream.binders.rabbitOne.type=rabbit
spring.cloud.stream.binders.rabbitOne.environment.spring.rabbitmq.username= admin
spring.cloud.stream.binders.rabbitOne.environment.spring.rabbitmq.password= admin
spring.cloud.stream.binders.rabbitOne.environment.spring.rabbitmq.port= 5672
spring.cloud.stream.binders.rabbitOne.environment.spring.rabbitmq.addresses= 192.168.140.131
#动态属性配置
spring.cloud.stream.bindings.DynamicAttributesInPuto.content-type= application/json
spring.cloud.stream.bindings.DynamicAttributesInPuto.destination= SwallowBirds_update_businessSystem
spring.cloud.stream.bindings.DynamicAttributesInPuto.binder=rabbitOne
#分布式定时任务配置
spring.cloud.stream.bindings.TimingTaskInPut.content-type= application/json
spring.cloud.stream.bindings.TimingTaskInPut.destination= SwallowBirds_task_businessSystem
spring.cloud.stream.bindings.TimingTaskInPut.binder=rabbitOne
spring.cloud.stream.bindings.TimingTaskInPut.group=TimingTask
#处理信息回调
spring.cloud.stream.bindings.newCallBack.content-type= application/json
spring.cloud.stream.bindings.newCallBack.destination= SwallowBirds_news_back
spring.cloud.stream.bindings.newCallBack.binder=rabbitOne
关于以上的回调机制,是关于该次任务执行或者动态属性改变结果的反馈,任务系统需要知道该次执行的结果。
3.2 代码部分,该代码部分,是使用反射的方式来执行具体代码
该为配置类,主要用于实现与propertis中的对应
public interface MqMessageSource {
//动态属性 接受消息
String TEST_OUT_PUT = "DynamicAttributesInPuto";
//定时任务接受消息
String TIMING_TASK_INPUT = "TimingTaskInPut";
//回调
String SWALLOW_BIRDS_NEWS_BACK = "newCallBack";
//动态属性 接受信息管道
@Input(TEST_OUT_PUT)
SubscribableChannel dynamicAttributesInput();
//接收信息管道
@Input(TIMING_TASK_INPUT)
SubscribableChannel timingTaskCallback();
}
//定时任务实体类
@Data
public class DynamicAttributesEntity {
//ioc容器的存的名字
private String className;
private List<Attribute> parameter;
//定时任务的id
private String id;
}
//定时任务执行类
@EnableBinding(MqMessageSource.class)
@Component
public class TimingTasksRealization {
@Autowired
private ApplicationContext applicationContext;
@StreamListener(MqMessageSource.TIMING_TASK_INPUT)
@SendTo(MqMessageSource.SWALLOW_BIRDS_NEWS_BACK)
public CallbackArgumentsEntity TimingTasksRealizationImpl(DynamicAttributesEntity dynamicAttributesEntity){
TimingTaskService timingTaskService = (TimingTaskService)applicationContext.getBean(dynamicAttributesEntity.getClassName());
CallbackArgumentsEntity callbackArgumentsEntity = timingTaskService.timingTask();
callbackArgumentsEntity.setId(dynamicAttributesEntity.getId());
callbackArgumentsEntity.setType(2);
return callbackArgumentsEntity;
}
}
业务系统中执行定时任务必须实现 TimingTaskService 接口。
该部分介绍分布式任务,动态属性张介绍,后续该框架也会进行优化源码放在码云,github.希望对大家有用。