RocketMQ 事务消息
2016-10-19 00:00
141 查看
RocketMQ将事务拆分成小事务异步执行的方式来执行。
RocketMQ第一阶段发送Prepared消息时,会拿到消息的地址,第二阶段执行本地事物,第三阶段通过第一阶段拿到的地址去访问消息,并修改状态。RocketMQ会定期扫描消息集群中的事物消息,这时候发现了Prepared消息,它会向消息发送者确认,RocketMQ会根据发送端设置的策略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。
RocketMQ事务消息:
TransactionCheckListenerImpl:
TransactionExecuterImpl:
TransactionProducer:
RocketMQConsumer:
RocketMQ第一阶段发送Prepared消息时,会拿到消息的地址,第二阶段执行本地事物,第三阶段通过第一阶段拿到的地址去访问消息,并修改状态。RocketMQ会定期扫描消息集群中的事物消息,这时候发现了Prepared消息,它会向消息发送者确认,RocketMQ会根据发送端设置的策略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。
RocketMQ事务消息:
TransactionCheckListenerImpl:
package aaron.mq.producer; import com.alibaba.rocketmq.client.producer.LocalTransactionState; import com.alibaba.rocketmq.client.producer.TransactionCheckListener; import com.alibaba.rocketmq.common.message.MessageExt; /** * Created by Aaron Sheng on 10/19/16. * TransactionCheckListenerImpl handle transaction unsettled. * Broker will notify producer to check local transaction. */ public class TransactionCheckListenerImpl implements TransactionCheckListener { @Override public LocalTransactionState checkLocalTransactionState(MessageExt messageExt) { System.out.println("checkLocalTransactionState"); System.out.println("topic: " + messageExt.getTopic()); System.out.println("body: " + messageExt.getBody()); return LocalTransactionState.ROLLBACK_MESSAGE; } }
TransactionExecuterImpl:
package aaron.mq.producer; import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter; import com.alibaba.rocketmq.client.producer.LocalTransactionState; import com.alibaba.rocketmq.common.message.Message; import java.util.concurrent.atomic.AtomicInteger; /** * Created by Aaron Sheng on 10/19/16. * TransactionExecuterImpl executre local trancation and return result to broker. */ public class TransactionExecuterImpl implements LocalTransactionExecuter { private AtomicInteger transactionIndex = new AtomicInteger(0); @Override public LocalTransactionState executeLocalTransactionBranch(Message message, Object o) { System.out.println("executeLocalTransactionBranch " + message.toString()); int value = transactionIndex.getAndIncrement(); if ((value % 3) == 0) { return LocalTransactionState.COMMIT_MESSAGE; } else if ((value % 3) == 1) { return LocalTransactionState.ROLLBACK_MESSAGE; } else{ return LocalTransactionState.UNKNOW; } } }
TransactionProducer:
package aaron.mq.producer; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.client.producer.TransactionCheckListener; import com.alibaba.rocketmq.client.producer.TransactionMQProducer; import com.alibaba.rocketmq.common.message.Message; /** * Created by Aaron Sheng on 10/19/16. */ public class TransactionProducer { public static void produce() throws MQClientException { TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl(); TransactionMQProducer producer = new TransactionMQProducer("TxProducer"); producer.setCheckThreadPoolMinSize(2); producer.setCheckThreadPoolMaxSize(4); producer.setCheckRequestHoldMax(2000); producer.setTransactionCheckListener(transactionCheckListener); producer.setNamesrvAddr("127.0.0.1:9876"); producer.setInstanceName("TxProducer-instance1"); producer.setVipChannelEnabled(false); producer.start(); TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl(); try { for (int i = 0; i < 1000; i++) { Message msg = new Message("Topic1", "Tag1", "OrderId" + i, ("Body" + i).getBytes()); SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null); System.out.println(sendResult); Thread.sleep(1000); } } catch (Exception e) { e.printStackTrace(); } finally { producer.shutdown(); } } }
RocketMQConsumer:
package aaron.mq.consumer; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.message.MessageExt; import java.util.List; /** * Created by Aaron Sheng on 10/17/16. */ public class RocketMQConsumer { public static void consume() throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Consumer"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.setVipChannelEnabled(false); consumer.setInstanceName("rmq-instance"); consumer.subscribe("Topic1", "Tag1"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println(msg.getKeys() + " " + new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } }
相关文章推荐
- 分布式消息队列 RocketMQ源码解析:事务消息
- RocketMQ学习-事务消息
- RocketMQ源码解析-事务消息的二阶段提交
- 分布式消息队列 RocketMQ源码解析:事务消息
- 分布式消息队列 RocketMQ源码解析:事务消息
- RocketMQ事务消息回查设计方案
- 分布式消息队列 RocketMQ源码解析:事务消息
- RocketMQ存储篇——事务消息相关的文件
- 分布式消息队列 RocketMQ源码解析:事务消息
- 分布式消息队列RocketMQ--事务消息--解决分布式事务的最佳实践
- 分布式消息队列 RocketMQ源码解析:事务消息
- 分布式消息队列RocketMQ--事务消息--解决分布式事务的最佳实践
- 分布式消息队列 RocketMQ源码解析:事务消息
- RocketMQ——Producer篇:发送事务消息
- rocketmq事务消息入门介绍
- rocketmq事务消息入门介绍
- 分布式消息队列RocketMQ--事务消息--解决分布式事务的最佳实践
- 分布式消息队列RocketMQ--事务消息--解决分布式事务的最佳实践
- rocketmq介绍和消息队列事务处理机制
- RocketMQ 源码分析 事务消息