您的位置:首页 > 编程语言 > Java开发

RocketMQ 事务消息

2016-10-19 00:00 141 查看
RocketMQ将事务拆分成小事务异步执行的方式来执行。
RocketMQ第一阶段发送Prepared消息时,会拿到消息的地址,第二阶段执行本地事物,第三阶段通过第一阶段拿到的地址去访问消息,并修改状态。RocketMQ会定期扫描消息集群中的事物消息,这时候发现了Prepared消息,它会向消息发送者确认,RocketMQ会根据发送端设置的策略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。
RocketMQ事务消息:



TransactionCheckListenerImpl:

package aaron.mq.producer;

import com.alibaba.rocketmq.client.producer.LocalTransactionState;
import com.alibaba.rocketmq.client.producer.TransactionCheckListener;
import com.alibaba.rocketmq.common.message.MessageExt;

/**
* Created by Aaron Sheng on 10/19/16.
* TransactionCheckListenerImpl handle transaction unsettled.
* Broker will notify producer to check local transaction.
*/
public class TransactionCheckListenerImpl implements TransactionCheckListener {

@Override
public LocalTransactionState checkLocalTransactionState(MessageExt messageExt) {
System.out.println("checkLocalTransactionState");
System.out.println("topic: " + messageExt.getTopic());
System.out.println("body: " + messageExt.getBody());

return LocalTransactionState.ROLLBACK_MESSAGE;
}
}


TransactionExecuterImpl:

package aaron.mq.producer;

import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter;
import com.alibaba.rocketmq.client.producer.LocalTransactionState;
import com.alibaba.rocketmq.common.message.Message;

import java.util.concurrent.atomic.AtomicInteger;

/**
* Created by Aaron Sheng on 10/19/16.
* TransactionExecuterImpl executre local trancation and return result to broker.
*/
public class TransactionExecuterImpl implements LocalTransactionExecuter {
private AtomicInteger transactionIndex = new AtomicInteger(0);

@Override
public LocalTransactionState executeLocalTransactionBranch(Message message, Object o) {
System.out.println("executeLocalTransactionBranch " + message.toString());

int value = transactionIndex.getAndIncrement();
if ((value % 3) == 0) {
return LocalTransactionState.COMMIT_MESSAGE;
} else if ((value % 3) == 1) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else{
return LocalTransactionState.UNKNOW;
}
}
}


TransactionProducer:

package aaron.mq.producer;

import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.client.producer.TransactionCheckListener;
import com.alibaba.rocketmq.client.producer.TransactionMQProducer;
import com.alibaba.rocketmq.common.message.Message;

/**
* Created by Aaron Sheng on 10/19/16.
*/
public class TransactionProducer {
public static void produce() throws MQClientException {
TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("TxProducer");
producer.setCheckThreadPoolMinSize(2);
producer.setCheckThreadPoolMaxSize(4);
producer.setCheckRequestHoldMax(2000);
producer.setTransactionCheckListener(transactionCheckListener);
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setInstanceName("TxProducer-instance1");
producer.setVipChannelEnabled(false);
producer.start();

TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
try {
for (int i = 0; i < 1000; i++) {
Message msg = new Message("Topic1",
"Tag1",
"OrderId" + i,
("Body" + i).getBytes());
SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
System.out.println(sendResult);
Thread.sleep(1000);
}
}
catch (Exception e) {
e.printStackTrace();
} finally {
producer.shutdown();
}
}
}


RocketMQConsumer:

package aaron.mq.consumer;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;

import java.util.List;

/**
* Created by Aaron Sheng on 10/17/16.
*/
public class RocketMQConsumer {
public static void consume() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Consumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setVipChannelEnabled(false);
consumer.setInstanceName("rmq-instance");
consumer.subscribe("Topic1", "Tag1");

consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(msg.getKeys() + " " + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Java RocketMQ