分布式事务九_基于可靠消息的最终一致性代码
2018-02-11 22:25
731 查看
分布式事务九_基于可靠消息的最终一致性代码
更多干货
分布式事务处理一分布式事务二
分布式事务处理三
分布式事务四_基于可靠消息的最终一致性
分布式事务五_基于可靠消息的最终一致性_异常流程
分布式事务六_常规MQ队列
分布式事务七_幂等性设计
分布式事务八_可靠消息最终一致性方案
分布式事务九_基于可靠消息的最终一致性代码
分布式事务10_最大努力通知形势
柔性事务解决方案:TCC(两阶段型、补偿型)
支付宝 分布式事务服务 DTS 一
分布式事务服务 DTS二
分布式事务服务 DTS三
支付宝 分布式事务服务 DTS四
一、消息服务子系统
1、表结构
DROP TABLE IF EXISTS `rp_transaction_message`; CREATE TABLE `rp_transaction_message` ( `id` varchar(50) NOT NULL DEFAULT '' COMMENT '主键ID', `version` int(11) NOT NULL DEFAULT '0' COMMENT '版本号', `editor` varchar(100) DEFAULT NULL COMMENT '修改者', `creater` varchar(100) DEFAULT NULL COMMENT '创建者', `edit_time` datetime DEFAULT NULL COMMENT '最后修改时间', `create_time` datetime NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT '创建时间', `message_id` varchar(50) NOT NULL DEFAULT '' COMMENT '消息ID', `message_body` longtext NOT NULL COMMENT '消息内容', `message_data_type` varchar(50) DEFAULT NULL COMMENT '消息数据类型', `consumer_queue` varchar(100) NOT NULL DEFAULT '' COMMENT '消费队列', `message_send_times` smallint(6) NOT NULL DEFAULT '0' COMMENT '消息重发次数', `areadly_dead` varchar(20) NOT NULL DEFAULT '' COMMENT '是否死亡', `status` varchar(20) NOT NULL DEFAULT '' COMMENT '状态', `remark` varchar(200) DEFAULT NULL COMMENT '备注', `field1` varchar(200) DEFAULT NULL COMMENT '扩展字段1', `field2` varchar(200) DEFAULT NULL COMMENT '扩展字段2', `field3` varchar(200) DEFAULT NULL COMMENT '扩展字段3', PRIMARY KEY (`id`), KEY `AK_Key_2` (`message_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
2、RpTransactionMessageService.java
public interface RpTransactionMessageService { /** * 预存储消息. */ public int saveMessageWaitingConfirm(RpTransactionMessage rpTransactionMessage) throws MessageBizException; /** * 确认并发送消息. */ public void confirmAndSendMessage(String messageId) throws MessageBizException; /** * 存储并发送消息. */ public int saveAndSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException; /** * 直接发送消息. */ public void directSendMessage(RpTransactionMessage rpTransacti 21c80 onMessage) throws MessageBizException; /** * 重发消息. */ public void reSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException; /** * 根据messageId重发某条消息. */ public void reSendMessageByMessageId(String messageId) throws MessageBizException; /** * 将消息标记为死亡消息. */ public void setMessageToAreadlyDead(String messageId) throws MessageBizException; /** * 根据消息ID获取消息 */ public RpTransactionMessage getMessageByMessageId(String messageId) throws MessageBizException; /** * 根据消息ID删除消息 */ public void deleteMessageByMessageId(String messageId) throws MessageBizException; /** * 重发某个消息队列中的全部已死亡的消息. */ public void reSendAllDeadMessageByQueueName(String queueName, int batchSize) throws MessageBizException; /** * 获取分页数据 */ PageBean listPage(PageParam pageParam, Map<String, Object> paramMap) throws MessageBizException; }
3、RpTransactionMessageServiceImpl
@Service("rpTransactionMessageService") public class RpTransactionMessageServiceImpl implements RpTransactionMessageService { private static final Log log = LogFactory.getLog(RpTransactionMessageServiceImpl.class); @Autowired private RpTransactionMessageDao rpTransactionMessageDao; @Autowired private JmsTemplate notifyJmsTemplate; public int saveMessageWaitingConfirm(RpTransactionMessage message) { if (message == null) { throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "保存的消息为空"); } if (StringUtil.isEmpty(message.getConsumerQueue())) { throw new MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL, "消息的消费队列不能为空 "); } message.setEditTime(new Date()); message.setStatus(MessageStatusEnum.WAITING_CONFIRM.name()); message.setAreadlyDead(PublicEnum.NO.name()); message.setMessageSendTimes(0); return rpTransactionMessageDao.insert(message); } public void confirmAndSendMessage(String messageId) { final RpTransactionMessage message = getMessageByMessageId(messageId); if (message == null) { throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "根据消息id查找的消息为空"); } message.setStatus(MessageStatusEnum.SENDING.name()); message.setEditTime(new Date()); rpTransactionMessageDao.update(message); notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue()); notifyJmsTemplate.send(new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message.getMessageBody()); } }); } public int saveAndSendMessage(final RpTransactionMessage message) { if (message == null) { throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "保存的消息为空"); } if (StringUtil.isEmpty(message.getConsumerQueue())) { throw new MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL, "消息的消费队列不能为空 "); } message.setStatus(MessageStatusEnum.SENDING.name()); message.setAreadlyDead(PublicEnum.NO.name()); message.setMessageSendTimes(0); message.setEditTime(new Date()); int result = rpTransactionMessageDao.insert(message); notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue()); notifyJmsTemplate.send(new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message.getMessageBody()); } }); return result; } public void directSendMessage(final RpTransactionMessage message) { if (message == null) { throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "保存的消息为空"); } if (StringUtil.isEmpty(message.getConsumerQueue())) { throw new MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL, "消息的消费队列不能为空 "); } notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue()); notifyJmsTemplate.send(new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message.getMessageBody()); } }); } public void reSendMessage(final RpTransactionMessage message) { if (message == null) { throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "保存的消息为空"); } if (StringUtil.isEmpty(message.getConsumerQueue())) { throw new MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL, "消息的消费队列不能为空 "); } message.addSendTimes(); message.setEditTime(new Date()); rpTransactionMessageDao.update(message); notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue()); notifyJmsTemplate.send(new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message.getMessageBody()); } }); } public void reSendMessageByMessageId(String messageId) { final RpTransactionMessage message = getMessageByMessageId(messageId); if (message == null) { throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "根据消息id查找的消息为空"); } int maxTimes = Integer.valueOf(PublicConfigUtil.readConfig("message.max.send.times")); if (message.getMessageSendTimes() >= maxTimes) { message.setAreadlyDead(PublicEnum.YES.name()); } message.setEditTime(new Date()); message.setMessageSendTimes(message.getMessageSendTimes() + 1); rpTransactionMessageDao.update(message); notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue()); notifyJmsTemplate.send(new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message.getMessageBody()); } }); } public void setMessageToAreadlyDead(String messageId) { RpTransactionMessage message = getMessageByMessageId(messageId); if (message == null) { throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "根据消息id查找的消息为空"); } message.setAreadlyDead(PublicEnum.YES.name()); message.setEditTime(new Date()); rpTransactionMessageDao.update(message); } public RpTransactionMessage getMessageByMessageId(String messageId) { Map<String, Object> paramMap = new HashMap<String, Object>(); paramMap.put("messageId", messageId); return rpTransactionMessageDao.getBy(paramMap); } public void deleteMessageByMessageId(String messageId) { Map<String, Object> paramMap = new HashMap<String, Object>(); paramMap.put("messageId", messageId); rpTransactionMessageDao.delete(paramMap); } @SuppressWarnings("unchecked") public void reSendAllDeadMessageByQueueName(String queueName, int batchSize) { log.info("==>reSendAllDeadMessageByQueueName"); int numPerPage = 1000; if (batchSize > 0 && batchSize < 100){ numPerPage = 100; } else if (batchSize > 100 && batchSize < 5000){ numPerPage = batchSize; } else if (batchSize > 5000){ numPerPage = 5000; } else { numPerPage = 1000; } int pageNum = 1; Map<String, Object> paramMap = new HashMap<String, Object>(); paramMap.put("consumerQueue", queueName); paramMap.put("areadlyDead", PublicEnum.YES.name()); paramMap.put("listPageSortType", "ASC"); Map<String, RpTransactionMessage> messageMap = new HashMap<String, RpTransactionMessage>(); List<Object> recordList = new ArrayList<Object>(); int pageCount = 1; PageBean pageBean = rpTransactionMessageDao.listPage(new PageParam(pageNum, numPerPage), paramMap); recordList = pageBean.getRecordList(); if (recordList == null || recordList.isEmpty()) { log.info("==>recordList is empty"); return; } pageCount = pageBean.getTotalPage(); for (final Object obj : recordList) { final RpTransactionMessage message = (RpTransactionMessage) obj; messageMap.put(message.getMessageId(), message); } for (pageNum = 2; pageNum <= pageCount; pageNum++) { pageBean = rpTransactionMessageDao.listPage(new PageParam(pageNum, numPerPage), paramMap); recordList = pageBean.getRecordList(); if (recordList == null || recordList.isEmpty()) { break; } for (final Object obj : recordList) { final RpTransactionMessage message = (RpTransactionMessage) obj; messageMap.put(message.getMessageId(), message); } } recordList = null; pageBean = null; for (Map.Entry<String, RpTransactionMessage> entry : messageMap.entrySet()) { final RpTransactionMessage message = entry.getValue(); message.setEditTime(new Date()); message.setMessageSendTimes(message.getMessageSendTimes() + 1); rpTransactionMessageDao.update(message); notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue()); notifyJmsTemplate.send(new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message.getMessageBody()); } }); } } @SuppressWarnings("unchecked") public PageBean<RpTransactionMessage> listPage(PageParam pageParam, Map<String, Object> paramMap){ return rpTransactionMessageDao.listPage(pageParam, paramMap); } }
二、消息状态确认子系统 与 消息恢复子系统
@Component("messageBiz") public class MessageBiz { private static final Log log = LogFactory.getLog(MessageBiz.class); @Autowired private RpTradePaymentQueryService rpTradePaymentQueryService; @Autowired private RpTransactionMessageService rpTransactionMessageService; /** * 处理[waiting_confirm]状态的消息 * * @param messages */ public void handleWaitingConfirmTimeOutMessages(Map<String, RpTransactionMessage> messageMap) { log.debug("开始处理[waiting_confirm]状态的消息,总条数[" + messageMap.size() + "]"); // 单条消息处理(目前该状态的消息,消费队列全部是accounting,如果后期有业务扩充,需做队列判断,做对应的业务处理。) for (Map.Entry<String, RpTransactionMessage> entry : messageMap.entrySet()) { RpTransactionMessage message = entry.getValue(); try { log.debug("开始处理[waiting_confirm]消息ID为[" + message.getMessageId() + "]的消息"); String bankOrderNo = message.getField1(); RpTradePaymentRecord record = rpTradePaymentQueryService.getRecordByBankOrderNo(bankOrderNo); // 如果订单成功,把消息改为待处理,并发送消息 if (TradeStatusEnum.SUCCESS.name().equals(record.getStatus())) { // 确认并发送消息 rpTransactionMessageService.confirmAndSendMessage(message.getMessageId()); } else if (TradeStatusEnum.WAITING_PAYMENT.name().equals(record.getStatus())) { // 订单状态是等到支付,可以直接删除数据 log.debug("订单没有支付成功,删除[waiting_confirm]消息id[" + message.getMessageId() + "]的消息"); rpTransactionMessageService.deleteMessageByMessageId(message.getMessageId()); } log.debug("结束处理[waiting_confirm]消息ID为[" + message.getMessageId() + "]的消息"); } catch (Exception e) { log.error("处理[waiting_confirm]消息ID为[" + message.getMessageId() + "]的消息异常:", e); } } } /** * 处理[SENDING]状态的消息 * * @param messages */ public void handleSendingTimeOutMessage(Map<String, RpTransactionMessage> messageMap) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); log.debug("开始处理[SENDING]状态的消息,总条数[" + messageMap.size() + "]"); // 根据配置获取通知间隔时间 Map<Integer, Integer> notifyParam = getSendTime(); // 单条消息处理 for (Map.Entry<String, RpTransactionMessage> entry : messageMap.entrySet()) { RpTransactionMessage message = entry.getValue(); try { log.debug("开始处理[SENDING]消息ID为[" + message.getMessageId() + "]的消息"); // 判断发送次数 int maxTimes = Integer.valueOf(PublicConfigUtil.readConfig("message.max.send.times")); log.debug("[SENDING]消息ID为[" + message.getMessageId() + "]的消息,已经重新发送的次数[" + message.getMessageSendTimes() + "]"); // 如果超过最大发送次数直接退出 if (maxTimes < message.getMessageSendTimes()) { // 标记为死亡 rpTransactionMessageService.setMessageToAreadlyDead(message.getMessageId()); continue; } // 判断是否达到发送消息的时间间隔条件 int reSendTimes = message.getMessageSendTimes(); int times = notifyParam.get(reSendTimes == 0 ? 1 : reSendTimes); long currentTimeInMillis = Calendar.getInstance().getTimeInMillis(); long needTime = currentTimeInMillis - times * 60 * 1000; long hasTime = message.getEditTime().getTime(); // 判断是否达到了可以再次发送的时间条件 if (hasTime > needTime) { log.debug("currentTime[" + sdf.format(new Date()) + "],[SENDING]消息上次发送时间[" + sdf.format(message.getEditTime()) + "],必须过了[" + times + "]分钟才可以再发送。"); continue; } // 重新发送消息 rpTransactionMessageService.reSendMessage(message); log.debug("结束处理[SENDING]消息ID为[" + message.getMessageId() + "]的消息"); } catch (Exception e) { log.error("处理[SENDING]消息ID为[" + message.getMessageId() + "]的消息异常:", e); } } } /** * 根据配置获取通知间隔时间 * * @return */ private Map<Integer, Integer> getSendTime() { Map<Integer, Integer> notifyParam = new HashMap<Integer, Integer>(); notifyParam.put(1, Integer.valueOf(PublicConfigUtil.readConfig("message.send.1.time"))); notifyParam.put(2, Integer.valueOf(PublicConfigUtil.readConfig("message.send.2.time"))); notifyParam.put(3, Integer.valueOf(PublicConfigUtil.readConfig("message.send.3.time"))); notifyParam.put(4, Integer.valueOf(PublicConfigUtil.readConfig("message.send.4.time"))); notifyParam.put(5, Integer.valueOf(PublicConfigUtil.readConfig("message.send.5.time"))); return notifyParam; } }
三、实时消息子系统
public class AccountingMessageListener implements SessionAwareMessageListener<Message> { private static final Log LOG = LogFactory.getLog(AccountingMessageListener.class); /** * 会计队列模板(由Spring创建并注入进来) */ @Autowired private JmsTemplate notifyJmsTemplate; @Autowired private RpAccountingVoucherService rpAccountingVoucherService; @Autowired private RpTransactionMessageService rpTransactionMessageService; public synchronized void onMessage(Message message, Session session) { RpAccountingVoucher param = null; String strMessage = null; try { ActiveMQTextMessage objectMessage = (ActiveMQTextMessage) message; strMessage = objectMessage.getText(); LOG.info("strMessage1 accounting:" + strMessage); param = JSONObject.parseObject(strMessage, RpAccountingVoucher.class); // 这里转换成相应的对象还有问题 if (param == null) { LOG.info("param参数为空"); return; } int entryType = param.getEntryType(); double payerChangeAmount = param.getPayerChangeAmount(); String voucherNo = param.getVoucherNo(); String payerAccountNo = param.getPayerAccountNo(); int fromSystem = param.getFromSystem(); int payerAccountType = 0; if (param.getPayerAccountType() != null && !param.getPayerAccountType().equals("")) { payerAccountType = param.getPayerAccountType(); } double payerFee = param.getPayerFee(); String requestNo = param.getRequestNo(); double bankChangeAmount = param.getBankChangeAmount(); double receiverChangeAmount = param.getReceiverChangeAmount(); String receiverAccountNo = param.getReceiverAccountNo(); String bankAccount = param.getBankAccount(); String bankChannelCode = param.getBankChannelCode(); double profit = param.getProfit(); double income = param.getIncome(); double cost = param.getCost(); String bankOrderNo = param.getBankOrderNo(); int receiverAccountType = 0; double payAmount = param.getPayAmount(); if (param.getReceiverAccountType() != null && !param.getReceiverAccountType().equals("")) { receiverAccountType = param.getReceiverAccountType(); } double receiverFee = param.getReceiverFee(); String remark = param.getRemark(); rpAccountingVoucherService.createAccountingVoucher(entryType, voucherNo, payerAccountNo, receiverAccountNo, payerChangeAmount, receiverChangeAmount, income, cost, profit, bankChangeAmount, requestNo, bankChannelCode, bankAccount, fromSystem, remark, bankOrderNo, payerAccountType, payAmount, receiverAccountType, payerFee, receiverFee); //删除消息 rpTransactionMessageService.deleteMessageByMessageId(param.getMessageId()); } catch (BizException e) { // 业务异常,不再写会队列 LOG.error("==>BizException", e); } catch (Exception e) { // 不明异常不再写会队列 LOG.error("==>Exception", e); } } public JmsTemplate getNotifyJmsTemplate() { return notifyJmsTemplate; } public void setNotifyJmsTemplate(JmsTemplate notifyJmsTemplate) { this.notifyJmsTemplate = notifyJmsTemplate; } public RpAccountingVoucherService getRpAccountingVoucherService() { return rpAccountingVoucherService; } public void setRpAccountingVoucherService(RpAccountingVoucherService rpAccountingVoucherService) { this.rpAccountingVoucherService = rpAccountingVoucherService; } }
相关文章推荐
- 分布式事务五_基于可靠消息的最终一致性_异常流程
- 分布式事务四_基于可靠消息的最终一致性
- 【分布式事务】可靠消息最终一致性方案
- 分布式事务解决方案之消息最终一致性(可靠消息服务)上篇
- 分布式事务解决方案之消息最终一致性(可靠消息服务)下篇
- 如何选择分布式事务形态(TCC,SAGA,2PC,基于消息最终一致性等等)
- 分布式事务解决方案一之:可靠消息最终一致性
- 分布式事务八_可靠消息最终一致性方案
- 基于可靠消息的分布式事务错误处理
- 分布式消息最终一致性事务
- QNJR-GROUP/EasyTransaction: 依赖于Spring的一个柔性事务实现,包含 TCC事务,补偿事务,基于消息的最终一致性事务,基于消息的最大努力交付事务交付QNJR-GROUP/EasyTransaction: 依赖于Spring的一个柔性事务实现,包含 TCC事务,补偿事务,基于消息的最终一致性事务,基于消息的最大努力交付事务交付
- 微服务架构分布式事务解决方案设计思路(可靠消息最终一致方案-概念)
- 分布式事务方案:可靠消息最终一致方案
- 分布式消息最终一致性事务
- 分布式事务解决方案之消息发送一致性(可靠消息的前提保障)
- (微服务)分布式事务-最大努力交付 && 消息最终一致性方案
- 微服务架构分布式事务解决方案设计思路(可靠消息最终一致方案-设计方案)
- 分布式事务最终一致性常用方案
- 消息中间件(一)分布式系统事务一致性解决方案大对比,谁最好使?
- 分布式事务最终一致性常用方案