分布式事务-RocketMQ消息事务设计思路及Demo
文章目录
前言
在之前的文章中介绍了基于Seata实现的分布式事务的解决方案(AT、TCC模式),有兴趣的看一下
这两种方案都是偏向于强一致性解决方案。协调器会不断循环各个资源RM来进行事务的同时提交,回滚。
而往往很多场景不求事务的强一致性,只需达到事务的最终一致性,这时候,事务消息可以很好的满足需求。通过将本地事务与消息的发送放在一个本地事务中,来保证,本地事务执行成功时,消息一定被成功投递到消息服务器中,最终利用消息中间件的高可靠性,保证消息会被下游业务所消费。
RocketMq 4.3版本中开源了事务消息,本文会以RocketMq为例,介绍事务消息用法、设计思路及原理
思路与问题所在
上面提到了,要保证本地事务与消息的发送在一个事务中,如果以A给B转账100块为例子(A和B分别处在2个微服务中,对应2个数据库),具体怎么做呢?
-
场景一
先执行A扣钱100本地事务,再发送给B一条扣钱100消息,行么?
假设碰到网络问题,消息发送失败了。A扣了100,B却没加钱,肯定不行 -
场景二
那先发给B发送一条扣钱100消息,再执行A扣钱本地事务,行么?
如果消息发送成功了,这时候A服务所在数据库宕机了,岂不是B所在系统消费了消息,B加了100,A却没扣钱,也不对
问题关键点是什么? 只要A扣钱和发送消息不是一个原子操作,即不在一个事务中完成,那么,无论先后顺序如何,都会出现数据不一致性问题
那么聪明的人又会想到,我搞个本地消息表不就行了?
- 场景三
在一个事务中,同时操作如下两步
1、A扣钱100
2、将要发送的消息记录存入A所在数据库中(如transfer_money_message表)
那么A扣钱成功的同时,一定会有一条对应B扣钱的消息记录在数据库中,然后A所在系统单独启动一个定时器去扫描该消息表,并将状态为待发送的消息,投递到消息服务器中,失败重试,直到消息发送成功
这种方案行不行?当然可以,那么缺点又是什么?显而易见
业务方需要单独设计消息表,及定时发送消息的定时器,增加了与业务无关的开发负担
名词解释
再介绍RocketMq消息事务前,先介绍下几个关键名词
概念 | 解释 |
---|---|
prepare消息 | 又名Half Message,半消息,标识该消息处于"暂时不能投递"状态,不会被Comsumer所消费,待服务端收到生成者对该消息的commit或者rollback响应后,消息会被正常投递或者回滚(丢弃)消息 |
RMQ_SYS_TRANS_HALF_TOPIC | prepare消息在被投递到Mq服务器后,会存储于Topic为RMQ_SYS_TRANS_HALF_TOPIC的消费队列中 |
RMQ_SYS_TRANS_OP_HALF_TOPIC | 在prepare消息被commit或者rollback处理后,会存储到Topic为RMQ_SYS_TRANS_OP_HALF_TOPIC的队列中,标识prepare消息已被处理 |
RocketMQ-事务消息设计思路
先抛出两个核心概念:两阶段提交、事务状态定时回查,下面具体说明
两阶段提交
关于两阶段提交的基本概念,本文不再赘述,贴上一张图来说明
上面已经提到,因为消息发送是一个远程调用,由于网络的不稳定,无法和本地事务的执行处于一个原子操作中,针对这个缺点,RocketMQ基于两阶段提交协议做了如下改动
-
第一阶段:生产者向MQ服务器发送事务消息(prepare消息),服务端确认后回调通知生产者执行本地事务(此时消息为Prepare消息,存储于RMQ_SYS_TRANS_HALF_TOPIC队列中,不会被消费者消费)
-
第二阶段:生产者执行完本地事务后(业务执行完成,同时将消息唯一标记,如transactionId与该业务执行记录同时入库,方便事务回查),根据本地事务执行结果,返回Commit/Rollback/Unknow状态码
1、服务端若收到Commit状态码,则将prepare消息变为提交(正常消息,可被消费者消费)
2、收到Rollback则对消息进行回滚(丢弃消息)
3、若状态为Unknow,则等待MQ服务端定时发起消息状态回查,超过一定重试次数或者超时,消息会被丢弃
引用一张流程图来说明消息事务的两阶段提交
事务状态定时回查
在第二阶段中,生产者在本地事务执行完成后,需要向MQ服务器返回响应状态码,发送状态码的过程也是通过Netty发送网络请求,假设由于网络原因发送失败怎么办?本地事务已经提交/回滚了,但是Commit/Rollback状态码却没发出去,那么MQ服务器上这条prepare消息状态岂不是无法被投递/回滚
因此,MQ服务端会定时扫描存储于RMQ_SYS_TRANS_HALF_TOPIC中的消息,若消息未被处理,则向消费发送者发起回调检查,检查消息对应本地事务执行状态。从而保证消息事务状态最终能和本地事务的状态一致。上图中的4、5、6就是MQ服务端定时回查步骤。
事务消息Demo
先介绍下RocketMQ中事务消息的几个核心类
TransactionMQProducer
事务消息发送者, 核心方法如下
//发送事务消息,arg表示业务参数,能在回调执行本地事务时被取到 public TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException { if (null == this.transactionListener) { throw new MQClientException("TransactionListener is null", null); } return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg); } //设置事务状态回调监听器 public void setTransactionCheckListener(TransactionCheckListener transactionCheckListener) { this.transactionCheckListener = transactionCheckListener; }
2、
TransactionCheckListener事务状态回调监听器
/** * prepare消息执行成功时,回调执行executeLocalTransaction方法,arg参数为sendMessageInTransaction时带入的业务参数 * * @return Transaction state */ LocalTransactionState executeLocalTransaction(final Message msg, final Object arg); /** * 检查消息对应本地事务执行状态的监听器,定时回调 */ LocalTransactionState checkLocalTransaction(final MessageExt msg);
以用户1向用户2转账100块为例子
设计用户表如下
CREATE TABLE `user` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `money` bigint(20) NOT NULL COMMENT '余额', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
转账记录表
其中转账流水编号,用于幂等去重,防止消息重复发送多次转账,加上唯一索引
消息事务id(transaction_id)为发送的事务消息的id,用于回调检查事务是否执行成功(执行成功时,转账记录表中一定有数据,且对应transaction_id为消息id)
CREATE TABLE `transfer_record` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `from_user_id` bigint(20) NOT NULL COMMENT '转账人id', `change_money` bigint(20) NOT NULL COMMENT '转账金额', `transaction_id` varchar(128) NOT NULL COMMENT '消息事务id', `to_user_id` bigint(20) NOT NULL COMMENT '被转账人id', `record_no` varchar(64) NOT NULL COMMENT '转账流水编号', PRIMARY KEY (`id`), UNIQUE KEY `idx_record_no` (`record_no`) USING BTREE ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;
数据库中初始数据如下,用户1余额100,用户2余额0
业务转账实现类,实现用户1给2转账功能,同时生成转账记录,用户扣钱操作与新增转账记录操作在一个本地事务中
1、若余额充足,新增转账记录并记录对应事务消息id(transactionId),本地事务执行成功,结果返回true,标识本地事务提交成功
2、若余额不足,抛异常,本地事务执行失败,结果返回false
public class BusinessService { @Resource private UserMapper userMapper; @Resource private TransferRecordMapper transferRecordMapper; /** * 转账操作 A扣钱,同时新增转账明细 * * @param fromUserId 转账人id * @param toUserId 被转账人id * @param changeMoney 转账金额 * @param businessNo 单次转账唯一业务标识 * @param transactionId 事务消息事务id * @return */ @Transactional(rollbackFor = Exception.class) public boolean doTransfer(Long fromUserId, Long toUserId, Long changeMoney, String businessNo, String transactionId) throws Exception { //插入转账记录明细 businessNo加唯一建 做去重操作 防止消息重试发送 导致本地事务多次执行 重复扣钱 //转账记录中 记录 消息事务transactionId 用于后续状态回查 TransferRecord transferRecord = new TransferRecord(); transferRecord.setFromUserId(fromUserId); transferRecord.setChangeMoney(changeMoney); transferRecord.setTransactionId(transactionId); transferRecord.setToUserId(toUserId); transferRecord.setRecordNo(businessNo); transferRecordMapper.insert(transferRecord); //执行A扣钱操作 //update user set money = money - #{money} where id = #{userId} and money >= #{money} int result = userMapper.reduceMoney(fromUserId, changeMoney); if (result <= 0) { throw new BizException("账户余额不足"); } System.out.println("转账成功,fromUserId:"+fromUserId+",toUserId:"+toUserId+",money:"+changeMoney); return true; } /** * 检查本地扣钱事务执行状态 * * @param transactionId * @return */ public boolean checkTransferStatus(String transactionId) { //根据transactionId查询转账记录 有转账记录 标识本地事务执行成功 即A扣钱成功 int count = transferRecordMapper.selectCount(new QueryWrapper<>(new TransferRecord().setTransactionId(transactionId))); return count > 0; } }
消息发送者如下,主要看test()方法,即构造一个用户1给用户2转账100块的请求
@Component public class TransactionProducer implements InitializingBean { private static TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name"); @Resource private TransactionListenerImpl transactionListener; @Override public void afterPropertiesSet() throws Exception { producer.setNamesrvAddr("111.231.110.149:9876"); ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("client-transaction-msg-check-thread"); return thread; } }); producer.setExecutorService(executorService); //设置回调检查监听器 producer.setTransactionListener(transactionListener); try { producer.start(); } catch (MQClientException e) { e.printStackTrace(); } } public void test() { //单次转账唯一编号 String businessNo = UUID.randomUUID().toString(); //要发送的事务消息 设置转账人用户id为1, 被转账人用户id为2 转账金额100元,单次转账唯一标识businessNo TransferRecord transferRecord = new TransferRecord(); transferRecord.setFromUserId(1L); transferRecord.setToUserId(2L); transferRecord.setChangeMoney(100L); transferRecord.setRecordNo(businessNo); try { Message msg = new Message("TransanctionMessage", "tag", businessNo, JSON.toJSONString(transferRecord).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.sendMessageInTransaction(msg, null); System.out.println("prepare事务消息发送结果:"+sendResult.getSendStatus()); } catch (Exception e) { e.printStackTrace(); } } }
若prepare消息发送成功,则会同步执行
TransactionListener#executeLocalTransaction方法,消息回调监听器代码如下
public class TransactionListenerImpl implements TransactionListener { @Resource private BusinessService businessService; @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { TransferRecord transferRecord = JSON.parseObject(msg.getBody(), TransferRecord.class); LocalTransactionState state = LocalTransactionState.UNKNOW; try { boolean isCommit = businessService.doTransfer(transferRecord.getFromUserId(),transferRecord.getToUserId() ,transferRecord.getChangeMoney(),transferRecord.getRecordNo(),msg.getTransactionId()); //根据本地事务执行结果 返回不同状态码 if (isCommit) { state = LocalTransactionState.COMMIT_MESSAGE; } else { state = LocalTransactionState.ROLLBACK_MESSAGE; } } catch (Exception e) { System.out.println("转账成功,fromUserId:"+transferRecord.getFromUserId()+",toUserId:"+transferRecord.getToUserId()+",money:"+transferRecord.getChangeMoney()); e.printStackTrace(); } return state; } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { LocalTransactionState state = LocalTransactionState.UNKNOW; try { //根据是否有transaction_id对应转账记录 来判断事务是否执行成功 boolean isCommit = businessService.checkTransferStatus(msg.getTransactionId()); if (isCommit) { state = LocalTransactionState.COMMIT_MESSAGE; } else { state = LocalTransactionState.ROLLBACK_MESSAGE; } } catch (Exception e) { e.printStackTrace(); } return state; } }
调用TransactionProducer的test()方法执行用户1给用户2转账100块操作
1、第一次调用时,用户1余额足够,prepare消息发送成功,本地事务执行成功并返回
LocalTransactionState.COMMIT_MESSAGE状态码给MQ服务器,消息成功投递,下游业务能接收到
2、第二次调用时用户1余额不足,prepare消息发送后,本地事务执行失败,返回
LocalTransactionState.ROLLBACK_MESSAGE状态码,MQ服务器收到消息后,丢弃prepare消息,下游业务无法消费
而consumer逻辑和普通消费者逻辑一样,只需注意单次业务操作的幂等控制即可,这里略去,大家自己体会
事务消息发送者逻辑都写的比较清楚了,直接贴上demo对应github地址,有兴趣的可以试下,里面RocketMQ的地址是我自己搭的,方便大家测试,大家不要搞我,使劲发消息
有什么写的不对的地方也欢迎大家指正
思考
看完文章大家肯定有如下疑问
为什么prepare消息不会被消费?
事务消息又是如何提交、回滚的?
定时回查本地事务状态的机制又是怎么样?
有兴趣的同学可以先去研究下事务消息发送的源码,我也会研究并分析下RocketMQ在源码层面实现事务消息的原理,到时再做分享
- 微服务架构分布式事务解决方案设计思路(可靠消息最终一致方案-概念)
- 微服务架构分布式事务解决方案设计思路(可靠消息最终一致方案-设计方案)
- 分布式消息队列 RocketMQ源码解析:事务消息
- 分布式消息队列RocketMQ--事务消息--解决分布式事务的最佳实践
- 分布式消息队列RocketMQ--事务消息--解决分布式事务的最佳实践
- RocketMQ事务消息回查设计方案
- 分布式消息队列 RocketMQ源码解析:事务消息
- 微服务架构分布式事务解决方案设计思路-(概念篇)
- 如何基于RocketMQ的事务消息特性实现分布式系统的最终一致性?
- 分布式消息队列 RocketMQ源码解析:事务消息
- 分布式消息队列 RocketMQ源码解析:事务消息
- 分布式消息队列 RocketMQ源码解析:事务消息
- 分布式消息队列RocketMQ--事务消息--解决分布式事务的最佳实践
- 分布式消息队列 RocketMQ源码解析:事务消息
- 分布式事务之如何基于RocketMQ的事务消息特性实现分布式系统的最终一致性?
- 分布式消息队列 RocketMQ源码解析:事务消息
- 分布式消息队列RocketMQ--事务消息--解决分布式事务的最佳实践 - chunlongyu的专栏 - CSDN博客
- 分布式消息队列RocketMQ--事务消息--解决分布式事务的最佳实践
- 阿里分布式事务设计思路
- 分布式消息队列RocketMQ--事务消息--解决分布式事务的最佳实践