您的位置:首页 > 其它

RocketMQ源码解析-事务消息的二阶段提交

2017-11-26 17:06 906 查看
在生产者producer当中,通过sendMessageInTransaction()方法来发送事务消息,但是在一开始向Broker发送的事务消息的时候,具体的事务操作还并没有进行处理,而是相当于向Broker发送了一条预处理消息。在sendMessageInTransaction()的参数中,除了需要发送的事务消息之外,还需要实现了sendMessageInTransaction接口的类作为参数之一,在executeLocalTransactionBranch()方法中实现具体的事务操作。

可以具体来看在DefaultMQProducerImpl实现的sendMessageInTransaction()方法。
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException {
if (null == tranExecuter) {
throw new MQClientException("tranExecutor is null", null);
}
Validators.checkMessage(msg, this.defaultMQProducer);

SendResult sendResult = null;
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP,
this.defaultMQProducer.getProducerGroup());
try {
sendResult = this.send(msg);
}
catch (Exception e) {
throw new MQClientException("send message Exception", e);
}

LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch (sendResult.getSendStatus()) {
case SEND_OK: {
try {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__",sendResult.getTransactionId());
}
localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}

if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString());
}
}
catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}

try {
this.endTransaction(sendResult, localTransactionState, localException);
}
catch (Exception e) {
log.warn("local transaction execute " + localTransactionState
+ ", but end broker transaction failed", e);
}

TransactionSendResult transactionSendResult = new TransactionSendResult();
transactionSendResult.setSendStatus(sendResult.getSendStatus());
transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
transactionSendResult.setTransactionId(sendResult.getTransactionId());
transactionSendResult.setLocalTransactionState(localTransactionState);
return transactionSendResult;
}

可以看到,在第一次发送消息之前,会给消息的事务属性设置为transactuion_prepared属性,然后的第一次发送逻辑还是与普通的非事务消息没有区别。

第一次的发送只是给事务消息设置了transactuion_prepared属性,可以把目光转到Broker看到对于接收到这一属性消息的处理。

在具体的Broker消息存储中,在消息存储到所有消息消费队列公用的存储文件队列commitLog完毕之前并没有区别,但是在之后消息在commitLog上具体存储偏移量的分发给消息消费队列发生区别。

在创建消息消费队列的过程中的doDispatch()方法,可以看到这段代码。

final int tranType = MessageSysFlag.getTransactionValue(req.getSysFlag());
// 1、分发消息位置信息到ConsumeQueue
switch (tranType) {
case MessageSysFlag.TransactionNotType:
case MessageSysFlag.TransactionCommitType:
// 将请求发到具体的Consume Queue
DefaultMessageStore.this.putMessagePostionInfo(req.getTopic(), req.getQueueId(),
req.getCommitLogOffset(), req.getMsgSize(), req.getTagsCode(),
req.getStoreTimestamp(), req.getConsumeQueueOffset());
break;
case MessageSysFlag.TransactionPreparedType:
case MessageSysFlag.TransactionRollbackType:
break;
}

也就是说,处于preparedType的事务消息并不会将消息的具体存储位置分发到消费队列,也就是说该消息并不会被消费者通消费队列以及消费队列上的偏移量取到并消费。

同理接下来的索引创建也不会将事务第一阶段提交的prepared状态的事务消息创建索引。

 

接下来,可以继续回到之前producer对于事务消息的发送过程。

如果成功接收到了第一次发送的事务消息发送成功的消息,那么将会在作为参数传入的sendMessageInTransaction的executeLocalTransactionBranch()方法当中执行具体的事务消息操作。在结束具体的事务操作之后将会将通过endTransaction()方法将第二次消息提交给Broker当中。

private void endTransaction(//
final SendResult sendResult, //
final LocalTransactionState localTransactionState, //
final Throwable localException) throws RemotingException, MQBrokerException,
InterruptedException, UnknownHostException {
final MessageId id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
String transactionId = sendResult.getTransactionId();
final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().ge
9c1a
tBrokerName());
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
requestHeader.setCommitLogOffset(id.getOffset());
switch (localTransactionState) {
case COMMIT_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TransactionCommitType);
break;
case ROLLBACK_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TransactionRollbackType);
break;
case UNKNOW:
requestHeader.setCommitOrRollback(MessageSysFlag.TransactionNotType);
break;
default:
break;
}

requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());
String remark =
localException != null ? ("executeLocalTransactionBranch exception: " + localException
.toString()) : null;
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
this.defaultMQProducer.getSendMsgTimeout());
}

在第二次消息的发送当中,将会把上一次的消息发送的commitLogOffset位置作为参数,根据具体事务处理的结果给消息的事务属性赋为commit或者rollback发送。

可以转到Broker看到针对第二次事务消息发送的处理。

在第二次消息的处理时,会在EndTrandsactionProcessor当中对于第二次消息进行处理。方法很长,可以看几处重点。

final MessageExt msgExt =
this.brokerController.getMessageStore().lookMessageByOffset(
requestHeader.getCommitLogOffset());

首先会根据第一次提交的消息的commitLogOffset取得存储在commitLog上第一次消息提交的实体。

MessageExtBrokerInner msgInner = this.endMessageTransaction(msgExt);
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(),
requestHeader.getCommitOrRollback()));

msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
msgInner.setStoreTimestamp(msgExt.getStoreTimestamp());
if (MessageSysFlag.TransactionRollbackType == requestHeader.getCommitOrRollback()) {
msgInner.setBody(null);
}

final MessageStore messageStore = this.brokerController.getMessageStore();
final PutMessageResult putMessageResult = messageStore.putMessage(msgInner);

之后会根据第一次提交的消息,构造一个内容一模一样的MessageExtBrokerInner的消息实体,如果是回滚的消息类型,将会给存储消息正文的body赋为null,之后将会重新通过消息存储层DefaultMessageStore的putMessage()方法重新进行消息的存储。

由于事务的状态属性已经为commit,将会正常存入commitLog以及分发给消费队列并创建索引,消费者可以正常消费。

 

 

对于事务消息的回查,RocketMQ并没有开源。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: