前言
A distributed transaction solution with high performance and ease of use for microservices architecture.(官方GitHub)
准备工作
配置TC协调器
配置registry
修改conf/registry.conf
配置文件
- 在
registry
配置注册中心,如果为type="eureka"
,则继续修改registry.eureka
配置
1 2
| serviceUrl = "http://localhost:8761/eureka" application = "seate-server"
|
- 在`config`配置日志记录的方式,如果为`type="file"`,则继续修改`conf/file.conf`配置文件
配置file
修改conf/file.conf
配置文件
- 在
store
配置数据存储方式,如果为mode="db"
,则继续修改store.db
配置
1 2 3 4 5
| dbType = "mysql" driverClassName = "com.mysql.cj.jdbc.Driver" url = "jdbc:mysql://127.0.0.1:3306/seata" user = "root" password = "123456"
|
配置启动参数
启动TC协调器
AT模式
- Seata的AT模式(Automatic Transaction)是一种无侵入的分布式事务解决方案
在项目中添加事务
添加依赖(每个模块)
为事务中的每个模块添加依赖
修改pom.xml
配置文件
1 2 3 4
| <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-alibaba-seata</artifactId> </dependency>
|
配置事务组组名(每个模块)
1 2 3 4 5
| spring: cloud: alibaba: seata: tx-service-group: order_tx_group
|
配置协调器(每个模块)
registry
- 在
resources
目录下创建registry.conf
配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
| registry { # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa type = "eureka"
nacos { serverAddr = "localhost" namespace = "" cluster = "default" } eureka { serviceUrl = "http://localhost:8761/eureka" # application = "default" # weight = "1" } redis { serverAddr = "localhost:6379" db = "0" password = "" cluster = "default" timeout = "0" } zk { cluster = "default" serverAddr = "127.0.0.1:2181" session.timeout = 6000 connect.timeout = 2000 username = "" password = "" } consul { cluster = "default" serverAddr = "127.0.0.1:8500" } etcd3 { cluster = "default" serverAddr = "http://localhost:2379" } sofa { serverAddr = "127.0.0.1:9603" application = "default" region = "DEFAULT_ZONE" datacenter = "DefaultDataCenter" cluster = "default" group = "SEATA_GROUP" addressWaitTime = "3000" } file { name = "file.conf" } }
config { # file、nacos 、apollo、zk、consul、etcd3、springCloudConfig type = "file"
nacos { serverAddr = "localhost" namespace = "" group = "SEATA_GROUP" } consul { serverAddr = "127.0.0.1:8500" } apollo { app.id = "seata-server" apollo.meta = "http://192.168.1.204:8801" namespace = "application" } zk { serverAddr = "127.0.0.1:2181" session.timeout = 6000 connect.timeout = 2000 username = "" password = "" } etcd3 { serverAddr = "http://localhost:2379" } file { name = "file.conf" } }
|
file
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
| transport { # tcp udt unix-domain-socket type = "TCP" #NIO NATIVE server = "NIO" #enable heartbeat heartbeat = true # the client batch send request enable enableClientBatchSendRequest = true #thread factory for netty threadFactory { bossThreadPrefix = "NettyBoss" workerThreadPrefix = "NettyServerNIOWorker" serverExecutorThread-prefix = "NettyServerBizHandler" shareBossWorker = false clientSelectorThreadPrefix = "NettyClientSelector" clientSelectorThreadSize = 1 clientWorkerThreadPrefix = "NettyClientWorkerThread" # netty boss thread size,will not be used for UDT bossThreadSize = 1 #auto default pin or 8 workerThreadSize = "default" } shutdown { # when destroy server, wait seconds wait = 3 } serialization = "seata" compressor = "none" } service { #transaction service group mapping # order_tx_group 与 yml 中的 “tx-service-group: order_tx_group” 配置一致 # “seata-server” 与 TC 服务器的注册名一致 # 从eureka获取seata-server的地址,再向seata-server注册自己,设置group vgroupMapping.order_tx_group = "seata-server" #only support when registry.type=file, please don't set multiple addresses order_tx_group.grouplist = "127.0.0.1:8091" #degrade, current not support enableDegrade = false #disable seata disableGlobalTransaction = false }
client { rm { asyncCommitBufferLimit = 10000 lock { retryInterval = 10 retryTimes = 30 retryPolicyBranchRollbackOnConflict = true } reportRetryCount = 5 tableMetaCheckEnable = false reportSuccessEnable = false } tm { commitRetryCount = 5 rollbackRetryCount = 5 } undo { dataValidation = true logSerialization = "jackson" logTable = "undo_log" } log { exceptionRate = 100 } }
|
配置数据源代理(每个模块)
为事务中的每个模块配置数据源代理
创一个全局配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Configuration public class DSAutoConfiguration {
@Bean @ConfigurationProperties("spring.datasource") public DataSource dataSource() { return new HikariDataSource(); }
@Bean @Primary public DataSourceProxy dataSourceProxy(DataSource dataSource) { return new DataSourceProxy(dataSource); }
}
|
- 由于使用的默认连接池为HikariCP连接池,需要修改
application.yml
的spring.datasource
配置,把spring.datasource.url
改为spring.datasource.jdbcUrl
1 2 3
| spring: datasource: jdbcUrl: jdbc:mysql://localhost:3306/seata_order?serverTimezone=GMT%2B8
|
- 排除SpringBoot的数据源配置,在SpringBoot主启动类上修改
@SpringBootApplication
注解
1 2 3 4
| @SpringBootApplication(exclude = DataSourceAutoConfiguration.class) public class Application { ... }
|
开启全局事务(主模块)
只开启本地事务(非主模块)
- 在非主模块的每个模块业务方法添加
@Transactional
注解开启本地事务
AT模式完成
- AT模式不需要修改业务代码,只需要添加配置,即可实现分布式业务
TCC模式
- TCC(Try、Confirm、Cancel) 与 Seata AT 事务一样都是两阶段事务,它与 AT 事务的主要区别为:
- TCC 对业务代码侵入严重:每个阶段的数据操作都要自己进行编码来实现,事务框架无法自动处理。
- TCC 效率更高:不必对数据加全局锁,允许多个事务同时操作数据。
在项目中添加事务
添加依赖(每个模块)
为事务中的每个模块添加依赖
修改pom.xml
配置文件
1 2 3 4
| <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-alibaba-seata</artifactId> </dependency>
|
配置事务组组名(每个模块)
1 2 3 4 5
| spring: cloud: alibaba: seata: tx-service-group: order_tx_group
|
配置协调器(每个模块)
registry
- 在
resources
目录下创建registry.conf
配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
| registry { # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa type = "eureka"
nacos { serverAddr = "localhost" namespace = "" cluster = "default" } eureka { serviceUrl = "http://localhost:8761/eureka" # application = "default" # weight = "1" } redis { serverAddr = "localhost:6379" db = "0" password = "" cluster = "default" timeout = "0" } zk { cluster = "default" serverAddr = "127.0.0.1:2181" session.timeout = 6000 connect.timeout = 2000 username = "" password = "" } consul { cluster = "default" serverAddr = "127.0.0.1:8500" } etcd3 { cluster = "default" serverAddr = "http://localhost:2379" } sofa { serverAddr = "127.0.0.1:9603" application = "default" region = "DEFAULT_ZONE" datacenter = "DefaultDataCenter" cluster = "default" group = "SEATA_GROUP" addressWaitTime = "3000" } file { name = "file.conf" } }
config { # file、nacos 、apollo、zk、consul、etcd3、springCloudConfig type = "file"
nacos { serverAddr = "localhost" namespace = "" group = "SEATA_GROUP" } consul { serverAddr = "127.0.0.1:8500" } apollo { app.id = "seata-server" apollo.meta = "http://192.168.1.204:8801" namespace = "application" } zk { serverAddr = "127.0.0.1:2181" session.timeout = 6000 connect.timeout = 2000 username = "" password = "" } etcd3 { serverAddr = "http://localhost:2379" } file { name = "file.conf" } }
|
file
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
| transport { # tcp udt unix-domain-socket type = "TCP" #NIO NATIVE server = "NIO" #enable heartbeat heartbeat = true # the client batch send request enable enableClientBatchSendRequest = true #thread factory for netty threadFactory { bossThreadPrefix = "NettyBoss" workerThreadPrefix = "NettyServerNIOWorker" serverExecutorThread-prefix = "NettyServerBizHandler" shareBossWorker = false clientSelectorThreadPrefix = "NettyClientSelector" clientSelectorThreadSize = 1 clientWorkerThreadPrefix = "NettyClientWorkerThread" # netty boss thread size,will not be used for UDT bossThreadSize = 1 #auto default pin or 8 workerThreadSize = "default" } shutdown { # when destroy server, wait seconds wait = 3 } serialization = "seata" compressor = "none" } service { #transaction service group mapping # order_tx_group 与 yml 中的 “tx-service-group: order_tx_group” 配置一致 # “seata-server” 与 TC 服务器的注册名一致 # 从eureka获取seata-server的地址,再向seata-server注册自己,设置group vgroupMapping.order_tx_group = "seata-server" #only support when registry.type=file, please don't set multiple addresses order_tx_group.grouplist = "127.0.0.1:8091" #degrade, current not support enableDegrade = false #disable seata disableGlobalTransaction = false }
client { rm { asyncCommitBufferLimit = 10000 lock { retryInterval = 10 retryTimes = 30 retryPolicyBranchRollbackOnConflict = true } reportRetryCount = 5 tableMetaCheckEnable = false reportSuccessEnable = false } tm { commitRetryCount = 5 rollbackRetryCount = 5 } undo { dataValidation = true logSerialization = "jackson" logTable = "undo_log" } log { exceptionRate = 100 } }
|
添加TCC接口(指定业务)
- 添加对应业务的TCC接口,为接口添加注解@LocalTCC,在接口中添加T(预留资源)C(确认)C(回滚)三个阶段的方法,方法名自定
@TwoPhaseBusinessAction
:标注在Try方法上
name = ""
:指定自定义的操作名称
commitMethod = ""
:指定提交方法名,默认值为commit(如果为默认值,这个属性可以省略)
rollbackMethod = ""
:指定回滚方法名,默认值为rollback(如果为默认值,这个属性可以省略)
BusinessActionContext businessActionContext
:上下文对象,用于传递保存的数据
其他对象为对数据库操作的参数(目前版本不能使用封装的对象最为参数)
@BusinessActionContextParameter(paramName = “”):标注在其他对数据库操作的参数前,被这个注解标注的参数会放到上下文对象中
paramName = ""
:指定参数名
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @LocalTCC public interface OrderTccAction {
@TwoPhaseBusinessAction(name = "OrderTccAction", commitMethod = "commit", rollbackMethod = "rollback") boolean prepareCreateOrder(BusinessActionContext businessActionContext, @BusinessActionContextParameter(paramName = "orderId") Long orderId, @BusinessActionContextParameter(paramName = "userId") Long userId, @BusinessActionContextParameter(paramName = "productId") Long productId, @BusinessActionContextParameter(paramName = "count") Integer count, @BusinessActionContextParameter(paramName = "money") BigDecimal money);
boolean commit(BusinessActionContext businessActionContext);
boolean rollback(BusinessActionContext businessActionContext);
}
|
添加TCC接口的实现类(根据接口)
- 用@Component注解标注,交给Spring管理
- 根据业务完成TCC代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| @Component public class OrderTccActionImpl implements OrderTccAction {
@Autowired private OrderMapper orderMapper;
@Override public boolean prepareCreateOrder(BusinessActionContext businessActionContext, Long orderId, Long userId, Long productId, Integer count, BigDecimal money) { orderMapper.create(new Order(orderId, userId, productId, count, money, 0)); return true; }
@Override public boolean commit(BusinessActionContext businessActionContext) { Long orderId = Long.valueOf(businessActionContext.getActionContext("orderId").toString()); orderMapper.updateStatus(orderId, 1); return true; }
@Override public boolean rollback(BusinessActionContext businessActionContext) { Long orderId = Long.valueOf(businessActionContext.getActionContext("orderId").toString()); orderMapper.deleteById(orderId); return true; } }
|
调整Service(指定业务)
- 在需要添加事务的方法上添加@GlobalTransactional注解启动全局事务
- 手动调用TCC第一阶段代码
BusinessActionContext参数应传递null值
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Service public class OrderServiceImpl implements OrderService {
@Autowired private OrderTccAction orderTccAction;
@Override @GlobalTransactional public void create(Order order) { orderTccAction.prepareCreateOrder(null, order.getId(), order.getUserId(), order.getProductId(), order.getCount(), order.getMoney()); }
}
|
幂等性控制
- 为了防止执行多次由TC发来的指令,通过
执行T添加标记,执行C删除标记
,来实现幂等性控制
工具类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| public class ResultHolder { private static Map<Class<?>, Map<String, String>> map = new ConcurrentHashMap<Class<?>, Map<String, String>>();
public static void setResult(Class<?> actionClass, String xid, String v) { Map<String, String> results = map.get(actionClass);
if (results == null) { synchronized (map) { if (results == null) { results = new ConcurrentHashMap<>(); map.put(actionClass, results); } } }
results.put(xid, v); }
public static String getResult(Class<?> actionClass, String xid) { Map<String, String> results = map.get(actionClass); if (results != null) { return results.get(xid); }
return null; }
public static void removeResult(Class<?> actionClass, String xid) { Map<String, String> results = map.get(actionClass); if (results != null) { results.remove(xid); } } }
|
修改实现类
在T执行结束,添加标记
1
| ResultHolder.setResult(OrderTccAction.class, businessActionContext.getXid(), "p");
|
在C执行结束,删除标记
1
| ResultHolder.removeResult(OrderTccAction.class, businessActionContext.getXid());
|
在C执行之前,添加判断,不重复执行
1 2 3
| if (ResultHolder.getResult(OrderTccAction.class, businessActionContext.getXid())==null) { return true; }
|
TCC模式完成
完成