您的位置:首页 > 编程语言 > Java开发

难道你还不知道Spring之事务的回滚和提交的原理吗,这篇文章带你走进源码级别的解读。

2021-12-02 20:12 162 查看

上一篇文章讲解了获取事务,并通过获取的connection设置只读,隔离级别等;这篇文章讲事务剩下的回滚和提交。

事务的回滚处理

之前已经完成了目标方法运行前的事务准备工作。而这些准备工作的最大目的无非就是对于程序没有按照我们期待的那样进行,也就是出现特定的错误;那么当出现错误的时候Spring是怎么对数据进行恢复的呢?我们先来看一下

TransactionAspectSupport
类里的
invokeWithinTransaction
函数的
completeTransactionAfterThrowing
方法(其它的已经在上一篇文章讲解过,这里不再赘述):

  • 看源码(
    TransactionAspectSupport.java
    )
protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
// 当抛出异常时,先判断当前是否存在事务,这是基础依据
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
"] after exception: " + ex);
}
// 这里判断是否回滚默认的 依据是抛出的异常是RunTimeException 或者是 Erro 类型r
if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
try {
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
}
catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by rollback exception", ex);
ex2.initApplicationException(ex);
throw ex2;
}
catch (RuntimeException | Error ex2) {
logger.error("Application exception overridden by rollback exception", ex);
throw ex2;
}
} else {
// We don't roll back on this exception.
// Will still roll back if TransactionStatus.isRollbackOnly() is true.
// 如果不满足回滚条件,即使抛出异常也正常提交
try {
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by commit exception", ex);
ex2.initApplicationException(ex);
throw ex2;
}
catch (RuntimeException | Error ex2) {
logger.error("Application exception overridden by commit exception", ex);
throw ex2;
}
}
}
}
  • 源码分析

在对目标方法的执行过程中,一旦出现

Throwable
就会被引导至此方法进行处理,但是不意味着所有的
Throwable
都会被回滚处理;比如我么最常用的Exception,默认是不会被处理的,默认情况下,即使出现异常,数据也会被正常提交,而这个关键的地方就在于
txInfo.transactionAttribute.rollbackOn(ex)
这个函数:

回滚条件
  • 看源码(
    DefaultTransactionAttribute.java
    )
@Override
public Boolean rollbackOn(Throwable ex) {
return (ex instanceof RuntimeException || ex instanceof Error);
}
  • 源码分析

从上述代码中可以看到,默认情况下:Spring只会对

RuntimeException
Error
两种类型的情况进行处理;但是我们可以利用注解方式来改变。例如:

@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
回滚处理

当然,一旦符合回滚条件,那么Spring就会将程序引导至回滚处理函数中。接下来我们看一下回滚函数,也就是

txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
代码的rollback方法:

  • 看源码(
    AbstractPlatformTransactionManager.java
    )
@Override
public final void rollback(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
processRollback(defStatus, false);
}

看一下这个方法里面的

processRollback
函数:

  • 看源码(
    AbstractPlatformTransactionManager.java
    )
private void processRollback(DefaultTransactionStatus status, Boolean unexpected) {
try {
Boolean unexpectedRollback = unexpected;
try {
triggerBeforeCompletion(status);
// 如果status 有 savePoint , 说明此事务 是 PROPAGATION_NESTED 且为子事务,只能回滚到savePoint
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Rolling back transaction to savepoint");
}
// 回滚到保存点
status.rollbackToHeldSavepoint();
}
// 如果此时的status显示的是新的事务,才进行回滚 else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction rollback");
}
// 如果此时是子事务,我们在这里想想哪些类型的事务会进到这里
// 回滚之前说的 已经存在的事务的处理,
// PROPAGATION_NOT_SUPPORTED 创建的事务:
//      return prepareTransactionStatus(definition, null, false, newSynchronization, debugEnabled, suspendedResources);
//   说明是旧事务,并且事务为null 不会进入
// PROPAGATION_REQUIRES_NEW 创建一个新的子事务,
//         newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
//         说明是新事务 ,会进入这个分支
// PROPAGATION_NESTED 创建的Status是  prepareTransactionStatus(definition, transaction, false...)是旧事物,
//         使用的是外层的事务,不会进入
// PROPAGATION_SUPPORTS 或 PROPAGATION_REQUIRED或PROPAGATION_MANDATORY存在事务加入事务即可,
//         标记为旧事务,prepareTransactionStatus(definition, transaction, false..)
// 说明当子事务,只有REQUIRES_NEW会进入到这里进行回滚
doRollback(status);
} else {
// Participating in larger transaction
// 如果status中有事务,进入下面
// 根据上面分析,PROPAGATION_SUPPORTS 或 PROPAGATION_REQUIRED或PROPAGATION_MANDATORY
//         创建的Status是prepareTransactionStatus(definition, transaction, false..)
//         如果此事务时子事务,表示存在事务,并且事务为旧事物,将进入到这里
if (status.hasTransaction()) {
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
if (status.isDebug()) {
logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
}
// 对status中的transaction作一个回滚了的标记,并不会立即回滚
doSetRollbackOnly(status);
} else {
if (status.isDebug()) {
logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
}
}
} else {
logger.debug("Should roll back transaction but cannot - no transaction available");
}
// Unexpected rollback only matters here if we're asked to fail early
if (!isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = false;
}
}
}
catch (RuntimeException | Error ex) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
}
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
// Raise UnexpectedRollbackException if we had a global rollback-only marker
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction rolled back because it has been marked as rollback-only");
}
}
finally {
// 清空记录的资源并将挂起的资源恢复
// 子事务结束了,之前挂起的事务就要恢复了
cleanupAfterCompletion(status);
}
}

我们先看看

processRollback
函数里面的
status.rollbackToHeldSavepoint();
这段代码,回滚到保存点的代码,根据保存点回滚的实现方式其实是根据底层的数据库连接进行的。回滚到保存点之后,也要释放掉当前的保存点。看具体实现:

  • 看源码(
    AbstractTransactionStatus.java
    )
public void rollbackToHeldSavepoint() throws TransactionException {
Object savepoint = getSavepoint();
if (savepoint == null) {
throw new TransactionUsageException(
"Cannot roll back to savepoint - no savepoint associated with current transaction");
}
getSavepointManager().rollbackToSavepoint(savepoint);
getSavepointManager().releaseSavepoint(savepoint);
setSavepoint(null);
}
  • 源码分析

这里使用的是JDBC的方式进行的数据库连接,那么

getSavepointManager()
函数返回的是JdbcTransactionObjectSupport,也就是说上面函数会调用JdbcTransactionObjectSupport 中的 rollbackToSavepoint 方法。

接下来查看一下JdbcTransactionObjectSupport 中的 rollbackToSavepoint 方法。

  • 看源码(
    JdbcTransactionObjectSupport.java
    )
@Override
public void rollbackToSavepoint(Object savepoint) throws TransactionException {
ConnectionHolder conHolder = getConnectionHolderForSavepoint();
try {
conHolder.getConnection().rollback((Savepoint) savepoint);
conHolder.resetRollbackOnly();
}
catch (Throwable ex) {
throw new TransactionSystemException("Could not roll back to JDBC savepoint", ex);
}
}

当之前已经保存的事务信息中的事务为新事务,那么直接回滚。常用于单独事务的处理。对于没有保存点的回滚,Spring同样是使用底层数据库连接提供的API来操作的。由于我们使用的是

DataSourceTransactionManager
,所以
AbstractPlatformTransactionManager
里的
processRollback
函数里的
doRollback(status);
也就是在DataSourceTransactionManager实现的。

  • 看源码(
    DataSourceTransactionManager.java
    )
@Override
protected void doRollback(DefaultTransactionStatus status) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
if (status.isDebug()) {
logger.debug("Rolling back JDBC transaction on Connection [" + con + "]");
}
try {
con.rollback();
}
catch (SQLException ex) {
throw translateException("JDBC rollback", ex);
}
}

当前事务信息中表明是存在事务的,又不属于以上两种情况,只做回滚标识,等到提交的时候再判断是否又回滚标识,下面回滚的时候再介绍,子事务中状态为PROPAGATION_SUPPORTS 或PROPAGATION_REQUIRED或PROPAGATION_MANDATORY回滚的时候将会标记为回滚标识,我们先来看看是怎么标记的。回到

AbstractPlatformTransactionManager
类的
processRollback
函数的
doSetRollbackOnly(status);

  • 看源码(
    DataSourceTransactionManager.java
    )
@Override
protected void doSetRollbackOnly(DefaultTransactionStatus status) {
// 将status中的transaction取出
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
if (status.isDebug()) {
logger.debug("Setting JDBC transaction [" + txObject.getConnectionHolder().getConnection() +
"] rollback-only");
}
// transaction执行标记回滚
txObject.setRollbackOnly();
}

继续查看一下

setRollbackOnly()
函数:

  • 看源码(
    DataSourceTransactionManager.java
    )
public void setRollbackOnly() {
// 这里将transaction里面的connHolder标记回滚
getConnectionHolder().setRollbackOnly();
}

继续查看setRollbackOnly()的具体实现

  • 看源码(
    ResourceHolderSupport.java
    )
public void setRollbackOnly() {
// 将holder中的这个属性设置成true
this.rollbackOnly = true;
}

我们看到将status中的Transaction中的ConnectionHolder的属性rollbackOnly属性设置为true,这里暂时不多考虑,等到下面提交的时候再介绍。

简单小结

简单总结一下

AbstractPlatformTransactionManager
类的
processRollback
函数

  • status.hasSavepoint() ** 如果status中有savePoint,只回滚到savePoint!**
  • status.isNewTransaction() 如果status是一个新事务,才会真正去回滚!
  • status.hasTransaction() ** 如果status有事务,将会对staus中的事务标记!**

事务的提交

在事务的执行没有出现任何的异常,也就意味着事务可以走正常事务的提交流程,这里回到流程中,看看

TransactionAspectSupport
类中的
commitTransactionAfterReturning(txInfo);
函数具体做了什么

  • 看源码(
    TransactionAspectSupport.java
    )
protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}
  • 源码分析

在真正的数据提交之前,还需要做一个判断,不知道大家还有没有印象,在我们分析事务异常处理规则的时候,当某个事务既没保存点,又不是新事务,Spring对它的处理方式只是设置一个回滚标识(具体是在AbstractPlatformTransactionManager的

processRollback函数里面
)。这个标识在这里就会派上用场了,如果子事务状态是**PROPAGATION_SUPPORTS **、PROPAGATION_REQUIREDPROPAGATION_MANDATORY,将会在外层事务中运行,回滚的时候,并不执行回滚,只是标记一下回滚的状态,当外层事务提交的时候,会先判断ConnectionHolder中的回滚状态,如果已经标记为回滚,则不会提交,而是外层事务进行回滚。(查看一下txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());)的commit方法:

  • 看源码(
    AbstractPlatformTransactionManager.java
    )
@Override
public final void commit(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
// 如果在事务链中已经被标记回滚,那么不会尝试提交事务,直接回滚
if (defStatus.isLocalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Transactional code has requested rollback");
}
processRollback(defStatus, false);
return;
}
if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
}
// 这里会进行回滚,并且抛出一个异常
processRollback(defStatus, true);
return;
}
// 如果没有被标记回滚之类的,这里才真正判断是否提交
processCommit(defStatus);
}

当事务执行一切都正常的时候,就可以真正的进入到提交流程了。

  • 看源码(
    AbstractPlatformTransactionManager.java
    )
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
Boolean beforeCompletionInvoked = false;
try {
Boolean unexpectedRollback = false;
prepareForCommit(status);
triggerBeforeCommit(status);
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
// 判断是否有savePoint
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Releasing transaction savepoint");
}
unexpectedRollback = status.isGlobalRollbackOnly();
// 不提交,仅仅是释放savePoint
status.releaseHeldSavepoint();
}
// 判断是否是新事务 else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction commit");
}
unexpectedRollback = status.isGlobalRollbackOnly();
// 这里才真正去提交!
doCommit(status);
} else if (isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = status.isGlobalRollbackOnly();
}
// Throw UnexpectedRollbackException if we have a global rollback-only
// marker but still didn't get a corresponding exception from commit.
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction silently rolled back because it has been marked as rollback-only");
}
}
catch (UnexpectedRollbackException ex) {
// can only be caused by doCommit
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
throw ex;
}
catch (TransactionException ex) {
// can only be caused by doCommit
if (isRollbackOnCommitFailure()) {
doRollbackOnCommitException(status, ex);
} else {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
}
throw ex;
}
catch (RuntimeException | Error ex) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
doRollbackOnCommitException(status, ex);
throw ex;
}
// Trigger afterCommit callbacks, with an exception thrown there
// propagated to callers but the transaction still considered as committed.
try {
triggerAfterCommit(status);
}
finally {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
}
finally {
// 清空记录的资源并将挂起的资源恢复
cleanupAfterCompletion(status);
}
}
  • 源码分析
  1. status.hasSavepoint() 如果status又savePoint,说明此时的事务是嵌套事务NESTED,这个事务外面还有事务,这里不提交,只是释放保存点。这里也可以看出来NESTED的传播行为了
  2. status.isNewTransaction() 如果是新事务才会提交!这里如果是子事务,只有PROPAGATION_NESTED状态才会走到这里提交,也说明了此状态子事务提交和外层事务是隔离的。
  3. 如果是子事务PROPAGATION_SUPPORTSPROPAGATION_REQUIREDPROPAGATION_MANDATORY这几种状态是旧事务,提交的时候将什么都不做,因为它们是运行在外层事务当中,如果子事务没有回滚,将由外层事务一次性提交

如果程序流通过了事务的层层把关,最后顺利的进入了提交流程,那么同样,Spring会将事务提交的操作引导至底层数据库连接的API,进行事务提交。

接下来看一下具体实现也就是

AbstractPlatformTransactionManager
processCommit函数里的
doCommit
方法

  • 看源码(
    DataSourceTransactionManager.java
    )
@Override
protected void doCommit(DefaultTransactionStatus status) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
if (status.isDebug()) {
logger.debug("Committing JDBC transaction on Connection [" + con + "]");
}
try {
con.commit();
}
catch (SQLException ex) {
throw translateException("JDBC commit", ex);
}
}

从回滚和提交的逻辑看,只有status是新事务,才会进行提交或回滚,需要读者记好这个状态->是否是新事务

事务的清理工作

关于清理的工作我们继续回到

AbstractPlatformTransactionManager
processCommit
函数,在这个函数里面可以看到的是无论是在异常还是没有异常的流程中,最后的finally代码块中的都会执行这个
cleanupAfterCompletion(status);
方法

  • 看源码(
    AbstractPlatformTransactionManager.java
    )
private void cleanupAfterCompletion(DefaultTransactionStatus status) {
// 设置完成状态
status.setCompleted();
if (status.isNewSynchronization()) {
TransactionSynchronizationManager.clear();
}
// 如果是新事务
if (status.isNewTransaction()) {
doCleanupAfterCompletion(status.getTransaction());
}
if (status.getSuspendedResources() != null) {
if (status.isDebug()) {
logger.debug("Resuming suspended transaction after completion of inner transaction");
}
Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
// 结束之前事务的挂起状态
resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
}
}

那么如果是新事务呢,它会做哪些清除资源的操作呢?接下看一下cleanupAfterCompletion函数里的

doCleanupAfterCompletion
函数的具体实现:

  • 看源码(
    DataSourceTransactionManager.java
    )
@Override
protected void doCleanupAfterCompletion(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
// Remove the connection holder from the thread, if exposed.
if (txObject.isNewConnectionHolder()) {
// 将数据库连接从当前线程中解除绑定,解绑过程我们在挂起的过程中已经分析过
TransactionSynchronizationManager.unbindResource(obtainDataSource());
}
// Reset connection.
// 释放连接,当前事务完成,则需要将连接释放,如果有线程池,则重置数据库连接,放回线程池
Connection con = txObject.getConnectionHolder().getConnection();
try {
if (txObject.isMustRestoreAutoCommit()) {
// 恢复数据库连接的自动提交属性
con.setAutoCommit(true);
}
// 重置数据库连接
DataSourceUtils.resetConnectionAfterTransaction(
con, txObject.getPreviousIsolationLevel(), txObject.isReadOnly());
}
catch (Throwable ex) {
logger.debug("Could not reset JDBC Connection after transaction", ex);
}
if (txObject.isNewConnectionHolder()) {
if (logger.isDebugEnabled()) {
logger.debug("Releasing JDBC Connection [" + con + "] after transaction");
}
// 如果当前事务是独立的新创建的事务则在事务完成时释放数据库连接
DataSourceUtils.releaseConnection(con, this.dataSource);
}
txObject.getConnectionHolder().clear();
}
  • 源码分析

综上cleanupAfterCompletiondoCleanupAfterCompletion这两个方法我们可以知道的是如果在事务执行前有事务挂起,那么当前事务执行结束后需要将挂起的事务恢复。

如果有挂起的事务的话,

status.getSuspendedResources()!=null
为真,也就是说status中会有suspendedResource这个属性,取得status中的transaction后计入
resume方法

  • 看源码(
    AbstractPlatformTransactionManager.java
    )
protected final void resume(@Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder)
throws TransactionException {
if (resourcesHolder != null) {
Object suspendedResources = resourcesHolder.suspendedResources;
// 如果有被挂起的事务才会被进入
if (suspendedResources != null) {
// 真正去resume恢复的地方
doResume(transaction, suspendedResources);
}
List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
if (suspendedSynchronizations != null) {
// 将上面提到的 TransactionSynchronizationManager 专门存放线程变量的类中的属性设置成被挂起事务的属性
TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
doResumeSynchronization(suspendedSynchronizations);
}
}
}

紧接着我们再去看一下真正去resume恢复的方法

doResume
函数

  • 看源码(
    DataSourceTransactionManager.java
    )
@Override
protected void doResume(@Nullable Object transaction, Object suspendedResources) {
TransactionSynchronizationManager.bindResource(obtainDataSource(), suspendedResources);
}
  • 源码分析

我们可以看到 这里恢复只是把suspendedResources重新绑定到线程中。


总结

到这里之后,我们就把事务的回滚和提交就讲完了。有兴趣的童鞋可以自己再深入的了解一下。

喜欢的可以点赞关注支持一下,微信搜索【码上遇见你】及时获取更多编程知识。我们共同进步。

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: