Skip to content

Seata 分布式事务

字数: 0 字 时长: 0 分钟

Seata(Simple Extensible Autonomous Transaction Architecture)是阿里开源的一款分布式事务解决方案,提供了多种事务模式,包括 AT、TCC、Saga及 XA 模式

原理

Seata 主要通过三大核心组件来协调全局事务的执行。

  1. 事务协调器(Transaction Coordinator) TC

TC 负责管理全局事务的生命周期,记录全局事务和分支事务的状态,并协调全局事务的提交和回滚。TC是 Seata 的中心控制器,所有的分布式事务请求都会通过 TC 进行管理。

  1. 事务管理器(Transaction Manager)TM

TM 负责定义全局事务的边界,即启动、提交、回滚全局事务。TM 通常嵌入在业务服务中,用于向 TC 发起全局事务的创建和提交请求。

  1. 资源管理器(Resource Manager)RM

RM 负责管理本地资源(如数据库),以及分支事务的注册、提交、回滚。RM 的核心作用是对本地数据库进行事务操作,并将分支事务的状态通知给 TC。

Seata原理.webp

以 AT 模式为例子,大致流程如下:

阶段1:分支事务注册与执行

  1. TM 开启全局事务, TC 生成 XID ,通过 RPC 上下文传播到所有参与微服务
  2. RM 在执行本地 SQL 前,向 TC 注册分支事务,关联 XID
  3. SQL 执行时, Seata 的数据源代理拦截操作,生成前置镜像(保存修改前的数据)和后置镜像(保存修改后的数据);将前后镜像生成 undo_log,存入本地数据库
  4. 业务 SQL + undo_log 作为原子操作提交本地事务,释放资源锁,RM 向 TC 报告分支状态(成功/失败)

阶段2:全局事务提交/回滚

  • 若所有分支成功, TC 通知所有 RM 异步删除 undo_log
  • 若有分支失败, TC 发起全局回滚,通知 RM 查询 undo_log 使用前置镜像恢复数据,然后删除 undo_log

Seata 事务模式

  • AT模式:自动事务, Seata 底层默认采用 AT 模式。它通过自动代理数据库操作,在事务提交前生成回滚日志,在本地事务提交时,通过 undo_log 表记录数据快照,实现数据的原子性回滚
  • XA模式:基于两阶段提交协议(2PC),第一阶段并不会真正对数据库提交,而是阻塞住,只有确认要提交时再提交;因此 XA 模式性能相对较低,适合对一致性要求严格的场景
  • TCC模式:TCC 将业务操作拆分为三个步骤: Try (预留资源)、Confirm(确认操作)、Cancel(回滚操作)。每个步骤都由业务开发者手动实现,适合对资源控制要求高的场景。
  • Saga模式: 长事务模式,将全局事务拆解为多个有序的小事务,每个小事务都有相应的补偿操作

采购案例

  1. 下载 seata 安装包,解压并启动 seata-server.bat

  2. 数据库准备

seata 是通过 undo_log 来进行回滚的,因此需要保证 undo_log 表存在

Details
sql
CREATE DATABASE IF NOT EXISTS `storage_db`;
USE  `storage_db`;
DROP TABLE IF EXISTS `storage_tbl`;
CREATE TABLE `storage_tbl` (
                               `id` int(11) NOT NULL AUTO_INCREMENT,
                               `commodity_code` varchar(255) DEFAULT NULL,
                               `count` int(11) DEFAULT 0,
                               PRIMARY KEY (`id`),
                               UNIQUE KEY (`commodity_code`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
INSERT INTO storage_tbl (commodity_code, count) VALUES ('P0001', 100);
INSERT INTO storage_tbl (commodity_code, count) VALUES ('B1234', 10);

-- 注意此处0.3.0+ 增加唯一索引 ux_undo_log
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (
                            `id` bigint(20) NOT NULL AUTO_INCREMENT,
                            `branch_id` bigint(20) NOT NULL,
                            `xid` varchar(100) NOT NULL,
                            `context` varchar(128) NOT NULL,
                            `rollback_info` longblob NOT NULL,
                            `log_status` int(11) NOT NULL,
                            `log_created` datetime NOT NULL,
                            `log_modified` datetime NOT NULL,
                            `ext` varchar(100) DEFAULT NULL,
                            PRIMARY KEY (`id`),
                            UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

CREATE DATABASE IF NOT EXISTS `order_db`;
USE  `order_db`;
DROP TABLE IF EXISTS `order_tbl`;
CREATE TABLE `order_tbl` (
                             `id` int(11) NOT NULL AUTO_INCREMENT,
                             `user_id` varchar(255) DEFAULT NULL,
                             `commodity_code` varchar(255) DEFAULT NULL,
                             `count` int(11) DEFAULT 0,
                             `money` int(11) DEFAULT 0,
                             PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- 注意此处0.3.0+ 增加唯一索引 ux_undo_log
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (
                            `id` bigint(20) NOT NULL AUTO_INCREMENT,
                            `branch_id` bigint(20) NOT NULL,
                            `xid` varchar(100) NOT NULL,
                            `context` varchar(128) NOT NULL,
                            `rollback_info` longblob NOT NULL,
                            `log_status` int(11) NOT NULL,
                            `log_created` datetime NOT NULL,
                            `log_modified` datetime NOT NULL,
                            `ext` varchar(100) DEFAULT NULL,
                            PRIMARY KEY (`id`),
                            UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

CREATE DATABASE IF NOT EXISTS `account_db`;
USE  `account_db`;
DROP TABLE IF EXISTS `account_tbl`;
CREATE TABLE `account_tbl` (
                               `id` int(11) NOT NULL AUTO_INCREMENT,
                               `user_id` varchar(255) DEFAULT NULL,
                               `money` int(11) DEFAULT 0,
                               PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
INSERT INTO account_tbl (user_id, money) VALUES ('1', 10000);
-- 注意此处0.3.0+ 增加唯一索引 ux_undo_log
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (
                            `id` bigint(20) NOT NULL AUTO_INCREMENT,
                            `branch_id` bigint(20) NOT NULL,
                            `xid` varchar(100) NOT NULL,
                            `context` varchar(128) NOT NULL,
                            `rollback_info` longblob NOT NULL,
                            `log_status` int(11) NOT NULL,
                            `log_created` datetime NOT NULL,
                            `log_modified` datetime NOT NULL,
                            `ext` varchar(100) DEFAULT NULL,
                            PRIMARY KEY (`id`),
                            UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
  1. 导入依赖
xml
<dependency>
  <groupId>com.alibaba.cloud</groupId>
  <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
  1. 每个微服务创建 file.conf 文件
shell
service {
  #transaction service group mapping
  vgroupMapping.default_tx_group = "default"
  #only support when registry.type=file, please don't set multiple addresses
  default.grouplist = "127.0.0.1:8091"
  #degrade, current not support
  enableDegrade = false
  #disable seata
  disableGlobalTransaction = false
}
file.conf 完整版
shell
transport {
  # tcp, unix-domain-socket
  type = "TCP"
  #NIO, NATIVE
  server = "NIO"
  #enable heartbeat
  heartbeat = true
  # the tm client batch send request enable
  enableTmClientBatchSendRequest = false
  # the rm client batch send request enable
  enableRmClientBatchSendRequest = true
   # the rm client rpc request timeout
  rpcRmRequestTimeout = 2000
  # the tm client rpc request timeout
  rpcTmRequestTimeout = 30000
  # the rm client rpc request timeout
  rpcRmRequestTimeout = 15000
  #thread factory for netty
  threadFactory {
    bossThreadPrefix = "NettyBoss"
    workerThreadPrefix = "NettyServerNIOWorker"
    serverExecutorThread-prefix = "NettyServerBizHandler"
    shareBossWorker = false
    clientSelectorThreadPrefix = "NettyClientSelector"
    clientSelectorThreadSize = 1
    clientWorkerThreadPrefix = "NettyClientWorkerThread"
    # netty boss thread size
    bossThreadSize = 1
    #auto default pin or 8
    workerThreadSize = "default"
  }
  shutdown {
    # when destroy server, wait seconds
    wait = 3
  }
  serialization = "seata"
  compressor = "none"
}
service {
  #transaction service group mapping
  vgroupMapping.default_tx_group = "default"
  #only support when registry.type=file, please don't set multiple addresses
  default.grouplist = "127.0.0.1:8091"
  #degrade, current not support
  enableDegrade = false
  #disable seata
  disableGlobalTransaction = false
}

client {
  rm {
    asyncCommitBufferLimit = 10000
    lock {
      retryInterval = 10
      retryTimes = 30
      retryPolicyBranchRollbackOnConflict = true
    }
    reportRetryCount = 5
    tableMetaCheckEnable = false
    tableMetaCheckerInterval = 60000
    reportSuccessEnable = false
    sagaBranchRegisterEnable = false
    sagaJsonParser = "fastjson"
    sagaRetryPersistModeUpdate = false
    sagaCompensatePersistModeUpdate = false
    tccActionInterceptorOrder = -2147482648 #Ordered.HIGHEST_PRECEDENCE + 1000
    sqlParserType = "druid"
    branchExecutionTimeoutXA = 60000
    connectionTwoPhaseHoldTimeoutXA = 10000
  }
  tm {
    commitRetryCount = 5
    rollbackRetryCount = 5
    defaultGlobalTransactionTimeout = 60000
    degradeCheck = false
    degradeCheckPeriod = 2000
    degradeCheckAllowTimes = 10
    interceptorOrder = -2147482648 #Ordered.HIGHEST_PRECEDENCE + 1000
  }
  undo {
    dataValidation = true
    onlyCareUpdateColumns = true
    logSerialization = "jackson"
    logTable = "undo_log"
    compress {
      enable = true
      # allow zip, gzip, deflater, lz4, bzip2, zstd default is zip
      type = zip
      # if rollback info size > threshold, then will be compress
      # allow k m g t
      threshold = 64k
    }
  }
  loadBalance {
      type = "XID"
      virtualNodes = 10
  }
}
log {
  exceptionRate = 100
}
tcc {
  fence {
    # tcc fence log table name
    logTableName = tcc_fence_log
    # tcc fence log clean period
    cleanPeriod = 1h
  }
}
  1. 梳理采购链路流程

采购链路.webp

  • 采购业务,需要调用库存服务,库存库会扣库存
  • 调用订单服务,订单库增加订单
  • 订单服务再调用余额服务,余额库进行扣减余额
  1. 业务代码实现

引入了 seata 后,只需要在分布式事务入口,加上 @GlobalTransactional 即可保证整个链路的事务一致性。

java
@Service
public class BusinessServiceImpl implements BusinessService {

    @Resource
    StorageFeignClient storageFeignClient;

    @Resource
    OrderFeignClient orderFeignClient;

    @GlobalTransactional
    @Override
    public void purchase(String userId, String commodityCode, int orderCount) {
        //TODO 1. 扣减库存
        storageFeignClient.deduct(commodityCode, orderCount);

        //TODO 2. 创建订单
        orderFeignClient.create(userId,commodityCode,orderCount);
    }
}

seata 二阶提交原理.webp