【笔记】Seata 分布式事务 学习笔记

前言

A distributed transaction solution with high performance and ease of use for microservices architecture.(官方GitHub

准备工作

配置TC协调器

配置registry

  • 修改conf/registry.conf配置文件
    • registry配置注册中心,如果为type="eureka",则继续修改registry.eureka配置
    • config配置日志记录的方式,如果为type="file",则继续修改conf/file.conf配置文件
conf/registry.conf
1
2
serviceUrl = "http://localhost:8761/eureka"
application = "seate-server"

配置file

  • 修改conf/file.conf配置文件
    • store配置数据存储方式,如果为mode="db",则继续修改store.db配置
conf/file.conf
1
2
3
4
5
dbType = "mysql"
driverClassName = "com.mysql.cj.jdbc.Driver"
url = "jdbc:mysql://127.0.0.1:3306/seata"
user = "root"
password = "123456"

配置启动参数

  • 修改bin/seata-server.sh 配置文件
    • 在120行根据需要修改启动参数(开发环境可以将运行内存调低)

启动TC协调器

  • 先启动注册中心,再启动 Seata Server
1
sh bin/seata-server.sh

AT模式

  • Seata的AT模式(Automatic Transaction)是一种无侵入的分布式事务解决方案

在项目中添加事务

添加依赖(每个模块)

  • 为事务中的每个模块添加依赖
pom.xml
1
2
3
4
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-seata</artifactId>
</dependency>

配置事务组组名(每个模块)

  • 将每个模块设置在同一个事务组
application.yml
1
2
3
4
5
spring:
cloud:
alibaba:
seata:
tx-service-group: order_tx_group

配置协调器(每个模块)

  • 为事务中的每个模块配置协调器配置
registry
  • resources目录下创建registry.conf配置文件
resources/registry.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "eureka"

nacos {
serverAddr = "localhost"
namespace = ""
cluster = "default"
}
eureka {
serviceUrl = "http://localhost:8761/eureka"
# application = "default"
# weight = "1"
}
redis {
serverAddr = "localhost:6379"
db = "0"
password = ""
cluster = "default"
timeout = "0"
}
zk {
cluster = "default"
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
username = ""
password = ""
}
consul {
cluster = "default"
serverAddr = "127.0.0.1:8500"
}
etcd3 {
cluster = "default"
serverAddr = "http://localhost:2379"
}
sofa {
serverAddr = "127.0.0.1:9603"
application = "default"
region = "DEFAULT_ZONE"
datacenter = "DefaultDataCenter"
cluster = "default"
group = "SEATA_GROUP"
addressWaitTime = "3000"
}
file {
name = "file.conf"
}
}

config {
# file、nacos 、apollo、zk、consul、etcd3、springCloudConfig
type = "file"

nacos {
serverAddr = "localhost"
namespace = ""
group = "SEATA_GROUP"
}
consul {
serverAddr = "127.0.0.1:8500"
}
apollo {
app.id = "seata-server"
apollo.meta = "http://192.168.1.204:8801"
namespace = "application"
}
zk {
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
username = ""
password = ""
}
etcd3 {
serverAddr = "http://localhost:2379"
}
file {
name = "file.conf"
}
}
file
  • resources目录下创建file.conf配置文件

  • 第36行要改为对应的事务组名

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
transport {
# tcp udt unix-domain-socket
type = "TCP"
#NIO NATIVE
server = "NIO"
#enable heartbeat
heartbeat = true
# the client batch send request enable
enableClientBatchSendRequest = true
#thread factory for netty
threadFactory {
bossThreadPrefix = "NettyBoss"
workerThreadPrefix = "NettyServerNIOWorker"
serverExecutorThread-prefix = "NettyServerBizHandler"
shareBossWorker = false
clientSelectorThreadPrefix = "NettyClientSelector"
clientSelectorThreadSize = 1
clientWorkerThreadPrefix = "NettyClientWorkerThread"
# netty boss thread size,will not be used for UDT
bossThreadSize = 1
#auto default pin or 8
workerThreadSize = "default"
}
shutdown {
# when destroy server, wait seconds
wait = 3
}
serialization = "seata"
compressor = "none"
}
service {
#transaction service group mapping
# order_tx_group 与 yml 中的 “tx-service-group: order_tx_group” 配置一致
# “seata-server” 与 TC 服务器的注册名一致
# 从eureka获取seata-server的地址,再向seata-server注册自己,设置group
vgroupMapping.order_tx_group = "seata-server"
#only support when registry.type=file, please don't set multiple addresses
order_tx_group.grouplist = "127.0.0.1:8091"
#degrade, current not support
enableDegrade = false
#disable seata
disableGlobalTransaction = false
}

client {
rm {
asyncCommitBufferLimit = 10000
lock {
retryInterval = 10
retryTimes = 30
retryPolicyBranchRollbackOnConflict = true
}
reportRetryCount = 5
tableMetaCheckEnable = false
reportSuccessEnable = false
}
tm {
commitRetryCount = 5
rollbackRetryCount = 5
}
undo {
dataValidation = true
logSerialization = "jackson"
logTable = "undo_log"
}
log {
exceptionRate = 100
}
}

配置数据源代理(每个模块)

  • 为事务中的每个模块配置数据源代理

  • 创一个全局配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Configuration
public class DSAutoConfiguration {

// 创建原始DataSource对象
@Bean
@ConfigurationProperties("spring.datasource")
public DataSource dataSource() {
return new HikariDataSource();
}

// 创建Seata的DataSourceProxy对象
@Bean
@Primary
public DataSourceProxy dataSourceProxy(DataSource dataSource) {
return new DataSourceProxy(dataSource);
}

}
  • 由于使用的默认连接池为HikariCP连接池,需要修改application.ymlspring.datasource配置,把spring.datasource.url改为spring.datasource.jdbcUrl
1
2
3
spring:
datasource:
jdbcUrl: jdbc:mysql://localhost:3306/seata_order?serverTimezone=GMT%2B8
  • 排除SpringBoot的数据源配置,在SpringBoot主启动类上修改@SpringBootApplication注解
1
2
3
4
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
public class Application {
...
}

开启全局事务(主模块)

  • 在主模块业务层方法添加@GlobalTransactional注解开启全局事务。它会创建TM(Transaction Manager),由TM向协调器申请开启全局事务

  • 全局事务只在主模块的业务上添加,被调用的模块不需要添加

只开启本地事务(非主模块)

  • 在非主模块的每个模块业务方法添加@Transactional注解开启本地事务

AT模式完成

  • AT模式不需要修改业务代码,只需要添加配置,即可实现分布式业务

TCC模式

  • TCC(Try、Confirm、Cancel) 与 Seata AT 事务一样都是两阶段事务,它与 AT 事务的主要区别为:
    • TCC 对业务代码侵入严重:每个阶段的数据操作都要自己进行编码来实现,事务框架无法自动处理。
    • TCC 效率更高:不必对数据加全局锁,允许多个事务同时操作数据。

在项目中添加事务

添加依赖(每个模块)

  • 为事务中的每个模块添加依赖

  • 修改pom.xml配置文件

1
2
3
4
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-seata</artifactId>
</dependency>

配置事务组组名(每个模块)

  • 将每个模块设置在同一个事务组

  • 修改每个模块的application.yml配置文件

application.yml
1
2
3
4
5
spring:
cloud:
alibaba:
seata:
tx-service-group: order_tx_group

配置协调器(每个模块)

  • 为事务中的每个模块配置协调器配置
registry
  • resources目录下创建registry.conf配置文件
resources/registry.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "eureka"

nacos {
serverAddr = "localhost"
namespace = ""
cluster = "default"
}
eureka {
serviceUrl = "http://localhost:8761/eureka"
# application = "default"
# weight = "1"
}
redis {
serverAddr = "localhost:6379"
db = "0"
password = ""
cluster = "default"
timeout = "0"
}
zk {
cluster = "default"
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
username = ""
password = ""
}
consul {
cluster = "default"
serverAddr = "127.0.0.1:8500"
}
etcd3 {
cluster = "default"
serverAddr = "http://localhost:2379"
}
sofa {
serverAddr = "127.0.0.1:9603"
application = "default"
region = "DEFAULT_ZONE"
datacenter = "DefaultDataCenter"
cluster = "default"
group = "SEATA_GROUP"
addressWaitTime = "3000"
}
file {
name = "file.conf"
}
}

config {
# file、nacos 、apollo、zk、consul、etcd3、springCloudConfig
type = "file"

nacos {
serverAddr = "localhost"
namespace = ""
group = "SEATA_GROUP"
}
consul {
serverAddr = "127.0.0.1:8500"
}
apollo {
app.id = "seata-server"
apollo.meta = "http://192.168.1.204:8801"
namespace = "application"
}
zk {
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
username = ""
password = ""
}
etcd3 {
serverAddr = "http://localhost:2379"
}
file {
name = "file.conf"
}
}
file
  • resources目录下创建file.conf配置文件

  • 第36行要改为对应的事务组名

resources/file.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
transport {
# tcp udt unix-domain-socket
type = "TCP"
#NIO NATIVE
server = "NIO"
#enable heartbeat
heartbeat = true
# the client batch send request enable
enableClientBatchSendRequest = true
#thread factory for netty
threadFactory {
bossThreadPrefix = "NettyBoss"
workerThreadPrefix = "NettyServerNIOWorker"
serverExecutorThread-prefix = "NettyServerBizHandler"
shareBossWorker = false
clientSelectorThreadPrefix = "NettyClientSelector"
clientSelectorThreadSize = 1
clientWorkerThreadPrefix = "NettyClientWorkerThread"
# netty boss thread size,will not be used for UDT
bossThreadSize = 1
#auto default pin or 8
workerThreadSize = "default"
}
shutdown {
# when destroy server, wait seconds
wait = 3
}
serialization = "seata"
compressor = "none"
}
service {
#transaction service group mapping
# order_tx_group 与 yml 中的 “tx-service-group: order_tx_group” 配置一致
# “seata-server” 与 TC 服务器的注册名一致
# 从eureka获取seata-server的地址,再向seata-server注册自己,设置group
vgroupMapping.order_tx_group = "seata-server"
#only support when registry.type=file, please don't set multiple addresses
order_tx_group.grouplist = "127.0.0.1:8091"
#degrade, current not support
enableDegrade = false
#disable seata
disableGlobalTransaction = false
}

client {
rm {
asyncCommitBufferLimit = 10000
lock {
retryInterval = 10
retryTimes = 30
retryPolicyBranchRollbackOnConflict = true
}
reportRetryCount = 5
tableMetaCheckEnable = false
reportSuccessEnable = false
}
tm {
commitRetryCount = 5
rollbackRetryCount = 5
}
undo {
dataValidation = true
logSerialization = "jackson"
logTable = "undo_log"
}
log {
exceptionRate = 100
}
}

添加TCC接口(指定业务)

  • 添加对应业务的TCC接口,为接口添加注解@LocalTCC,在接口中添加T(预留资源)C(确认)C(回滚)三个阶段的方法,方法名自定

@TwoPhaseBusinessAction:标注在Try方法上

name = "":指定自定义的操作名称
commitMethod = "":指定提交方法名,默认值为commit(如果为默认值,这个属性可以省略)
rollbackMethod = "":指定回滚方法名,默认值为rollback(如果为默认值,这个属性可以省略)

BusinessActionContext businessActionContext:上下文对象,用于传递保存的数据
其他对象为对数据库操作的参数(目前版本不能使用封装的对象最为参数)

@BusinessActionContextParameter(paramName = “”):标注在其他对数据库操作的参数前,被这个注解标注的参数会放到上下文对象中

paramName = "":指定参数名

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@LocalTCC
public interface OrderTccAction {

@TwoPhaseBusinessAction(name = "OrderTccAction", commitMethod = "commit", rollbackMethod = "rollback")
boolean prepareCreateOrder(BusinessActionContext businessActionContext,
@BusinessActionContextParameter(paramName = "orderId") Long orderId,
@BusinessActionContextParameter(paramName = "userId") Long userId,
@BusinessActionContextParameter(paramName = "productId") Long productId,
@BusinessActionContextParameter(paramName = "count") Integer count,
@BusinessActionContextParameter(paramName = "money") BigDecimal money);

// Confirm
boolean commit(BusinessActionContext businessActionContext);

// Cancel
boolean rollback(BusinessActionContext businessActionContext);

}

添加TCC接口的实现类(根据接口)

  • 用@Component注解标注,交给Spring管理
  • 根据业务完成TCC代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Component
public class OrderTccActionImpl implements OrderTccAction {

@Autowired
private OrderMapper orderMapper;

@Override
public boolean prepareCreateOrder(BusinessActionContext businessActionContext, Long orderId, Long userId, Long productId, Integer count, BigDecimal money) {
orderMapper.create(new Order(orderId, userId, productId, count, money, 0));
return true;
}

@Override
public boolean commit(BusinessActionContext businessActionContext) {
Long orderId = Long.valueOf(businessActionContext.getActionContext("orderId").toString());
orderMapper.updateStatus(orderId, 1);
return true;
}

@Override
public boolean rollback(BusinessActionContext businessActionContext) {
Long orderId = Long.valueOf(businessActionContext.getActionContext("orderId").toString());
orderMapper.deleteById(orderId);
return true;
}
}

调整Service(指定业务)

  • 在需要添加事务的方法上添加@GlobalTransactional注解启动全局事务
  • 手动调用TCC第一阶段代码

BusinessActionContext参数应传递null值

1
2
3
4
5
6
7
8
9
10
11
12
13
@Service
public class OrderServiceImpl implements OrderService {

@Autowired
private OrderTccAction orderTccAction;

@Override
@GlobalTransactional
public void create(Order order) {
orderTccAction.prepareCreateOrder(null, order.getId(), order.getUserId(), order.getProductId(), order.getCount(), order.getMoney());
}

}

幂等性控制

  • 为了防止执行多次由TC发来的指令,通过执行T添加标记,执行C删除标记,来实现幂等性控制

工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class ResultHolder {
private static Map<Class<?>, Map<String, String>> map = new ConcurrentHashMap<Class<?>, Map<String, String>>();

public static void setResult(Class<?> actionClass, String xid, String v) {
Map<String, String> results = map.get(actionClass);

if (results == null) {
synchronized (map) {
if (results == null) {
results = new ConcurrentHashMap<>();
map.put(actionClass, results);
}
}
}

results.put(xid, v);
}

public static String getResult(Class<?> actionClass, String xid) {
Map<String, String> results = map.get(actionClass);
if (results != null) {
return results.get(xid);
}

return null;
}

public static void removeResult(Class<?> actionClass, String xid) {
Map<String, String> results = map.get(actionClass);
if (results != null) {
results.remove(xid);
}
}
}

修改实现类

在T执行结束,添加标记
1
ResultHolder.setResult(OrderTccAction.class, businessActionContext.getXid(), "p");
在C执行结束,删除标记
  • 提交和回滚添加的代码相同,都添加删除标记和判断
1
ResultHolder.removeResult(OrderTccAction.class, businessActionContext.getXid());
在C执行之前,添加判断,不重复执行
1
2
3
if (ResultHolder.getResult(OrderTccAction.class, businessActionContext.getXid())==null) {
return true;
}

TCC模式完成

完成