目录
AbstractPlatformTransactionManager
TransactionSynchronizationManager
TransactionSynchronizationUtils
在使用Spring事务时,会使用@Transactional注解。@Transactional指定了PlatformTransactionManager。
public @interface Transactional {
@AliasFor("transactionManager")
String value() default "";
/*PlatformTransactionManager
*/
@AliasFor("value")
String transactionManager() default "";
Propagation propagation() default Propagation.REQUIRED;
/*TransactionAttribute
*/
Isolation isolation() default Isolation.DEFAULT;
int timeout() default TransactionDefinition.TIMEOUT_DEFAULT;
boolean readOnly() default false;
Class<? extends Throwable>[] rollbackFor() default {};
String[] rollbackForClassName() default {};
Class<? extends Throwable>[] noRollbackFor() default {};
String[] noRollbackForClassName() default {};
}
PlatformTransactionManager
PlatformTransactionManager接口定义了事务的相关的3个方法,包括获取事务,提交和回滚。
public interface PlatformTransactionManager extends TransactionManager {
TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException;
void commit(TransactionStatus status) throws TransactionException;
void rollback(TransactionStatus status) throws TransactionException;
}
上图为PlatformTransactionManager接口的实现类,其中以DataSourceTransactionManager最为常用。
TransactionStatus
TransactionStatus用于控制事务的状态。
public interface TransactionStatus extends TransactionExecution, SavepointManager, Flushable {
boolean hasSavepoint();
@Override
void flush();
}
public interface TransactionExecution {
/**是否一个新事务。 */
boolean isNewTransaction();
void setRollbackOnly();
//事务一定会回滚,用来代替抛出异常。
boolean isRollbackOnly();
boolean isCompleted();
}
public interface SavepointManager {
Object createSavepoint() throws TransactionException;
void rollbackToSavepoint(Object savepoint) throws TransactionException;
void releaseSavepoint(Object savepoint) throws TransactionException;
}
DefaultTransactionStatus
@Nullable
private final Object transaction;
private final boolean newTransaction;
private final boolean newSynchronization;
private final boolean readOnly;
private final boolean debug;
@Nullable
private final Object suspendedResources;
AbstractPlatformTransactionManager
AbstractPlatformTransactionManager是PlatformTransactionManager抽象实现,主要处理以下几个工作:
- 处理事务传递性
- 处理挂起事务,恢复事务时,同步更新线程变量TransactionSynchronizationManager
- 处理注册的回调方法,就是同一种类,放到一个list,然后在特定时间取出来for循环调用响应方法
定义了整个的开启事务,处理事务的骨架。
属性
private int transactionSynchronization = SYNCHRONIZATION_ALWAYS;
private int defaultTimeout = TransactionDefinition.TIMEOUT_DEFAULT;
//是否允许嵌套事务
private boolean nestedTransactionAllowed = false;
//是否检验已存在事务
private boolean validateExistingTransaction = false;
//部分失败是否导致全局回滚。
private boolean globalRollbackOnParticipationFailure = true;
private boolean failEarlyOnGlobalRollbackOnly = false;
//提交失败是否回滚。
private boolean rollbackOnCommitFailure = false;
getTransaction
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException {
//如果不存在事务定义,则使用默认事务定义。
TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
//获取事务。由子类实现。
Object transaction = doGetTransaction();
boolean debugEnabled = logger.isDebugEnabled();
//判断事务已存在。
if (isExistingTransaction(transaction)) {
// 如果事务存在,根据事务传播特性处理事务。
return handleExistingTransaction(def, transaction, debugEnabled);
}
// 为新事务设置超时。
if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
}
// 如果不存在事务,并且传播特性为PROPAGATION_MANDATORY,则抛出异常。
if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
//挂起事务。
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
}
try {
//判断是否新的事务同步。
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
//新事务。
DefaultTransactionStatus status = newTransactionStatus(
def, transaction, true, newSynchronization, debugEnabled, suspendedResources);
//开始事务。
doBegin(transaction, def);
prepareSynchronization(status, def);
return status;
}
catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
}
else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + def);
}
//以非事务方式执行。
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
}
}
handleExistingTransaction
处理已存在事务的情况。
//处理已经存在事务的情况。
private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException {
//PROPAGATION_NEVER,如果当前存在事务,则抛出异常。
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
//PROPAGATION_NOT_SUPPORTED:以非事务方式执行操作,如果当前存在事务,就把当前事务挂起。
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
if (debugEnabled) {
logger.debug("Suspending current transaction");
}
//把当前事务挂起
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
//PROPAGATION_REQUIRES_NEW:一直新建事务,如果当前存在事务,把当前事务挂起。
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
if (debugEnabled) {
logger.debug("Suspending current transaction, creating new transaction with name [" +
definition.getName() + "]");
}
//把当前事务挂起
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
//内嵌事务开始执行。
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException | Error beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
}
//如果当前存在事务,则在嵌套事务内执行。
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
"specify 'nestedTransactionAllowed' property with value 'true'");
}
if (debugEnabled) {
logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
}
if (useSavepointForNestedTransaction()) {
// Create savepoint within existing Spring-managed transaction,
// through the SavepointManager API implemented by TransactionStatus.
// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
status.createAndHoldSavepoint();
return status;
}
else {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, null);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
}
// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
if (debugEnabled) {
logger.debug("Participating in existing transaction");
}
//检查已存在事务。
if (isValidateExistingTransaction()) {
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
Constants isoConstants = DefaultTransactionDefinition.constants;
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] specifies isolation level which is incompatible with existing transaction: " +
(currentIsolationLevel != null ?
isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
"(unknown)"));
}
}
if (!definition.isReadOnly()) {
if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] is not marked as read-only but existing transaction is");
}
}
}
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
if (status.isNewSynchronization()) {
TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
definition.getIsolationLevel() : null);
TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
TransactionSynchronizationManager.initSynchronization();
}
}
protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
//判断是否激活同步。
if (TransactionSynchronizationManager.isSynchronizationActive()) {
List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
try {
Object suspendedResources = null;
if (transaction != null) {
suspendedResources = doSuspend(transaction);
}
//以下都是复制状态。
String name = TransactionSynchronizationManager.getCurrentTransactionName();
TransactionSynchronizationManager.setCurrentTransactionName(null);
boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
TransactionSynchronizationManager.setActualTransactionActive(false);
return new SuspendedResourcesHolder(
suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
}
catch (RuntimeException | Error ex) {
// doSuspend failed - original transaction is still active...
doResumeSynchronization(suspendedSynchronizations);
throw ex;
}
}
else if (transaction != null) {
// Transaction active but no synchronization active.
Object suspendedResources = doSuspend(transaction);
return new SuspendedResourcesHolder(suspendedResources);
}
else {
// Neither transaction nor synchronization active.
return null;
}
}
需要由子类实现的方法
protected abstract Object doGetTransaction() throws TransactionException;
protected abstract void doBegin(Object transaction, TransactionDefinition definition) throws TransactionException;
protected Object doSuspend(Object transaction) throws TransactionException {
throw new TransactionSuspensionNotSupportedException(
"Transaction manager [" + getClass().getName() + "] does not support transaction suspension");
}
commit
@Override
public final void commit(TransactionStatus status) throws TransactionException {
//不可重复提交
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
// 如果在事务链中已经被标记回滚,那么不会尝试提交事务,直接回滚
if (defStatus.isLocalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Transactional code has requested rollback");
}
// 这里会进行回滚,不抛出一个异常
processRollback(defStatus, false);
return;
}
//设置了全局回滚。
if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
}
// 这里会进行回滚,并且抛出一个异常
processRollback(defStatus, true);
return;
}
//执行提交
processCommit(defStatus);
}
processCommit
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
boolean unexpectedRollback = false;
//准备提交
prepareForCommit(status);
//提交前
triggerBeforeCommit(status);
//
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
if (status.hasSavepoint()) {
//有安全点,说明此时的事务是嵌套事务NESTED,这个事务外面还有事务,这里不提交,只是释放保存点。
if (status.isDebug()) {
logger.debug("Releasing transaction savepoint");
}
unexpectedRollback = status.isGlobalRollbackOnly();
status.releaseHeldSavepoint();
}
else if (status.isNewTransaction()) {
// 判断是否是新事务。这里如果是子事务,只有PROPAGATION_NESTED状态才会走到这里提交,也说明了此状态子事务提交和外层事务是隔离的
if (status.isDebug()) {
logger.debug("Initiating transaction commit");
}
unexpectedRollback = status.isGlobalRollbackOnly();
// 这里才真正去提交!
doCommit(status);
}
else if (isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = status.isGlobalRollbackOnly();
}
// Throw UnexpectedRollbackException if we have a global rollback-only
// marker but still didn't get a corresponding exception from commit.
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction silently rolled back because it has been marked as rollback-only");
}
}
catch (UnexpectedRollbackException ex) {
// 提交失败处理。
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
throw ex;
}
catch (TransactionException ex) {
// can only be caused by doCommit
if (isRollbackOnCommitFailure()) {
doRollbackOnCommitException(status, ex);
}
else {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
}
throw ex;
}
catch (RuntimeException | Error ex) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
doRollbackOnCommitException(status, ex);
throw ex;
}
// Trigger afterCommit callbacks, with an exception thrown there
// propagated to callers but the transaction still considered as committed.
try {
//提交成功触发
triggerAfterCommit(status);
}
finally {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
}
finally {
cleanupAfterCompletion(status);
}
}
需要由子类实现的方法
protected abstract void doCommit(DefaultTransactionStatus status) throws TransactionException;
rollback
@Override
public final void rollback(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
processRollback(defStatus, false);
}
processRollback
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
try {
boolean unexpectedRollback = unexpected;
try {
//处理前触发器
triggerBeforeCompletion(status);
//如果有保存点,仅回滚到保存点
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Rolling back transaction to savepoint");
}
status.rollbackToHeldSavepoint();
}
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction rollback");
}
//新的事务,则执行新事务的回滚。
doRollback(status);
}
else {
// Participating in larger transaction
if (status.hasTransaction()) {
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
if (status.isDebug()) {
logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
}
doSetRollbackOnly(status);
}
else {
if (status.isDebug()) {
logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
}
}
}
else {
logger.debug("Should roll back transaction but cannot - no transaction available");
}
// Unexpected rollback only matters here if we're asked to fail early
if (!isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = false;
}
}
}
catch (RuntimeException | Error ex) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
}
//处理后触发器
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
// Raise UnexpectedRollbackException if we had a global rollback-only marker
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction rolled back because it has been marked as rollback-only");
}
}
finally {
//处理后清理。
cleanupAfterCompletion(status);
}
}
需要由子类实现的方法
protected abstract void doRollback(DefaultTransactionStatus status) throws TransactionException;
SuspendedResourcesHolder
挂起事务状态Holder,用于保存挂起事务信息,以便后面回复。
//挂起的资源
private final Object suspendedResources;
//事务同步器
private List<TransactionSynchronization> suspendedSynchronizations;
//事务名称
private String name;
//是否只读事务
private boolean readOnly;
//隔离级别
private Integer isolationLevel;
//是否激活
private boolean wasActive;
事务状态控制
TransactionSynchronization
TransactionSynchronization接口定义了一系列的回调方法,对应一个事务执行的不同阶段:挂起、恢复、flush、提交(前、后)、完成(事务成功或失败)等。当事务运行到对应阶段时,事务管理器会从TransactionSynchronizationManager维护的synchronizations中拿出所有的回调器,逐个回调其中的对应方法。
public interface TransactionSynchronization extends Flushable {
/** 正确提交时完成状态 */
int STATUS_COMMITTED = 0;
/** 回滚后完成状态 */
int STATUS_ROLLED_BACK = 1;
/** 未知状态:混合完成或系统错误 */
int STATUS_UNKNOWN = 2;
/**挂起同步器。从TransactionSynchronizationManager 解绑资源*/
default void suspend() {
}
/**继续同步器。重新为TransactionSynchronizationManager 绑定资源*/
default void resume() {
}
/**Flush指定sessioin到数据存储*/
@Override
default void flush() {
}
/**
* 事务提交前调用(before "beforeCompletion").*/
default void beforeCommit(boolean readOnly) {
}
/**
* 事务commit/rollback完成前调用.在beforeCommit之后,忽略beforeCommit是否抛出异常,
任何返回结果。不要在子类抛出TransactionException 异常。
*/
default void beforeCompletion() {
}
/**
* 事务提交后调用. 事务已经提交,但是事务资源可能是激活状态和可访问。
* 不要在子类抛出TransactionException 异常。
*/
default void afterCommit() {
}
/**
* 事务 提交后调用,可以清理资源了。
不要在子类抛出TransactionException 异常。
*/
default void afterCompletion(int status) {
}
}
public abstract class TransactionSynchronizationAdapter implements TransactionSynchronization, Ordered {
@Override
public int getOrder() {
return Ordered.LOWEST_PRECEDENCE;
}
@Override
public void suspend() {
}
@Override
public void resume() {
}
@Override
public void flush() {
}
@Override
public void beforeCommit(boolean readOnly) {
}
@Override
public void beforeCompletion() {
}
@Override
public void afterCommit() {
}
@Override
public void afterCompletion(int status) {
}
}
TransactionSynchronizationManager
TransactionSynchronizationManager用于管理TransactionSynchronization,由一系列的ThreadLocal对象构成。ThreadLocal类型属性与SuspendedResourcesHolder属性基本对应,都是事务状态信息。
public abstract class TransactionSynchronizationManager {
//线程上下文中保存着【线程池对象:ConnectionHolder】的Map对象。线程可以通过该属性获取到同一个Connection对象。
private static final ThreadLocal< Map<Object, Object> > resources = new NamedThreadLocal<>("Transactional resources");
//事务同步器,是Spring交由程序员进行扩展的代码,每个线程可以注册N个事务同步器。
private static final ThreadLocal< Set<TransactionSynchronization> > synchronizations = new NamedThreadLocal<>("Transaction synchronizations");
// 事务的名称
private static final ThreadLocal< String > currentTransactionName = new NamedThreadLocal<>("Current transaction name");
// 事务是否是只读
private static final ThreadLocal< Boolean > currentTransactionReadOnly = new NamedThreadLocal<>("Current transaction read-only status");
// 事务的隔离级别
private static final ThreadLocal< Integer > currentTransactionIsolationLevel = new NamedThreadLocal<>("Current transaction isolation level");
// 事务是否开启 actual:真实的
private static final ThreadLocal< Boolean > actualTransactionActive = new NamedThreadLocal<>("Actual transaction active");
}
TransactionSynchronizationUtils
TransactionSynchronizationUtils执行同步器操作,内部调用TransactionSynchronizationManager.getSynchronizations()获取所有同步器,然后循环调用对应操作。
public static void triggerBeforeCompletion() {
for (TransactionSynchronization synchronization : TransactionSynchronizationManager.getSynchronizations()) {
try {
synchronization.beforeCompletion();
}
catch (Throwable tsex) {
logger.error("TransactionSynchronization.beforeCompletion threw exception", tsex);
}
}
}