您的位置:首页 > 编程语言 > Java开发

spring 多数据源手动管理事务,最大程度保障数据一致性

2018-01-10 16:13 465 查看
模板代码如下:

@Autowired
private DataSourceTransactionManager transactionManager;

public void insert(CardEntity card, String dataSourceCloud, String tableNo) {
TransactionStatus statusInsert = null;
TransactionStatus statusMobile = null;
TransactionStatus statusTel = null;
try {
DataSourceKeyUtils.set(dataSourceCloud);
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
statusInsert = transactionManager.getTransaction(def);
cardService.insert(card, tableNo, dataSourceCloud);

//插入索引
statusMobile = insertIndex(card, IndexType.mobile);
statusTel = insertIndex(card, IndexType.tel);

if (statusTel != null) {
transactionManager.commit(statusTel);
}
if (statusMobile != null) {
transactionManager.commit(statusMobile);
}
transactionManager.commit(statusInsert);
} catch (Exception e) {
if (statusTel != null) {
transactionManager.rollback(statusTel);
}
if (statusMobile != null) {
transactionManager.rollback(statusMobile);
}
if (statusInsert != null) {
transactionManager.rollback(statusInsert);
}
throw e;
}
}

在创建事务对象TransactionStatus 的时候,先手动切换数据源。事务的提交和回滚要安装创建顺序的逆顺序来提交或回滚。

原因分析:当第一次创建TransactionStatus
的时候会调用doGetTransaction()方法,这个方法会获取当前线程绑定的ConnectionHolder,第一次是null,然后就创建一个事务对象并开始。

boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;

doBegin方法主要是设置一个连接

if (txObject.getConnectionHolder() == null ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Connection newCon = this.dataSource.getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}

当第二创建TransactionStatus 时,由于当前已存在事务所以要创建新的事务,同时挂起当前事务。

if (isExistingTransaction(transaction)) {
// Existing transaction found -> check propagation behavior to find out how to behave.
return handleExistingTransaction(definition, transaction, debugEnabled);
}

SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); doBegin(transaction, definition); prepareSynchronization(status, definition); return status;
}

挂起当前事务,实际就是将当前事务对象绑定的连接设置为null,同时解绑当前线程资源。挂起的事务会保存,

等当前事务提交之后会恢复之前的事务状态。所以要逆序提交或回滚

挂起当前事务:

@Override
protected Object doSuspend(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
txObject.setConnectionHolder(null);
return TransactionSynchronizationManager.unbindResource(this.dataSource);
}

提交之后恢复之前的事务:


private void cleanupAfterCompletion(DefaultTransactionStatus status) {
status.setCompleted();
if (status.isNewSynchronization()) {
TransactionSynchronizationManager.clear();
}
if (status.isNewTransaction()) {
doCleanupAfterCompletion(status.getTransaction());
}
if (status.getSuspendedResources() != null) {
if (status.isDebug()) {
logger.debug("Resuming suspended transaction after completion of inner transaction");
}
resume(status.getTransaction(), (SuspendedResourcesHolder) status.getSuspendedResources());
}
}


后续:今天测试了一下有锁的情况,发现会堵塞,因为事务没提交,导致其他事务不能进行。原因在于就是同一个库我也开了多个事务。

改进方法:所有属于一个库的操作,使用同一个事务。

适用范围:这些sql操作没有先后关系。

关键代码如下:

private void sqlRun(Map<String, List<Runnable>> map) {
List<TransactionStatus> transactionStatusList = new ArrayList<>();
try {
for (Map.Entry<String, List<Runnable>> entry : map.entrySet()) {
DataSourceKeyUtils.set(entry.getKey());
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
TransactionStatus transaction = transactionManager.getTransaction(def);
transactionStatusList.add(transaction);
List<Runnable> runnableList = entry.getValue();
for (Runnable runnable : runnableList) {
runnable.run();
}
}
Collections.reverse(transactionStatusList);
for (TransactionStatus status : transactionStatusList) {
transactionManager.commit(status);
}
} catch (Exception e) {
Collections.reverse(transactionStatusList);
for (TransactionStatus status : transactionStatusList) {
transactionManager.rollback(status);
}
throw e;
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spring 事务