您的位置:首页 > 其它

解决事物提交与消息发送顺序问题

2019-05-13 17:03 483 查看

最近在线上发现了一个问题,mq的监听时常会报消息不存在的异常,关键代码如下:

public void sendMessage(MessageData message) throws Exception {
if (message == null) return;

// 持久化消息 ①
String messageId = rpTransactionControlService.createTransactionControl(message.getMessageBody(), message.getMessageType(), message.getQueue(), delayTime, message.getField1(), message.getField2(), message.getField3());
log.info("create transaction control messageid = " + messageId);
message.setMessageId(messageId);

// 发送消息 ②
ActiveMQClientPool.getInstance().sendMsg(sendMessage, message.getQueue(), message.getDelayTime() * 1000);

}

导致的原因就是 ②已经消息发送了,但是①还没有事物提交,就导致了问题。

解决办法 1、 增加延迟发送 。

                2、 增加事物监听。

针对1方法,如果是activemq,有一个需要注意的地方, 需要修改activemq.xml  

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true">

 即,增加 schedulerSupport="true" 参数

针对2方法, 需要先创建一个 TransactionalMessageListener 类

@Component
@Slf4j
public class TransactionalMessageListener {

@TransactionalEventListener(fallbackExecution = true, phase = TransactionPhase.AFTER_COMMIT)
public void afterCommit(EventMessage eventMessage) throws Exception {

if (eventMessage == null) return;

if (eventMessage.getEventType() == null) throw new NullPointerException("eventType");

if (eventMessage.getEventType() == EventType.ACTIVE_MQ) { // 防止事物已经还没有提交,mq监听器就已经收到了消息

if (eventMessage.getData() instanceof MessageData) {
MessageData message = (MessageData) eventMessage.getData();
String sendMessage = JSONObject.toJSONString(message);
log.info("push msg to activeMq, context :{ " + sendMessage + "}");
try {
ActiveMQClientPool.getInstance().sendMsg(sendMessage, message.getQueue(), message.getDelayTime() * 1000);
} catch (Exception e) {
log.error("push msg error to activeMq:{" + sendMessage + "}", e);
}
}

}
}

}
public class EventMessage<T> {

public EventMessage(EventType eventType, T data) {
this.eventType = eventType;
this.data = data;
}

private EventType eventType;

private T data;

public EventType getEventType() {
return eventType;
}

public void setEventType(EventType eventType) {
this.eventType = eventType;
}

public T getData() {
return data;
}

public void setData(T data) {
this.data = data;
}
}

然后修改原来逻辑如下


@Autowired
ApplicationEventPublisher publisher;
public void sendMessage(MessageData message) throws Exception {
if (message == null) return;

String messageId = rpTransactionControlService.createTransactionControl(message.getMessageBody(), message.getMessageType(), message.getQueue(), delayTime, message.getField1(), message.getField2(), message.getField3());
log.info("create transaction control messageid = " + messageId);
message.setMessageId(messageId);

publisher.publishEvent(new EventMessage<MessageData>(EventType.MQ, message));
}

这里我需要对这个进行解释一下:   @TransactionalEventListener(fallbackExecution = true, phase = TransactionPhase.AFTER_COMMIT)     

fallbackExecution = true 是为了保证没有事物的时候也能正常收到消息

 phase = TransactionPhase.AFTER_COMMIT  代表提交后监听

 

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: