Seata 分布式事务
字数: 0 字 时长: 0 分钟
Seata(Simple Extensible Autonomous Transaction Architecture)是阿里开源的一款分布式事务解决方案,提供了多种事务模式,包括 AT、TCC、Saga及 XA 模式
原理
Seata 主要通过三大核心组件来协调全局事务的执行。
- 事务协调器(Transaction Coordinator) TC
TC 负责管理全局事务的生命周期,记录全局事务和分支事务的状态,并协调全局事务的提交和回滚。TC是 Seata 的中心控制器,所有的分布式事务请求都会通过 TC 进行管理。
- 事务管理器(Transaction Manager)TM
TM 负责定义全局事务的边界,即启动、提交、回滚全局事务。TM 通常嵌入在业务服务中,用于向 TC 发起全局事务的创建和提交请求。
- 资源管理器(Resource Manager)RM
RM 负责管理本地资源(如数据库),以及分支事务的注册、提交、回滚。RM 的核心作用是对本地数据库进行事务操作,并将分支事务的状态通知给 TC。
以 AT 模式为例子,大致流程如下:
阶段1:分支事务注册与执行
- TM 开启全局事务, TC 生成 XID ,通过 RPC 上下文传播到所有参与微服务
- RM 在执行本地 SQL 前,向 TC 注册分支事务,关联 XID
- SQL 执行时, Seata 的数据源代理拦截操作,生成前置镜像(保存修改前的数据)和后置镜像(保存修改后的数据);将前后镜像生成
undo_log
,存入本地数据库 - 业务 SQL +
undo_log
作为原子操作提交本地事务,释放资源锁,RM 向 TC 报告分支状态(成功/失败)
阶段2:全局事务提交/回滚
- 若所有分支成功, TC 通知所有 RM 异步删除
undo_log
- 若有分支失败, TC 发起全局回滚,通知 RM 查询
undo_log
使用前置镜像恢复数据,然后删除undo_log
Seata 事务模式
- AT模式:自动事务, Seata 底层默认采用 AT 模式。它通过自动代理数据库操作,在事务提交前生成回滚日志,在本地事务提交时,通过
undo_log
表记录数据快照,实现数据的原子性回滚 - XA模式:基于两阶段提交协议(2PC),第一阶段并不会真正对数据库提交,而是阻塞住,只有确认要提交时再提交;因此 XA 模式性能相对较低,适合对一致性要求严格的场景
- TCC模式:TCC 将业务操作拆分为三个步骤: Try (预留资源)、Confirm(确认操作)、Cancel(回滚操作)。每个步骤都由业务开发者手动实现,适合对资源控制要求高的场景。
- Saga模式: 长事务模式,将全局事务拆解为多个有序的小事务,每个小事务都有相应的补偿操作
采购案例
下载 seata 安装包,解压并启动
seata-server.bat
数据库准备
seata 是通过 undo_log
来进行回滚的,因此需要保证 undo_log
表存在
Details
sql
CREATE DATABASE IF NOT EXISTS `storage_db`;
USE `storage_db`;
DROP TABLE IF EXISTS `storage_tbl`;
CREATE TABLE `storage_tbl` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`commodity_code` varchar(255) DEFAULT NULL,
`count` int(11) DEFAULT 0,
PRIMARY KEY (`id`),
UNIQUE KEY (`commodity_code`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
INSERT INTO storage_tbl (commodity_code, count) VALUES ('P0001', 100);
INSERT INTO storage_tbl (commodity_code, count) VALUES ('B1234', 10);
-- 注意此处0.3.0+ 增加唯一索引 ux_undo_log
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
CREATE DATABASE IF NOT EXISTS `order_db`;
USE `order_db`;
DROP TABLE IF EXISTS `order_tbl`;
CREATE TABLE `order_tbl` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`user_id` varchar(255) DEFAULT NULL,
`commodity_code` varchar(255) DEFAULT NULL,
`count` int(11) DEFAULT 0,
`money` int(11) DEFAULT 0,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- 注意此处0.3.0+ 增加唯一索引 ux_undo_log
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
CREATE DATABASE IF NOT EXISTS `account_db`;
USE `account_db`;
DROP TABLE IF EXISTS `account_tbl`;
CREATE TABLE `account_tbl` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`user_id` varchar(255) DEFAULT NULL,
`money` int(11) DEFAULT 0,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
INSERT INTO account_tbl (user_id, money) VALUES ('1', 10000);
-- 注意此处0.3.0+ 增加唯一索引 ux_undo_log
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
- 导入依赖
xml
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
- 每个微服务创建
file.conf
文件
shell
service {
#transaction service group mapping
vgroupMapping.default_tx_group = "default"
#only support when registry.type=file, please don't set multiple addresses
default.grouplist = "127.0.0.1:8091"
#degrade, current not support
enableDegrade = false
#disable seata
disableGlobalTransaction = false
}
file.conf
完整版
shell
transport {
# tcp, unix-domain-socket
type = "TCP"
#NIO, NATIVE
server = "NIO"
#enable heartbeat
heartbeat = true
# the tm client batch send request enable
enableTmClientBatchSendRequest = false
# the rm client batch send request enable
enableRmClientBatchSendRequest = true
# the rm client rpc request timeout
rpcRmRequestTimeout = 2000
# the tm client rpc request timeout
rpcTmRequestTimeout = 30000
# the rm client rpc request timeout
rpcRmRequestTimeout = 15000
#thread factory for netty
threadFactory {
bossThreadPrefix = "NettyBoss"
workerThreadPrefix = "NettyServerNIOWorker"
serverExecutorThread-prefix = "NettyServerBizHandler"
shareBossWorker = false
clientSelectorThreadPrefix = "NettyClientSelector"
clientSelectorThreadSize = 1
clientWorkerThreadPrefix = "NettyClientWorkerThread"
# netty boss thread size
bossThreadSize = 1
#auto default pin or 8
workerThreadSize = "default"
}
shutdown {
# when destroy server, wait seconds
wait = 3
}
serialization = "seata"
compressor = "none"
}
service {
#transaction service group mapping
vgroupMapping.default_tx_group = "default"
#only support when registry.type=file, please don't set multiple addresses
default.grouplist = "127.0.0.1:8091"
#degrade, current not support
enableDegrade = false
#disable seata
disableGlobalTransaction = false
}
client {
rm {
asyncCommitBufferLimit = 10000
lock {
retryInterval = 10
retryTimes = 30
retryPolicyBranchRollbackOnConflict = true
}
reportRetryCount = 5
tableMetaCheckEnable = false
tableMetaCheckerInterval = 60000
reportSuccessEnable = false
sagaBranchRegisterEnable = false
sagaJsonParser = "fastjson"
sagaRetryPersistModeUpdate = false
sagaCompensatePersistModeUpdate = false
tccActionInterceptorOrder = -2147482648 #Ordered.HIGHEST_PRECEDENCE + 1000
sqlParserType = "druid"
branchExecutionTimeoutXA = 60000
connectionTwoPhaseHoldTimeoutXA = 10000
}
tm {
commitRetryCount = 5
rollbackRetryCount = 5
defaultGlobalTransactionTimeout = 60000
degradeCheck = false
degradeCheckPeriod = 2000
degradeCheckAllowTimes = 10
interceptorOrder = -2147482648 #Ordered.HIGHEST_PRECEDENCE + 1000
}
undo {
dataValidation = true
onlyCareUpdateColumns = true
logSerialization = "jackson"
logTable = "undo_log"
compress {
enable = true
# allow zip, gzip, deflater, lz4, bzip2, zstd default is zip
type = zip
# if rollback info size > threshold, then will be compress
# allow k m g t
threshold = 64k
}
}
loadBalance {
type = "XID"
virtualNodes = 10
}
}
log {
exceptionRate = 100
}
tcc {
fence {
# tcc fence log table name
logTableName = tcc_fence_log
# tcc fence log clean period
cleanPeriod = 1h
}
}
- 梳理采购链路流程
- 采购业务,需要调用库存服务,库存库会扣库存
- 调用订单服务,订单库增加订单
- 订单服务再调用余额服务,余额库进行扣减余额
- 业务代码实现
引入了 seata 后,只需要在分布式事务入口,加上
@GlobalTransactional
即可保证整个链路的事务一致性。
java
@Service
public class BusinessServiceImpl implements BusinessService {
@Resource
StorageFeignClient storageFeignClient;
@Resource
OrderFeignClient orderFeignClient;
@GlobalTransactional
@Override
public void purchase(String userId, String commodityCode, int orderCount) {
//TODO 1. 扣减库存
storageFeignClient.deduct(commodityCode, orderCount);
//TODO 2. 创建订单
orderFeignClient.create(userId,commodityCode,orderCount);
}
}