上两篇文章主要分析activiti数据库相关初始化、sessionFactory初始化及创建实体管理类的原理。实体管理类封装了activiti底层增删查改操作。上层命令类程序不需要直接去调用mybatis的sqlSession,通过实体管理类间接调用。本文通过例子分析实体管理类是如何调用mybaits的。
我们以DeploymentEntityManager为例:
public void insertDeployment(DeploymentEntity deployment) {
getDbSqlSession().insert(deployment);
for (ResourceEntity resource : deployment.getResources().values()) {
resource.setDeploymentId(deployment.getId());
getResourceManager().insertResource(resource);
}
}
上面是当部署流程文档时,调用deploymentBuilder.deploy()部署,最终调用DeploymentEntityManager的insertDeployment方法插入流程文档到ACT_RE_DEPLOYMENT表。第2行getDbSqlSession()获取DbSqlSession的实例,然后调用insert方法把DeploymentEntity插入到ACT_RE_DEPLOYMENT表。4-7行则DeploymentEntity对应的资源插入到ACT_GE_BYTEARRAY中。
接下来看看DbSqlSession.java的insert方法:
public void insert(PersistentObject persistentObject) {
if (persistentObject.getId()==null) {
String id = dbSqlSessionFactory.getIdGenerator().getNextId();
persistentObject.setId(id);
}
Class<? extends PersistentObject> clazz = persistentObject.getClass();
if (!insertedObjects.containsKey(clazz)) {
insertedObjects.put(clazz, new ArrayList<PersistentObject>());
}
insertedObjects.get(clazz).add(persistentObject);
cachePut(persistentObject, false);
}
protected CachedObject cachePut(PersistentObject persistentObject, boolean storeState) {
Map<String, CachedObject> classCache = cachedObjects.get(persistentObject.getClass());
if (classCache==null) {
classCache = new HashMap<String, CachedObject>();
cachedObjects.put(persistentObject.getClass(), classCache);
}
CachedObject cachedObject = new CachedObject(persistentObject, storeState);
classCache.put(persistentObject.getId(), cachedObject);
return cachedObject;
}
所有实体类XXXEntity都实现PersistentObject接口。第2行如果该对象没有设置id的话,第3-4行则通过id生成器生成其id并设置之。第7-12行把待插入对象添加到insertedObjects中。insertedObjects是一个以插入对象class为key,以ArrayList为value的HashMap对象。插入操作根据不同的插入对象的类型,添加到insertedObjects不同的队列中。此时新增对象并未插入到数据库中,还保留在队列里。cachePut方法会把准备插入的内容缓存到cachedObjects中。若在把新增对象刷新到数据库前发生修改、删除、查询等操作时,则操作cachedObjects中的数据即可。
数据已经被保存到缓存里了,那什么时候刷新到数据库呢?还记得《activiti学习(八)——自定义拦截器和命令类》一文中提到拦截器调用链,如下图所示:
刷新的操作在命令执行结束后,回调到上下文拦截器的时候触发,我们查看CommandContextInterceptor的execute方法:
public <T> T execute(CommandConfig config, Command<T> command) {
CommandContext context = Context.getCommandContext();
boolean contextReused = false;
if (!config.isContextReusePossible() || context == null || context.getException() != null) {
context = commandContextFactory.createCommandContext(command);
}
else {
log.debug("Valid context found. Reusing it for the current command '{}'", command.getClass().getCanonicalName());
contextReused = true;
}
try {
Context.setCommandContext(context);
Context.setProcessEngineConfiguration(processEngineConfiguration);
return next.execute(config, command);
} catch (Exception e) {
context.exception(e);
} finally {
try {
if (!contextReused) {
context.close();
}
} finally {
Context.removeCommandContext();
Context.removeProcessEngineConfiguration();
Context.removeBpmnOverrideContext();
}
}
return null;
}
15行执行调用链,待命令类的execute方法调用完毕后,如果设置了上下文不重用,则执行21行context.close方法,把缓存刷到数据库的调用就在这里面。接着我们看CommandContext的close方法:
public void close() {
try {
try {
try {
//......省略
if (exception == null) {
flushSessions();
}
}
//......省略
} catch (Throwable exception) {
exception(exception);
} finally {
closeSessions();
}
} catch (Throwable exception) {
exception(exception);
}
//.....省略
}
protected void flushSessions() {
for (Session session : sessions.values()) {
session.flush();
}
}
第9行调用flushSessions刷新会话,29行调用DbSqlSession的flush方法。下面看DbSqlSession的flush方法是如何刷新数据的:
public void flush() {
List<DeleteOperation> removedOperations = removeUnnecessaryOperations();
flushDeserializedObjects();
List<PersistentObject> updatedObjects = getUpdatedObjects();
if (log.isDebugEnabled()) {
Collection<List<PersistentObject>> insertedObjectLists = insertedObjects.values();
int nrOfInserts = 0, nrOfUpdates = 0, nrOfDeletes = 0;
for (List<PersistentObject> insertedObjectList: insertedObjectLists) {
for (PersistentObject insertedObject : insertedObjectList) {
log.debug(" insert {}", insertedObject);
nrOfInserts++;
}
}
for (PersistentObject updatedObject: updatedObjects) {
log.debug(" update {}", updatedObject);
nrOfUpdates++;
}
for (DeleteOperation deleteOperation: deleteOperations) {
log.debug(" {}", deleteOperation);
nrOfDeletes++;
}
log.debug("flush summary: {} insert, {} update, {} delete.", nrOfInserts, nrOfUpdates, nrOfDeletes);
log.debug("now executing flush...");
}
flushInserts();
flushUpdates(updatedObjects);
flushDeletes(removedOperations);
}
第2行removeUnnecessaryOperations移除不必要的操作,具体的意思是这样:命令类的操作把添加的记录放在insertedObjects缓存中,在deleteOperations中添加了需要删除的操作,removeUnnecessaryOperations函数匹配在insertedObjects中是否存在deleteOperations需要删除的数据,如果有,则把这个对象从insertedObjects中剔除,因为没必要先进行插入,然后又做删除这样无意义的操作。详细源码可以读者可自行分析。第3行flushDeserializedObjects刷新序列化变量,把流程中实现了Serializable接口的变量刷新到数据库中。第4行获取更新对象。27行刷新插入对象,28行刷新更新对象,29行刷新删除操作。我们暂时只看插入操作,跟踪flushInserts:
protected void flushInserts() {
for (Class<? extends PersistentObject> persistentObjectClass : EntityDependencyOrder.INSERT_ORDER) {
if (insertedObjects.containsKey(persistentObjectClass)) {
flushPersistentObjects(persistentObjectClass, insertedObjects.get(persistentObjectClass));
insertedObjects.remove(persistentObjectClass);
}
}
if (insertedObjects.size() > 0) {
for (Class<? extends PersistentObject> persistentObjectClass : insertedObjects.keySet()) {
flushPersistentObjects(persistentObjectClass, insertedObjects.get(persistentObjectClass));
}
}
insertedObjects.clear();
}
protected void flushPersistentObjects(Class<? extends PersistentObject> persistentObjectClass, List<PersistentObject> persistentObjectsToInsert) {
if (persistentObjectsToInsert.size() == 1) {
flushRegularInsert(persistentObjectsToInsert.get(0), persistentObjectClass);
} else if (Boolean.FALSE.equals(dbSqlSessionFactory.isBulkInsertable(persistentObjectClass))) {
for (PersistentObject persistentObject : persistentObjectsToInsert) {
flushRegularInsert(persistentObject, persistentObjectClass);
}
} else {
flushBulkInsert(insertedObjects.get(persistentObjectClass), persistentObjectClass);
}
}
protected void flushRegularInsert(PersistentObject persistentObject, Class<? extends PersistentObject> clazz) {
String insertStatement = dbSqlSessionFactory.getInsertStatement(persistentObject);
insertStatement = dbSqlSessionFactory.mapStatement(insertStatement);
if (insertStatement==null) {
throw new ActivitiException("no insert statement for " + persistentObject.getClass() + " in the ibatis mapping files");
}
log.debug("inserting: {}", persistentObject);
sqlSession.insert(insertStatement, persistentObject);
if (persistentObject instanceof HasRevision) {
((HasRevision) persistentObject).setRevision(((HasRevision) persistentObject).getRevisionNext());
}
}
protected void flushBulkInsert(List<PersistentObject> persistentObjectList, Class<? extends PersistentObject> clazz) {
String insertStatement = dbSqlSessionFactory.getBulkInsertStatement(clazz);
insertStatement = dbSqlSessionFactory.mapStatement(insertStatement);
if (insertStatement==null) {
throw new ActivitiException("no insert statement for " + persistentObjectList.get(0).getClass() + " in the ibatis mapping files");
}
if (persistentObjectList.size() <= dbSqlSessionFactory.getMaxNrOfStatementsInBulkInsert()) {
sqlSession.insert(insertStatement, persistentObjectList);
} else {
for (int start = 0; start < persistentObjectList.size(); start += dbSqlSessionFactory.getMaxNrOfStatementsInBulkInsert()) {
List<PersistentObject> subList = persistentObjectList.subList(start,
Math.min(start + dbSqlSessionFactory.getMaxNrOfStatementsInBulkInsert(), persistentObjectList.size()));
sqlSession.insert(insertStatement, subList);
}
}
if (persistentObjectList.get(0) instanceof HasRevision) {
for (PersistentObject insertedObject: persistentObjectList) {
((HasRevision) insertedObject).setRevision(((HasRevision) insertedObject).getRevisionNext());
}
}
}
2-5行根据EntityDependencyOrder初始化时的顺序逐个实体类型调用flushPersistentObjects插入数据库。activiti定义了插入和删除的先后顺序,解决依赖的问题。8-12行插入用户自定义的实体类型。17行判断被插入的实体数量,如果为1个,则使用常规插入,19行判断改类型是否可批量插入,不可的话20-22行仍然是常规插入,否则走24行批量插入。28-42行是常规插入。29行获取插入语句,其实这个不是指sql语句,是指mybatis的xml文件里sql语句对应的id。30行针对特殊语句,不同数据库要替换成不同的语句,这体现了activiti如何屏蔽数据库差异。37行调用mybatis的sqlSession的insert方法进行插入。39行判断如果实体类实现了HasRevision接口,则设置其revision值。revision用于乐观锁。activiti使用乐观锁对数据版本进行控制。各位可以结合映射文件里的语句研究。我们看看dbSqlSessionFactory.getInsertStatement函数具体怎么获取语句,DbSqlSessionFactory.java:
public String getInsertStatement(PersistentObject object) {
return getStatement(object.getClass(), insertStatements, "insert");
}
private String getStatement(Class<?> persistentObjectClass, Map<Class<?>,String> cachedStatements, String prefix) {
String statement = cachedStatements.get(persistentObjectClass);
if (statement!=null) {
return statement;
}
statement = prefix + persistentObjectClass.getSimpleName();
statement = statement.substring(0, statement.length()-6); // removing 'entity'
cachedStatements.put(persistentObjectClass, statement);
return statement;
}
第2行调用getStatement函数。第6-10行实质是查看insertStatements缓存中是否已保存该实体类型对应的插入语句,如果有则直接返回。11-12行先为实体类加上前缀,再去掉实体类的“Entity”。以本文一开始通过Deployment举例,这里插入的是DeploymentEntity,首先在前面添加“insert”前缀,再去掉“Entity”,最后statment为“insertDeployment”。结合mybatis的映射,我们查找org/activiti/db/mapping/entity/Deployment.xml:
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.activiti.engine.impl.persistence.entity.DeploymentEntity">
<!-- DEPLOYMENT INSERT -->
<insert id="insertDeployment" parameterType="org.activiti.engine.impl.persistence.entity.DeploymentEntity">
insert into ${prefix}ACT_RE_DEPLOYMENT(ID_, NAME_, CATEGORY_, TENANT_ID_, DEPLOY_TIME_)
values(#{id, jdbcType=VARCHAR}, #{name, jdbcType=VARCHAR}, #{category, jdbcType=VARCHAR}, #{tenantId, jdbcType=VARCHAR}, #{deploymentTime, jdbcType=TIMESTAMP})
</insert>
<!-- 省略 -->
</mapper>
假设这里的插入语句activiti不需要做特殊处理,所以最后执行的是insertDeployment对应的sql语句。
到此为止本文已经分析了deployment插入的大致过程。剩余批量插入、更新、删除等有兴趣各位可自行研究。