Seata 分布式事务
环境搭建
seata 客户端安装
下载 seata 安装包,解压并启动
seata-server.bat
数据库准备
seata 是通过
undo_log
来进行回滚的,因此需要保证undo_log
表存在
Details
sql
CREATE DATABASE IF NOT EXISTS `storage_db`;
USE `storage_db`;
DROP TABLE IF EXISTS `storage_tbl`;
CREATE TABLE `storage_tbl` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`commodity_code` varchar(255) DEFAULT NULL,
`count` int(11) DEFAULT 0,
PRIMARY KEY (`id`),
UNIQUE KEY (`commodity_code`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
INSERT INTO storage_tbl (commodity_code, count) VALUES ('P0001', 100);
INSERT INTO storage_tbl (commodity_code, count) VALUES ('B1234', 10);
-- 注意此处0.3.0+ 增加唯一索引 ux_undo_log
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
CREATE DATABASE IF NOT EXISTS `order_db`;
USE `order_db`;
DROP TABLE IF EXISTS `order_tbl`;
CREATE TABLE `order_tbl` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`user_id` varchar(255) DEFAULT NULL,
`commodity_code` varchar(255) DEFAULT NULL,
`count` int(11) DEFAULT 0,
`money` int(11) DEFAULT 0,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- 注意此处0.3.0+ 增加唯一索引 ux_undo_log
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
CREATE DATABASE IF NOT EXISTS `account_db`;
USE `account_db`;
DROP TABLE IF EXISTS `account_tbl`;
CREATE TABLE `account_tbl` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`user_id` varchar(255) DEFAULT NULL,
`money` int(11) DEFAULT 0,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
INSERT INTO account_tbl (user_id, money) VALUES ('1', 10000);
-- 注意此处0.3.0+ 增加唯一索引 ux_undo_log
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
导入依赖
xml
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
file.conf
配置文件
每个微服务创建
file.conf
文件
shell
service {
#transaction service group mapping
vgroupMapping.default_tx_group = "default"
#only support when registry.type=file, please don't set multiple addresses
default.grouplist = "127.0.0.1:8091"
#degrade, current not support
enableDegrade = false
#disable seata
disableGlobalTransaction = false
}
file.conf
完整版
shell
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
transport {
# tcp, unix-domain-socket
type = "TCP"
#NIO, NATIVE
server = "NIO"
#enable heartbeat
heartbeat = true
# the tm client batch send request enable
enableTmClientBatchSendRequest = false
# the rm client batch send request enable
enableRmClientBatchSendRequest = true
# the rm client rpc request timeout
rpcRmRequestTimeout = 2000
# the tm client rpc request timeout
rpcTmRequestTimeout = 30000
# the rm client rpc request timeout
rpcRmRequestTimeout = 15000
#thread factory for netty
threadFactory {
bossThreadPrefix = "NettyBoss"
workerThreadPrefix = "NettyServerNIOWorker"
serverExecutorThread-prefix = "NettyServerBizHandler"
shareBossWorker = false
clientSelectorThreadPrefix = "NettyClientSelector"
clientSelectorThreadSize = 1
clientWorkerThreadPrefix = "NettyClientWorkerThread"
# netty boss thread size
bossThreadSize = 1
#auto default pin or 8
workerThreadSize = "default"
}
shutdown {
# when destroy server, wait seconds
wait = 3
}
serialization = "seata"
compressor = "none"
}
service {
#transaction service group mapping
vgroupMapping.default_tx_group = "default"
#only support when registry.type=file, please don't set multiple addresses
default.grouplist = "127.0.0.1:8091"
#degrade, current not support
enableDegrade = false
#disable seata
disableGlobalTransaction = false
}
client {
rm {
asyncCommitBufferLimit = 10000
lock {
retryInterval = 10
retryTimes = 30
retryPolicyBranchRollbackOnConflict = true
}
reportRetryCount = 5
tableMetaCheckEnable = false
tableMetaCheckerInterval = 60000
reportSuccessEnable = false
sagaBranchRegisterEnable = false
sagaJsonParser = "fastjson"
sagaRetryPersistModeUpdate = false
sagaCompensatePersistModeUpdate = false
tccActionInterceptorOrder = -2147482648 #Ordered.HIGHEST_PRECEDENCE + 1000
sqlParserType = "druid"
branchExecutionTimeoutXA = 60000
connectionTwoPhaseHoldTimeoutXA = 10000
}
tm {
commitRetryCount = 5
rollbackRetryCount = 5
defaultGlobalTransactionTimeout = 60000
degradeCheck = false
degradeCheckPeriod = 2000
degradeCheckAllowTimes = 10
interceptorOrder = -2147482648 #Ordered.HIGHEST_PRECEDENCE + 1000
}
undo {
dataValidation = true
onlyCareUpdateColumns = true
logSerialization = "jackson"
logTable = "undo_log"
compress {
enable = true
# allow zip, gzip, deflater, lz4, bzip2, zstd default is zip
type = zip
# if rollback info size > threshold, then will be compress
# allow k m g t
threshold = 64k
}
}
loadBalance {
type = "XID"
virtualNodes = 10
}
}
log {
exceptionRate = 100
}
tcc {
fence {
# tcc fence log table name
logTableName = tcc_fence_log
# tcc fence log clean period
cleanPeriod = 1h
}
}
采购案例
- 采购业务,需要调用库存服务,库存库会扣库存
- 调用订单服务,订单库增加订单
- 订单服务再调用余额服务,余额库进行扣减余额
采购操作涉及到了多个服务,以及多个数据库的增删改操作,所以需要使用分布式事务保证数据的一致性。
引入了 seata 后,只需要在分布式事务入口,加上
@GlobalTransactional
即可保证整个链路的事务一致性。
java
@Service
public class BusinessServiceImpl implements BusinessService {
@Resource
StorageFeignClient storageFeignClient;
@Resource
OrderFeignClient orderFeignClient;
@GlobalTransactional
@Override
public void purchase(String userId, String commodityCode, int orderCount) {
//TODO 1. 扣减库存
storageFeignClient.deduct(commodityCode, orderCount);
//TODO 2. 创建订单
orderFeignClient.create(userId,commodityCode,orderCount);
}
}
Seata 原理
Seata 事务模式
- AT模式:自动事务, Seata 底层默认采用 AT 模式
- XA模式:第一阶段并不会真正对数据库提交,而是阻塞住,只有确认要提交时再提交;因此 XA 模式性能相对较低
- TCC模式: 手动事务,需要手动提交和回滚,需要自己实现;适合夹杂了非数据库操作的事务代码。比如在业务链当中发短信,短信没办法撤回
- Saga模式: 长事务模式,比如请假审批流程,流程较长