基于spring boot项目的多数据源配置与分布式事务处理总结
2016-09-10 10:53
746 查看
多数据源配置
项目存在10个数据源,如下core_biz 业务逻辑 数据库
core_sys 系统设置 数据库
fund_pool 资金池 数据库 分用户拆分了8个库
针对这10个数据源,分别进行创建
首先为业务逻辑数据库创建数据源,定义为Java Bean
@Configuration及
@Bean注解的使用不做赘述
@Configuration public class CoreBizDataSourceConfiguration { @Value("${core.biz.database.isEmbedded}") private Boolean databaseIsEmbedded; @Value("${core.biz.database.url}") private String databaseUrl; @Value("${core.biz.database.username}") private String databaseUsername; @Value("${core.biz.database.password}") private String databasePassword; @Bean(name = "coreBizDataSource") public DataSource coreBizDataSource() { if (databaseIsEmbedded) { return DataSourceUtil.getEmbeddedH2XADataSource( "core_biz", "classpath:db/core_biz_h2_init.sql"); } else { return DataSourceUtil.getAtomikosXADataSource("core_biz", databaseUrl, databaseUsername, databasePassword); } } }
细节说明
配置信息定义在
application.properties中
databaseIsEmbedded标识是否使用内置数据库
DataSourceUtil.getEmbeddedH2XADataSource获取支持分布式事务的内嵌数据源,参数
core_biz表示数据库名
DataSourceUtil.getAtomikosXADataSource获取支持分布式事务的MySql数据源
具体细节见后续小节
core_sys数据库对应数据源与core_biz类似
资金池数据源定义如下,共8个数据源
方案一:(如果你需要对该8个数据源共享使用事务,则方案一行不通) 通过实现
AbstractRoutingDataSource抽象类,实现DataSource路由,对外仅暴露一个DataSource Bean实例
@Component public class FundPoolRoutingDataSource extends AbstractRoutingDataSource { @Value("${fundPool.database.isEmbedded}") private Boolean databaseIsEmbedded; @Value("${fundPool.database.url.format}") private String databaseUrlFormat; @Value("${fundPool.database.username}") private String databaseUsername; @Value("${fundPool.database.password}") private String databasePassword; @PostConstruct private void init() { Map<Object, Object> dataSourceMap = new HashMap<>(); for (int i = 0; i < 8; i++) { dataSourceMap.put(i, createFundPoolDataSource(i)); } setTargetDataSources(dataSourceMap); } @Override protected Object determineCurrentLookupKey() { return FundPoolContextHolder.getPoolIndex(); } private DataSource createFundPoolDataSource(int index) { if (databaseIsEmbedded) { return DataSourceUtil.getEmbeddedH2XADataSource( String.format("FN_CHOGORI_POOL_00%d", index), "classpath:db/fund_pool_h2_init.sql"); } else { return DataSourceUtil.getAtomikosXADataSource(String.format("fund_pool_%d", index), String.format(databaseUrlFormat, index), databaseUsername, databasePassword); } } }
再次注意,由于将多个数据源抽象为一个数据源,则无法对内部多个数据源共享事务处理
细节说明
init方法创建8个数据源,构建
dataSourceMap,调用
setTargetDataSources()
FundPoolContextHolder中存储当前使用的到数据源索引号
重写
determineCurrentLookupKey()返回当前数据源索引
FundPoolContextHolder实现如下
public class FundPoolContextHolder { private static final ThreadLocal<Integer> CONTEXT_HOLDER = new ThreadLocal<>(); public static void setPoolIndex(Integer poolIndex) { Assert.isTrue(poolIndex >= 0 && poolIndex < 8); CONTEXT_HOLDER.set(poolIndex); } public static Integer getPoolIndex() { return CONTEXT_HOLDER.get(); } public static void clearPoolIndex() { CONTEXT_HOLDER.remove(); } }
参考 Dynamic DataSource Routing
方案二(推荐):(项目使用MyBatis实现数据访问,配置如下)
@Configuration public class FundPoolMybatisConfiguration { private Logger logger = LoggerFactory.getLogger(getClass()); @Autowired private FundPoolDataSourceGenerator fundPoolDataSourceGenerator; @Bean public FundPoolMapperContainer fundPoolMapperContainer() throws Exception { FundPoolMapperContainer container = new FundPoolMapperContainer(); for (int i = 0; i < FundPoolDefinition.FUND_POOL_COUNT; i++) { DataSource dataSource = fundPoolDataSourceGenerator.createFundPoolDataSource(i); SqlSessionFactory sqlSessionFactory = createSqlSessionFactory(dataSource); MapperFactoryBean<FundPoolMapper> mapperFactoryBean = getMapper(FundPoolMapper.class, sqlSessionFactory); mapperFactoryBean.afterPropertiesSet(); container.put(i, mapperFactoryBean.getObject()); } return container; } private SqlSessionFactory createSqlSessionFactory(DataSource dataSource) throws Exception { return createSqlSessionFactory(dataSource, Arrays.<Class>asList(FundPool.class)); } private SqlSessionFactory createSqlSessionFactory(DataSource dataSource, List<Class> types) throws Exception { SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean(); sqlSessionFactoryBean.setDataSource(dataSource); List<Class> allTypes = new ArrayList<>(); allTypes.addAll(Arrays.asList( DateTimeTypeHandler.class, EnumTypeHandler.class, EnumOrdinalTypeHandler.class)); allTypes.addAll(types); sqlSessionFactoryBean.setTypeAliases(allTypes.toArray(new Class[allTypes.size()])); sqlSessionFactoryBean.getObject().getConfiguration().setMapUnderscoreToCamelCase(true); return sqlSessionFactoryBean.getObject(); } private <T> MapperFactoryBean<T> getMapper(Class<T> mapperInterface, SqlSessionFactory sessionFactory) { MapperFactoryBean<T> mapperFactoryBean = new MapperFactoryBean<>(); try { mapperFactoryBean.setSqlSessionFactory(sessionFactory); mapperFactoryBean.setMapperInterface(mapperInterface); } catch (Exception ex) { logger.error("error when create mapper: ", ex); throw new RuntimeException(ex); } return mapperFactoryBean; } }
说明
FundPoolDataSourceGenerator用于创建基本DataSource
@Component public class FundPoolDataSourceGenerator { @Value(“${fundPool.database.isEmbedded}”) private Boolean databaseIsEmbedded;
@Value("${fundPool.database.url.format}") private String databaseUrlFormat; @Value("${fundPool.database.username}") private String databaseUsername; @Value("${fundPool.database.password}") private String databasePassword; public DataSource createFundPoolDataSource(int index) { if (databaseIsEmbedded) { return DataSourceUtil.getEmbeddedH2XADataSource( String.format("FN_CHOGORI_POOL_00%d", index), "classpath:db/fund_pool_h2_init.sql"); } else { return DataSourceUtil.getAtomikosXADataSource(String.format("fund_pool_%d", index), String.format(databaseUrlFormat, index), databaseUsername, databasePassword); } } }
FundPoolMapperContainer保管8个库的Mapper实例,其实现如下
public class FundPoolMapperContainer { private Map<Integer, FundPoolMapper> fundPoolMapperMap = new Hashtable<>(); public void put(int i, FundPoolMapper fundPoolMapper) { fundPoolMapperMap.put(i, fundPoolMapper); } public FundPoolMapper get(int i) { return fundPoolMapperMap.get(i); } }
将
FundPoolMapperContainer定义为Bean后,以如下方式使用
@Service public class FundPoolService { @Autowired private FundPoolMapperContainer fundPoolMapperContainer; private FundPoolMapper getFundPoolMapper(int poolIndex) { return fundPoolMapperContainer.get(poolIndex); } public int count(int poolIndex) { return getFundPoolMapper(poolIndex).count(); } public static int getPoolIndex(long uid) { return (int) (uid % 8); } public FundPool get(long uid) { return getFundPoolMapper(getPoolIndex(uid)).get(uid); } // ... }
JTA分布式事务
原理及实践教程
Configuring Spring and JTA without full Java EEJTA 深度历险 - 原理与实现
开源实现Atomikos
Why Use AtomikosInstalling TransactionsEssentials
依赖库添加
项目需添加如下依赖<dependency> <groupId>com.atomikos</groupId> <artifactId>transactions-jdbc</artifactId> <version>3.9.3</version> </dependency> <dependency> <groupId>javax.transaction</groupId> <artifactId>jta</artifactId> <version>1.1</version> </dependency>
创建JTA数据源
“多数据源配置”一节中对于mysql数据源的创建,使用工具方法DataSourceUtil.getAtomikosXADataSource
其实现如下
public static DataSource getAtomikosXADataSource( String uniqueResourceName, String databaseUrl, String userName, String password) { MysqlXADataSource mysqlXADataSource = new MysqlXADataSource(); mysqlXADataSource.setUrl(databaseUrl); mysqlXADataSource.setUser(userName); mysqlXADataSource.setPassword(password); AtomikosDataSourceBean atomikosDataSource = new AtomikosDataSourceBean(); atomikosDataSource.setUniqueResourceName(uniqueResourceName); atomikosDataSource.setXaDataSource(mysqlXADataSource); atomikosDataSource.setMinPoolSize(5); atomikosDataSource.setMaxPoolSize(20); atomikosDataSource.setTestQuery("SELECT 1"); return atomikosDataSource; }
说明: - 使用
MysqlXADataSource创建支持XA协议的数据源 -
AtomikosDataSourceBean实现连接池
- 如果之前使用
org.apache.tomcat.jdbc.pool.DataSource作为连接池,必须改为直接使用
MySqlXADataSource(使用tomcat.jdbc.pool.XADataSource
- 不同数据源,
uniqueResourceName需保证唯一
基于spring boot项目的JTA配置
@Configuration public class JtaTransactionConfiguration { @Autowired private AtomikosJtaConfiguration jtaConfiguration; @Bean(name = "financeCore") public PlatformTransactionManager platformTransactionManager() throws Throwable { return new JtaTransactionManager(jtaConfiguration.userTransaction(), jtaConfiguration.transactionManager()); } }
项目中存在多个
PlatformTransactionManagerBean实例,因此命名上加以区分,这里指定为
financeCore
@Configuration public class AtomikosJtaConfiguration { @Bean public UserTransaction userTransaction() throws Throwable { UserTransactionImp userTransactionImp = new UserTransactionImp(); userTransactionImp.setTransactionTimeout(1000); return userTransactionImp; } @Bean(initMethod = "init", destroyMethod = "close") public TransactionManager transactionManager() throws Throwable { UserTransactionManager userTransactionManager = new UserTransactionManager(); userTransactionManager.setForceShutdown(false); return userTransactionManager; } }
使用JTA事务
使用spring内置的@Transactional标识事务处理范围
@Service public class FundPoolService { @Transactional(value = "financeCore") public void increase(long uid, BigDecimal amount) { // ... } }
单测,确保分布式事务生效
确保分布式事务正确有效执行,对多数据源数据操作实施单测验证默认Embedded数据库不支持分布式事务,扩展方式见下一小节,也可本地搭建mysql服务进行验证
单测
实现纯用于单测的类型
JtaDemoService,配套
JtaDemoServiceTest实现如下:
public class JtaDemoServiceTest extends FinanceCoreTestBase { @Autowired private JtaDemoService jtaDemoService; @Test public void testRunAllCommit() { jtaDemoService.runAllCommit(); jtaDemoService.validateAllCommit(); } @Test public void testRunAllRollback() { try { jtaDemoService.runAllRollback(); } catch (RuntimeException ignore) { // ignore } jtaDemoService.validateAllRollback(); } }
验证commit逻辑
@Service public class JtaDemoService { // ... @Transactional(value = "financeCore") public void runAllCommit() { createTransferRecord(); // 操作BIZ数据库 // 操作各fundPool数据库 for (int i = 0; i < FundPoolDefinition.FUND_POOL_COUNT; i++) { createFundPool(i); } createFundPoolChangeRecord(); // 操作SYS数据库 } public void validateAllCommit() { assertEquals(1, transferService.count()); assertEquals(8, fundPoolService.count()); // 操作各fundPool数据库 for (int i = 0; i < FundPoolDefinition.FUND_POOL_COUNT; i++) { assertEquals(1, fundPoolService.count(i)); } assertEquals(1, fundPoolChangeRecordService.count()); } // ... }
验证callback逻辑
@Service public class JtaDemoService { // ... @Transactional(value = "financeCore") public void runAllRollback() { createTransferRecord(); for (int i = 0; i < FundPoolDefinition.FUND_POOL_COUNT; i++) { createFundPool(i); } createFundPoolChangeRecord(); throw new RuntimeException(); } public void validateAllRollback() { assertEquals(0, transferService.count()); assertEquals(0, fundPoolService.count()); assertEquals(0, fundPoolChangeRecordService.count()); } // ... }
Spring Embedded数据库分布式事务支持
创建内置H2 Database常规方式如下:public static DataSource getEmbeddedH2DataSource(String name, String... scripts) { EmbeddedDatabaseBuilder builder = new EmbeddedDatabaseBuilder().setType(EmbeddedDatabaseType.H2) .setName(name) .setScriptEncoding("utf8").addScript("classpath:db/h2_init.sql"); for (String script : scripts) { builder.addScript(script); } return builder.build(); }
EmbeddedDatabaseBuilder默认创建的DataSource并未实现XADataSource接口,因此不支持分布式事务
阅读EmbeddedDatabaseBuilder及相关代码,得出类结构(类结构图省略),要做的就是替换掉
SimpleDriverDataSource
这里替换成新的类型
H2DriverDataSourceFactory,该类实现如下:
private static class H2DriverDataSourceFactory implements DataSourceFactory { private final JdbcDataSource dataSource = new JdbcDataSource(); @Override public ConnectionProperties getConnectionProperties() { return new ConnectionProperties() { @Override public void setDriverClass(Class<? extends Driver> driverClass) { // dataSource.setDriverClass(driverClass); } @Override public void setUrl(String url) { dataSource.setUrl(url); } @Override public void setUsername(String username) { dataSource.setUser(username); } @Override public void setPassword(String password) { dataSource.setPassword(password); } }; } @Override public DataSource getDataSource() { return this.dataSource; } }
创建Embedded H2 Database 数据源时,使用如下工具方法
public static DataSource getEmbeddedH2XADataSource(String name, String... scripts) { H2DriverDataSourceFactory h2DriverDataSourceFactory = new H2DriverDataSourceFactory(); EmbeddedDatabaseBuilder builder = new EmbeddedDatabaseBuilder(); builder.setDataSourceFactory(h2DriverDataSourceFactory); builder.setType(EmbeddedDatabaseType.H2).setName(name) .setScriptEncoding("utf8").addScript("classpath:db/h2_init.sql"); for (String script : scripts) { builder.addScript(script); } builder.build(); AtomikosDataSourceBean atomikosDataSource = new AtomikosDataSourceBean(); atomikosDataSource.setUniqueResourceName(name); atomikosDataSource.setXaDataSource((XADataSource) h2DriverDataSourceFactory.getDataSource()); return atomikosDataSource; }
转自:http://hungryant.github.io/java/2015/11/26/java-spring-boot-jta.html
相关文章推荐
- 基于maven的spring boot项目 部署到tomcat出现js文件失效处理思路总结
- 基于spring boot项目的多数据源配置
- spring采用基于xml配置方式进行事务的处理
- 项目总结--maven+springsecurity+solr+springmvc+hibernate 延迟加载处理+hibernate sql对象处理(sql执行插入)+spring注解方式+邮件 + JNDI+项目框架写法+jasperreprot的简单应用
- 基于SpringBoot + Mybatis实现SpringMVC Web项目【原创】
- 基于Eclipse Maven的Spring4/Spring-MVC/Hibernate4整合之十:Spring mvc & hibernate 事务处理(回滚)
- 基于多个maven module搭建的Spring boot web项目
- ORA-01591: lock held by in-doubt distributed transaction 以及分布式事务处理总结
- spring基于事务处理的测试类
- 基于SpringBoot + Mybatis实现SpringMVC Web项目
- 基于Spring Boot和Spring Cloud实现微服务架构学习(五)-Docker总结
- .NET分布式事务处理总结【上】 - 实现分布式事务处理
- Maven构建一个多模块的Spring Boot + Spring MVC项目,完全基于java config
- 处理问题的思路,基于SSM项目(spring,springmvc,mybatis)
- 基于SpringBoot项目的https
- 基于可靠消息的分布式事务错误处理
- Spring学习总结9(基于Hibernate的事务管理)
- SpringBoot项目打包后,找不到文件路径问题,处理方法
- 基于SpringBoot + Mybatis实现SpringMVC Web项目【原创】