您的位置:首页 > 其它

分布式事务解决方案之消息最终一致性(可靠消息服务)下篇

2018-04-02 13:50 706 查看
背景:1.支付成功 通知订单完成2.订单完成,通知会计记账上游订单服务,必须开放可查询订单状态接口,判断消息是否可以发送下游会计消费成功后,必须回调消息服务,ACK操作(约束:幂等性。 例如:消息id等)材料摘自龙果学院:http://www.roncoo.com/

流程:订单服务: 预存储消息 -> 订单完成 -> 确认发送消息 会计服务:消费订单消息 -> 完成记账 -> 确认消息已消费
消息生命周期: 预存储 -> 发送中 -> 销毁/确认消费
1.Create MySql Message Table
ROP TABLE IF EXISTS `rp_transaction_message`;
CREATE TABLE `rp_transaction_message` (
`id` varchar(50) NOT NULL DEFAULT '' COMMENT '主键ID',
`version` int(11) NOT NULL DEFAULT '0' COMMENT '版本号',
`editor` varchar(100) DEFAULT NULL COMMENT '修改者',
`creater` varchar(100) DEFAULT NULL COMMENT '创建者',
`edit_time` datetime DEFAULT NULL COMMENT '最后修改时间',
`create_time` datetime NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT '创建时间',
`message_id` varchar(50) NOT NULL DEFAULT '' COMMENT '消息ID',
`message_body` longtext NOT NULL COMMENT '消息内容',
`message_data_type` varchar(50) DEFAULT NULL COMMENT '消息数据类型',
`consumer_queue` varchar(100) NOT NULL DEFAULT '' COMMENT '消费队列',
`message_send_times` smallint(6) NOT NULL DEFAULT '0' COMMENT '消息重发次数',
`areadly_dead` varchar(20) NOT NULL DEFAULT '' COMMENT '是否死亡',
`status` varchar(20) NOT NULL DEFAULT '' COMMENT '状态',
`remark` varchar(200) DEFAULT NULL COMMENT '备注',
`field` varchar(1024) DEFAULT NULL COMMENT '扩展字段',
PRIMARY KEY (`id`),
KEY `AK_Key_2` (`message_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
2.Message API (消息服务API)

c966
不需要配置MQ自身重发机制,一定时间内不成功 ,规定次数内重发,重发时间可设置梯度public interface RpTransactionMessageService {

/**
* 预存储消息.
*/
public int saveMessageWaitingConfirm(RpTransactionMessage rpTransactionMessage) throws MessageBizException;

/**
* 确认并发送消息.
*/
public void confirmAndSendMessage(String messageId) throws MessageBizException;

/**
* 存储并发送消息.
*/
public int saveAndSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException;

/**
* 直接发送消息.
*/
public void directSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException;

/**
* 重发消息.
*/
public void reSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException;

/**
* 根据messageId重发某条消息.
*/
public void reSendMessageByMessageId(String messageId) throws MessageBizException;

/**
* 将消息标记为死亡消息.
*/
public void setMessageToAreadlyDead(String messageId) throws MessageBizException;

/**
* 根据消息ID获取消息
*/
public RpTransactionMessage getMessageByMessageId(String messageId) throws MessageBizException;

/**
* 根据消息ID删除消息
*/
public void deleteMessageByMessageId(String messageId) throws MessageBizException;

/**
* 重发某个消息队列中的全部已死亡的消息.
*/
public void reSendAllDeadMessageByQueueName(String queueName, int batchSize) throws MessageBizException;

/**
* 获取分页数据
*/
PageBean listPage(PageParam pageParam, Map<String, Object> paramMap) throws MessageBizException;

}
3.Message APP (消息确认,消息恢复)/**
* 消息定时器接口
*/
public interface MessageScheduled {

/**
* 处理状态为“待确认”但已超时的消息.
*/
public void handleWaitingConfirmTimeOutMessages();

/**
* 处理状态为“发送中”但超时没有被成功消费确认的消息
*/
public void handleSendingTimeOutMessage();

}
/**
* message业务处理类
*/
@Component("messageBiz")
public class MessageBiz {

private static final Log log = LogFactory.getLog(MessageBiz.class);

@Autowired
private RpTradePaymentQueryService rpTradePaymentQueryService;
@Autowired
private RpTransactionMessageService rpTransactionMessageService;

/**
* 处理[waiting_confirm]状态的消息
*
* @param messageMap
*/
public void handleWaitingConfirmTimeOutMessages(Map<String, RpTransactionMessage> messageMap) {
log.debug("开始处理[waiting_confirm]状态的消息,总条数[" + messageMap.size() + "]");
// 单条消息处理(目前该状态的消息,消费队列全部是accounting,如果后期有业务扩充,需做队列判断,做对应的业务处理。)
for (Map.Entry<String, RpTransactionMessage> entry : messageMap.entrySet()) {
RpTransactionMessage message = entry.getValue();
try {
log.debug("结束处理[waiting_confirm]消息ID为[" + message.getMessageId() + "]的消息");
} catch (Exception e) {
log.error("处理[waiting_confirm]消息ID为[" + message.getMessageId() + "]的消息异常:", e);
}
}
}

/**
* 处理[SENDING]状态的消息
*
* @param messageMap
*/
public void handleSendingTimeOutMessage(Map<String, RpTransactionMessage> messageMap) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
log.debug("开始处理[SENDING]状态的消息,总条数[" + messageMap.size() + "]");

// 根据配置获取通知间隔时间
Map<Integer, Integer> notifyParam = getSendTime();

// 单条消息处理
for (Map.Entry<String, RpTransactionMessage> entry : messageMap.entrySet()) {
RpTransactionMessage message = entry.getValue();
try {
log.debug("开始处理[SENDING]消息ID为[" + message.getMessageId() + "]的消息");
// 判断发送次数
int maxTimes = Integer.valueOf(PublicConfigUtil.readConfig("message.max.send.times"));
log.debug("[SENDING]消息ID为[" + message.getMessageId() + "]的消息,已经重新发送的次数[" + message.getMessageSendTimes() + "]");

// 如果超过最大发送次数直接退出
if (maxTimes < message.getMessageSendTimes()) {
// 标记为死亡
rpTransactionMessageService.setMessageToAreadlyDead(message.getMessageId());
continue;
}
// 判断是否达到发送消息的时间间隔条件
int reSendTimes = message.getMessageSendTimes();
int times = notifyParam.get(reSendTimes == 0 ? 1 : reSendTimes);
long currentTimeInMillis = Calendar.getInstance().getTimeInMillis();
long needTime = currentTimeInMillis - times * 60 * 1000;
long hasTime = message.getEditTime().getTime();
// 判断是否达到了可以再次发送的时间条件
if (hasTime > needTime) {
log.debug("currentTime[" + sdf.format(new Date()) + "],[SENDING]消息上次发送时间[" + sdf.format(message.getEditTime()) + "],必须过了[" + times + "]分钟才可以再发送。");
continue;
}

// 重新发送消息
rpTransactionMessageService.reSendMessage(message);

log.debug("结束处理[SENDING]消息ID为[" + message.getMessageId() + "]的消息");
} catch (Exception e) {
log.error("处理[SENDING]消息ID为[" + message.getMessageId() + "]的消息异常:", e);
}
}

}

/**
* 根据配置获取通知间隔时间
*
* @return
*/
private Map<Integer, Integer> getSendTime() {
//TODO...   config 配置 次数相对应的时间间隔
}
}
4.Message QUEUE(消息消费)
·消费消息,完成业务操作后,调用消息服务ACK确认消息已经被消费

5.Message WEB (消息管理)消息管理可视化界面,处理死亡消息等

优化
1.订单完成后,调用消息RPC确认消息可能超时,订单将会被回滚,但是消息已经被确认,导致会计成功记账方案:确认消息设置成异步<dubbo:reference interface="com.roncoo.pay.service.message.api.RpTransactionMessageService" id="rpTransactionMessageService" check="false"><dubbo:method name="confirmAndSendMessage" async="true" return="false" /></dubbo:reference>
2.消息存储DB方案:redis,mongodb
3.被动方业务幂等性判断方案:业务操作成功,记录消息id
4.服务部署集群导致任务重复调度方案:分布式任务调度(当当开源elastic-job,分布式作业调度框架)
5.实时消息服务方案:rabbitMQ,rocketMQ
6.性能提高,解耦方案:每个业务使用一套消息服务
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: