您的位置:首页 > 其它

分布式事务-RocketMQ消息事务设计思路及Demo

2019-05-10 23:13 309 查看

文章目录

  • 事务消息Demo
  • 思考
  • 前言

    在之前的文章中介绍了基于Seata实现的分布式事务的解决方案(AT、TCC模式),有兴趣的看一下

    https://blog.csdn.net/hosaos/article/details/89136666

    这两种方案都是偏向于强一致性解决方案。协调器会不断循环各个资源RM来进行事务的同时提交,回滚。

    而往往很多场景不求事务的强一致性,只需达到事务的最终一致性,这时候,事务消息可以很好的满足需求。通过将本地事务与消息的发送放在一个本地事务中,来保证,本地事务执行成功时,消息一定被成功投递到消息服务器中,最终利用消息中间件的高可靠性,保证消息会被下游业务所消费

    RocketMq 4.3版本中开源了事务消息,本文会以RocketMq为例,介绍事务消息用法、设计思路及原理

    思路与问题所在

    上面提到了,要保证本地事务与消息的发送在一个事务中,如果以A给B转账100块为例子(A和B分别处在2个微服务中,对应2个数据库),具体怎么做呢?

    • 场景一
      先执行A扣钱100本地事务,再发送给B一条扣钱100消息,行么?
      假设碰到网络问题,消息发送失败了。A扣了100,B却没加钱,肯定不行

    • 场景二
      那先发给B发送一条扣钱100消息,再执行A扣钱本地事务,行么?
      如果消息发送成功了,这时候A服务所在数据库宕机了,岂不是B所在系统消费了消息,B加了100,A却没扣钱,也不对

    问题关键点是什么? 只要A扣钱和发送消息不是一个原子操作,即不在一个事务中完成,那么,无论先后顺序如何,都会出现数据不一致性问题

    那么聪明的人又会想到,我搞个本地消息表不就行了?

    • 场景三
      在一个事务中,同时操作如下两步
      1、A扣钱100
      2、将要发送的消息记录存入A所在数据库中(如transfer_money_message表)

    那么A扣钱成功的同时,一定会有一条对应B扣钱的消息记录在数据库中,然后A所在系统单独启动一个定时器去扫描该消息表,并将状态为待发送的消息,投递到消息服务器中,失败重试,直到消息发送成功

    这种方案行不行?当然可以,那么缺点又是什么?显而易见

    业务方需要单独设计消息表,及定时发送消息的定时器,增加了与业务无关的开发负担

    名词解释

    再介绍RocketMq消息事务前,先介绍下几个关键名词

    概念 解释
    prepare消息 又名Half Message,半消息,标识该消息处于"暂时不能投递"状态,不会被Comsumer所消费,待服务端收到生成者对该消息的commit或者rollback响应后,消息会被正常投递或者回滚(丢弃)消息
    RMQ_SYS_TRANS_HALF_TOPIC prepare消息在被投递到Mq服务器后,会存储于Topic为RMQ_SYS_TRANS_HALF_TOPIC的消费队列中
    RMQ_SYS_TRANS_OP_HALF_TOPIC 在prepare消息被commit或者rollback处理后,会存储到Topic为RMQ_SYS_TRANS_OP_HALF_TOPIC的队列中,标识prepare消息已被处理

    RocketMQ-事务消息设计思路

    先抛出两个核心概念:两阶段提交、事务状态定时回查,下面具体说明

    两阶段提交

    关于两阶段提交的基本概念,本文不再赘述,贴上一张图来说明

    上面已经提到,因为消息发送是一个远程调用,由于网络的不稳定,无法和本地事务的执行处于一个原子操作中,针对这个缺点,RocketMQ基于两阶段提交协议做了如下改动

    • 第一阶段:生产者向MQ服务器发送事务消息(prepare消息),服务端确认后回调通知生产者执行本地事务(此时消息为Prepare消息,存储于RMQ_SYS_TRANS_HALF_TOPIC队列中,不会被消费者消费)

    • 第二阶段:生产者执行完本地事务后(业务执行完成,同时将消息唯一标记,如transactionId与该业务执行记录同时入库,方便事务回查),根据本地事务执行结果,返回Commit/Rollback/Unknow状态码

      1、服务端若收到Commit状态码,则将prepare消息变为提交(正常消息,可被消费者消费)
      2、收到Rollback则对消息进行回滚(丢弃消息)
      3、若状态为Unknow,则等待MQ服务端定时发起消息状态回查,超过一定重试次数或者超时,消息会被丢弃

    引用一张流程图来说明消息事务的两阶段提交

    事务状态定时回查

    在第二阶段中,生产者在本地事务执行完成后,需要向MQ服务器返回响应状态码,发送状态码的过程也是通过Netty发送网络请求,假设由于网络原因发送失败怎么办?本地事务已经提交/回滚了,但是Commit/Rollback状态码却没发出去,那么MQ服务器上这条prepare消息状态岂不是无法被投递/回滚

    因此,MQ服务端会定时扫描存储于RMQ_SYS_TRANS_HALF_TOPIC中的消息,若消息未被处理,则向消费发送者发起回调检查,检查消息对应本地事务执行状态。从而保证消息事务状态最终能和本地事务的状态一致。上图中的4、5、6就是MQ服务端定时回查步骤。

    事务消息Demo

    先介绍下RocketMQ中事务消息的几个核心类

    1. TransactionMQProducer
      事务消息发送者, 核心方法如下
    //发送事务消息,arg表示业务参数,能在回调执行本地事务时被取到
    public TransactionSendResult sendMessageInTransaction(final Message msg,
    final Object arg) throws MQClientException {
    if (null == this.transactionListener) {
    throw new MQClientException("TransactionListener is null", null);
    }
    
    return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
    }
    //设置事务状态回调监听器
    public void setTransactionCheckListener(TransactionCheckListener transactionCheckListener) {
    this.transactionCheckListener = transactionCheckListener;
    }

    2、

    TransactionCheckListener
    事务状态回调监听器

    /**
    * prepare消息执行成功时,回调执行executeLocalTransaction方法,arg参数为sendMessageInTransaction时带入的业务参数
    *
    * @return Transaction state
    */
    LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);
    
    /**
    * 检查消息对应本地事务执行状态的监听器,定时回调
    */
    LocalTransactionState checkLocalTransaction(final MessageExt msg);

    以用户1向用户2转账100块为例子

    设计用户表如下

    CREATE TABLE `user` (
    `id` bigint(20) NOT NULL AUTO_INCREMENT,
    `money` bigint(20) NOT NULL COMMENT '余额',
    PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

    转账记录表
    其中转账流水编号,用于幂等去重,防止消息重复发送多次转账,加上唯一索引
    消息事务id(transaction_id)为发送的事务消息的id,用于回调检查事务是否执行成功(执行成功时,转账记录表中一定有数据,且对应transaction_id为消息id)

    CREATE TABLE `transfer_record` (
    `id` bigint(20) NOT NULL AUTO_INCREMENT,
    `from_user_id` bigint(20) NOT NULL COMMENT '转账人id',
    `change_money` bigint(20) NOT NULL COMMENT '转账金额',
    `transaction_id` varchar(128) NOT NULL COMMENT '消息事务id',
    `to_user_id` bigint(20) NOT NULL COMMENT '被转账人id',
    `record_no` varchar(64) NOT NULL COMMENT '转账流水编号',
    PRIMARY KEY (`id`),
    UNIQUE KEY `idx_record_no` (`record_no`) USING BTREE
    ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;

    数据库中初始数据如下,用户1余额100,用户2余额0

    业务转账实现类,实现用户1给2转账功能,同时生成转账记录,用户扣钱操作与新增转账记录操作在一个本地事务中

    1、若余额充足,新增转账记录并记录对应事务消息id(transactionId),本地事务执行成功,结果返回true,标识本地事务提交成功
    2、若余额不足,抛异常,本地事务执行失败,结果返回false

    public class BusinessService {
    @Resource
    private UserMapper userMapper;
    @Resource
    private TransferRecordMapper transferRecordMapper;
    
    /**
    * 转账操作 A扣钱,同时新增转账明细
    *
    * @param fromUserId    转账人id
    * @param toUserId      被转账人id
    * @param changeMoney   转账金额
    * @param businessNo    单次转账唯一业务标识
    * @param transactionId 事务消息事务id
    * @return
    */
    @Transactional(rollbackFor = Exception.class)
    public boolean doTransfer(Long fromUserId, Long toUserId, Long changeMoney, String businessNo, String transactionId) throws Exception {
    //插入转账记录明细 businessNo加唯一建 做去重操作 防止消息重试发送 导致本地事务多次执行 重复扣钱
    //转账记录中 记录 消息事务transactionId 用于后续状态回查
    TransferRecord transferRecord = new TransferRecord();
    transferRecord.setFromUserId(fromUserId);
    transferRecord.setChangeMoney(changeMoney);
    transferRecord.setTransactionId(transactionId);
    transferRecord.setToUserId(toUserId);
    transferRecord.setRecordNo(businessNo);
    
    transferRecordMapper.insert(transferRecord);
    
    //执行A扣钱操作
    //update user set money = money - #{money} where id = #{userId} and money >= #{money}
    int result = userMapper.reduceMoney(fromUserId, changeMoney);
    if (result <= 0) {
    throw new BizException("账户余额不足");
    }
    System.out.println("转账成功,fromUserId:"+fromUserId+",toUserId:"+toUserId+",money:"+changeMoney);
    return true;
    }
    /**
    * 检查本地扣钱事务执行状态
    *
    * @param transactionId
    * @return
    */
    public boolean checkTransferStatus(String transactionId) {
    //根据transactionId查询转账记录 有转账记录 标识本地事务执行成功 即A扣钱成功
    int count = transferRecordMapper.selectCount(new QueryWrapper<>(new TransferRecord().setTransactionId(transactionId)));
    return count > 0;
    
    }
    
    }

    消息发送者如下,主要看test()方法,即构造一个用户1给用户2转账100块的请求

    @Component
    public class TransactionProducer  implements InitializingBean {
    private static TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
    
    @Resource
    private TransactionListenerImpl transactionListener;
    
    @Override
    public void afterPropertiesSet() throws Exception {
    producer.setNamesrvAddr("111.231.110.149:9876");
    
    ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
    @Override
    public Thread newThread(Runnable r) {
    Thread thread = new Thread(r);
    thread.setName("client-transaction-msg-check-thread");
    return thread;
    }
    });
    
    producer.setExecutorService(executorService);
    //设置回调检查监听器
    producer.setTransactionListener(transactionListener);
    try {
    producer.start();
    } catch (MQClientException e) {
    e.printStackTrace();
    }
    }
    
    public void test() {
    //单次转账唯一编号
    String businessNo = UUID.randomUUID().toString();
    
    //要发送的事务消息 设置转账人用户id为1, 被转账人用户id为2  转账金额100元,单次转账唯一标识businessNo
    TransferRecord transferRecord = new TransferRecord();
    transferRecord.setFromUserId(1L);
    transferRecord.setToUserId(2L);
    transferRecord.setChangeMoney(100L);
    transferRecord.setRecordNo(businessNo);
    
    try {
    Message msg = new Message("TransanctionMessage", "tag", businessNo,
    JSON.toJSONString(transferRecord).getBytes(RemotingHelper.DEFAULT_CHARSET));
    SendResult sendResult = producer.sendMessageInTransaction(msg, null);
    System.out.println("prepare事务消息发送结果:"+sendResult.getSendStatus());
    } catch (Exception e) {
    e.printStackTrace();
    }
    
    }
    }

    若prepare消息发送成功,则会同步执行

    TransactionListener#executeLocalTransaction
    方法,消息回调监听器代码如下

    public class TransactionListenerImpl implements TransactionListener {
    @Resource
    private BusinessService businessService;
    
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
    TransferRecord transferRecord = JSON.parseObject(msg.getBody(), TransferRecord.class);
    LocalTransactionState state = LocalTransactionState.UNKNOW;
    try {
    boolean isCommit = businessService.doTransfer(transferRecord.getFromUserId(),transferRecord.getToUserId()
    ,transferRecord.getChangeMoney(),transferRecord.getRecordNo(),msg.getTransactionId());
    //根据本地事务执行结果 返回不同状态码
    if (isCommit) {
    state = LocalTransactionState.COMMIT_MESSAGE;
    } else {
    state = LocalTransactionState.ROLLBACK_MESSAGE;
    }
    } catch (Exception e) {
    System.out.println("转账成功,fromUserId:"+transferRecord.getFromUserId()+",toUserId:"+transferRecord.getToUserId()+",money:"+transferRecord.getChangeMoney());
    e.printStackTrace();
    }
    
    return state;
    }
    
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
    LocalTransactionState state = LocalTransactionState.UNKNOW;
    try {
    //根据是否有transaction_id对应转账记录 来判断事务是否执行成功
    boolean isCommit = businessService.checkTransferStatus(msg.getTransactionId());
    if (isCommit) {
    state = LocalTransactionState.COMMIT_MESSAGE;
    } else {
    state = LocalTransactionState.ROLLBACK_MESSAGE;
    }
    } catch (Exception e) {
    e.printStackTrace();
    }
    return state;
    }
    }

    调用TransactionProducer的test()方法执行用户1给用户2转账100块操作

    1、第一次调用时,用户1余额足够,prepare消息发送成功,本地事务执行成功并返回

    LocalTransactionState.COMMIT_MESSAGE
    状态码给MQ服务器,消息成功投递,下游业务能接收到
    2、第二次调用时用户1余额不足,prepare消息发送后,本地事务执行失败,返回
    LocalTransactionState.ROLLBACK_MESSAGE
    状态码,MQ服务器收到消息后,丢弃prepare消息,下游业务无法消费

    而consumer逻辑和普通消费者逻辑一样,只需注意单次业务操作的幂等控制即可,这里略去,大家自己体会

    事务消息发送者逻辑都写的比较清楚了,直接贴上demo对应github地址,有兴趣的可以试下,里面RocketMQ的地址是我自己搭的,方便大家测试,大家不要搞我,使劲发消息

    https://github.com/hosaos/transaction-message-demo

    有什么写的不对的地方也欢迎大家指正

    思考

    看完文章大家肯定有如下疑问

    为什么prepare消息不会被消费?
    事务消息又是如何提交、回滚的?
    定时回查本地事务状态的机制又是怎么样?

    有兴趣的同学可以先去研究下事务消息发送的源码,我也会研究并分析下RocketMQ在源码层面实现事务消息的原理,到时再做分享

    内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
    标签: