解决事物提交与消息发送顺序问题
2019-05-13 17:03
483 查看
最近在线上发现了一个问题,mq的监听时常会报消息不存在的异常,关键代码如下:
public void sendMessage(MessageData message) throws Exception { if (message == null) return; // 持久化消息 ① String messageId = rpTransactionControlService.createTransactionControl(message.getMessageBody(), message.getMessageType(), message.getQueue(), delayTime, message.getField1(), message.getField2(), message.getField3()); log.info("create transaction control messageid = " + messageId); message.setMessageId(messageId); // 发送消息 ② ActiveMQClientPool.getInstance().sendMsg(sendMessage, message.getQueue(), message.getDelayTime() * 1000); }
导致的原因就是 ②已经消息发送了,但是①还没有事物提交,就导致了问题。
解决办法 1、 增加延迟发送 。
2、 增加事物监听。
针对1方法,如果是activemq,有一个需要注意的地方, 需要修改activemq.xml
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true">
即,增加 schedulerSupport="true" 参数
针对2方法, 需要先创建一个 TransactionalMessageListener 类
@Component @Slf4j public class TransactionalMessageListener { @TransactionalEventListener(fallbackExecution = true, phase = TransactionPhase.AFTER_COMMIT) public void afterCommit(EventMessage eventMessage) throws Exception { if (eventMessage == null) return; if (eventMessage.getEventType() == null) throw new NullPointerException("eventType"); if (eventMessage.getEventType() == EventType.ACTIVE_MQ) { // 防止事物已经还没有提交,mq监听器就已经收到了消息 if (eventMessage.getData() instanceof MessageData) { MessageData message = (MessageData) eventMessage.getData(); String sendMessage = JSONObject.toJSONString(message); log.info("push msg to activeMq, context :{ " + sendMessage + "}"); try { ActiveMQClientPool.getInstance().sendMsg(sendMessage, message.getQueue(), message.getDelayTime() * 1000); } catch (Exception e) { log.error("push msg error to activeMq:{" + sendMessage + "}", e); } } } } }
public class EventMessage<T> { public EventMessage(EventType eventType, T data) { this.eventType = eventType; this.data = data; } private EventType eventType; private T data; public EventType getEventType() { return eventType; } public void setEventType(EventType eventType) { this.eventType = eventType; } public T getData() { return data; } public void setData(T data) { this.data = data; } }
然后修改原来逻辑如下
@Autowired
ApplicationEventPublisher publisher;
public void sendMessage(MessageData message) throws Exception { if (message == null) return; String messageId = rpTransactionControlService.createTransactionControl(message.getMessageBody(), message.getMessageType(), message.getQueue(), delayTime, message.getField1(), message.getField2(), message.getField3()); log.info("create transaction control messageid = " + messageId); message.setMessageId(messageId);
publisher.publishEvent(new EventMessage<MessageData>(EventType.MQ, message));
}
这里我需要对这个进行解释一下: @TransactionalEventListener(fallbackExecution = true, phase = TransactionPhase.AFTER_COMMIT)
fallbackExecution = true 是为了保证没有事物的时候也能正常收到消息
phase = TransactionPhase.AFTER_COMMIT 代表提交后监听
相关文章推荐
- 理解websocket协议解决消息发送问题 Could not decode a text frame as UTF-8.
- 工作线程调用sendmessage()向主对话框发送自定义消息遇到的问题以及解决
- MQ如何解决消息的顺序问题和消息的重复问题
- 简单收集微信小程序formId,解决发送模板消息不够用的问题
- MQ如何解决消息的顺序问题和消息的重复问题
- ajax提交相同url,重复发送请求后台,页面无更新的问题解决
- 片段中初始化Umeng反馈,客户端发送消息无法传送到服务端问题解决
- 消息id乱序接收但顺序发送问题
- 使用AppDelegate单例,解决子视图无法给父视图发送消息的问题
- 钩子中向窗口发送消息、操作窗口无反应的问题解决方法(Hook dll ShowWindow HWND)
- 使用Bot Framework遇到WebChat无法发送消息的问题解决
- 如何解决netty发送消息截断问题
- 解决rabbitmq消息队列的顺序及重复消费问题
- 使用AppDelegate单例,解决子视图无法给父视图发送消息的问题
- Spring-boot JMS 发送消息慢的问题解决
- mysql中大事物提交延迟问题及解决方法
- 【转】解决Maxwell发送Kafka消息数据倾斜问题
- kafka无法发送消息问题处理
- 解决页面刷新重复提交的问题
- android客户端通过Get方式提交参数给服务器,使用URL和HttpURLConnection实现,以及乱码问题解决