通过rabbitMQ消息队列实现分布式环境下的最终一致性
2018-09-07 20:30
225 查看
话不多说,直接切入正题:
2018-09-07 20:30 更
基础流程图:
上游服务接收到信息,先保存在本地消息表中,保存失败直接返回退出,保存成功则通知消息服务器new一个新的消息对象,状态为NEW,表示这个可能要发送,通知成功(注意这里的通知这一步需要为同步模式,不然会出现这种情况,本地已经消费了,但是却没通知到,这样消息就丢失了)
通知成功之后则开始处理本地的业务逻辑,失败的时候手动抛出异常,让业务回滚,成功之后,可以异步/同步的方式通知消息服务器更新状态,在其中的任意环节出现失败都不会影响一致性,这时候是处于软安全的状态,不会影响一致性,为什么不会影响呢,因为本地有表记录着啊,而本地失败的情况下回造成业务回滚,从而记录为空的,如这种情况:最后一步,服务本地业务成功,异步通知消息服务器可以更改状态为ready让其发送消息,假设失败了,意味着消息服务器中的状态依旧为NEW,但是没关系,本地成功了,消息服务器后台会有个线程,自动检测超时状态下的,当处于NEW状态的消息会通过上游服务器提供的接口查询记录是否存在(或者状态是否为消费),如果有记录表明是消费了的,这个消息应该被发送,所以修改状态问ready,然后发送下游服务器流程图: ![下游服务流程图](https://img-blog.csdn.net/20180907202946474?watermark/2/text/aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0NvZGVyX0pva2Vy/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70)
再更
2018-09-13
demo应该明天会给出
2018-09-18 更:
整体架构:3个微服务:server-1,server-2,message //message为消息服务中心
然后创建表,为了演示只需要简单创建表即可:
以及server-1对应的本地业务表:
至于server-2的就不展示了,大家随意创建一个即可
dao省略,直接进入核心的sevice:
@Service public class MQTransactionService { @Autowired private MessageDao messageDao; @Autowired private UserService userService; @Autowired private IMessageServerFeignServiec messageServerFeignServiec; @Transactional(rollbackFor=Exception.class) public String testRabbitMqTransaction(String detail) { //插入本地消息表 Integer localMessageValidCount = messageDao.insert(detail); if(localMessageValidCount<=0) { return "fail"; } //通知远程服务,添加消息 try { Integer remoteMessageValidCount = messageServerFeignServiec.addMessage(detail); if(remoteMessageValidCount<=0) { throw new RuntimeException("手动抛异常回滚:远程通知服务器失败,插入数据失败"); } } catch (Exception e) { throw new RuntimeException("手动抛异常回滚:远程通知服务器失败",e); } //执行本地业务 Integer logicValidCount = userService.insert("joker"); if(logicValidCount<=0) { throw new RuntimeException("手动抛异常回滚:本地执行业务失败"); } //调用其他服务的接口 //通知远程服务器更新状态 Integer updateStautsValidCount = messageServerFeignServiec.updateMsgStatus((long) detail.hashCode(), 1); if(updateStautsValidCount<=0) { throw new RuntimeException("手动抛异常回滚:远程更新消息状态失败"); } return "succes"; } }
核心思路就是确保每步都成功再执行下一步,不过关于消息通知这块,可以考虑用异步的方式,不过这样的话需要在message-server中添加一个定时器,定时扫描那些消息状态改变了却没发送的,同时上游服务需要开放一个接口,供消息服务器调用查询信息是否存在(因为如果上游消费成功了,是会在db中插入数据的),并且相对的,下游服务器也是需要提供接口的,用于检测任务是否已经被消费(是否存在)
简易demo地址,未深入设计,在项目中是慢慢一点一点深入的
相关文章推荐
- 跨数据库分布式实时事务 - 基于RabbitMQ实时消息队列服务实现
- 消息队列的使用 RabbitMQ (二): Windows 环境下集群的实现与优化
- UNIX环境高级编程学习之第十五章进程间通信 - 通过消息队列实现进程间通信
- Python操作RabbitMQ服务器实现消息队列的路由功能
- 分布式事务解决方案一之:可靠消息最终一致性
- RabbitMQ开源企业级消息队列系统实现方案(单机版)
- 基于Redis实现分布式消息队列(2)
- Lua App中通过Sqlite实现消息队列(异步通信)
- 使用消息队列实现分布式事物---公认较为理想的分布式事物解决方案
- 基于PHP使用rabbitmq实现消息队列
- RabbitMQ消息队列入门篇(环境配置+Java实例+基础概念)
- 使用消息队列实现分布式事务-公认较为理想的分布式事务解决方案
- RabbitMQ学习系列二:.net 环境下 C#代码使用 RabbitMQ 消息队列
- 快速入门分布式消息队列之 RabbitMQ(2)
- 分布式延迟消息队列实现分析与设计
- 分布式之分布式事务的消息队列模式实现
- 通过haproxy 配置rabbitmq消息队列主从
- (七)RabbitMQ消息队列-通过fanout模式将消息推送到多个Queue中
- 使用消息队列实现分布式事物---公认较为理想的分布式事物解决方案
- RabbitMQ消息队列之一:RabbitMQ的环境安装及配置