背景
当我们使用Quartz做任务调度的时候,随着访问量越来越大,我们需要在多台机器上同时实现Quartz,这里会出现一个问题,分布式环境下,同一个任务调度可能被执行多次,这就与我们期望的结果不一致了,那么如何解决呢?下面提供两种基于数据库锁的解决方法。
两种思路
1.使用数据库自带的行锁机制,使得只有一个线程能获得该锁,并在线程执行完毕后释放该锁
2.利用unique key的唯一性,保证只有一个线程操作是成功的。
具体实现
1.行锁机制
首先需要创建一张用于锁表的锁t_lock,执行以下语句可以锁住表并获得该表的锁:
select * from t_lock for update
释放锁:
connection.commit();
在执行Job的execute方法的业务逻辑之前先锁住t_lock,保证只有一个线程能继续执行业务逻辑,其他机器上的调度线程阻塞,在业务逻辑中判断是否执行过任务调度,如果是继续执行,并在业务逻辑处理完并返回true后释放锁,其他调度线程激活,当判断该任务已经执行过直接退出,下面只贴出了具体的Job逻辑,其他spring配置,mybatis操作这里就不再贴出来。
/**
* 定时任务:每日定时生成文件
* @author hww
*/
@Component
public class GenerateFileJob implements Job {
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd HH:mm:ss");
Date date = new Date();
Connection connection = null;
@Autowired
QuartzLogMapper logMapper;
@Autowired
MybatisSqlSessionFactoryBean bean;
public void before() {
System.out.println("======================================开始生成文件!======================================");
try {
if(connection == null){
SqlSessionFactory sqlSessionFactory = bean.getObject();
SqlSession openSession = sqlSessionFactory.openSession(false);
connection = openSession.getConnection();
connection.setAutoCommit(false);
}
PreparedStatement pps = connection.prepareStatement("select * from t_lock for update");
//通过for update锁表并获得锁
pps.executeQuery();
} catch (Exception e) {
e.printStackTrace();
}
}
public void execute(JobExecutionContext context)
throws JobExecutionException {
before();
//获取job id
String name = context.getJobDetail().getKey().getName();
//check whether exist QuartzLog
if(!existLog(name)){
//业务逻辑
QuartzLog quartzLog = new QuartzLog();
quartzLog.setQuartz_name(name);
quartzLog.setDatetime(sdf.format(new Date()));
quartzLog.setStatus(1);
quartzLog.setRemark("["+Thread.currentThread().getName()+"],执行Job"+name+",当前时间:"+new Date());
logMapper.insertLog(quartzLog);
System.out.println("["+Thread.currentThread().getName()+"],执行Job"+name+",当前时间:"+new Date());
}
after();
}
private void after() {
try {
//在业务执行完成后,释放掉锁
connection.commit();
//关闭连接
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
System.out.println("======================================生成文件结束!======================================");
}
private boolean existLog(String name){
List<QuartzLog> log = logMapper.queryLogByName(name);
if(null != log && log.size() > 0){
return true;
}
return false;
}
}
几点问题:
1.因为这种方式依赖于数据库,如果数据库是单点的,一旦数据库宕机,系统业务就不可用了。(可以搞多个数据库,实现双向同步,其中一个数据库挂掉,转到其他数据库上)
2.不可重入性,当获取锁后,因为业务逻辑抛出异常导致无法释放锁,此时无法再次获取锁。
2.字段唯一性
在上面创建的t_lock表中增加一个唯一字段mark:
ALTER TABLE t_lock ADD UNIQUE (mark);
在执行业务逻辑之前先向t_lock表中插入一条记录,mark字段的值为job20190411001,它表示JobName+time+执行次数,因为mark是唯一的,所以可以保证只有一个调度线程可以执行完成,而其他线程都会抛出异常(这里可以捕获异常正常退出),这里的代码逻辑可以参照上面只需要把for update那里的语句改成插入语句,既可实现。
几点问题:
1.因为这种方式依赖于数据库,如果数据库是单点的,一旦数据库宕机,系统业务就不可用了。(可以搞多个数据库,实现双向同步,其中一个数据库挂掉,转到其他数据库上)
2.没有失效时间,一旦业务逻辑处理失败,需要重新处理,此时t_lock已经存在锁,此时无法重新处理业务。