1.环境
框架采用springBoot + serviceMesh,使用istio治理服务,k8s治理集群,采用的主数据库是mongoDB。
异步业务采用rabbitMQ处理。
2.业务
接口:新增审批
业务流程:
微服务1,先将一条DRAFT状态的数据修改为EDIT_LOCK状态,再调用微服务2的审批接口进行审批;
微服务2,审批接口审批通过,然后审批接口又调用微服务3的回调接口进行具体业务处理;
微服务3,回调接口先将消息保存到rabbitMQ,rabbitMQ队列异步处理业务,将数据修改为ENABLE。
微服务1代码:
/**
* 表单数据,新增审批
*
* @param formId 表单id
* @param grantId 操作员id
* @param formDataId 表单中数据id
* @param tenantId 企业id
* @return 分装结果对象
* @author leimin
*/
@Override
public BaseVO formCreateApproval(Long formId, Long grantId, Long formDataId, Long tenantId) {
long lockTimeOut = 0L;
try {
lockTimeOut = distributeLockUtil.getLock(FormConstants.CREATE_APPROVAL_LOCK + tenantId + formDataId);
} catch (Exception e) {
return new BaseVO(BaseVO.CONFLICT_CODE, "对不起,目前系统繁忙,请稍后重试");
}
if (lockTimeOut == 0) {
return new BaseVO(BaseVO.CONFLICT_CODE, "对不起,目前系统繁忙,请稍后重试");
}
FormData formData = null;
FormDataApproval formDataApproval = null;
log.info("rabbitMQ消息:新增审批,formDataId是==》" + formDataId.toString());
log.info("新增审批接口1。 grantId" + grantId);
try {
formData = formDao.getByFormDataId(formDataId, tenantId);
if (!auth.get(FormDataState.APPROVAL_CREATE.toString(), grantId.equals(formData.getCreator()), formData.getStatus())
|| !formData.getEditor().equals(grantId)) {
return new BaseVO(BaseVO.UNAUTHORIZED_CODE, "无权限");
}
formApprovalUtil.setFormDataStatusAndIndex(formData, FormDataState.APPROVAL_CREATE);
formDao.updateFormDataStatus(formData);
formDataApproval = createFormDataApproval(formData, FormDataApprovalState.APPROVING);
formApprovalDao.upsertFormDataApproval(formDataApproval);
loggerUtil.saveLog(tenantId, grantId, formData, FormConstants.CREATE);
log.info("新增审批接口2。 主表editor" + formData.getEditor());
log.info("新增审批接口3。 副表editor" + formDataApproval.getEditor());
return audit.audit(formData, formId, grantId, formDataId, tenantId, FormConstants.CREATE, formDataApproval.getId());
} catch (Exception e) {
log.info(String.valueOf(e));
formApprovalUtil.approvalRollback(tenantId, formDataId, formDataApproval, formData, FormDataState.DRAFT);
distributeLockUtil.releaseLock(FormConstants.CREATE_APPROVAL_LOCK + tenantId + formDataId, lockTimeOut);
return new BaseVO(BaseVO.OTHER_CODE, "操作失败!");
}
}
微服务3代码:
/**
* 审批完成后,回调接口,修改源数据
*
* @param tenantId 企业id
* @param formId 表单id
* @param dataId 数据id
* @param result 审核结果
* @param type 审核类型
* @return 返回的结果
* @author niulibing
*/
@Override
public BaseVO callback(Long tenantId, Long formId, Long dataId, String result, String type) {
log.info("1.开始调用回调接口");
//根据dataId查询formDataApproval表的数据,查询状态为审核中的状态的数据
FormDataApproval formDataApproval = formApprovalDao.findOneByDataId(tenantId, dataId);
if (formDataApproval == null) {
return new BaseVO(BaseVO.OTHER_CODE, "不存在该条记录");
}
FormData formData = formDao.findOneByDataIdAndFormId(tenantId, dataId, formId);
if (formData == null) {
return new BaseVO(BaseVO.OTHER_CODE, "不存在该条记录");
}
log.info("回调接口 formDataApproval.getEditor ==>" + formDataApproval.getEditor());
switch (type) {
//审核类型:create
case FormConstants.CREATE:
log.info("2.进入新增审批回调");
try {
// 修改数据状态为ENABLE
return createApprovalCallBack(tenantId, formId, dataId, result, formDataApproval, formData);
} catch (Exception e) {
log.info("新增审批回调异常:" + e.getMessage());
log.info("rabbitMQ消息:回调接口异常,formDataId是==》" + dataId.toString());
log.info("新增审批回调异常回滚==》开始");
formApprovalUtil.approvalRollback(tenantId, dataId, formDataApproval, formData, FormDataState.DRAFT);
log.info("新增审批回调异常回滚==》结束");
distributeLockUtil.releaseLock(FormConstants.CREATE_APPROVAL_LOCK + tenantId + dataId);
return new BaseVO(BaseVO.OTHER_CODE, "提交失败");
}
//审核类型:edit
case FormConstants.EDIT:
log.info("2.进入编辑审批回调");
try {
return editApprovalCallBack(tenantId, formId, dataId, result, formDataApproval, formData);
} catch (Exception e) {
log.info("编辑审批回调异常:" + e.getMessage());
log.info("rabbitMQ消息:回调接口异常,formDataId是==》" + dataId.toString());
log.info("编辑审批回调异常回滚==》开始");
updateStatus(formData, FormDataState.EDITING, formDataApproval, FormDataApprovalState.DRAFT);
log.info("编辑审批回调异常回滚==》结束");
distributeLockUtil.releaseLock(FormConstants.EDIT_APPROVAL_LOCK + tenantId + dataId);
return new BaseVO(BaseVO.OTHER_CODE, "提交失败");
}
//审核类型:delete
case FormConstants.DELETE:
log.info("2.进入删除审批回调");
try {
return deleteApprovalCallBack(tenantId, formId, dataId, result, formDataApproval, formData);
} catch (Exception e) {
log.info("删除审批回调异常:" + e.getMessage());
log.info("rabbitMQ消息:回调接口异常,formDataId是==》" + dataId.toString());
FormDataState formDataState = (FormDataState) redisTemplate.opsForValue().get("deleteApproval:formDataId:" + dataId);
formApprovalUtil.approvalRollback(tenantId, dataId, formDataApproval, formData, formDataState);
redisTemplate.opsForValue().getOperations().delete("deleteApproval:formDataId:" + dataId);
return new BaseVO(BaseVO.OTHER_CODE, "提交失败");
}
default:
return new BaseVO(BaseVO.OTHER_CODE, "审核方式不存在,仅支持create、edit、delete,请确认后提交。");
}
}
3.症状
前端页面调用该审批接口的时候,偶尔会出现一条数据被锁死,状态始终是EDIT_LOCK的数据,后面审批的数据都通过了,但他一直是保持该状态,前端没有报错,日志也没有报异常。
4.可能原因
4.1 rabbitMQ生产者丢数据
可能性很小,因为,生产者采用了异常捕捉机制,出现异常(业务处理异常、处理超时异常等)数据回滚。
4.2 rabbitMQ队列丢数据
可能性也很小,rabbitMQ配置有持久化属性,确保队列不会数据丢失。
4.3 rabbitMQ消费者丢数据
可能性很大,我的rabbitMQ配置中配置了5次重试机制,rabbitMQ队列会向消费者重复发送5次消息。
如果消费者在第一次接收到数据后,消费者业务异常,数据处理失败;或消费者网络异常、传输超时没有接受到数据,队列将认为消费者处理数据成功,不在向消费者发送。
导致该条数据一直处于EDIT_LOCK状态;
5.解决办法
5.1 消费者,添加异常回滚
发现异常将数据回滚到draft 状态;
//审核类型:create
case FormConstants.CREATE:
log.info("2.进入新增审批回调");
try {
return createApprovalCallBack(tenantId, formId, dataId, result, formDataApproval, formData);
} catch (Exception e) {
log.info("新增审批回调异常:" + e.getMessage());
log.info("rabbitMQ消息:回调接口异常,formDataId是==》" + dataId.toString());
log.info("新增审批回调异常回滚==》开始");
formApprovalUtil.approvalRollback(tenantId, dataId, formDataApproval, formData, FormDataState.DRAFT);
log.info("新增审批回调异常回滚==》结束");
distributeLockUtil.releaseLock(FormConstants.CREATE_APPROVAL_LOCK + tenantId + dataId);
return new BaseVO(BaseVO.OTHER_CODE, "提交失败");
}
5.2 延长网络处理时间
将rabbitMQ配置的5次重试改为10次,延长网络处理时间,增加网络请求的获取概率;
spring:
rabbitmq:
host: ${rabbit_url:test.server.cncommdata.cn}
port: ${rabbit_port:5672}
username: ${rabbit_username:guest}
password: ${rabbit_password:guest}
virtual-host: /
# 设置消息的确认模式为auto
listener:
simple:
retry:
## 开启消费者重试
enabled: true
##尝试投递消息的最大数量(默认3次)
max-attempts: 10
##第一次与第二次投递尝试的时间间隔
initial-interval: 3000ms
5.3 自动改手动
对于rabbitMQ消费者丢数据,可以,将之前的自动确认模式,配置改为手动却模式;
但是,不建议如此,手动机制可能会导致各种新的数据维护问题。
5.4 换消息队列
不建议如此,系统变动太大,成本较高。
参考:https://www.cnblogs.com/pypua/articles/7519213.html