Flowable 快速入门教程:任务驳回与回退
前言
- 不管这篇文章是不是入门难度的,反正标题要整齐,估计看懂的不多
- 本文的代码中没有对流程做任何特殊处理,用的都是流程本身的数据,因此可以通用
- 本文暂时不考虑子流程情况,但是核心代码是一样的,可以自己稍微改下(后面考虑进去后更新)
- 本文不保证功能完美,因为这玩意是真的难,我只能尽我所能
- 有存在 Bug 可以在评论中反馈,但是不一定能改
思考(Q&A)
驳回
1: 功能是什么 ?
我传入一个现在任务ID,然后找到上个任务,通过节点跳转API进行跳转
2: 跳转功能情况 ?
1对多,多对1,不考虑多对多情况(官方没提供API,返回错误信息即可)
3: 我怎么找到上个任务 ?
阶段一: 延线找到上个节点(未考虑中间一些需要掠过的任务)
阶段二: 延线找到上个上个用户任务节点,略过像发送邮件之类的节点,网关(未考虑分支线路问题)
阶段三: 阶段二基础上找到最后一次的路线在获取用户任务节点
4: 如何获取最后一次路线 ?
阶段一: 去掉删除的部分获取历史数据(未考虑删除导致脏数据问题,不止删除的数据才是脏数据)
阶段二: 必须找到历史数据的规律,对数据进行清洗(最难,中间还有一堆问题)
阶段三: 在阶段二基础上考虑循环导致的数据问题,直接获取节点数据还不是最后一次走过的数据(未考虑会签带来的逻辑问题)
阶段四: 在阶段三基础上考虑会签,带来的数据问题
5: 如果当前是并行网关之类的情况,如何获取当前其他任务 ?
阶段一: 获取所有正在进行的任务(未考虑其他网关情况)
阶段二: 从当前任务向前找网关,再从网关向后找任务(根本不现实,因为根本不知道要找到哪个网关才够)
阶段三: 从上个任务向后找网关
6: 跳转接口有两类,怎么选 ?
第一种 Activity To Activity 类型,跳转用的活动ID
第二种 Excution To Activity 类型,跳转用的是执行任务ID
我选择第一种,因为这样会签跳转就不需要去获取到具体的哪行执行任务,第二钟太细,不好操作
退回
在驳回的基础上
1: 功能是什么样的 ?或者说退回与驳回有什么区别?
驳回只回到上个任务节点,退回则是根据传入的具体 任务Key 退回到指定的之前的节点
2: 有没有情况是无法退回的?什么情况?
有,只有回退的相对于当前任务是串型才能退回,如果回退的任务属于并行,不能退回
3: 如何判断跳转的节点是否可以符合要求
通过当前任务节点通过连线向前寻找,必须所有线路都能到达目标节点,有一个不行都不符合要求
4: 如果当前是并行网关之类的情况,如何获取当前其他任务 ?
与驳回相同
脏数据
脏数据是我们在获取历史数据时候必须要处理的问题
脏数据样例
串行样例
并行样例
会签样例
驳回解析
基础数据获取
获取当前任务的信息,以及整个流程的信息
// 校验流程是否挂起
if (taskService.createTaskQuery().taskId(taskId).singleResult().isSuspended()) {
return JsonUtil.toJSON(ErrorMsg.ERROR.setNewErrorMsg("任务处于挂起状态"));
}
// 当前任务 task
Task task = taskService.createTaskQuery().taskId(taskId).singleResult();
// 获取流程定义信息
ProcessDefinition processDefinition = repositoryService.createProcessDefinitionQuery().processDefinitionId(task.getProcessDefinitionId()).singleResult();
// 获取所有节点信息,暂不考虑子流程情况
Process process = repositoryService.getBpmnModel(processDefinition.getId()).getProcesses().get(0);
Collection<FlowElement> flowElements = process.getFlowElements();
// 获取当前任务节点元素
UserTask source = null;
if (flowElements != null) {
for (FlowElement flowElement : flowElements) {
// 类型为用户节点
if (flowElement.getId().equals(task.getTaskDefinitionKey())) {
source = (UserTask) flowElement;
}
}
}
获取所有目标Key
获取所有的父级任务
延入口处的连线开始向前迭代,发现是用户任务就添加到返回列表中
虽然拿到了父级用户任务,但是不能直接用,因为向分支情况不知道具体过来的路线是什么
List<UserTask> parentUserTaskList = FlowableUtils.iteratorFindParentUserTasks(source, null);
// 初始任务节点,不做操作
if (parentUserTaskList == null || parentUserTaskList.size() == 0) {
return JsonUtil.toJSON(ErrorMsg.ERROR.setNewErrorMsg("当前节点为初始任务节点"));
}
迭代采用的深度优先算法,说简单点沿一条边走到头,再换其他路线
/**
* 迭代获取父级任务节点列表,向前找
* @param source 起始节点
* @param userTaskList 已找到的用户任务节点
* @return
*/
public static List<UserTask> iteratorFindParentUserTasks(FlowElement source, List<UserTask> userTaskList) {
userTaskList = userTaskList == null ? new ArrayList<>() : userTaskList;
// 根据类型,获取入口连线
List<SequenceFlow> sequenceFlows = null;
if (source instanceof Task) {
sequenceFlows = ((Task) source).getIncomingFlows();
} else if (source instanceof Gateway) {
sequenceFlows = ((Gateway) source).getIncomingFlows();
}
if (sequenceFlows != null) {
// 循环找到目标元素
for (SequenceFlow sequenceFlow: sequenceFlows) {
// 类型为用户节点,则新增父级节点
if (sequenceFlow.getSourceFlowElement() instanceof UserTask) {
userTaskList.add((UserTask) sequenceFlow.getSourceFlowElement());
continue;
}
// 否则继续迭代
userTaskList = iteratorFindParentUserTasks(sequenceFlow.getSourceFlowElement(), userTaskList);
}
}
return userTaskList;
}
数据清洗
PS:虽然我觉得能看懂的不多,数据清洗很多也是在遇到问题后再去添加处理的,因此代码稍微有点乱,糅杂在一起
历史数据获取
通过开头获取的流程的基础信息与历史数据进行数据清洗
下面开始 historicTaskInstanceClean 部分
参数定义与脏数据范围寻找
我把数据放入栈里,因为我这里的数据需要从后向前处理,正好利用栈的后进先出的特性
// 循环放入栈,栈 LIFO:后进先出
Stack<HistoricTaskInstance> stack = new Stack<>();
historicTaskInstanceList.forEach(item -> stack.push(item));
定义的三个比较重要的参数
- userTaskKey:用来保存上条记录的 key
- deleteKeyList:这个就是脏数据的起点数据
- dirtyDataLineList:则是脏数据的路线集合,也就是线路上脏数据点,用 Set 是因为不需要重复的
如图,可以利用他这个头尾相同的特点来寻找脏数据的范围
PS:为什么 deleteKey 用 list 来存储,看上面并行的样例,会发现,在多条情况下如果生成顺序是 3 4,那么驳回数据的生成顺序会倒过来变成 4 3 ,因此我们需要多个点都匹配
筛选脏线路的
脏数据在回退的过程中,夹杂在中间的不确定点的过滤出来,这里用到脏数据路线的原因就是为了过滤掉其他并行线路的数据,毕竟其他线路的点和这个没关系
如图,红框中的 3 4 5 就是中间夹杂的不确定点
结束点标识获取
结束的点,也就是上图中最后的脏数据的点,选择直接从删除原因中获取,因为在并行情况下存在多过个点,此时的多个点我们难以获取,之后获取脏线路路线
脏线路获取
passRoads 是为了保存已经经过的路线,因为存在可能线拉回前面导致循环的情况,因此经过的路线就 pass 即可
/**
* 从后向前寻路,获取所有脏线路上的点
* @param source 起始节点
* @param passRoads 已经经过的点集合
* @param targets 目标脏线路终点
* @param dirtyRoads 确定为脏数据的点,因为不需要重复,因此使用 set 存储
* @return
*/
public static Set<String> iteratorFindDirtyRoads(FlowElement source, List<String> passRoads, List<String> targets, Set<String> dirtyRoads) {
passRoads = passRoads == null ? new ArrayList<>() : passRoads;
dirtyRoads = dirtyRoads == null ? new HashSet<>() : dirtyRoads;
// 根据类型,获取入口连线
List<SequenceFlow> sequenceFlows = null;
if (source instanceof Task) {
sequenceFlows = ((Task) source).getIncomingFlows();
} else if (source instanceof Gateway) {
sequenceFlows = ((Gateway) source).getIncomingFlows();
}
if (sequenceFlows != null) {
// 循环找到目标元素
for (SequenceFlow sequenceFlow: sequenceFlows) {
passRoads.add(sequenceFlow.getSourceFlowElement().getId());
// 如果此点为目标点,确定经过的路线为脏线路,添加点到脏线路中,然后找下个连线
if (targets.contains(sequenceFlow.getSourceFlowElement().getId())) {
dirtyRoads.addAll(passRoads);
continue;
}
// 否则就继续迭代
dirtyRoads = iteratorFindDirtyRoads(sequenceFlow.getSourceFlowElement(), passRoads, targets, dirtyRoads);
}
}
return dirtyRoads;
}
脏线路合并
首先之前获取的脏数据线路是不完整的,只代表了之前节点的分支线路
在并行情况下点是多个的,因此在进行到下个并行点时候,我们需要获取这个分支节点的路线并且合并到之前的数据中,而是否是并行的标志,就是我们之前从删除原因中获取的脏数据的结束点中存在
PS:这里就不需要考虑脏数据点中间嵌套脏数据点了,因为中间的脏数据点都在脏线路中存在,会被当作过程中的不确定点处理
会签脏数据结束点处理
普通的节点,在找到一个点后,用 replace 函数把 deleteKey 的值替换掉,都替换光后说明匹配结束,把list里的值移除即可,为了兼容并行情况
会签就不太一样了
如上图,在驳回到会签节点时候,脏数据结束点生成的数据是多条的,与并行不同的是,这几条数据对应的节点 Key 是相同的,因此之前的逻辑不足以处理。
但是由于会签的数据是连续的,因此采用延迟处理的方式,如果脏数据是会签点(代码开始获取的会签节点列表就是这时候用的),就记录下当前的会签 Key 和 索引下标,当循环到的值不一样的时候,说明上个点就是结束点已经处理结束,再把值删掉。
PS:注意会签的删除要在循环结束后删,否在会导致溢出,因此放了个 multiOpera 作为处理的标识
结束后的处理
脏数据清洗效果图
流程图
对应数据
清洗效果截图,由于没有循环,可以看到除了会签对应 3 条实例数据,其他节点清洗后都只有一个
sid-4FE193FF-E1E2-4F87-8424-2F00BCA9AFC5 是网关,没给它命名
父级任务筛选
最后如何判断最后的路线,毕竟如果拉了循环的线路情况,可能导致每个分支都由记录的情况
逻辑:不管路线循环了几次,每两次之间的各点最多经过 1 次,前提是在之前的数据清理后,当然要处理下会签情况下一个点多记录情况
因此我们只需获取最后两次经过数据之前,属于父级节点的任务即可
同样利用会签数据连续且节点 Key 相同的特点,排除会签问题
获取所有需要跳转的当前任务
如何获取:以其中一个父级任务出发向后扫描,能匹配到的任务节点都是需要跳转的
同样的,只要向子节点扫描的,都需要考虑循环的问题
/**
* 迭代获取子级任务节点列表,向后找
* @param source 起始节点
* @param runTaskKeyList 正在运行的任务 Key,用于校验任务节点是否是正在运行的节点
* @param hasSequenceFlow 已经经过的连线的 ID,用于判断线路是否重复
* @param userTaskList 需要撤回的用户任务列表
* @return
*/
public static List<UserTask> iteratorFindChildUserTasks(FlowElement source, List<String> runTaskKeyList, Set<String> hasSequenceFlow, List<UserTask> userTaskList) {
hasSequenceFlow = hasSequenceFlow == null ? new HashSet<>() : hasSequenceFlow;
userTaskList = userTaskList == null ? new ArrayList<>() : userTaskList;
// 根据类型,获取出口连线
List<SequenceFlow> sequenceFlows = null;
if (source instanceof Task) {
sequenceFlows = ((Task) source).getOutgoingFlows();
} else if (source instanceof Gateway) {
sequenceFlows = ((Gateway) source).getOutgoingFlows();
}
if (sequenceFlows != null) {
// 循环找到目标元素
for (SequenceFlow sequenceFlow: sequenceFlows) {
// 如果发现连线重复,说明循环了,跳过这个循环
if (hasSequenceFlow.contains(sequenceFlow.getId())) {
continue;
}
// 添加已经走过的连线
hasSequenceFlow.add(sequenceFlow.getId());
// 如果为用户任务类型,且任务节点的 Key 正在运行的任务中存在,添加
if (sequenceFlow.getTargetFlowElement() instanceof UserTask && runTaskKeyList.contains((sequenceFlow.getTargetFlowElement()).getId())) {
userTaskList.add((UserTask) sequenceFlow.getTargetFlowElement());
continue;
}
// 否则继续迭代
userTaskList = iteratorFindChildUserTasks(sequenceFlow.getTargetFlowElement(), runTaskKeyList, hasSequenceFlow, userTaskList);
}
}
return userTaskList;
}
任务扭转
最后根据当前任务的数量以及目标任务的数量,选择不同的方法即可
回退解析
基础数据获取
和驳回不同的,这里多获取了一个目标任务节点的信息
校验能否跳转
逻辑思路:从当前的任务向前扫描路线,所有的路线都必须经过目标节点,才能跳转,可以画图看看。
/**
* 迭代从后向前扫描,判断目标节点相对于当前节点是否是串行
* @param source 起始节点
* @param isSequential 是否串行
* @param targetKsy 目标节点
* @return
*/
public static Boolean iteratorCheckSequentialReferTarget(FlowElement source, String targetKsy, Boolean isSequential) {
isSequential = isSequential == null ? true : isSequential;
// 根据类型,获取入口连线
List<SequenceFlow> sequenceFlows = null;
if (source instanceof Task) {
sequenceFlows = ((Task) source).getIncomingFlows();
} else if (source instanceof Gateway) {
sequenceFlows = ((Gateway) source).getIncomingFlows();
}
if (sequenceFlows != null) {
// 循环找到目标元素
for (SequenceFlow sequenceFlow: sequenceFlows) {
// 如果目标节点已被判断为并行,后面都不需要执行,直接返回
if (isSequential == false) {
break;
}
// 这条线路存在目标节点,这条线路完成,进入下个线路
if (targetKsy.equals(sequenceFlow.getSourceFlowElement().getId())) {
continue;
}
if (sequenceFlow.getSourceFlowElement() instanceof StartEvent) {
isSequential = false;
break;
}
// 否则就继续迭代
isSequential = iteratorCheckSequentialReferTarget(sequenceFlow.getSourceFlowElement(), targetKsy, isSequential);
}
}
return isSequential;
}
获取所有需要跳转的当前任务
这个和驳回的逻辑是一样的就不多说了
任务扭转
完整源码
RuntimeApiController
/**
* 流程定义与实例相关接口封装
* @author: linjinp
* @create: 2019-11-05 14:55
**/
@RestController
@RequestMapping("/flowable/runtime/api")
public class RuntimeApiController {
public static final Logger logger = LogManager.getLogger(RuntimeApiController.class);
@Resource
private RepositoryService repositoryService;
@Resource
private RuntimeService runtimeService;
@Resource
private TaskService taskService;
@Resource
private HistoryService historyService;
/**
* 流程收回/驳回
* @param taskId 当前任务ID
* @return
*/
@GetMapping(value = "/flowTackback/{taskId}")
public String flowTackback(@PathVariable(value = "taskId") String taskId) {
if (taskService.createTaskQuery().taskId(taskId).singleResult().isSuspended()) {
return JsonUtil.toJSON(ErrorMsg.ERROR.setNewErrorMsg("任务处于挂起状态"));
}
// 当前任务 task
Task task = taskService.createTaskQuery().taskId(taskId).singleResult();
// 获取流程定义信息
ProcessDefinition processDefinition = repositoryService.createProcessDefinitionQuery().processDefinitionId(task.getProcessDefinitionId()).singleResult();
// 获取所有节点信息,暂不考虑子流程情况
Process process = repositoryService.getBpmnModel(processDefinition.getId()).getProcesses().get(0);
Collection<FlowElement> flowElements = process.getFlowElements();
// 获取当前任务节点元素
UserTask source = null;
if (flowElements != null) {
for (FlowElement flowElement : flowElements) {
// 类型为用户节点
if (flowElement.getId().equals(task.getTaskDefinitionKey())) {
source = (UserTask) flowElement;
}
}
}
// 目的获取所有跳转到的节点 targetIds
// 获取当前节点的所有父级用户任务节点
// 深度优先算法思想:延边迭代深入
List<UserTask> parentUserTaskList = FlowableUtils.iteratorFindParentUserTasks(source, null);
// 初始任务节点,不做操作
if (parentUserTaskList == null || parentUserTaskList.size() == 0) {
return JsonUtil.toJSON(ErrorMsg.ERROR.setNewErrorMsg("当前节点为初始任务节点"));
}
// 获取活动 ID 即节点 Key
List<String> parentUserTaskKeyList = new ArrayList<>();
parentUserTaskList.forEach(item -> parentUserTaskKeyList.add(item.getId()));
// 获取全部历史节点活动实例,即已经走过的节点历史
List<HistoricTaskInstance> historicTaskInstanceList = historyService.createHistoricTaskInstanceQuery().processInstanceId(task.getProcessInstanceId()).orderByHistoricTaskInstanceStartTime().asc().list();
// 数据清洗,将回滚导致的脏数据清洗掉
List<String> lastHistoricTaskInstanceList = FlowableUtils.historicTaskInstanceClean(flowElements, historicTaskInstanceList);
// 此时历史任务实例为倒序,获取最后走的节点
List<String> targetIds = new ArrayList<>();
// 循环结束标识,遇到当前目标节点的次数
int number = 0;
StringBuilder parentHistoricTaskKey = new StringBuilder();
for (String historicTaskInstanceKey : lastHistoricTaskInstanceList) {
// 当会签时候会出现特殊的,连续都是同一个节点历史数据的情况,这种时候跳过
if (parentHistoricTaskKey.toString().equals(historicTaskInstanceKey)) {
continue;
}
parentHistoricTaskKey = new StringBuilder(historicTaskInstanceKey);
if (historicTaskInstanceKey.equals(task.getTaskDefinitionKey())) {
number ++;
}
// 在数据清洗后,历史节点就是唯一一条从起始到当前节点的历史记录,理论上每个点只会出现一次
// 在流程中如果出现循环,那么每次循环中间的点也只会出现一次,再出现就是下次循环
// number == 1,第一次遇到当前节点
// number == 2,第二次遇到,代表最后一次的循环范围
if (number == 2) {
break;
}
// 如果当前历史节点,属于父级的节点,说明最后一次经过了这个点,需要退回这个点
if (parentUserTaskKeyList.contains(historicTaskInstanceKey)) {
targetIds.add(historicTaskInstanceKey);
}
}
// 目的获取所有需要被跳转的节点 currentIds
// 取其中一个父级任务,因为后续要么存在公共网关,要么就是串行公共线路
UserTask oneUserTask = parentUserTaskList.get(0);
// 获取所有正常进行的任务节点 Key,这些任务不能直接使用,需要找出其中需要撤回的任务
List<Task> runTaskList = taskService.createTaskQuery().processInstanceId(task.getProcessInstanceId()).list();
List<String> runTaskKeyList = new ArrayList<>();
runTaskList.forEach(item -> runTaskKeyList.add(item.getTaskDefinitionKey()));
// 需驳回任务列表
List<String> currentIds = new ArrayList<>();
// 通过父级网关的出口连线,结合 runTaskList 比对,获取需要撤回的任务
List<UserTask> currentUserTaskList = FlowableUtils.iteratorFindChildUserTasks(oneUserTask, runTaskKeyList, null, null);
currentUserTaskList.forEach(item -> currentIds.add(item.getId()));
// 规定:并行网关之前节点必须需存在唯一用户任务节点,如果出现多个任务节点,则并行网关节点默认为结束节点,原因为不考虑多对多情况
if (targetIds.size() > 1 && currentIds.size() > 1) {
return JsonUtil.toJSON(ErrorMsg.ERROR.setNewErrorMsg("任务出现多对多情况,无法撤回"));
}
try {
// 如果父级任务多于 1 个,说明当前节点不是并行节点,原因为不考虑多对多情况
if (targetIds.size() > 1) {
// 1 对 多任务跳转,currentIds 当前节点(1),targetIds 跳转到的节点(多)
runtimeService.createChangeActivityStateBuilder().processInstanceId(task.getProcessInstanceId()).moveSingleActivityIdToActivityIds(currentIds.get(0), targetIds).changeState();
}
// 如果父级任务只有一个,因此当前任务可能为网关中的任务
if (targetIds.size() == 1) {
// 1 对 1 或 多 对 1 情况,currentIds 当前要跳转的节点列表(1或多),targetIds.get(0) 跳转到的节点(1)
runtimeService.createChangeActivityStateBuilder().processInstanceId(task.getProcessInstanceId()).moveActivityIdsToSingleActivityId(currentIds, targetIds.get(0)).changeState();
}
} catch (FlowableObjectNotFoundException e) {
e.printStackTrace();
return JsonUtil.toJSON(ErrorMsg.ERROR.setNewErrorMsg("未找到流程实例,流程可能已发生变化"));
} catch (FlowableException e) {
e.printStackTrace();
return JsonUtil.toJSON(ErrorMsg.ERROR.setNewErrorMsg("无法取消或开始活动"));
}
return JsonUtil.toJSON(ErrorMsg.SUCCESS);
}
/**
* 流程回退
* @param taskId 当前任务ID
* @param targetKey 要回退的任务 Key
* @return
*/
@GetMapping(value = "/flowReturn/{taskId}/{targetKey}")
public String flowReturn(@PathVariable(value = "taskId") String taskId, @PathVariable(value = "targetKey") String targetKey) {
if (taskService.createTaskQuery().taskId(taskId).singleResult().isSuspended()) {
return JsonUtil.toJSON(ErrorMsg.ERROR.setNewErrorMsg("任务处于挂起状态"));
}
// 当前任务 task
Task task = taskService.createTaskQuery().taskId(taskId).singleResult();
// 获取流程定义信息
ProcessDefinition processDefinition = repositoryService.createProcessDefinitionQuery().processDefinitionId(task.getProcessDefinitionId()).singleResult();
// 获取所有节点信息,暂不考虑子流程情况
Process process = repositoryService.getBpmnModel(processDefinition.getId()).getProcesses().get(0);
Collection<FlowElement> flowElements = process.getFlowElements();
// 获取当前任务节点元素
UserTask source = null;
// 获取跳转的节点元素
UserTask target = null;
if (flowElements != null) {
for (FlowElement flowElement : flowElements) {
// 当前任务节点元素
if (flowElement.getId().equals(task.getTaskDefinitionKey())) {
source = (UserTask) flowElement;
}
// 跳转的节点元素
if (flowElement.getId().equals(targetKey)) {
target = (UserTask) flowElement;
}
}
}
// 从当前节点向前扫描
// 如果存在路线上不存在目标节点,说明目标节点是在网关上或非同一路线上,不可跳转
// 否则目标节点相对于当前节点,属于串行
Boolean isSequential = FlowableUtils.iteratorCheckSequentialReferTarget(source, targetKey, null);
if (!isSequential) {
return JsonUtil.toJSON(ErrorMsg.ERROR.setNewErrorMsg("当前节点相对于目标节点,不属于串行关系,无法回退"));
}
// 获取所有正常进行的任务节点 Key,这些任务不能直接使用,需要找出其中需要撤回的任务
List<Task> runTaskList = taskService.createTaskQuery().processInstanceId(task.getProcessInstanceId()).list();
List<String> runTaskKeyList = new ArrayList<>();
runTaskList.forEach(item -> runTaskKeyList.add(item.getTaskDefinitionKey()));
// 需退回任务列表
List<String> currentIds = new ArrayList<>();
// 通过父级网关的出口连线,结合 runTaskList 比对,获取需要撤回的任务
List<UserTask> currentUserTaskList = FlowableUtils.iteratorFindChildUserTasks(target, runTaskKeyList, null, null);
currentUserTaskList.forEach(item -> currentIds.add(item.getId()));
try {
// 1 对 1 或 多 对 1 情况,currentIds 当前要跳转的节点列表(1或多),targetKey 跳转到的节点(1)
runtimeService.createChangeActivityStateBuilder().processInstanceId(task.getProcessInstanceId()).moveActivityIdsToSingleActivityId(currentIds, targetKey).changeState();
} catch (FlowableObjectNotFoundException e) {
e.printStackTrace();
return JsonUtil.toJSON(ErrorMsg.ERROR.setNewErrorMsg("未找到流程实例,流程可能已发生变化"));
} catch (FlowableException e) {
e.printStackTrace();
return JsonUtil.toJSON(ErrorMsg.ERROR.setNewErrorMsg("无法取消或开始活动"));
}
return JsonUtil.toJSON(ErrorMsg.SUCCESS);
}
}
FlowableUtils
/**
* 流程引擎工具类封装
* @author: linjinp
* @create: 2019-12-24 13:51
**/
public class FlowableUtils {
public static final Logger logger = LogManager.getLogger(FlowableUtils.class);
/**
* 迭代获取父级任务节点列表,向前找
* @param source 起始节点
* @param userTaskList 已找到的用户任务节点
* @return
*/
public static List<UserTask> iteratorFindParentUserTasks(FlowElement source, List<UserTask> userTaskList) {
userTaskList = userTaskList == null ? new ArrayList<>() : userTaskList;
// 根据类型,获取入口连线
List<SequenceFlow> sequenceFlows = null;
if (source instanceof Task) {
sequenceFlows = ((Task) source).getIncomingFlows();
} else if (source instanceof Gateway) {
sequenceFlows = ((Gateway) source).getIncomingFlows();
}
if (sequenceFlows != null) {
// 循环找到目标元素
for (SequenceFlow sequenceFlow: sequenceFlows) {
// 类型为用户节点,则新增父级节点
if (sequenceFlow.getSourceFlowElement() instanceof UserTask) {
userTaskList.add((UserTask) sequenceFlow.getSourceFlowElement());
continue;
}
// 否则继续迭代
userTaskList = iteratorFindParentUserTasks(sequenceFlow.getSourceFlowElement(), userTaskList);
}
}
return userTaskList;
}
/**
* 迭代获取子级任务节点列表,向后找
* @param source 起始节点
* @param runTaskKeyList 正在运行的任务 Key,用于校验任务节点是否是正在运行的节点
* @param hasSequenceFlow 已经经过的连线的 ID,用于判断线路是否重复
* @param userTaskList 需要撤回的用户任务列表
* @return
*/
public static List<UserTask> iteratorFindChildUserTasks(FlowElement source, List<String> runTaskKeyList, Set<String> hasSequenceFlow, List<UserTask> userTaskList) {
hasSequenceFlow = hasSequenceFlow == null ? new HashSet<>() : hasSequenceFlow;
userTaskList = userTaskList == null ? new ArrayList<>() : userTaskList;
// 根据类型,获取出口连线
List<SequenceFlow> sequenceFlows = null;
if (source instanceof Task) {
sequenceFlows = ((Task) source).getOutgoingFlows();
} else if (source instanceof Gateway) {
sequenceFlows = ((Gateway) source).getOutgoingFlows();
}
if (sequenceFlows != null) {
// 循环找到目标元素
for (SequenceFlow sequenceFlow: sequenceFlows) {
// 如果发现连线重复,说明循环了,跳过这个循环
if (hasSequenceFlow.contains(sequenceFlow.getId())) {
continue;
}
// 添加已经走过的连线
hasSequenceFlow.add(sequenceFlow.getId());
// 如果为用户任务类型,且任务节点的 Key 正在运行的任务中存在,添加
if (sequenceFlow.getTargetFlowElement() instanceof UserTask && runTaskKeyList.contains((sequenceFlow.getTargetFlowElement()).getId())) {
userTaskList.add((UserTask) sequenceFlow.getTargetFlowElement());
continue;
}
// 否则继续迭代
userTaskList = iteratorFindChildUserTasks(sequenceFlow.getTargetFlowElement(), runTaskKeyList, hasSequenceFlow, userTaskList);
}
}
return userTaskList;
}
/**
* 从后向前寻路,获取所有脏线路上的点
* @param source 起始节点
* @param passRoads 已经经过的点集合
* @param targets 目标脏线路终点
* @param dirtyRoads 确定为脏数据的点,因为不需要重复,因此使用 set 存储
* @return
*/
public static Set<String> iteratorFindDirtyRoads(FlowElement source, List<String> passRoads, List<String> targets, Set<String> dirtyRoads) {
passRoads = passRoads == null ? new ArrayList<>() : passRoads;
dirtyRoads = dirtyRoads == null ? new HashSet<>() : dirtyRoads;
// 根据类型,获取入口连线
List<SequenceFlow> sequenceFlows = null;
if (source instanceof Task) {
sequenceFlows = ((Task) source).getIncomingFlows();
} else if (source instanceof Gateway) {
sequenceFlows = ((Gateway) source).getIncomingFlows();
}
if (sequenceFlows != null) {
// 循环找到目标元素
for (SequenceFlow sequenceFlow: sequenceFlows) {
passRoads.add(sequenceFlow.getSourceFlowElement().getId());
// 如果此点为目标点,确定经过的路线为脏线路,添加点到脏线路中,然后找下个连线
if (targets.contains(sequenceFlow.getSourceFlowElement().getId())) {
dirtyRoads.addAll(passRoads);
continue;
}
// 否则就继续迭代
dirtyRoads = iteratorFindDirtyRoads(sequenceFlow.getSourceFlowElement(), passRoads, targets, dirtyRoads);
}
}
return dirtyRoads;
}
/**
* 迭代从后向前扫描,判断目标节点相对于当前节点是否是串行
* @param source 起始节点
* @param isSequential 是否串行
* @param targetKsy 目标节点
* @return
*/
public static Boolean iteratorCheckSequentialReferTarget(FlowElement source, String targetKsy, Boolean isSequential) {
isSequential = isSequential == null ? true : isSequential;
// 根据类型,获取入口连线
List<SequenceFlow> sequenceFlows = null;
if (source instanceof Task) {
sequenceFlows = ((Task) source).getIncomingFlows();
} else if (source instanceof Gateway) {
sequenceFlows = ((Gateway) source).getIncomingFlows();
}
if (sequenceFlows != null) {
// 循环找到目标元素
for (SequenceFlow sequenceFlow: sequenceFlows) {
// 如果目标节点已被判断为并行,后面都不需要执行,直接返回
if (isSequential == false) {
break;
}
// 这条线路存在目标节点,这条线路完成,进入下个线路
if (targetKsy.equals(sequenceFlow.getSourceFlowElement().getId())) {
continue;
}
if (sequenceFlow.getSourceFlowElement() instanceof StartEvent) {
isSequential = false;
break;
}
// 否则就继续迭代
isSequential = iteratorCheckSequentialReferTarget(sequenceFlow.getSourceFlowElement(), targetKsy, isSequential);
}
}
return isSequential;
}
/**
* 历史节点数据清洗,清洗掉又回滚导致的脏数据
* @param flowElements 全部节点信息
* @param historicTaskInstanceList 历史任务实例信息,数据采用开始时间升序
* @return
*/
public static List<String> historicTaskInstanceClean(Collection<FlowElement> flowElements, List<HistoricTaskInstance> historicTaskInstanceList) {
// 会签节点收集
List<String> multiTask = new ArrayList<>();
for (FlowElement flowElement : flowElements) {
if (flowElement instanceof UserTask) {
// 如果该节点的行为为会签行为,说明该节点为会签节点
if (((UserTask) flowElement).getBehavior() instanceof ParallelMultiInstanceBehavior || ((UserTask) flowElement).getBehavior() instanceof SequentialMultiInstanceBehavior) {
multiTask.add(flowElement.getId());
}
}
}
// 循环放入栈,栈 LIFO:后进先出
Stack<HistoricTaskInstance> stack = new Stack<>();
historicTaskInstanceList.forEach(item -> stack.push(item));
// 清洗后的历史任务实例
List<String> lastHistoricTaskInstanceList = new ArrayList<>();
// 网关存在可能只走了部分分支情况,且还存在跳转废弃数据以及其他分支数据的干扰,因此需要对历史节点数据进行清洗
// 临时用户任务 key
StringBuilder userTaskKey = null;
// 临时被删掉的任务 key,存在并行情况
List<String> deleteKeyList = new ArrayList<>();
// 临时脏数据线路
List<Set<String>> dirtyDataLineList = new ArrayList<>();
// 由某个点跳到会签点,此时出现多个会签实例对应 1 个跳转情况,需要把这些连续脏数据都找到
// 会签特殊处理下标
int multiIndex = -1;
// 会签特殊处理 key
StringBuilder multiKey = null;
// 会签特殊处理操作标识
boolean multiOpera = false;
while (!stack.empty()) {
// 从这里开始 userTaskKey 都还是上个栈的 key
// 是否是脏数据线路上的点
boolean isDirtyData = false;
for (Set<String> oldDirtyDataLine : dirtyDataLineList) {
if (oldDirtyDataLine.contains(stack.peek().getTaskDefinitionKey())) {
isDirtyData = true;
}
}
// 删除原因不为空,说明从这条数据开始回跳或者回退的
// MI_END:会签完成后,其他未签到节点的删除原因,不在处理范围内
if (stack.peek().getDeleteReason() != null && !stack.peek().getDeleteReason().equals("MI_END")) {
// 可以理解为脏线路起点
String dirtyPoint = "";
if (stack.peek().getDeleteReason().indexOf("Change activity to ") >= 0) {
dirtyPoint = stack.peek().getDeleteReason().replace("Change activity to ", "");
}
// 会签回退删除原因有点不同
if (stack.peek().getDeleteReason().indexOf("Change parent activity to ") >= 0) {
dirtyPoint = stack.peek().getDeleteReason().replace("Change parent activity to ", "");
}
FlowElement dirtyTask = null;
// 获取变更节点的对应的入口处连线
// 如果是网关并行回退情况,会变成两天脏数据路线,效果一样
for (FlowElement flowElement : flowElements) {
if (flowElement.getId().equals(stack.peek().getTaskDefinitionKey())) {
dirtyTask = flowElement;
}
}
// 获取脏数据线路
Set<String> dirtyDataLine = FlowableUtils.iteratorFindDirtyRoads(dirtyTask, null, Arrays.asList(dirtyPoint.split(",")), null);
// 自己本身也是脏线路上的点,加进去
dirtyDataLine.add(stack.peek().getTaskDefinitionKey());
logger.info(stack.peek().getTaskDefinitionKey() + "点脏路线集合:" + dirtyDataLine);
// 是全新的需要添加的脏线路
boolean isNewDirtyData = true;
for (int i = 0; i < dirtyDataLineList.size(); i++) {
// 如果发现他的上个节点在脏线路内,说明这个点可能是并行的节点,或者连续驳回
// 这时,都以之前的脏线路节点为标准,只需合并脏线路即可,也就是路线补全
if (dirtyDataLineList.get(i).contains(userTaskKey.toString())) {
isNewDirtyData = false;
dirtyDataLineList.get(i).addAll(dirtyDataLine);
}
}
// 已确定时全新的脏线路
if (isNewDirtyData) {
// deleteKey 单一路线驳回到并行,这种同时生成多个新实例记录情况,这时 deleteKey 其实是由多个值组成
// 按照逻辑,回退后立刻生成的实例记录就是回退的记录
// 至于驳回所生成的 Key,直接从删除原因中获取,因为存在驳回到并行的情况
deleteKeyList.add(dirtyPoint + ",");
dirtyDataLineList.add(dirtyDataLine);
}
// 添加后,现在这个点变成脏线路上的点了
isDirtyData = true;
}
// 如果不是脏线路上的点,说明是有效数据,添加历史实例 Key
if (!isDirtyData) {
lastHistoricTaskInstanceList.add(stack.peek().getTaskDefinitionKey());
}
// 校验脏线路是否结束
for (int i = 0; i < deleteKeyList.size(); i ++) {
// 如果发现脏数据属于会签,记录下下标与对应 Key,以备后续比对,会签脏数据范畴开始
if (multiKey == null && multiTask.contains(stack.peek().getTaskDefinitionKey())
&& deleteKeyList.get(i).contains(stack.peek().getTaskDefinitionKey())) {
multiIndex = i;
multiKey = new StringBuilder(stack.peek().getTaskDefinitionKey());
}
// 会签脏数据处理,节点退回会签清空
// 如果在会签脏数据范畴中发现 Key改变,说明会签脏数据在上个节点就结束了,可以把会签脏数据删掉
if (multiKey != null && !multiKey.toString().equals(stack.peek().getTaskDefinitionKey())) {
deleteKeyList.set(multiIndex , deleteKeyList.get(multiIndex).replace(stack.peek().getTaskDefinitionKey() + ",", ""));
multiKey = null;
// 结束进行下校验删除
multiOpera = true;
}
// 其他脏数据处理
// 发现该路线最后一条脏数据,说明这条脏数据线路处理完了,删除脏数据信息
// 脏数据产生的新实例中是否包含这条数据
if (multiKey == null && deleteKeyList.get(i).contains(stack.peek().getTaskDefinitionKey())) {
// 删除匹配到的部分
deleteKeyList.set(i , deleteKeyList.get(i).replace(stack.peek().getTaskDefinitionKey() + ",", ""));
}
// 如果每组中的元素都以匹配过,说明脏数据结束
if ("".equals(deleteKeyList.get(i))) {
// 同时删除脏数据
deleteKeyList.remove(i);
dirtyDataLineList.remove(i);
break;
}
}
// 会签数据处理需要在循环外处理,否则可能导致溢出
// 会签的数据肯定是之前放进去的所以理论上不会溢出,但还是校验下
if (multiOpera && deleteKeyList.size() > multiIndex && "".equals(deleteKeyList.get(multiIndex))) {
// 同时删除脏数据
deleteKeyList.remove(multiIndex);
dirtyDataLineList.remove(multiIndex);
multiIndex = -1;
multiOpera = false;
}
// pop() 方法与 peek() 方法不同,在返回值的同时,会把值从栈中移除
// 保存新的 userTaskKey 在下个循环中使用
userTaskKey = new StringBuilder(stack.pop().getTaskDefinitionKey());
}
logger.info("清洗后的历史节点数据:" + lastHistoricTaskInstanceList);
return lastHistoricTaskInstanceList;
}
}