一,场景:从FTP取文件,把数据落到-接口表,读完之后会把该文件移到另外一个文件夹中,避免重复读取,再把数据从接口表落到-业务表。
二,原因:例如FTP文件数量量很多,加入5万条,全部落到接口表需要5分钟时间,而定时任务周期设的是3分钟,这样当
比如:
9:00,来了一个文件,
9:02,定时任务开始读文件落接口表,
9:05,定时任务又开始读取该文件,文件未移走,导致重复读
9:07,前一个定时任务已读完,会把FTP文件移动到另外一个文件夹中,后面定时任务无法读取
疑问1:当9:07把文件移走后,会不会影响9:05第二个定时任务读取,答案是:不会,虽然整个落接口表周期是5分钟,但中间流程,读取FTP文件内容的时间可能只需要几秒钟,也就是说内容已经取到,就算删除该文件也没有影响,耗时主要在于解析文件内容,落库。
疑问2:如果一次来了10个文件,全部落到接口表需要50分钟,影响是什么?
这样在此期间定时任务会执行10次,每一次具体代码执行逻辑是,先获取全部文件名,遍历执行,根据文件名下载文件,再读取文件内容,而且是读完一个文件,立刻移走。这样就有问题了,当第二个定时任务执行的时候,只能获取到9个
9:00,来了10个文件,
9:02,定时任务1,开始读文件落接口表,取10个文件名
9:05,定时任务2,又开始读取文件,还没有文件处理成功移走,取10个文件名
9:07,第一个定时任务已读完第一个FTP文件,会把FTP文件移动到另外一个文件夹中
9:08,定时任务3,只能获取9个文件名,落9个文件的数据
9:11,定时任务4,开始读取文件,还是取9个文件名
9:12,第一个定时任务已读完第二个FTP文件,会把FTP文件移动到另外一个文件夹中,剩8个
9:14,定时任务4,开始读取文件,只能取8个文件名
------------
所以文件多的情况下,每次执行一个定时任务,FTP上的文件都会被重复读取,只不过,随着前面定时任务读取文件并把文件移走后,后面的定时任务重复执行的文件减少。
三,实现逻辑
方式1,定时任务设置的足够长,确保在此周期内,所有的文件全部读取完,再执行下一个定时任务代码。
不足之处,实际业务中FTP上传较快,不能得到及时处理。
方式2,加锁,redis锁。假如一次定时读取的文件全部处理完,后面的定时任务才能执行,就算执行时间到了,也不执行。
即每次执行加锁,执行完释放锁。
四,落业务表存在类似问题
会不会存在取接口表数据落业务表时,导致同一条接口数据重复落业务表。
接口表数据成功落业务表后,状态会改为已处理,每次只会取未处理接口数据落。
假如:接口表中有1万条数据,全部落完要12分钟,定时任务周期是10分钟
9:00,定时任务1,此时接口表中有1万条数据
9:10,定时任务2,读取接口表数据
此时数据分两种:1,已落业务表,状态为已成功,未提交事物;2,未落业务表,状态是未处理
那么定时任务2,查询接口表时,只能取到未处理的数据,而已成功,虽然未提交事物,实际也查询不到。
那么10000条数据,8000条被定时1处理,2000条被定时任务2处理。这样数据是不会被重复读取,但是否会引发其他问题?
会:假如10000条数据,来自两个FTP文件,每个5000条,后5000条数据分别被两个定时任务处理,
而两个文件一起处理,与分开处理业务逻辑是不同的,必须一个定时任务处理才正确。
处理方式,类似,加锁。
五:代码
public Boolean insertOversea() throws Exception {
logger.info("开始执行oversea落中间表..........");
long startTimeOversea = System.currentTimeMillis();
if (!redisUtils.isLock("insertOversea")) {
logger.info("oversea落中间表-未被锁定...");
redisUtils.lock("insertOversea");
Date now = new Date();
FtpUtil ftpUtil = new FtpUtil(overseaInformation.getProperty("ftp.ip"), Integer.parseInt(overseaInformation.getProperty("ftp.port")), overseaInformation.getProperty("ftp.name"),
overseaInformation.getProperty("ftp.pwd"));
try {
ftpUtil.open();
List<String> fileNames = ftpUtil.getFileNameList("SAP_WMS/OVERSEADN/Unhandled");
logger.info("fileNames={}", fileNames);
if (fileNames == null || fileNames.isEmpty()) {
return true;
}
for (String fileName : fileNames) {
String name = fileName.substring(fileName.lastIndexOf("/"), fileName.length());
String nameOversea = name.substring(1, name.length());
String suffer = fileName.substring(fileName.indexOf(".") + 1, fileName.length());
if (suffer.equals("CTL")) {
continue;
}
String nameCTL = name.substring(0, name.indexOf(".")) + CTL_FILE_SUFFIX;
String fileNameCTL = fileName.substring(0, fileName.lastIndexOf(".")) + CTL_FILE_SUFFIX;
if (!fileNames.contains(fileNameCTL)) {
logger.info("不存在ctl文件!");
continue;
}
try {
ftpUtil.download(fileName, DOWN_NAME + name);
List<String> contents = ftpUtil.analysis(fileName);
long startTimeToEntity = System.currentTimeMillis();
List<OverseaEntity> overseas = this.conversionToObject(nameOversea, contents, now);
long endTimeToEntity = System.currentTimeMillis();
logger.info("解析文件:" + nameOversea + ",封装数据时间: " + (endTimeToEntity - startTimeToEntity) + "ms");
if (overseas == null || overseas.size() < 1) {
continue;
}
// 把批量的方式改成单次
// overseaMapper.insertList(overseas);
long startTimeLuoDi = System.currentTimeMillis();
for (OverseaEntity oversea : overseas) {
OverseaEntity entity = new OverseaEntity();
entity.setItem(oversea.getItem());
entity.setFileName(nameOversea);
OverseaEntity res = overseaMapper.selectOne(entity);
if (res == null) {
overseaMapper.insert(oversea);
}
}
long endTimeLuoDi = System.currentTimeMillis();
logger.info("文件:" + nameOversea + ",数据落地时间: " + (endTimeLuoDi - startTimeLuoDi) + "ms");
ftpUtil.move(fileName, "SAP_WMS/OVERSEADN/Handled" + name);
ftpUtil.move(fileNameCTL, "SAP_WMS/OVERSEADN/Handled" + nameCTL);
} catch (Exception e) {
e.printStackTrace();
if (e instanceof ServiceException) {
throw e;
} else {
ftpUtil.move(fileName, "SAP_WMS/OVERSEADN/Error" + name);
ftpUtil.move(fileNameCTL, "SAP_WMS/OVERSEADN/Handled" + nameCTL);
continue;
}
}
}
long endTimeOversea = System.currentTimeMillis();
logger.info("落接口表时间: " + (endTimeOversea - startTimeOversea) + "ms");
redisUtils.unLock("insertOversea");
} catch (Exception e) {
e.printStackTrace();
throw e;
} finally {
redisUtils.unLock("insertOversea");
ftpUtil.close();
}
} else {
logger.info("oversea落中间表-已被锁定...");
}
return true;
}