Jms的MessageListener中的Jms事务
2017-01-24 14:54
169 查看
摘要
一般来说,如果为JmsTemplate做了事务配置,那么它将会与当前线程的数据库事务挂钩,并且仅在数据库事务的afterCommit动作中提交。但是,如果一个MessageListener在接收Jms消息的同时,也使用JmsTemplate发送了Jms消息;那么它发送的Jms消息将与数据库事务无关(即使为JmsTemplate做了事务配置),而是与Listener接收消息保持在同一个事务中。问题
问题是一位同事发现的。账务系统的垫付功能存在REST和MessageListener两个入口;两个入口中调用的是同一套代码和业务逻辑。但是,REST入口中发送的Jms消息会随着数据库事务回滚而回滚;MessageListener中却不会回滚。相关流程图说明如下。我们期望的结果是:在还款操作中发送的Jms消息,随还款操作的数据库事务回滚而取消(红色底色部分的操作);而垫付操作中发送的Jms消息,则应随垫付操作的数据库事务提交而提交(绿色底色部分的操作)。这一点在REST入口的相关日志和数据中得到了验证。但是,从MessageListener入口调用此服务时,却出现了问题:虽然还款服务的数据库事务确实回滚了,但是其中的Jms消息却成功发送了出来(参见红色字体部分)。
分析
首先,REST入口的操作、结果是正确的。这说明,当数据库事务回滚时,Jms消息确实没有提交。那么,可以肯定一点:一定是MessageListener后续处理中做了提交消息这个动作。经过一系列的Debug和逐行执行、分析,我找到了这段代码。MessageListener接收到消息后,会进入org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(Objectinvoker,Sessionsession,MessageConsumerconsumer)方法中。由于没有配置transactionManager,我们会通过doReceiveAndExecute(invoker,session,consumer,null)来调用org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(Objectinvoker,Sessionsession,MessageConsumerconsumer,TransactionStatusstatus)方法。/** *Executethelistenerforamessagereceivedfromthegivenconsumer, *wrappingtheentireoperationinanexternaltransactionifdemanded. *@paramsessiontheJMSSessiontoworkon *@paramconsumertheMessageConsumertoworkon *@returnwhetheramessagehasbeenreceived *@throwsJMSExceptionifthrownbyJMSmethods *@see#doReceiveAndExecute */ protected boolean receiveAndExecute(Objectinvoker,Sessionsession,MessageConsumerconsumer) throws JMSException{ if ( this .transactionManager!= null ){ //Executereceivewithintransaction. TransactionStatusstatus= this .transactionManager.getTransaction( this .transactionDefinition); boolean messageReceived; try { messageReceived=doReceiveAndExecute(invoker,session,consumer,status); } catch (JMSExceptionex){ rollbackOnException(status,ex); throw ex; } catch (RuntimeExceptionex){ rollbackOnException(status,ex); throw ex; } catch (Errorerr){ rollbackOnException(status,err); throw err; } this .transactionManager.commit(status); return messageReceived; } else { //Executereceiveoutsideoftransaction. return doReceiveAndExecute(invoker,session,consumer, null ); } } |
/** *Executethespecifiedlistener, *committingorrollingbackthetransactionafterwards(ifnecessary). *@paramsessiontheJMSSessiontooperateon *@parammessagethereceivedJMSMessage *@throwsJMSExceptionifthrownbyJMSAPImethods *@see#invokeListener *@see#commitIfNecessary *@see#rollbackOnExceptionIfNecessary *@see#convertJmsAccessException */ protected void doExecuteListener(Sessionsession,Messagemessage) throws JMSException{ if (!isAcceptMessagesWhileStopping()&&!isRunning()){ if (logger.isWarnEnabled()){ logger.warn( "Rejectingreceivedmessagebecauseofthelistenercontainer" + "havingbeenstoppedinthemeantime:" +message); } rollbackIfNecessary(session); throw new MessageRejectedWhileStoppingException(); } try { invokeListener(session,message); } catch (JMSExceptionex){ rollbackOnExceptionIfNecessary(session,ex); throw ex; } catch (RuntimeExceptionex){ rollbackOnExceptionIfNecessary(session,ex); throw ex; } catch (Errorerr){ rollbackOnExceptionIfNecessary(session,err); throw err; } commitIfNecessary(session,message); } |
public ResponsesyncSendPacket(Commandcommand) throws JMSException{ if (isClosed()){ throw new ConnectionClosedException(); } else { try { Responseresponse=(Response) this .transport.request(command); if (response.isException()){ ExceptionResponseer=(ExceptionResponse)response; if (er.getException() instanceof JMSException){ throw (JMSException)er.getException(); } else { if (isClosed()||closing.get()){ LOG.debug( "Receivedanexceptionbutconnectionisclosing" ); } JMSExceptionjmsEx= null ; try { jmsEx=JMSExceptionSupport.create(er.getException()); } catch (Throwablee){ LOG.error( "CaughtanexceptiontryingtocreateaJMSExceptionfor" +er.getException(),e); } if (er.getException() instanceof SecurityException&&command instanceof ConnectionInfo){ forceCloseOnSecurityException(er.getException()); } if (jmsEx!= null ){ throw jmsEx; } } } return response; } catch (IOExceptione){ throw JMSExceptionSupport.create(e); } } } |
解决方案
方案一:使用JmsTransactionManager来管理Jms事务
可以通过以下配置,为MessageListner注入JmsTransactionManager:< bean id = "jmsTransactionManager" class = "org.springframework.jms.connection.JmsTransactionManager" > < property name = "connectionFactory" ref = "jmsConnectionFactory" /> </ bean > < jms:listener-container destination-type = "queue" transaction-manager = "jmsTransactionManager" concurrency = "4" acknowledge = "transacted" connection-factory = "jmsConnectionFactory" > < jms:listener destination = "queue.thread.autopay" ref = "autoPayListener" /> </ jms:listener-container > |
方案二:手动将发送消息的操作放到数据库事务的AfterCommit操作中
现有代码中,我们是在事务体内执行JmsTemplate.send()操作;在事务的AfterCommit操作中执行Session.commit()。如果我们将JmsTemplate.send()操作放到AfterCommit操作中,那么就可以确保只在数据库事务提交后,才会提交Jms消息了。此方案验证可行。验证代码如下:public void send(Eventevent,List<TransferParam>transferParams){ TransactionSynchronizationManager .registerSynchronization( new TransactionSynchronizationAdapter(){ @Override public void afterCommit(){ System.out.println( this .getClass()+ "-" +event+ "--" +transferParams); try { event.getTychoOperType().ifPresent( (value)->{ TychoProductor4Account. this .doSend(event, transferParams); }); } catch (Exceptione){ System.out.println(e.getMessage()); TychoProductor4Account.LOGGER.error( "发送数据到tycho异常:{}" , e); } } }); } |
方案三:手动在数据库事务的RollBack操作中回滚Jms消息
暂未找到实现方式。方案四:尝试为发送消息创建并使用新的Connection
代码流程中之所以会使用同一个Connection,是因为接收、发送消息时,都是从线程上下文中尝试获取JmsResourceHolder,并从其中获取连接的。那么,简单做法就是在接收到消息后,开启一个子线程;复杂做法则是为JmsTransactionManager编写识别@Transactional(propagation=Propagation.REQUIRES_NEW)注解的功能。开启子线程的方案可行。验证代码如下:Future<Event>actualResult= this .keplerRestExecutor.submit(()->{ Event4Reserveevent4Reserve= new Event4Reserve(); event4Reserve.setRecordId(recordId); event4Reserve.setUserId(ThreadConsts.SYSTEM_USER_ID); AutoPayListener4BaeEvent.LOGGER.debug( "event4Reserve={}" , event4Reserve); Eventresult= this .bizAccountEventService.handle(event4Reserve); AutoPayListener4BaeEvent.LOGGER.info( "result={}" ,result); return result; }); try { actualResult.get(); } catch (InterruptedExceptione){ AutoPayListener4BaeEvent.LOGGER.error( "线程被中断!" ,e); throw new RuntimeException( "垫付线程中断!" ,e); } catch (ExecutionExceptione){ AutoPayListener4BaeEvent.LOGGER.error( "执行过程出错!" ,e); Throwablereal=e.getCause(); if (real instanceof RuntimeException){ throw (RuntimeException)real; } else { throw new RuntimeException(real); } } |
方案五:使用org.springframework.jms.connection.CachingConnectionFactory
已验证,方案无效。测试配置如下:< bean id = "jmsTransactionManager" class = "org.springframework.jms.connection.JmsTransactionManager" > < property name = "connectionFactory" ref = "jmsConnectionFactory" /> </ bean > < bean id = "jmsConnectionFactory" class = "org.springframework.jms.connection.CachingConnectionFactory" > < property name = "targetConnectionFactory" ref = "targetActiveMqConnectionFactory" /> < property name = "sessionCacheSize" value = "10" /> </ bean > < amq:connectionFactory id = "targetActiveMqConnectionFactory" brokerURL = "${jms.url.failover}" > < amq:redeliveryPolicyMap > < amq:redeliveryPolicyMap > < amq:defaultEntry > <!--5次,每次30秒--> < amq:redeliveryPolicy maximumRedeliveries = "5" initialRedeliveryDelay = "30000" /> </ amq:defaultEntry > < amq:redeliveryPolicyEntries > <!--5次,每次10秒--> < amq:redeliveryPolicy queue = "queue.thread.autopay" maximumRedeliveries = "5" initialRedeliveryDelay = "10000" /> </ amq:redeliveryPolicyEntries > < amq:redeliveryPolicyEntries > <!--银联实时划扣超时限制,5次,每次90秒--> < amq:redeliveryPolicy queue = "queue.thread.instantUnionpay" maximumRedeliveries = "5" initialRedeliveryDelay = "90000" /> </ amq:redeliveryPolicyEntries > </ amq:redeliveryPolicyMap > </ amq:redeliveryPolicyMap > </ amq:connectionFactory > < jms:listener-container destination-type = "queue" transaction-manager = "jmsTransactionManager" concurrency = "4" acknowledge = "transacted" connection-factory = "jmsConnectionFactory" > < jms:listener destination = "queue.thread.autopay" ref = "autoPayListener" /> </ jms:listener-container > |
方案六:为jmsTemplate和MessageListener配置不同的ConnectionFactory
验证可行。测试配置如下:< bean id = "newJmsTemplate" class = "org.springframework.jms.core.JmsTemplate" > < property name = "connectionFactory" ref = "targetActiveMqConnectionFactory" /> < property name = "sessionTransacted" value = "true" /> < property name = "explicitQosEnabled" value = "${activemq.explicitQosEnabled}" /> < property name = "timeToLive" value = "86400000" /> </ bean > < amq:connectionFactory id = "targetActiveMqConnectionFactory" brokerURL = "${jms.url.failover}" > </ amq:connectionFactory > < jms:listener-container destination-type = "queue" concurrency = "4" acknowledge = "transacted" connection-factory = "jmsConnectionFactory" > < jms:listener destination = "queue.thread.autopay" ref = "autoPayListener" /> </ jms:listener-container > < amq:connectionFactory id = "jmsConnectionFactory" brokerURL = "${jms.url.failover}" > </ amq:connectionFactory > |
后续工作
基本已经验证完毕。小结
可行方案有三个,分别是方案二:手动将发送消息的操作放到数据库事务的AfterCommit操作中、方案四:尝试为发送消息创建并使用新的Connection、方案六:为jmsTemplate和MessageListener配置不同的ConnectionFactory。比较简便的方式是方案六,其它方式都需要修改代码。相关文章推荐
- 新闻
- asp.net创建事务的方法
- SQL Server触发器和事务用法示例
- SQL Server误区30日谈 第1天 正在运行的事务在服务器故障转移后继续执行
- 浅析SQL Server中包含事务的存储过程
- Mysql中的事务是什么如何使用
- MySql的事务使用与示例详解
- C#分布式事务的超时处理实例分析
- C#中的事务用法实例分析
- SQL Server的事务操作隔离模式介绍
- MySQL中事务概念的简洁学习教程
- C#处理Access中事务的方法
- 在ASP.NET 2.0中操作数据之六十一:在事务里对数据库修改进行封装
- oracle 合并查询 事务 sql函数小知识学习
- 深入理解Java事务的原理与应用
- sql不常用函数总结以及事务,增加,删除触发器
- mysql的XA事务恢复过程详解
- 在Mysql存储过程中使用事务实例
- mysql存储过程事务管理简析
- php+mysql事务rollback&commit示例