您的位置:首页 > 编程语言 > Java开发

Spring + Atomikos 分布式事务实现方式

2015-11-13 16:33 603 查看
前段时间发现对分布式事务了解的不够清晰,最近又重新看了一下分布式事务,简单做个记录,以后方便查看

Java规范对分布式事务定义了标准的规范Java事务API和Java事务服务,分别是JTA和JTS

一个分布式事务必须包括一个事务管理器和多个资源管理器,

资源管理器是任意类型的持久化数据存储,而事务管理器则是承担着所有事务参与单元者的相互通讯的责任

JTA的规范制定了分布式事务的实现的整套流程框架,定义了各个接口且只有接口,而实现分别交给事务管理器的实现方和资源管理器的实现方

对于资源管理器而言,主要包括数据库连接,JMS等,还有很多了解的不清楚

对于事务管理器而言,从网上了解主要是应用服务器,包括JBOSS,WEBLOGIC等应用服务器,也就是说事务管理器的实现方是应用服务器,用来管理事务的通讯和协调

对于大多数谈的数据库了解,事务管理器需要从数据库获得XAConnection , XAResource等对象,而这些对象是数据库驱动程序需要提供的

所以如果要实现分布式事务还必须有支持分布式事务的数据库服务器以及数据库驱动程序

对Mysql而言,在mysql5.0以上的版本已经支持了分布式事务,另外常用的mysql-connector-java-5.1.25-bin.jar也是支持分布式事务的

可以在jar包的com.mysql.jdbc.jdbc2.optional中找到XA对象的实现

上面介绍了事务管理器和资源管理器的实现方式,在学习研究过程中发现对于事务管理器,特别强调了tomcat等服务器是不支持的,这句话的意思应该是在tomcat容器内

并没有分布式事务管理器的实现对象。而在JBOSS或者WEBLOGIC等商业服务器应该内置了分布式事务管理器的实现对象,应用程序可以通过JNDI方式获取UserTransaction

和TransactionManager等分布式事务环境中所需要用到的对象



事务管理器作为管理和协调分布式事务的关键处理中心非常重要,所以应用服务器可以单独只用过事务管理器。

上图具体文章链接为http://blog.csdn.net/xiaol_zhong/article/details/7983863

上面主要是一些基本的概念,在学习研究中总结出来的,可能不太全面,下面主要介绍一下在使用Spring使用分布式事务中的心得,这种做法也是将事务管理器嵌入应用中。

开始准备Spring的时候,网上介绍了Jotm以及Atomikos等工具,实际上这些工具都是取代应用服务器对事务管理器的支持,负责实现事务管理器对象

Jotm需要使用特定数据库连接池enhydra,而且网上说因为维护时间久远,问题不少,所以直接使用Atomikos进行测试

在Maven上下载了Atomikos的3.9.0版本的相关需要jar包。主要包括

atomikos-util-3.9.0.jar transactions-3.9.0.jar transactions-api-3.9.0.jar transactions-jdbc-3.9.0.jar transactions-jta-3.9.0.jar jta-1.1.jar

下面看一下主要的配置文件的配置方式:

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd"> <bean id="dataSource0" class="com.mchange.v2.c3p0.ComboPooledDataSource">
<property name="driverClass" value="com.mysql.jdbc.Driver" />
<property name="jdbcUrl" value="jdbc:mysql://172.17.2.5:3003/jta" />
<property name="user" value="root" />
<property name="password" value="ems" />
<property name="autoCommitOnClose" value="true" />
</bean>

<bean id="dataSource1" class="com.atomikos.jdbc.AtomikosDataSourceBean"
init-method="init" destroy-method="close">
<property name="uniqueResourceName" value="ds1" />
<property name="xaDataSourceClassName"
value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource" />
<property name="xaProperties">
<props>
<prop key="url">jdbc:mysql://172.17.2.5:3003/jta</prop>
<prop key="user">root</prop>
<prop key="password">ems</prop>
</props>
</property>
<property name="minPoolSize" value="10" />
<property name="maxPoolSize" value="100" />
<property name="borrowConnectionTimeout" value="30" />
<property name="testQuery" value="select 1" />
<property name="maintenanceInterval" value="60" />
</bean>

<bean id="dataSource2" class="com.atomikos.jdbc.AtomikosDataSourceBean"
init-method="init" destroy-method="close">
<property name="uniqueResourceName" value="ds2" />
<property name="xaDataSourceClassName"
value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource" />
<property name="xaProperties">
<props>
<prop key="url">jdbc:mysql://172.17.2.5:3306/jta</prop>
<prop key="user">root</prop>
<prop key="password">ems</prop>
</props>
</property>
<property name="minPoolSize" value="10" />
<property name="maxPoolSize" value="100" />
<property name="borrowConnectionTimeout" value="30" />
<property name="testQuery" value="select 1" />
<property name="maintenanceInterval" value="60" />
</bean>

<bean id="dataSource3" class="com.mchange.v2.c3p0.ComboPooledDataSource">
<property name="driverClass" value="com.mysql.jdbc.Driver" />
<property name="jdbcUrl" value="jdbc:mysql://172.17.2.5:3306/jta" />
<property name="user" value="root" />
<property name="password" value="ems" />
<property name="autoCommitOnClose" value="true" />
</bean>

<!--SqlMap setup for MyBatis Database Layer -->
<bean id="sqlSessionFactoryForD1" class="org.mybatis.spring.SqlSessionFactoryBean">
<property name="dataSource" ref="dataSource1" />
<property name="mapperLocations" value="classpath:jtaatomikos/jta.xml" />
</bean>
<bean id="sqlSessionTemplateForD1" class="org.mybatis.spring.SqlSessionTemplate">
<constructor-arg index="0" ref="sqlSessionFactoryForD1" />
</bean>

<bean id="sqlSessionFactoryForD2" class="org.mybatis.spring.SqlSessionFactoryBean">
<property name="dataSource" ref="dataSource2" />
<property name="mapperLocations" value="classpath:jtaatomikos/jta.xml" />
</bean>
<bean id="sqlSessionTemplateForD2" class="org.mybatis.spring.SqlSessionTemplate">
<constructor-arg index="0" ref="sqlSessionFactoryForD2" />
</bean>

<!-- Config JTA UserTransactionManager Impl -->
<bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager"
init-method="init" destroy-method="close">
<property name="forceShutdown">
<value>true</value>
</property>
</bean>

<!-- Config JTA UserTransaction Impl -->
<bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
<property name="transactionTimeout">
<value>300</value>
</property>
</bean>

<!-- Spring JtaTransactionManager Config -->
<bean id="springJTATransactionManager"
class="org.springframework.transaction.jta.JtaTransactionManager">
<property name="transactionManager">
<ref bean="atomikosTransactionManager" />
</property>
<property name="userTransaction">
<ref bean="atomikosUserTransaction" />
</property>
</bean>

<bean id="transactionManager"
class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource1" />
</bean>

<!-- Aop Config -->
<aop:config>
<aop:pointcut id="jtaServiceOperation"
expression="execution(* jtaatomikos.*Service.*(..))"></aop:pointcut>
<aop:advisor pointcut-ref="jtaServiceOperation"
advice-ref="txAdvice"></aop:advisor>
</aop:config>

<!-- Transacation Advice Handle -->
<tx:advice id="txAdvice" transaction-manager="springJTATransactionManager">
<tx:attributes>
<tx:method name="update*" rollback-for="Exception" />
</tx:attributes>
</tx:advice>

<context:component-scan base-package="jtaatomikos"></context:component-scan>

</beans>


下面分别对每段配置进行部分记录

分别定义了dataSource0 dataSource1 dataSource2 dataSource3

其中0和3都是普通的C3P0数据库连接池,而1和2是AtomikosDataSourceBean

定义0和3的目的是在后面的测试中测试如果不是AtomikosDataSourceBean的连接是不能加入到分布式事务中的

接着定义了两个sqlSessionTemplate,分别对应3003和3306两个数据库。对应程序中的两个Dao

下面是定义分布式事务最重要的两个实现UserTransactionManager和UserTransactionImpl,均使用Atomikos中的实现

接着是Spring的AOP代理所使用的事务处理器springJTATransactionManager,这是Spring自带的JTA实现类,但是Spring只负责提供接口,真正内部实现分布式事务的上面定义

的两个对象,所以需要将上面定义的两个对象进行注入,所以Spring框架负责提供接口,Atomikos负责实现

另外再定义一个transactionManager是为了测试在传统的Spring事务方式下,为什么不能支持分布式事务

后面就是为了测试定义的AOP配置,不再多说

再来看看具体测试实现,只贴出核心测试方法

存在Service如下

public interface D1Service {

void updateAccount(Integer account);

}


实现如下:

public void updateAccount(Integer account) {
int userAId = 1;
int userBId = 2;
int userA_Account = d1Dao.getAccount(userAId);
int userB_Account = d2Dao.getAccount(userBId);
d1Dao.saveAccount(userAId, userA_Account + account);
d2Dao.saveAccount(userBId, userB_Account - account);
if(userB_Account - account < 0){
throw new AccountNotEnoughException();
}
}


分别在3003和3306数据库上新建相同的数据库表,简单测试需要字段userId和account,初始化定义数据为1 10000 ; 2 10000;

代表用户1和2分别账户有10000。

测试程序如下:

<pre name="code" class="java">public static void main(String[] args) {
ApplicationContext appContext = new ClassPathXmlApplicationContext("jtaatomikos/application-jta-atomikos.xml");
D1Service service = (D1Service) appContext.getBean("d1Service");
service.updateAccount(1000);
service.updateAccount(9100);
}



很明显在执行转账1000的时候,是没有问题的,在第二部执行转账9100的时候,由于userB_Account-account< 0 成立所以会有异常抛出

此时就是测试分布式事务的关键

假设以下几种情况

在使用springJTATransactionManager的情况下

1 均使用正确的AtomikosDataSourceBean,此时两个事务均能正确回滚

2 如果分别使用AtomikosDataSourceBean和C3P0,则只有前者对应数据库会回滚,而后者则不会回滚,猜想这是因为 springJTATransactionManager在处理事务的时候, 内部的atomikosTransactionManager只会将AtomokosDataSourceBean加入到分布式事务中,而不考虑其他连接方式

3 如果均使用C3P0,根据上面的解释,很清楚的可以猜到两个数据库的数据均不会回滚,测试结果也符合该预期

再来谈谈分布式事务为什么需要使用springJTATransactionManager

Spring传统的事务管理均使用

org.springframework.jdbc.datasource.DataSourceTransactionManager,那这种事务管理器为什么不能支持分布式事务呢?

从配置中可以看出该对象需要注入dataSource属性,注意只能注入单一的dataSource,显然这是不符合分布式事务的必须使用多个数据库这一基础的,所以在使用传统的该事务管理器,只能选择一个数据连接进行事务管理,本身来说Spring的事务管理也是基于这点实现的,保证事务管理内的所有数据库操作均使用同一个Connection,例如Connection的begin和commit以及rollback控制事务。

当使用org.springframework.jdbc.datasource.DataSourceTransactionManager

测试结果如下:

第一个结论:无论使用DataSource0 - DataSource3 中的任何一个,均能正常回滚,也就是说该事务管理器不依赖DataSource的具体实现,不论是Atomikos的实现或者是其他的数据库连接实现,均能够被传统的事务管理器管理

第二个结论:因为该事务管理器只能配置单一的DataSource,所以只能保证配置的DataSource能被事务管理,其它的DataSource都不受事务控制,其原理也很显而易见,因为传统的事务管理器使用单一Connection进行事务管理,在分布式事务多个不同数据库的Connection条件下,显然这种实现方式不能成立。所以需要Atimikos提供实现了JTA规范标准的事务管理器

关于JTA的两段提交方案,网上也很多教程,后面自己也会进行一步步实践,后面会跟进进行记录
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: