分布式事务解决方案之消息最终一致性(可靠消息服务)下篇
2018-04-02 13:50
706 查看
背景:1.支付成功 通知订单完成2.订单完成,通知会计记账上游订单服务,必须开放可查询订单状态接口,判断消息是否可以发送下游会计消费成功后,必须回调消息服务,ACK操作(约束:幂等性。 例如:消息id等)材料摘自龙果学院:http://www.roncoo.com/
流程:订单服务: 预存储消息 -> 订单完成 -> 确认发送消息 会计服务:消费订单消息 -> 完成记账 -> 确认消息已消费
消息生命周期: 预存储 -> 发送中 -> 销毁/确认消费
1.Create MySql Message Table
c966
不需要配置MQ自身重发机制,一定时间内不成功 ,规定次数内重发,重发时间可设置梯度public interface RpTransactionMessageService {
/**
* 预存储消息.
*/
public int saveMessageWaitingConfirm(RpTransactionMessage rpTransactionMessage) throws MessageBizException;
/**
* 确认并发送消息.
*/
public void confirmAndSendMessage(String messageId) throws MessageBizException;
/**
* 存储并发送消息.
*/
public int saveAndSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException;
/**
* 直接发送消息.
*/
public void directSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException;
/**
* 重发消息.
*/
public void reSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException;
/**
* 根据messageId重发某条消息.
*/
public void reSendMessageByMessageId(String messageId) throws MessageBizException;
/**
* 将消息标记为死亡消息.
*/
public void setMessageToAreadlyDead(String messageId) throws MessageBizException;
/**
* 根据消息ID获取消息
*/
public RpTransactionMessage getMessageByMessageId(String messageId) throws MessageBizException;
/**
* 根据消息ID删除消息
*/
public void deleteMessageByMessageId(String messageId) throws MessageBizException;
/**
* 重发某个消息队列中的全部已死亡的消息.
*/
public void reSendAllDeadMessageByQueueName(String queueName, int batchSize) throws MessageBizException;
/**
* 获取分页数据
*/
PageBean listPage(PageParam pageParam, Map<String, Object> paramMap) throws MessageBizException;
}
3.Message APP (消息确认,消息恢复)/**
* 消息定时器接口
*/
public interface MessageScheduled {
/**
* 处理状态为“待确认”但已超时的消息.
*/
public void handleWaitingConfirmTimeOutMessages();
/**
* 处理状态为“发送中”但超时没有被成功消费确认的消息
*/
public void handleSendingTimeOutMessage();
}
·消费消息,完成业务操作后,调用消息服务ACK确认消息已经被消费
5.Message WEB (消息管理)消息管理可视化界面,处理死亡消息等
优化
1.订单完成后,调用消息RPC确认消息可能超时,订单将会被回滚,但是消息已经被确认,导致会计成功记账方案:确认消息设置成异步<dubbo:reference interface="com.roncoo.pay.service.message.api.RpTransactionMessageService" id="rpTransactionMessageService" check="false"><dubbo:method name="confirmAndSendMessage" async="true" return="false" /></dubbo:reference>
2.消息存储DB方案:redis,mongodb
3.被动方业务幂等性判断方案:业务操作成功,记录消息id
4.服务部署集群导致任务重复调度方案:分布式任务调度(当当开源elastic-job,分布式作业调度框架)
5.实时消息服务方案:rabbitMQ,rocketMQ
6.性能提高,解耦方案:每个业务使用一套消息服务
流程:订单服务: 预存储消息 -> 订单完成 -> 确认发送消息 会计服务:消费订单消息 -> 完成记账 -> 确认消息已消费
消息生命周期: 预存储 -> 发送中 -> 销毁/确认消费
1.Create MySql Message Table
ROP TABLE IF EXISTS `rp_transaction_message`; CREATE TABLE `rp_transaction_message` ( `id` varchar(50) NOT NULL DEFAULT '' COMMENT '主键ID', `version` int(11) NOT NULL DEFAULT '0' COMMENT '版本号', `editor` varchar(100) DEFAULT NULL COMMENT '修改者', `creater` varchar(100) DEFAULT NULL COMMENT '创建者', `edit_time` datetime DEFAULT NULL COMMENT '最后修改时间', `create_time` datetime NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT '创建时间', `message_id` varchar(50) NOT NULL DEFAULT '' COMMENT '消息ID', `message_body` longtext NOT NULL COMMENT '消息内容', `message_data_type` varchar(50) DEFAULT NULL COMMENT '消息数据类型', `consumer_queue` varchar(100) NOT NULL DEFAULT '' COMMENT '消费队列', `message_send_times` smallint(6) NOT NULL DEFAULT '0' COMMENT '消息重发次数', `areadly_dead` varchar(20) NOT NULL DEFAULT '' COMMENT '是否死亡', `status` varchar(20) NOT NULL DEFAULT '' COMMENT '状态', `remark` varchar(200) DEFAULT NULL COMMENT '备注', `field` varchar(1024) DEFAULT NULL COMMENT '扩展字段', PRIMARY KEY (`id`), KEY `AK_Key_2` (`message_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;2.Message API (消息服务API)
c966
不需要配置MQ自身重发机制,一定时间内不成功 ,规定次数内重发,重发时间可设置梯度public interface RpTransactionMessageService {
/**
* 预存储消息.
*/
public int saveMessageWaitingConfirm(RpTransactionMessage rpTransactionMessage) throws MessageBizException;
/**
* 确认并发送消息.
*/
public void confirmAndSendMessage(String messageId) throws MessageBizException;
/**
* 存储并发送消息.
*/
public int saveAndSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException;
/**
* 直接发送消息.
*/
public void directSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException;
/**
* 重发消息.
*/
public void reSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException;
/**
* 根据messageId重发某条消息.
*/
public void reSendMessageByMessageId(String messageId) throws MessageBizException;
/**
* 将消息标记为死亡消息.
*/
public void setMessageToAreadlyDead(String messageId) throws MessageBizException;
/**
* 根据消息ID获取消息
*/
public RpTransactionMessage getMessageByMessageId(String messageId) throws MessageBizException;
/**
* 根据消息ID删除消息
*/
public void deleteMessageByMessageId(String messageId) throws MessageBizException;
/**
* 重发某个消息队列中的全部已死亡的消息.
*/
public void reSendAllDeadMessageByQueueName(String queueName, int batchSize) throws MessageBizException;
/**
* 获取分页数据
*/
PageBean listPage(PageParam pageParam, Map<String, Object> paramMap) throws MessageBizException;
}
3.Message APP (消息确认,消息恢复)/**
* 消息定时器接口
*/
public interface MessageScheduled {
/**
* 处理状态为“待确认”但已超时的消息.
*/
public void handleWaitingConfirmTimeOutMessages();
/**
* 处理状态为“发送中”但超时没有被成功消费确认的消息
*/
public void handleSendingTimeOutMessage();
}
/** * message业务处理类 */ @Component("messageBiz") public class MessageBiz { private static final Log log = LogFactory.getLog(MessageBiz.class); @Autowired private RpTradePaymentQueryService rpTradePaymentQueryService; @Autowired private RpTransactionMessageService rpTransactionMessageService; /** * 处理[waiting_confirm]状态的消息 * * @param messageMap */ public void handleWaitingConfirmTimeOutMessages(Map<String, RpTransactionMessage> messageMap) { log.debug("开始处理[waiting_confirm]状态的消息,总条数[" + messageMap.size() + "]"); // 单条消息处理(目前该状态的消息,消费队列全部是accounting,如果后期有业务扩充,需做队列判断,做对应的业务处理。) for (Map.Entry<String, RpTransactionMessage> entry : messageMap.entrySet()) { RpTransactionMessage message = entry.getValue(); try { log.debug("结束处理[waiting_confirm]消息ID为[" + message.getMessageId() + "]的消息"); } catch (Exception e) { log.error("处理[waiting_confirm]消息ID为[" + message.getMessageId() + "]的消息异常:", e); } } } /** * 处理[SENDING]状态的消息 * * @param messageMap */ public void handleSendingTimeOutMessage(Map<String, RpTransactionMessage> messageMap) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); log.debug("开始处理[SENDING]状态的消息,总条数[" + messageMap.size() + "]"); // 根据配置获取通知间隔时间 Map<Integer, Integer> notifyParam = getSendTime(); // 单条消息处理 for (Map.Entry<String, RpTransactionMessage> entry : messageMap.entrySet()) { RpTransactionMessage message = entry.getValue(); try { log.debug("开始处理[SENDING]消息ID为[" + message.getMessageId() + "]的消息"); // 判断发送次数 int maxTimes = Integer.valueOf(PublicConfigUtil.readConfig("message.max.send.times")); log.debug("[SENDING]消息ID为[" + message.getMessageId() + "]的消息,已经重新发送的次数[" + message.getMessageSendTimes() + "]"); // 如果超过最大发送次数直接退出 if (maxTimes < message.getMessageSendTimes()) { // 标记为死亡 rpTransactionMessageService.setMessageToAreadlyDead(message.getMessageId()); continue; } // 判断是否达到发送消息的时间间隔条件 int reSendTimes = message.getMessageSendTimes(); int times = notifyParam.get(reSendTimes == 0 ? 1 : reSendTimes); long currentTimeInMillis = Calendar.getInstance().getTimeInMillis(); long needTime = currentTimeInMillis - times * 60 * 1000; long hasTime = message.getEditTime().getTime(); // 判断是否达到了可以再次发送的时间条件 if (hasTime > needTime) { log.debug("currentTime[" + sdf.format(new Date()) + "],[SENDING]消息上次发送时间[" + sdf.format(message.getEditTime()) + "],必须过了[" + times + "]分钟才可以再发送。"); continue; } // 重新发送消息 rpTransactionMessageService.reSendMessage(message); log.debug("结束处理[SENDING]消息ID为[" + message.getMessageId() + "]的消息"); } catch (Exception e) { log.error("处理[SENDING]消息ID为[" + message.getMessageId() + "]的消息异常:", e); } } } /** * 根据配置获取通知间隔时间 * * @return */ private Map<Integer, Integer> getSendTime() { //TODO... config 配置 次数相对应的时间间隔 } }4.Message QUEUE(消息消费)
·消费消息,完成业务操作后,调用消息服务ACK确认消息已经被消费
5.Message WEB (消息管理)消息管理可视化界面,处理死亡消息等
优化
1.订单完成后,调用消息RPC确认消息可能超时,订单将会被回滚,但是消息已经被确认,导致会计成功记账方案:确认消息设置成异步<dubbo:reference interface="com.roncoo.pay.service.message.api.RpTransactionMessageService" id="rpTransactionMessageService" check="false"><dubbo:method name="confirmAndSendMessage" async="true" return="false" /></dubbo:reference>
2.消息存储DB方案:redis,mongodb
3.被动方业务幂等性判断方案:业务操作成功,记录消息id
4.服务部署集群导致任务重复调度方案:分布式任务调度(当当开源elastic-job,分布式作业调度框架)
5.实时消息服务方案:rabbitMQ,rocketMQ
6.性能提高,解耦方案:每个业务使用一套消息服务
相关文章推荐
- 分布式事务解决方案之消息最终一致性(可靠消息服务)上篇
- 微服务架构分布式事务解决方案设计思路(可靠消息最终一致方案-设计方案)
- 微服务架构分布式事务解决方案设计思路(可靠消息最终一致方案-概念)
- 分布式事务解决方案一之:可靠消息最终一致性
- 分布式事务八_可靠消息最终一致性方案
- 【分布式事务】可靠消息最终一致性方案
- 分布式事务四_基于可靠消息的最终一致性
- 分布式事务九_基于可靠消息的最终一致性代码
- 分布式事务解决方案之消息发送一致性(可靠消息的前提保障)
- 分布式事务五_基于可靠消息的最终一致性_异常流程
- (微服务)分布式事务-最大努力交付 && 消息最终一致性方案
- 微服务~分布式事务里的最终一致性
- 消息中间件(一)分布式系统事务一致性解决方案大对比,谁最好使?
- 分布式事务方案:可靠消息最终一致方案
- 分布式事物学习之(可靠消息一致性方案之独立消息服务)(七)
- 如何选择分布式事务形态(TCC,SAGA,2PC,基于消息最终一致性等等)
- 微服务~分布式事务里的最终一致性
- 微服务~分布式事务里的最终一致性
- 分布式消息最终一致性事务
- 使用kafka消息队列解决分布式事务(可靠消息最终一致性方案-本地消息服务)