您的位置:首页 > 其它

通过rabbitMQ消息队列实现分布式环境下的最终一致性

2018-09-07 20:30 225 查看

话不多说,直接切入正题:
2018-09-07 20:30 更
基础流程图:

上游服务接收到信息,先保存在本地消息表中,保存失败直接返回退出,保存成功则通知消息服务器new一个新的消息对象,状态为NEW,表示这个可能要发送,通知成功(注意这里的通知这一步需要为同步模式,不然会出现这种情况,本地已经消费了,但是却没通知到,这样消息就丢失了)

通知成功之后则开始处理本地的业务逻辑,失败的时候手动抛出异常,让业务回滚,成功之后,可以异步/同步的方式通知消息服务器更新状态,在其中的任意环节出现失败都不会影响一致性,这时候是处于软安全的状态,不会影响一致性,为什么不会影响呢,因为本地有表记录着啊,而本地失败的情况下回造成业务回滚,从而记录为空的,如这种情况:最后一步,服务本地业务成功,异步通知消息服务器可以更改状态为ready让其发送消息,假设失败了,意味着消息服务器中的状态依旧为NEW,但是没关系,本地成功了,消息服务器后台会有个线程,自动检测超时状态下的,当处于NEW状态的消息会通过上游服务器提供的接口查询记录是否存在(或者状态是否为消费),如果有记录表明是消费了的,这个消息应该被发送,所以修改状态问ready,然后发送
下游服务器流程图:
![下游服务流程图](https://img-blog.csdn.net/20180907202946474?watermark/2/text/aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0NvZGVyX0pva2Vy/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70)

再更
2018-09-13
demo应该明天会给出
2018-09-18 更:
整体架构:3个微服务:server-1,server-2,message //message为消息服务中心

然后创建表,为了演示只需要简单创建表即可:

以及server-1对应的本地业务表:

至于server-2的就不展示了,大家随意创建一个即可
dao省略,直接进入核心的sevice:

@Service
public class MQTransactionService
{
@Autowired
private MessageDao messageDao;
@Autowired
private UserService userService;
@Autowired
private IMessageServerFeignServiec messageServerFeignServiec;

@Transactional(rollbackFor=Exception.class)
public String testRabbitMqTransaction(String detail)
{
//插入本地消息表
Integer localMessageValidCount = messageDao.insert(detail);
if(localMessageValidCount<=0)
{
return "fail";
}
//通知远程服务,添加消息
try
{
Integer remoteMessageValidCount = messageServerFeignServiec.addMessage(detail);
if(remoteMessageValidCount<=0)
{
throw new RuntimeException("手动抛异常回滚:远程通知服务器失败,插入数据失败");
}
} catch (Exception e)
{
throw new RuntimeException("手动抛异常回滚:远程通知服务器失败",e);
}
//执行本地业务
Integer logicValidCount = userService.insert("joker");
if(logicValidCount<=0)
{
throw new RuntimeException("手动抛异常回滚:本地执行业务失败");
}
//调用其他服务的接口

//通知远程服务器更新状态
Integer updateStautsValidCount = messageServerFeignServiec.updateMsgStatus((long) detail.hashCode(), 1);
if(updateStautsValidCount<=0)
{
throw new RuntimeException("手动抛异常回滚:远程更新消息状态失败");
}
return "succes";
}
}

核心思路就是确保每步都成功再执行下一步,不过关于消息通知这块,可以考虑用异步的方式,不过这样的话需要在message-server中添加一个定时器,定时扫描那些消息状态改变了却没发送的,同时上游服务需要开放一个接口,供消息服务器调用查询信息是否存在(因为如果上游消费成功了,是会在db中插入数据的),并且相对的,下游服务器也是需要提供接口的,用于检测任务是否已经被消费(是否存在)
简易demo地址,未深入设计,在项目中是慢慢一点一点深入的

阅读更多
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: