您的位置:首页 > 编程语言 > Java开发

Spring Cloud笔记(8)使用Seata管理分布式事务

2020-06-23 04:36 218 查看

分布式事务介绍

所谓事务,就是一系列业务操作构成的独立的执行单元。比如用户购买商品下单的行为,需要执行创建订单,扣减商品库存的两个不同的数据库操作,这就是一个事务。事务最重要的特性就是要支持原子性,要么所有操作全部成功,要么全部失败。为什么要这样设计呢?如果一切顺利,当然什么问题都不会有。但天有不测风云,没有谁能保证系统一直不会出错,如果哪一天订单已经创建成功了,但在扣减对应商品库存时突然失败了,那就麻烦大了,订单数据和商品的库存数据可能就对不上了,有可能商品早都没货了,客户还能继续下单购买。

所以,事务在保持数据一致性的方面是非常重要的。在单服务系统中我们一般不需要为事务操心,数据库已经为我们考虑了一切,只要在操作开始前声明了事务,那么调用过程只要发生了错误事务会自动回滚到操作开始时的状态。

但在微服务系统中,不同的业务操作可能被分割到不同的模块,而不同业务模块都会配置一个独立的数据源,甚至订单服务和仓储服务可能都不在同一个数据库中,这样显然就不能只依靠本地数据库事务来解决问题。我们需要一种能够跨网络和应用的分布式事务机制,分布式事务的主要实现思路一般分为两种:

  • 刚性事务:类似于上面说的数据库的本地事务,遵循ACID原则,要求数据的强一致性;代表性的实现就是二阶段提交(XA),通过引入一个事务协调者,所有事务的参与者将操作成败通知协调者,再由协调者根据所有参与者的反馈情报决定各参与者是否要提交操作还是中止操作。XA一度是分布式事务的事实标准,但实际实现上却存在很多问题,比如事务在等待提交过程中处于同步阻塞状态,导致资源长时间占用,整体性能很差。

  • 柔性事务:遵循BASE理论,最终一致性;与刚性事务不同,柔性事务允许一定时间内,不同节点的数据不一致,但要求最终一致。比如刚才我们说到的客户下单的例子,如果扣减库存的时候出错了,我们先不回滚系统,而是将扣减库存的调用请求存起来(比如存放到一个消息队列中),定时去重试。这样虽然订单数据和库存数据在某个时间段内是不一致的,但一旦库存服务恢复了,库存扣减请求就能够正常调用了,这样数据在某个时间点之后就会同步成一致的状态,这就是最终一致性。最终一致性提高了系统的可用性(客户下单可以不受仓储服务故障的影响),但也可能在数据不一致期间发生问题(比如超卖),这个需要考虑业务上是否能够接受。柔性事务的实现主要有重试和补偿两种机制,重试就是刚才描述方式,系统将出错的请求存储下来然后不断进行重试,直到成功;而补偿就是在出错之后,执行类似的一个undo操作,消除已提交的操作对数据的影响,比如扣减库存失败以后就直接把之前创建成功的订单再删除掉,TCC(Try/Confirm/Cancel)型事务就是采用这样的方式。

Seata 的相关概念

由于类似XA的刚性事务实现在复杂的分布式环境中存在大量的问题,所以目前主流的分布式事务实现方案都是走柔性事务路线的。比较流行的解决方案有阿里开源的seata,还有独立开发者发布的LCN框架,但LCN目前的维护更新遇到了困难(独立开发者的悲哀啊)。seata提供了几种不同的事务模式实现,包括:AT、TCC、SAGA 和 XA。在这里我们重点介绍一下AT模式,其它模式的说明请查看 seata的官网。AT模式也是走的补偿路线,但是不需要应用程序去关心调用失败后如何恢复数据,而是由框架本身负责去恢复收当前事务影响的所有数据,这非常类似于我们已经习惯了的本地事务,对开发者的负担也比较小。

seata可分为三个角色:TC,TM和RM:

  • TC - 事务协调者: 维护全局和分支事务的状态,驱动全局事务提交或回滚。业务无关,需要在服务端独立进行部署。

  • TM - 事务管理器:发起全局事务的开始,提交或回滚(发起后由TC协调其它的RM执行相应的操作),该角色会集成到应用程序当中。

  • RM - 资源管理器:全局事务的参与者,管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚,该角色也是集成到应用程序当中的。

部署TC server

首先在 官网 下载seata server的二进制文件,其实就是一个spring boot的应用程序。在服务器上解压之后,进入到conf文件夹,我们需要重点关注registry.conf这个文件,它的作用是配置seata server(TC)注册到哪个注册中心,目前支持几乎所有主流的注册中心,TC会从配置中心中读取相应的配置,TM和RM实例也可以通过同一个注册中心找到TC部署的位置,从而实现TC的集群化部署。我们还是延用之前就部署好的consul来作为注册中心,只需要将registry.conf中的type修改为“consul”即可,并调整配置文件中consul的部署地址:

# 注册中心配置,用于TC,TM,RM的相互服务发现
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "consul"
consul {
cluster = "seata"
serverAddr = "192.168.1.220:8500"
}
}
# 配置中心配置,用于读取TC的相关配置
config {
# file、nacos 、apollo、zk、consul、etcd3
type = "file"
file {
name = "file.conf"
}
}

file.conf中是TC的一些持久化配置选项,seata支持文件和数据库两种持久化方式,文件模式比数据库更快,但不支持并发操作,如果要部署TC集群就必须要使用数据库进行持久化。这里我们为了简化,就直接使用默认的file模式。file.conf也可以初始化到配置中心,通过配置中心来统一读取,这对于集群部署是有帮助的,初始化的方式可参考 这里 。修改完配置后,直接运行seata-server.sh启动TC server:

$ sh ./bin/seata-server.sh -p 8091 -h 127.0.0.1

启动成功后我们可以在consul的控制台看到TC注册的服务:

集成TM和RM

seata的AT模式会在服务调用失败后,自动恢复受影响的数据,其原理就是在启动事务之后,会自动分析当前事务中执行的SQL对数据的影响,把受影响的数据直接存储到本地数据库中,如果事务回滚了,就通过存储的数据备份对原始数据进行恢复( AT模式介绍 ),所以我们需要在每个RM所在的业务数据库中初始化seata的undo_log表。我们之前的spring-cloud-demo中也还没有建立相应的业务数据库,为了测试分布式事务,我们需要为order-service和storage-service创建两个业务库,并初始化相关的业务表格,整个初始化脚本如下:

-- 创建order-service数据库
CREATE DATABASE `cloud-demo-order`;

CREATE TABLE IF NOT EXISTS `cloud-demo-order`.`t_order` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`customer_code` varchar(45) DEFAULT NULL COMMENT '客户编码',
`good_code` varchar(45) DEFAULT NULL COMMENT '产品编码',
`good_quantity` int(11) DEFAULT NULL COMMENT '购买数量',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- AT模式需要的undo log表
CREATE TABLE IF NOT EXISTS `cloud-demo-order`.`undo_log`
(
`id`            BIGINT(20)   NOT NULL AUTO_INCREMENT COMMENT 'increment id',
`branch_id`     BIGINT(20)   NOT NULL COMMENT 'branch transaction id',
`xid`           VARCHAR(100) NOT NULL COMMENT 'global transaction id',
`context`       VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` LONGBLOB     NOT NULL COMMENT 'rollback info',
`log_status`    INT(11)      NOT NULL COMMENT '0:normal status,1:defense status',
`log_created`   DATETIME     NOT NULL COMMENT 'create datetime',
`log_modified`  DATETIME     NOT NULL COMMENT 'modify datetime',
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table';

-- 创建storage-service数据库
CREATE DATABASE `cloud-demo-storage`;

CREATE TABLE IF NOT EXISTS `cloud-demo-storage`.`t_inventory` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`good_code` varchar(45) DEFAULT NULL COMMENT '产品编码',
`good_quantity` int(11) DEFAULT NULL COMMENT '库存总量',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- AT模式需要的undo log表
CREATE TABLE IF NOT EXISTS `cloud-demo-storage`.`undo_log`
(
`id`            BIGINT(20)   NOT NULL AUTO_INCREMENT COMMENT 'increment id',
`branch_id`     BIGINT(20)   NOT NULL COMMENT 'branch transaction id',
`xid`           VARCHAR(100) NOT NULL COMMENT 'global transaction id',
`context`       VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` LONGBLOB     NOT NULL COMMENT 'rollback info',
`log_status`    INT(11)      NOT NULL COMMENT '0:normal status,1:defense status',
`log_created`   DATETIME     NOT NULL COMMENT 'create datetime',
`log_modified`  DATETIME     NOT NULL COMMENT 'modify datetime',
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table';

初始化脚本后,在order-serivce模块和storage-service模快引入seata的依赖包和相关配置:

<!--在spring cloud中自动配置seata-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<version>2.2.1.RELEASE</version>
</dependency>

需要注意的是maven库中还有一个包名称是spring-cloud-alibaba-seata,我看了一下两者的源码是一致的,任意引用哪一个包都可以,不知道为什么会有两个不同名称。在模块的application.yml中加入如下内容(以order-service模块的配置为例):

# Seata 配置项,对应 SeataProperties 类
seata:
application-id: ${spring.application.name} # Seata 应用编号,默认为 ${spring.application.name}
tx-service-group: ${spring.application.name}-group # 该应用所属事务组编号,用于寻找TC集群的映射
# Seata 服务配置项,对应 ServiceProperties 类
service:
#事务分组与TC集群(seata)集群的映射关系,order-service-group对应tx-service-group参数,值为一个虚拟的TC集群名称
#默认值就是default,如果无需分组可以不用设置
vgroup-mapping:
order-service-group: default
# Seata注册中心配置项
registry:
type: consul # 注册中心类型,默认为 file
consul:
cluster: seata #对应seata集群在consul中注册的服务名
server-addr: 192.168.1.220:8500

需要注意的是 registry.consul.cluster 中指定的名称需要与TC server的同名配置项中指定的名称一致(默认的值是default),否则会找不到TC server,这个在官方文档和很多教程中都没有说明。 更多配置内容可以查看 配置说明

另外为了方便对数据库进行操作,order-serivce模块和storage-service模快都引入了mybatis-plus框架的相关依赖和配置,这里就不做详细介绍了,具体可以去查看源码。一切准备好以后,将原有的orderService.createNewOrder方法做如下改造:

@GlobalTransactional
public Integer createNewOrder(OrderDTO orderDTO) {
Order newOrder = new Order();
newOrder.setCustomerCode(orderDTO.getCustomerCode());
newOrder.setGoodCode(orderDTO.getGoodCode());
newOrder.setGoodQuantity(orderDTO.getQuantity());
//向本地数据库插入订单信息
this.save(newOrder);
InventoryChangeDTO req = new InventoryChangeDTO();
req.setGoodCode(orderDTO.getGoodCode());
req.setQuantity(orderDTO.getQuantity());
//调用远程仓储服务变更库存
Integer remainQuantity = storageService.updateInventoryOfGood(req);
return remainQuantity;
}

其实除了写入数据库的相关代码,这里最重要的变化就是加入了**@GlobalTransactional注解,这个注解标记了当前方法会开启一个全局事务。在本例中order-service模块既是 TM (我的理解是声明@GlobalTransactional**的地方就算是一个TM)又是 RM,而storage-service模块则是另一个 RM

通过PostMan请求创建订单的接口地址:http://localhost:9001/api/order/create-order,一切顺利的话就能在数据库中看到新增的订单数据和库存数据。

从相关的日志中可以看出全局事务的开始和提交:

2020-05-06 15:28:21.311  INFO 11516 --- [nio-9001-exec-7] i.seata.tm.api.DefaultGlobalTransaction  : Begin new global transaction [192.168.1.220:8091:2010342569]
2020-05-06 15:28:21.457  INFO 11516 --- [nio-9001-exec-7] i.seata.tm.api.DefaultGlobalTransaction  : [192.168.1.220:8091:2010342569] commit status: Committed
2020-05-06 15:28:21.457  INFO 11516 --- [nio-9001-exec-7] c.g.d.s.o.controller.Controller          : 剩余数量:-20
2020-05-06 15:28:21.576  INFO 11516 --- [atch_RMROLE_1_4] i.s.core.rpc.netty.RmMessageListener     : onMessage:xid=192.168.1.220:8091:2010342569,branchId=2010342570,branchType=AT,resourceId=jdbc:mysql://192.168.1.212:3306/cloud-demo-order,applicationData=null
2020-05-06 15:28:21.576  INFO 11516 --- [atch_RMROLE_1_4] io.seata.rm.AbstractRMHandler            : Branch committing: 192.168.1.220:8091:2010342569 2010342570 jdbc:mysql://192.168.1.212:3306/cloud-demo-order null
2020-05-06 15:28:21.576  INFO 11516 --- [atch_RMROLE_1_4] io.seata.rm.AbstractRMHandler            : Branch commit result: PhaseTwo_Committed

我们将 storageService.updateInventoryOfGood 方法稍稍修改一下,故意引发一个异常来测试全局事务的回滚:

public Integer changeInventory(InventoryChangeDTO req) {
Inventory inventory = this.getOne(Wrappers.<Inventory>lambdaQuery().eq(Inventory::getGoodCode, req.getGoodCode()));
if (inventory == null) {
inventory = new Inventory();
inventory.setGoodQuantity(0);
inventory.setGoodCode(req.getGoodCode());
}
inventory.setGoodQuantity(inventory.getGoodQuantity() - req.getQuantity());
this.saveOrUpdate(inventory);
//引发异常回滚
Object exceptionCause = null;
exceptionCause.toString();
return inventory.getGoodQuantity();
}

还需要注意一点,测试全局事务回滚时需要将我们直接配置的hystrix fallback关闭,否则应用程序调用远程接口失败后会触发fallback机制,从而让seata认为远程调用是成功的,就不会触发回滚:

//@FeignClient(name = "storage-service", fallback = StorageServiceFallback.class)
//测试全局事务回滚时需要注释掉fallback,否则接口会返回默认的值导致事务无法回滚
@FeignClient(name = "storage-service")
public interface StorageService {

@PostMapping("/api/storage/change-inventory")
Integer updateInventoryOfGood(InventoryChangeDTO inventoryChangeDTO);

}

然后我们再次请求创建订单的服务,从日志上可以看出,事务已成功触发回滚操作:

2020-05-06 15:50:57.809  INFO 3804 --- [nio-9001-exec-7] i.seata.tm.api.DefaultGlobalTransaction  : Begin new global transaction [192.168.1.220:8091:2010342574]
2020-05-06 15:50:59.157  INFO 3804 --- [orage-service-1] c.netflix.config.ChainedDynamicProperty  : Flipping property: storage-service.ribbon.ActiveConnectionsLimit to use NEXT property: niws.loadbalancer.availabilityFilteringRule.activeConnectionsLimit = 2147483647
2020-05-06 15:50:59.275  INFO 3804 --- [orage-service-1] c.n.u.concurrent.ShutdownEnabledTimer    : Shutdown hook installed for: NFLoadBalancer-PingTimer-storage-service
2020-05-06 15:50:59.275  INFO 3804 --- [orage-service-1] c.netflix.loadbalancer.BaseLoadBalancer  : Client: storage-service instantiated a LoadBalancer: DynamicServerListLoadBalancer:{NFLoadBalancer:name=storage-service,current list of Servers=[],Load balancer stats=Zone stats: {},Server stats: []}ServerList:null
2020-05-06 15:50:59.290  INFO 3804 --- [orage-service-1] c.n.l.DynamicServerListLoadBalancer      : Using serverListUpdater PollingServerListUpdater
2020-05-06 15:50:59.325  INFO 3804 --- [orage-service-1] c.netflix.config.ChainedDynamicProperty  : Flipping property: storage-service.ribbon.ActiveConnectionsLimit to use NEXT property: niws.loadbalancer.availabilityFilteringRule.activeConnectionsLimit = 2147483647
2020-05-06 15:50:59.329  INFO 3804 --- [orage-service-1] c.n.l.DynamicServerListLoadBalancer      : DynamicServerListLoadBalancer for client storage-service initialized: DynamicServerListLoadBalancer:{NFLoadBalancer:name=storage-service,current list of Servers=[192.168.1.252:9002],Load balancer stats=Zone stats: {unknown=[Zone:unknown;	Instance count:1;	Active connections count: 0;	Circuit breaker tripped count: 0;	Active connections per server: 0.0;]
},Server stats: [[Server:192.168.1.252:9002;	Zone:UNKNOWN;	Total Requests:0;	Successive connection failure:0;	Total blackout seconds:0;	Last connection made:Thu Jan 01 08:00:00 CST 1970;	First connection made: Thu Jan 01 08:00:00 CST 1970;	Active Connections:0;	total failure count in last (1000) msecs:0;	average resp time:0.0;	90 percentile resp time:0.0;	95 percentile resp time:0.0;	min resp time:0.0;	max resp time:0.0;	stddev resp time:0.0]
]}ServerList:ConsulServerList{serviceId='storage-service', tag=null}
2020-05-06 15:51:00.119  INFO 3804 --- [nio-9001-exec-7] i.seata.tm.api.DefaultGlobalTransaction  : [192.168.1.220:8091:2010342574] rollback status: Rollbacked

本文的相关代码可以查看这里 spring-cloud-demo

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: