Spring的声明式事务处理的即开即用特性为用户提供了很大的方便,在使用Spring时,我们绝大多数情况下还是使用其声明式事务处理。声明式事务处理涉及Spring框架对事务处理的统一管理,以及对并发事务和事务属性的处理,是一个比较复杂的过程,下面了解一下Spring框架声明式事务处理功能的具体实现。
一、事务的创建
前一篇文章讲到对Spring事务拦截器TransactionInterceptor回调方法invoke的源码分析中,我们了解到在进行事务处理前,首先根据是否是CallbackPreferringPlatformTransactionManager类型的事务处理器分别通过下面两个方法创建事务信息对象:
- TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
- TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
二、事务的挂起
如果当前线程存在事务,但事务传播特性又要求开启新事务时,需要将已有的事务进行挂起,事务的挂起涉及线程与事务信息的保存,实现源码如下:
- protected final SuspendedResourcesHolder suspend(Object transaction) throws TransactionException {
- //如果事务是激活的,且当前线程事务同步机制也是激活状态
- if (TransactionSynchronizationManager.isSynchronizationActive()) {
- //挂起当前线程中所有同步的事务
- List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
- try {
- Object suspendedResources = null;
- //把挂起事务的操作交由具体的事务处理器处理
- if (transaction != null) {
- suspendedResources = doSuspend(transaction);
- }
- //在线程中保存与事务处理有关的信息,并将线程里有关的线程局部变量重置
- String name = TransactionSynchronizationManager.getCurrentTransactionName();
- //重置当前线程中事务相关的线程局部变量
- TransactionSynchronizationManager.setCurrentTransactionName(null);
- boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
- TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
- Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
- TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
- boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
- TransactionSynchronizationManager.setActualTransactionActive(false);
- //将当前线程中事务相关信息保存
- return new SuspendedResourcesHolder(
- suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
- }
- //对事务挂起操作中产生异常和错误的处理
- catch (RuntimeException ex) {
- doResumeSynchronization(suspendedSynchronizations);
- throw ex;
- }
- catch (Error err) {
- doResumeSynchronization(suspendedSynchronizations);
- throw err;
- }
- }
- //如果事务是激活的,但是事务同步机制不是激活的,则只需要保存事务状态,不
- //需要重置事务相关的线程局部变量
- else if (transaction != null) {
- Object suspendedResources = doSuspend(transaction);
- return new SuspendedResourcesHolder(suspendedResources);
- }
- //事务和事务同步机制都不是激活的,则不要想处理
- else {
- return null;
- }
- }
三、事务的提交
当事务方法处理成功之后,需要将当前事务提交,将更改同步到数据库中,事务提交的实现源码如下:
- public final void commit(TransactionStatus status) throws TransactionException {
- //如果事务的执行状态已经结束,则抛出异常
- if (status.isCompleted()) {
- throw new IllegalTransactionStateException(
- "Transaction is already completed - do not call commit or rollback more than once per transaction");
- }
- DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
- //如果事务执行状态时回滚
- if (defStatus.isLocalRollbackOnly()) {
- if (defStatus.isDebug()) {
- logger.debug("Transactional code has requested rollback");
- }
- //处理事务回滚
- processRollback(defStatus);
- return;
- }
- //如果事务没有被标记为回滚时提交,且事务状态时全局回滚
- if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
- if (defStatus.isDebug()) {
- logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
- }
- //回滚处理
- processRollback(defStatus);
- //如果事务状态时新事务,或者在全局回滚时失败
- if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
- throw new UnexpectedRollbackException(
- "Transaction rolled back because it has been marked as rollback-only");
- }
- return;
- }
- //处理提交
- processCommit(defStatus);
- }
- //提交处理操作
- private void processCommit(DefaultTransactionStatus status) throws TransactionException {
- try {
- boolean beforeCompletionInvoked = false;
- try {
- //事务提交的准备工作,有具体的事务处理器完成
- prepareForCommit(status);
- triggerBeforeCommit(status);
- triggerBeforeCompletion(status);
- beforeCompletionInvoked = true;
- boolean globalRollbackOnly = false;
- //如果事务状态是新事务,或者全局回滚失败
- if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
- //设置事务全局回滚
- globalRollbackOnly = status.isGlobalRollbackOnly();
- }
- //嵌套事务处理
- if (status.hasSavepoint()) {
- if (status.isDebug()) {
- logger.debug("Releasing transaction savepoint");
- }
- //释放挂起事务保存点
- status.releaseHeldSavepoint();
- }
- //如果当前事务是新事务
- else if (status.isNewTransaction()) {
- if (status.isDebug()) {
- logger.debug("Initiating transaction commit");
- }
- //调用具体事务处理器提交事务
- doCommit(status);
- }
- //如果事务被标记为全局回滚
- if (globalRollbackOnly) {
- throw new UnexpectedRollbackException(
- "Transaction silently rolled back because it has been marked as rollback-only");
- }
- }
- //提交过程中产生未预期的回滚异常,则回滚处理
- catch (UnexpectedRollbackException ex) {
- triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
- throw ex;
- }
- //对提交过程中产生的事务异常处理
- catch (TransactionException ex) {
- //如果回滚失败,则进行回滚异常处理
- if (isRollbackOnCommitFailure()) {
- doRollbackOnCommitException(status, ex);
- }
- else {
- triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
- }
- throw ex;
- }
- //对提交过程中产生的异常处理
- catch (RuntimeException ex) {
- //如果不是在完成前调用的
- if (!beforeCompletionInvoked) {
- //触发完成前的回调方法
- triggerBeforeCompletion(status);
- }
- //进行回滚异常处理
- doRollbackOnCommitException(status, ex);
- throw ex;
- }
- //对提交过程中产生的错误处理
- catch (Error err) {
- if (!beforeCompletionInvoked) {
- triggerBeforeCompletion(status);
- }
- doRollbackOnCommitException(status, err);
- throw err;
- }
- //触发提交之后的回调操作
- try {
- triggerAfterCommit(status);
- }
- finally {
- triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
- }
- }
- //提交完成之后清除事务相关状态
- finally {
- cleanupAfterCompletion(status);
- }
- }
四、事务的回滚
当在事务处理过程中产生异常,或者提交失败时,往往需要对数据库中已有的更改做回滚操作,即恢复到操作之前的状态,回滚的实现代码如下:
- public final void rollback(TransactionStatus status) throws TransactionException {
- //如果事务状态已完成,则抛出异常
- if (status.isCompleted()) {
- throw new IllegalTransactionStateException(
- "Transaction is already completed - do not call commit or rollback more than once per transaction");
- }
- DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
- //处理回滚的操作
- processRollback(defStatus);
- }
- //回滚操作
- private void processRollback(DefaultTransactionStatus status) {
- try {
- try {
- //触发完成前的回调操作
- triggerBeforeCompletion(status);
- //嵌套事务回滚处理
- if (status.hasSavepoint()) {
- if (status.isDebug()) {
- logger.debug("Rolling back transaction to savepoint");
- }
- //回滚挂起在保存点的事务
- status.rollbackToHeldSavepoint();
- }
- //当前事务中创建新事务的回滚操作
- else if (status.isNewTransaction()) {
- if (status.isDebug()) {
- logger.debug("Initiating transaction rollback");
- }
- //回滚处理,由具体的事务处理器实现
- doRollback(status);
- }
- //如果在当前事务中没有新建事务
- else if (status.hasTransaction()) {
- //如果当前事务状态为本地回滚,或全局回滚失败
- if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
- if (status.isDebug()) {
- logger.debug(
- "Participating transaction failed - marking existing transaction as rollback-only");
- }
- //设置当前事务状态为回滚
- doSetRollbackOnly(status);
- }
- //当前事务状态没有设置为本地回滚,且没有产生全局回滚失败,则
- //由线程中的前一个事务来处理回滚,这个步骤任何处理
- else {
- if (status.isDebug()) {
- logger.debug(
- "Participating transaction failed - letting transaction originator decide on rollback");
- }
- }
- }
- //如果当前线程没有事务
- else {
- logger.debug("Should roll back transaction but cannot - no transaction available");
- }
- }
- //对回滚操作过程中的运行时异常和错误的处理
- catch (RuntimeException ex) {
- triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
- throw ex;
- }
- catch (Error err) {
- triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
- throw err;
- }
- //回滚操作完成后,触发回滚之后回调操作
- triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
- }
- //清除回滚之后事务状态信息
- finally {
- cleanupAfterCompletion(status);
- }
- }