您的位置:首页 > 编程语言 > Java开发

Spring事务管理(5)-开启事务

2016-07-21 22:37 591 查看
在前几篇文章中,我们分析了Spring的AOP实现,AOP的实现原理即JdkDynamicAop或Cglib。目标方法执行的时候,进入invoke方法中先执行Advisors链。Spring的事务开启需要在目标方法执行前进行,因此可以作为一个前置增强添加到Advisors链中。

Spring的声明式事务配置如下:

<bean id="transactionManager"
class="org.springframework.orm.hibernate3.HibernateTransactionManager">
<property name="sessionFactory" ref="sessionFactory"></property>
</bean>

<tx:advice id="txAdvice" transaction-manager="transactionManager">
<tx:attributes>
<tx:method name="add*" propagation="REQUIRED" rollback-for="Exception" />
<tx:method name="modify*" propagation="REQUIRED" rollback-for="Exception" />
<tx:method name="del*" propagation="REQUIRED" rollback-for="Exception" />
<tx:method name="save*" propagation="REQUIRED" rollback-for="Exception" />
</tx:attributes>
</tx:advice>

<aop:config>
<aop:pointcut id="daoMethod" expression="execution(* com.hikvision.modules.*.service.*.*(..))" />
<aop:advisor pointcut-ref="daoMethod" advice-ref="txAdvice" />
</aop:config>


通过观察< aop : config>的配置,可以预测Spring的事务管理是一个adivsor,通过pointcut-ref属性定义了切点信息,通过advice-ref则定义了advice,同时将事务管理器添加到advice中进行事务管理。具体的advisor类无需在配置文件中定义,因为spring已经内置了特定的类TransactionInterceptor。观察图1 TransactionInterceptor的类层次结构,发现其实现了MethodInteceptor,这是Advisor的基础接口。



图1 TransactionInterceptor的类层次结构

上面的配置方式是Spring定义的规范,在解析配置文件时根据定义的标签便可以生成对应的advisor。现在用常见的配置bean的方法去翻译上面的配置,会更加透彻地理解Spring的事务配置。

<!-- 定义一个TransactionInteceptor的bean -->
<bean id="advisor" class="org.springframework.transaction.interceptor.TransactionInteceptor">
<property name="pointcut" ref="daoMethod" />
<property name="advice" ref="txAdvice" />
</bean>

<!-- 定义切点对象 -->
<bean id="daoMethod" class="org.springframework.aop.aspectj.AspectJExpressionPointcut">
<property name="expression" ref="execution(* com.hikvision.modules.*.service.*.*(..))" />
</bean>

<!-- 定义advice对象 -->
<bean id="txAdvice" class="org.springframework.aop.aspectj.AbstractAspectJAdvice">
<property name="transactionManager" ref="transactionManager" />
</bean>

<!-- 定义事务管理对象 -->
<bean id="transactionManager"
class="org.springframework.orm.hibernate3.HibernateTransactionManager">
<property name="sessionFactory" ref="sessionFactory"></property>
</bean>


通过翻译Spring的声明式配置,我们也可以定义自己的Advisor去管理程序中的事务,需要实现的是AOP和事务管理器。

TransactionInteceptor作为事务管理的Advisor,因此在分析Spring的事务管理时,可以从该类的invoke函数进行分析。在invoke方法中,执行invokeWithinTransaction进行事务处理,其中第三个参数是一个回调函数,在开启事务之后,进行回调执行目标方法,执行完成中回到invokeWithinTransaction方法中关闭事务。

public Object invoke(final MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

return invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() {
public Object proceedWithInvocation() throws Throwable {
return invocation.proceed();
}
});
}


接着进入invokeWithinTransaction方法。由于invokeWithinTransaction方法比较长,在只讨论最常用的事务管理的时候,为了流程上更加清晰,去掉一些分支代码。

protected Object invokeWithinTransaction(Method method, Class targetClass, final InvocationCallback invocation)
throws Throwable {

// 获得事务属性,即配置的propagation等信息
final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
// 获得事务管理器
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
final String joinpointIdentification = methodIdentification(method, targetClass);

if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
// 创建事务信息
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
try {
// 执行目标方法的回调
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// 在抛出异常时在txInfo中进行标记
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
// 最后清除事务信息
cleanupTransactionInfo(txInfo);
}
// 提交或回滚事务
commitTransactionAfterReturning(txInfo);
return retVal;
}

else {
//CallbackPreferringPlatformTransactionManager事务管理器的逻辑
}
}


对于以上代码的事务管理过程,可以用以下伪代码进行抽象:

//获得事务
try {
//业务逻辑
} catch {
//回滚
} finally {
//清楚状态
}
//提交事务


通过以上的分析,事务管理的整体流程已经非常清晰了。接下来讲详细讲解事务的开启过程。

事务的开启是在createTransactionIfNecessary方法中进行的。

protected TransactionInfo createTransactionIfNecessary(
PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) {

// 如果事务属性不存在,则创建一个代理事务属性
// 如果事务属性的名称不存在,则用方法的全限定名替代
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}

TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
// 开启事务,并返回事务状态对象
status = tm.getTransaction(txAttr);
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
"] because no transaction manager has been configured");
}
}
}

// 包装事务状态
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}


在上文代码中,status = tm.getTransaction(txAttr)语句开启事务,并返回事务状态。其中tm则是我们定义的事务管理器。可以看到,开启事务的执行最终交由事务管理器去执行。而事务管理器的注入方式则实现不同种类事务的配置管理,比如数据库事务、消息队列事务、Jta事务等,只需应用实现了事务的规范,那么理论上都可以用spring去集成管理。

在Spring事务管理(1)-初探中,有提到Spring自带的事务管理器,此处进行复习一下。



图2 Spring的事务管理器

AbstractPlatformTransactionManager最为事务管理器的基类,实现了事务管理器的基本方法。而上文提到的tm.getTransaction(txAttr)正是AbstractPlatformTransactionManager的方法。

public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
// 获得事务对象
Object transaction = doGetTransaction();

// 此处省略对definition是否为空的判断

// 如果已经存在事务对象,则进行处理
if (isExistingTransaction(transaction)) {
return handleExistingTransaction(definition, transaction, debugEnabled);
}

// 此处省略事务属性中的超时属性是否有效,即值是否为负数
// 如果为负数,则抛出异常

// 如果事务的传播性定义为PROPAGATION_MANDATORY,则抛出异常
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
// 如果事务的传播性定义为PROPAGATION_REQUIRED 、PROPAGATION_REQUIRES_NEW或PROPAGATION_NESTED,这三种传播性都需要新建一个事务
else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
// 挂起已经存在的资源,比如session等
SuspendedResourcesHolder suspendedResources = suspend(null);
// 此处省略debug日志
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 创建事务状态
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
// 开启事务
doBegin(transaction, definition);
// 设置事务状态
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException ex) {
resume(null, suspendedResources);
throw ex;
}
catch (Error err) {
resume(null, suspendedResources);
throw err;
}
}
else {
// 其他,则创建空事务
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
}
}


在以上代码中,根据不同的事务传播特性进行了分支。此处简单复习一下事务的传播特性,包括以下几种:

PROPAGATION_REQUIRED: 如果存在一个事务,则支持当前事务。如果没有事务则开启

PROPAGATION_SUPPORTS: 如果存在一个事务,支持当前事务。如果没有事务,则非事务的执行

PROPAGATION_MANDATORY: 如果已经存在一个事务,支持当前事务。如果没有一个活动的事务,则抛出异常。

PROPAGATION_REQUIRES_NEW: 总是开启一个新的事务。如果一个事务已经存在,则将这个存在的事务挂起。

PROPAGATION_NOT_SUPPORTED: 总是非事务地执行,并挂起任何存在的事务。

PROPAGATION_NEVER:总是非事务地执行,如果存在一个活动事务,则抛出异常。

PROPAGATION_NESTED:如果一个活动的事务存在,则运行在一个嵌套的事务中. 如果没有活动事务。

在事务的传播特性中,PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW和PROPAGATION_NESTED需要一个事务。进入getTransaction方法的该分支,其流程包括挂起已经存在的资源(因为要新建一个事务,所有要挂起旧的资源)、创建事务状态、开启事务、设置事务状态以及在发生异常时需要恢复状态。

1.挂起已经存在的资源

suspend方法执行挂起操作。

protected final SuspendedResourcesHolder suspend(Object transaction) throws TransactionException {
//如果存在事务协同
if (TransactionSynchronizationManager.isSynchronizationActive()) {
// 挂起TransactionSynchronization的资源
List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
try {
// 挂起当前事务的资源
Object suspendedResources = null;
if (transaction != null) {
suspendedResources = doSuspend(transaction);
}

// 此处省略获得当前活跃状态事务的属性

// 备份被挂起的资源,并返回
return new SuspendedResourcesHolder(
suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
}
catch (RuntimeException ex) {
// 异常恢复             doResumeSynchronization(suspendedSynchronizations);
throw ex;
}
catch() {
// 此处省略若干异常恢复
}
}
else if (transaction != null) {
// Transaction active but no synchronization active.
Object suspendedResources = doSuspend(transaction);
return new SuspendedResourcesHolder(suspendedResources);
}
else {
return null;
}
}


doSuspendSynchronization方法将逐个挂起当前线程中的TransactionSynchronization

private List<TransactionSynchronization> doSuspendSynchronization() {
List<TransactionSynchronization> suspendedSynchronizations =
TransactionSynchronizationManager.getSynchronizations();
for (TransactionSynchronization synchronization : suspendedSynchronizations) {
// 挂起TransactionSynchronization
synchronization.suspend();
}
TransactionSynchronizationManager.clearSynchronization();
return suspendedSynchronizations;
}


TransactionSynchronization的类层次结构如图3所示。



图3 TransactionSynchronization的类层次结构

观察ConnectionSynchronization的suspend实现:

@Override
public void suspend() {
if (this.holderActive) {
//解绑当前线程中的数据源
TransactionSynchronizationManager.unbindResource(this.dataSource);
// 如果存在连接,且处于打开状态
if (this.connectionHolder.hasConnection() && !this.connectionHolder.isOpen()) {
// 当挂起的时候如果没有句柄连接到该connection,将释放该连接
// 当resume的时候 会重开打开一个连接参与到原来的事务中
releaseConnection(this.connectionHolder.getConnection(), this.dataSource);
this.connectionHolder.setConnection(null);
}
}
}


挂起当前事务的方法如下,也是一个解绑sessionFactory和dataSource的过程,并备份返回SuspendedResourcesHolder对象。

protected Object doSuspend(Object transaction) {
HibernateTransactionObject txObject = (HibernateTransactionObject) transaction;
txObject.setSessionHolder(null);
SessionHolder sessionHolder =
(SessionHolder) TransactionSynchronizationManager.unbindResource(getSessionFactory());
txObject.setConnectionHolder(null);
ConnectionHolder connectionHolder = null;
if (getDataSource() != null) {
connectionHolder = (ConnectionHolder) TransactionSynchronizationManager.unbindResource(getDataSource());
}
return new SuspendedResourcesHolder(sessionHolder, connectionHolder);
}


当挂起资源失败的时候,需要恢复异常。异常处理是保证程序稳定运行的关键。即恢复所有的TransactionSynchronization并绑定到当前线程中。

private void doResumeSynchronization(List<TransactionSynchronization> suspendedSynchronizations) {
TransactionSynchronizationManager.initSynchronization();
for (TransactionSynchronization synchronization : suspendedSynchronizations) {
synchronization.resume();
TransactionSynchronizationManager.registerSynchronization(synchronization);
}
}


2.创建事务状态

创建事务状态的语句即efaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);进入该方法中,发现其主要负责传递当前的事务状态,类似于j2ee开发过程中的dto类。

protected DefaultTransactionStatus newTransactionStatus(
TransactionDefinition definition, Object transaction, boolean newTransaction,
boolean newSynchronization, boolean debug, Object suspendedResources) {

boolean actualNewSynchronization = newSynchronization &&
!TransactionSynchronizationManager.isSynchronizationActive();
// 新建DefaultTransactionStatus对象,负责传递状态
return new DefaultTransactionStatus(
transaction, newTransaction, actualNewSynchronization,
definition.isReadOnly(), debug, suspendedResources);
}


3.开启事务

doBegin方法有许多实现,此处观察HibernateTransactionManager的实现。

protected void doBegin(Object transaction, TransactionDefinition definition) {
HibernateTransactionObject txObject = (HibernateTransactionObject) transaction;

// 如果已经存在连接,但不支持同步
if (txObject.hasConnectionHolder() && !txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
// 抛出异常,一次只能用一个事务管理器
throw new IllegalTransactionStateException(
"Pre-bound JDBC Connection found! HibernateTransactionManager does not support " +
"running within DataSourceTransactionManager if told to manage the DataSource itself. " +
"It is recommended to use a single HibernateTransactionManager for all transactions " +
"on a single DataSource, no matter whether Hibernate or JDBC access.");
}

Session session = null;

try {
// 如果连接为null或允许同步事务
if (txObject.getSessionHolder() == null || txObject.getSessionHolder().isSynchronizedWithTransaction()) {
Interceptor entityInterceptor = getEntityInterceptor();
// 创建session
Session newSession = (entityInterceptor != null ?
getSessionFactory().openSession(entityInterceptor) : getSessionFactory().openSession());
if (logger.isDebugEnabled()) {
logger.debug("Opened new Session [" + SessionFactoryUtils.toString(newSession) +
"] for Hibernate transaction");
}
txObject.setSession(newSession);
}

session = txObject.getSessionHolder().getSession();

// 如果需要准备连接
if (this.prepareConnection && isSameConnectionForEntireSession(session)) {
// We're allowed to change the transaction settings of the JDBC Connection.
if (logger.isDebugEnabled()) {
logger.debug(
"Preparing JDBC Connection of Hibernate Session [" + SessionFactoryUtils.toString(session) + "]");
}
Connection con = session.connection();
// 设置连接属性,是否只读和事务隔离级别
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
}
else {
//此处判断是否定义隔离级别,如果没有定义,则抛出异常
}

// 如果事务属性是只读,并且新建的session
if (definition.isReadOnly() && txObject.isNewSession()) {
// 设置为手动刷新
session.setFlushMode(FlushMode.MANUAL);
}

// 如果不是只读,并且不是新建的session
if (!definition.isReadOnly() && !txObject.isNewSession()) {
// 自动刷新
FlushMode flushMode = session.getFlushMode();
if (flushMode.lessThan(FlushMode.COMMIT)) {
session.setFlushMode(FlushMode.AUTO);
txObject.getSessionHolder().setPreviousFlushMode(flushMode);
}
}

Transaction hibTx;

int timeout = determineTimeout(definition);
//如果定义事务超时
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
// 获得并开启事务
hibTx = session.getTransaction();
hibTx.setTimeout(timeout);
hibTx.begin();
}
else {
// Open a plain Hibernate transaction without specified timeout.
hibTx = session.beginTransaction();
}

// 保存事务
txObject.getSessionHolder().setTransaction(hibTx);

// 注册连接
if (getDataSource() != null) {
Connection con = session.connection();
ConnectionHolder conHolder = new ConnectionHolder(con);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
conHolder.setTimeoutInSeconds(timeout);
}
if (logger.isDebugEnabled()) {
logger.debug("Exposing Hibernate transaction as JDBC transaction [" + con + "]");
}
// 将当前连接绑定到线程
TransactionSynchronizationManager.bindResource(getDataSource(), conHolder);
txObject.setConnectionHolder(conHolder);
}

// Bind the session holder to the thread.
if (txObject.isNewSessionHolder()) {
TransactionSynchronizationManager.bindResource(getSessionFactory(), txObject.getSessionHolder());
}
txObject.getSessionHolder().setSynchronizedWithTransaction(true);
}

catch (Throwable ex) {
// 异常处理
// 如果事务处于活跃状态,则回滚
// 关闭session
}
}


上述代码的主要功能是打开session,并更加事务属性定义设置session的只读属性、隔离级别,然后通过session开启事务,设置事务的超时属性,并利用session打开连接,设置连接的超时属性。最后将session、connection、datasource绑定到当前线程中。

4.设置事务状态

prepareSynchronization语句设置当前的事务属性到TransactionSynchronizationManager中。

protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
if (status.isNewSynchronization()) {
TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
(definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) ?
definition.getIsolationLevel() : null);
TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
TransactionSynchronizationManager.initSynchronization();
}
}


5.异常恢复

在发生异常的时候,resume函数将根据原来挂起的信息恢复TransactionSynchronizationManager的状态。

至此,getTransaction完成了开启事务,并包装生成事务状态对象,回到

invokeWithinTransaction的createTransactionIfNecessary方法中,会再次封装TransactionStatus为TransactionInfo并返回。

TransactionInfo包含了事务管理器、事务属性、事务名称以及事务状态。

protected TransactionInfo prepareTransactionInfo(PlatformTransactionManager tm,
TransactionAttribute txAttr, String joinpointIdentification, TransactionStatus status) {

TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
if (txAttr != null) {
// We need a transaction for this method
if (logger.isTraceEnabled()) {
logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
// The transaction manager will flag an error if an incompatible tx already exists
txInfo.newTransactionStatus(status);
}
else {
// The TransactionInfo.hasTransaction() method will return
// false. We created it only to preserve the integrity of
// the ThreadLocal stack maintained in this class.
if (logger.isTraceEnabled())
logger.trace("Don't need to create transaction for [" + joinpointIdentification +
"]: This method isn't transactional.");
}

// We always bind the TransactionInfo to the thread, even if we didn't create
// a new transaction here. This guarantees that the TransactionInfo stack
// will be managed correctly even if no transaction was created by this aspect.
txInfo.bindToThread();
return txInfo;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spring 事务