您的位置:首页 > 编程语言 > Java开发

代码 - Java - 分布式事务 - Seata源码解析 - 服务器

2020-01-14 13:55 429 查看

一、seata 概览

  • Transaction Coordinator(TC):事务协调器,用于接收事务的注册,提交和回滚
  • Transaction Manager(TM):控制全局事务的边界,告知TC对全局事务的开始,提交,回滚
  • Resource Manager(RM):控制分支事务,每一个RM都会作为一个分支事务注册在TC
  • TM 向 TC 申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的 XID
  • XID 在微服务调用链路的上下文中传播
  • RM 向 TC 注册分支事务,将其纳入 XID 对应全局事务的管辖
  • TM 向 TC 发起针对 XID 的全局提交或回滚决议
  • TC 调度 XID 下管辖的全部分支事务完成提交或回滚请求

二、seata-common 项目

封装异常类,工具类等

三、seata-config 项目

存储和查找服务端的配置,比如:存储类型<store-mode : db / file>,Netty 基础配置信息等

seata-config-core:核心配置,默认为file

  • ConfigurationFactory.getInstance() -> buildConfiguration()

核心接口:Configuration

  • getInt/Long/Boolean:通过dataId来获取对应的值
  • putConfig:用于添加配置
  • removeConfig:删除一个配置
  • add/remove/get ConfigListener:添加/删除/获取 配置监听器,一般用来监听配置的变更

说明:使用哪种配置需要在 registry.conf 中指定 config-type 即可

四、seata-core 项目

封装消息格式,数据模型,RPC等

五、seata-discovery 项目

服务注册/发现模块,Server 启动后将 Server 地址暴露给我们Client

seata-discovery-core :核心配置,对注册中心接入实现,默认为file

  • loadbalance -> 负载均衡,随机/轮询,默认随机,@LoadLevel 中 order 值越大越优先
  • registry -> 注册发现

核心接口:RegistryService

  • register:服务器使用,服务注册
  • unregister:服务器使用,服务注销
  • subscribe:客户端使用,注册监听事件,用来监听地址的变化
  • unsubscribe:客户端使用,取消注册监听事件
  • lookup:客户端使用,根据key查找服务地址列表
  • close:均可使用,用于关闭Register资源

六、seata-rm 项目

RM 实现,客户端

七、seata-rm-datasource 项目

对 JDBC 拓展,实现对 MySQL 等的透明接入 RM 的实现

八、seata-server 项目


对 TC 的核心实现,提供了事务协调、锁、事务状态、事务会话等功能

Coordinator:在最下面的模块是事务协调器核心代码,主要用来处理事务协调的逻辑,如是否commit,rollback等协调活动
Store:存储模块,数据持久化,防止重启或者宕机数据丢失,默认文件存储
Discover: 服务注册/发现模块,用于将Server地址暴露给我们Client
Config: 用来存储和查找我们服务端的配置
Lock: 锁模块,用于给Seata提供全局锁的功能
Rpc:用于和其他端通信
HA-Cluster:高可用集群,目前还没开源。为Seata提供可靠的高可用功能加粗样式

九、seata-tm 项目

对 TM 的实现,提供了全局事务管理,例如说事务的发起,提交,回滚等

十、seata-tcc 项目

对 TCC 事务模式的实现

十一、seata-spring 项目

Spring 对 Seata 集成的实现,例如使用 @GlobalTransactional 注解,自动创建全局事务

十二、seata-integration 项目

对不同框架的集成

seata-dubbo:对 Apache Dubbo 接入
seata-dubbo-alibaba:对 Alibaba Dubbo 接入

十三、seata-codec 项目

十四、seata-distribution 项目

十五、seata-metrics 项目

十六、服务端流程

(1)启动流程:

  1. 创建一个RpcServer,再这个里面包含了我们网络的操作,用Netty实现了服务端
  2. 解析端口号和文件地址
  3. 初始化SessionHolder,其中最重要的重要就是重我们dataDir这个文件夹中恢复我们的数据,重建我们的Session
  4. 创建一个DefaultCoordinator,这个也是我们事务协调器的逻辑核心代码,然后将其初始化,其内部初始化的逻辑会创建四个定时任务:
    1)retryRollbacking:重试rollback定时任务,用于将那些失败的rollback进行重试的,每隔5ms执行一次
    2)retryCommitting:重试commit定时任务,用于将那些失败的commit进行重试的,每隔5ms执行一次
    3)asyncCommitting:异步commit定时任务,用于执行异步的commit,每隔10ms一次
    4)timeoutCheck:超时定时任务检测,用于检测超时的任务,然后执行超时的逻辑,每隔2ms执行一次
  5. 初始化UUIDGenerator这个也是我们生成各种ID(transcationId,branchId)的基本类
  6. 将本地IP和监听端口设置到XID中,初始化rpcServer等待客户端的连接

(2)开启全局事物 - DefaultCore.begin

  1. 根据应用ID,事务分组,名字,超时时间创建一个GloabSession
  2. 对其添加一个RootSessionManager用于监听一些事件,Seata里面有四种类型的Listener。由于这里是开启事务,其他SessionManager不需要关注,我们只添加RootSessionManager即可
    1)ROOT_SESSION_MANAGER:所有Session
    2)ASYNC_COMMITTING_SESSION_MANAGER:异步提交Session
    3)RETRY_COMMITTING_SESSION_MANAGER:重试提交Session
    4)RETRY_ROLLBACKING_SESSION_MANAGER:重试回滚Session
  3. 开启Globalsession
    1)session.begin()
    2)状态变为 Begin,记录开始时间,并且调用 SessionLifecycleListener 的 onBegin 监听方法,将Session保存
  4. 最后返回XID(ip : port : tranId),TM申请到之后需要将这个ID传到RM中,RM通过XID来决定到底应该访问哪一台Server

(3)分支事务注册 - DefaultCore.branchRegister

  1. 通过transactionId获取并校验全局事务是否是开启状态
  2. 创建一个新的分支事务 - BranchSession
  3. 对分支事务进行加全局锁
  4. 添加branchSession,主要是将其添加到globalSession对象中,并写入到我们的文件中
  5. 返回branchId,该ID用来回滚我们的事务,或者对我们分支事务状态更新

(4)全局提交 - DefaultCore.commit

  1. 首先找到我们的globalSession。如果他为Null证明已经被commit过了,那么直接幂等操作,返回成功
  2. 关闭我们的GloabSession防止再次有新的branch进来
  3. 如果status是等于Begin,那么久证明还没有提交过,改变其状态为Committing也就是正在提交
  4. 判断是否是可以异步提交,目前只有AT模式可以异步提交,因为是通过Undolog的方式去做的。MT和TCC都需要走同步提交的代码
  5. 如果是异步提交,直接将其放进我们ASYNC_COMMITTING_SESSION_MANAGER,让其再后台线程异步去做我们的step6,如果是同步的那么直接执行我们的step6
  6. 遍历我们的BranchSession进行提交,如果某个分支事务失败,根据不同的条件来判断是否进行重试,异步不需要重试,因为其本身都在manager中,只要没有成功就不会被删除会一直重试,如果是同步提交的会放进异步重试队列进行重试

(5)全局回滚 - DefaultCore.rollback

  1. 与提交流程雷同
  • 点赞
  • 收藏
  • 分享
  • 文章举报
ノ往事成空、 发布了24 篇原创文章 · 获赞 0 · 访问量 1232 私信 关注
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: