Spring Cloud Alibaba系列四:集成 seata 实现分布式事务
Spring Cloud Alibaba 整合 seata 实现分布式事务
Spring Cloud Alibaba系列四:集成 seata 实现分布式事务
spring cloud alibaba 有严格的版本要求,所以一定要选对 spring cloud 版本,以及 spring boot 的版本
此系列文章的版本选取如下:
- Spring Cloud 版本:Spring Cloud 2021.0.1
- Spring Boot 版本:2.6.3
- Spring Cloud Alibaba 版本:2021.0.1.0
Spring Cloud Alibaba系列一:nacos 注册中心
Spring Cloud Alibaba系列二:openFeign 实现服务间的通信
Spring Cloud Alibaba系列三:集成Gateway实现路由管理
Spring Cloud Alibaba系列四:集成 seata 实现分布式事务
前言
前面的几篇文章中,记录了整个 Spring Cloud Alibaba 系列的实现以及整合 nacos、openFeign、gateway等功能。这篇文章主要用来记录自己整合 seata 的过程,以及遇到的坑。
Seata 是什么?
我们可以 引用 Seata文档 里的描述:
Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。
比如说,在一个机票售票系统中,可能需要多个服务支持:
- 售票服务:修改库存机票数量进行修改
- 流水服务:添加订单流水记录
- 通知服务:给用户发送购买结果消息
一个售票系统被拆成了多个独立的应用,可能多个应用对应多个数据库。在各自的应用中,可以用事务确保数据的一致性,但是对于整个系统来说,数据的一致性就没法保证了。
但是,引入了 seata 之后,开启全局事务,那么当售票服务出错之后,那么流水服务也会回滚,就不会往数据库里写入数据。
Seata 术语
1. TC (Transaction Coordinator) - 事务协调者
维护全局和分支事务的状态,驱动全局事务提交或回滚。
2.TM (Transaction Manager) - 事务管理器
定义全局事务的范围:开始全局事务、提交或回滚全局事务。
3.RM (Resource Manager) - 资源管理器
管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。
安装 seata
描述 | 说明 |
---|---|
版本 | 1.5.2 |
安装方式 | docker |
register 方式 | nacos |
config 方式 | nacos |
store 方式 | db |
mysql版本 | 8.0.25 |
因为一开始安装的 MySQL 是 8.0 版本的,所以就使用了 1.5.2 版本的 seata,在这之前的版本 不支持 8.0 的数据库,另外 1.5.0 之后的 seata 的配置做了整合,只需要修改 seata 的 application.yml
即可。
1、创建 seata 数据库,并添加对应的表
-- ----------------------------
-- Table structure for branch_table
-- ----------------------------
DROP TABLE IF EXISTS `branch_table`;
CREATE TABLE `branch_table` (
`branch_id` bigint(0) NOT NULL,
`xid` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,
`transaction_id` bigint(0) NULL DEFAULT NULL,
`resource_group_id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
`resource_id` varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
`branch_type` varchar(8) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
`status` tinyint(0) NULL DEFAULT NULL,
`client_id` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
`application_data` varchar(2000) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
`gmt_create` datetime(6) NULL DEFAULT NULL,
`gmt_modified` datetime(6) NULL DEFAULT NULL,
PRIMARY KEY (`branch_id`) USING BTREE,
INDEX `idx_xid`(`xid`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of branch_table
-- ----------------------------
-- ----------------------------
-- Table structure for distributed_lock
-- ----------------------------
DROP TABLE IF EXISTS `distributed_lock`;
CREATE TABLE `distributed_lock` (
`lock_key` char(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,
`lock_value` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,
`expire` bigint(0) NULL DEFAULT NULL,
PRIMARY KEY (`lock_key`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of distributed_lock
-- ----------------------------
INSERT INTO `distributed_lock` VALUES ('AsyncCommitting', ' ', 0);
INSERT INTO `distributed_lock` VALUES ('RetryCommitting', ' ', 0);
INSERT INTO `distributed_lock` VALUES ('RetryRollbacking', ' ', 0);
INSERT INTO `distributed_lock` VALUES ('TxTimeoutCheck', ' ', 0);
-- ----------------------------
-- Table structure for global_table
-- ----------------------------
DROP TABLE IF EXISTS `global_table`;
CREATE TABLE `global_table` (
`xid` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,
`transaction_id` bigint(0) NULL DEFAULT NULL,
`status` tinyint(0) NOT NULL,
`application_id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
`transaction_service_group` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
`transaction_name` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
`timeout` int(0) NULL DEFAULT NULL,
`begin_time` bigint(0) NULL DEFAULT NULL,
`application_data` varchar(2000) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
`gmt_create` datetime(0) NULL DEFAULT NULL,
`gmt_modified` datetime(0) NULL DEFAULT NULL,
PRIMARY KEY (`xid`) USING BTREE,
INDEX `idx_status_gmt_modified`(`status`, `gmt_modified`) USING BTREE,
INDEX `idx_transaction_id`(`transaction_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of global_table
-- ----------------------------
-- ----------------------------
-- Table structure for lock_table
-- ----------------------------
DROP TABLE IF EXISTS `lock_table`;
CREATE TABLE `lock_table` (
`row_key` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,
`xid` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
`transaction_id` bigint(0) NULL DEFAULT NULL,
`branch_id` bigint(0) NOT NULL,
`resource_id` varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
`table_name` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
`pk` varchar(36) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
`status` tinyint(0) NOT NULL DEFAULT 0 COMMENT '0:locked ,1:rollbacking',
`gmt_create` datetime(0) NULL DEFAULT NULL,
`gmt_modified` datetime(0) NULL DEFAULT NULL,
PRIMARY KEY (`row_key`) USING BTREE,
INDEX `idx_status`(`status`) USING BTREE,
INDEX `idx_branch_id`(`branch_id`) USING BTREE,
INDEX `idx_xid_and_branch_id`(`xid`, `branch_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of lock_table
-- ----------------------------
2、docker 拉取镜像
docker pull seataio/seata-server:1.5.2
3、启动 seata
docker run --name seata-server -p 8091:8091 -p 7091:7091 seataio/seata-server:1.5.2
4、复制 seata 的配置文件到本地
docker cp seata-server:/seata-server/resources /mydata/seata
5、修改配置文件
vim /mydata/seata/resources/application.yml
server:
port: 7091
spring:
application:
name: seata-server
logging:
config: classpath:logback-spring.xml
file:
path: ${user.home}/logs/seata
extend:
logstash-appender:
destination: 127.0.0.1:4560
kafka-appender:
bootstrap-servers: 127.0.0.1:9092
topic: logback_to_logstash
console:
user:
username: seata
password: seata
seata:
config:
# support: nacos, consul, apollo, zk, etcd3
type: nacos
nacos:
server-addr: 192.168.2.213:8848
namespace:
group: SEATA_GROUP
username: nacos
password: nacos
##if use MSE Nacos with auth, mutex with username/password attribute
# # access-key: ""
# # secret-key: ""
# data-id: seataServer.propertie
registry:
# support: nacos, eureka, redis, zk, consul, etcd3, sofa
type: nacos
preferred-networks: 30.240.*
nacos:
application: seata-server
group: SEATA_GROUP
server-addr: 192.168.2.213:8848
namespace:
cluster: default
username:
password:
##if use MSE Nacos with auth, mutex with username/password attribute
# #access-key: ""
# #secret-key: ""
store:
# support: file 、 db 、 redis
mode: db
db:
datasource: druid
db-type: mysql
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://192.168.2.213:3306/seata?rewriteBatchedStatements=true
user: root
password: root
min-conn: 5
max-conn: 100
global-table: global_table
branch-table: branch_table
lock-table: lock_table
distributed-lock-table: distributed_lock
query-limit: 100
max-wait: 5000
# server:
# service-port: 8091 #If not configured, the default is '${server.port} + 1000'
security:
secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017
tokenValidityInMilliseconds: 1800000
ignore:
urls: /,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.ico,/console-fe/public/**,/api/v1/auth/login
修改完 配置文件后,删除启动的 seata,重新启动
docker ps -a
docker stop seata-server
docker rm cfccdab1bf46 # 对应 第一步查出来的 container id
docker run -d --name seata-server \
-p 8091:8091 \
-p 7091:7091 \
-e SEATA_IP=192.168.2.213 \
-v /mydata/seata/resources:/seata-server/resources \
seataio/seata-server:1.5.2
SETA_IP
seata 服务所在服务器的ip
-v
指定本地修改的配置文件启动
docker logs -f seata-server # 查看启动日志
启动成功之后,就可以在 nacos 注册中心找到对应的服务:
集成 seata
1、创建表
在项目的数据库里创建 3 张表:
-- auto-generated definition
create table air_ticket
(
id bigint auto_increment
primary key,
from_city varchar(30) not null comment '起飞城市',
to_city varchar(30) not null comment '目的地城市',
surplus_num int not null comment '库存数量',
start_time datetime not null comment '出发时间',
created_by bigint not null comment '创建人id',
create_time datetime default CURRENT_TIMESTAMP not null comment '创建时间',
updated_by bigint not null comment '修改人',
update_time datetime default CURRENT_TIMESTAMP not null comment '修改时间'
)
comment '飞机票表';
-- auto-generated definition
create table user_tickets
(
id bigint auto_increment
primary key,
user_id bigint not null comment '用户id',
ticket_id bigint not null comment '机票id',
total_num int not null comment '机票数量'
)
comment '用户机票表';
-- auto-generated definition
create table undo_log
(
branch_id bigint not null comment 'branch transaction id',
xid varchar(128) not null comment 'global transaction id',
context varchar(128) not null comment 'undo_log context,such as serialization',
rollback_info longblob not null comment 'rollback info',
log_status int not null comment '0:normal status,1:defense status',
log_created datetime(6) not null comment 'create datetime',
log_modified datetime(6) not null comment 'modify datetime',
constraint ux_undo_log
unique (xid, branch_id)
)
comment 'AT transaction mode undo table' charset = utf8mb4;
undo_log
表是 seata 的 AT 模式必须要使用的表,如果没有,项目启动会报错。如果多个服务使用 seata ,且对应多个数据库,则,多个数据库都要引入这张表。
2、新建一个子 module:business
pom 的设置 以及 配置文件 的设置同 provider
3、生成实例等文件
使用插件或者手动也好,在 provider 子项目里生 成 air_ticket 的 entity、dao、mapper、controller。
在 business 子项目里生成 user_tickets 的 entity、dao、mapper、controller。
4、引入依赖
在 common 模块的配置文件中添加 seata 依赖
<!--添加seata依赖,-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<exclusions>
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>${seata.version}</version>
</dependency>
这里我引入的是 1.5.2,但是 参考spring cloud alibaba 版本对照,实际我引入的 1.5.2 的版本并没有生效,项目里生效的是 1.4.2 的版本。
5、修改配置文件
依次在 provider、business、customer 的配置文件中添加如下内容:
# provider
# application.yml
seata:
tx-service-group: provider-tx-group
service:
vgroupMapping:
provider-tx-group: default
config:
type: nacos
nacos:
group: SEATA_GROUP
username: ${spring.cloud.nacos.config.username}
password: ${spring.cloud.nacos.config.password}
registry:
type: nacos
nacos:
group: SEATA_GROUP
username: ${spring.cloud.nacos.discovery.username}
password: ${spring.cloud.nacos.discovery.password}
# customer
# application.yml
seata:
tx-service-group: customer-tx-group
service:
vgroupMapping:
customer-tx-group: default
config:
type: nacos
nacos:
group: SEATA_GROUP
username: ${spring.cloud.nacos.config.username}
password: ${spring.cloud.nacos.config.password}
registry:
type: nacos
nacos:
group: SEATA_GROUP
username: ${spring.cloud.nacos.discovery.username}
password: ${spring.cloud.nacos.discovery.password}
# business
# application.yml
seata:
tx-service-group: business-tx-group
service:
vgroupMapping:
business-tx-group: default
config:
type: nacos
nacos:
group: SEATA_GROUP
username: ${spring.cloud.nacos.config.username}
password: ${spring.cloud.nacos.config.password}
registry:
type: nacos
nacos:
group: SEATA_GROUP
username: ${spring.cloud.nacos.discovery.username}
password: ${spring.cloud.nacos.discovery.password}
# application-dev.yml
seata:
registry:
nacos:
server-addr: ${spring.cloud.nacos.discovery.server-addr}
config:
nacos:
server-addr: ${spring.cloud.nacos.config.server-addr}
1、
tx-service-group
:Seata 事务组编号,用于 TC 集群名。
2、vgroupMapping
:虚拟组和分组的映射
3、网上说 vgroupMapping 下的名字要和 tx-service-group 后的值一致,但是我自己在测试中,由于失误,没有设置一致,发现并不会影响 全局事务的生效。
6、nacos 添加配置
在 nacos 上为 3 个 tx-service-group 添加配置
Data Id
:值固定, service.vgroupMapping.XXX。XXX 要和 项目的配置文件里的 tx-service-group 的值一致。
Group
:和 seata 服务端注册到 nacos 的分组一致。
配置内容
:default
7、添加业务代码
7.1、provider 里添加售票业务
# AirTicketService.java
/**
* 售票
* @param ticketId 机票id
* @param count 数量
*/
void sellTicket(Long ticketId, int count);
# AirTicketServiceImpl.java
@Service
@Slf4j
public class AirTicketServiceImpl extends ServiceImpl<AirTicketDao, AirTicket> implements AirTicketService {
@Override
public void sellTicket(Long ticketId, int count) {
log.info("卖票开始---------------------->");
AirTicket airTicket = AirTicket.builder()
.id(ticketId)
.build();
AirTicket airTicket1 = getById(ticketId);
airTicket.setSurplusNum(airTicket1.getSurplusNum() - count);
updateById(airTicket);
// 这段代码就是模拟 发生异常
System.out.println(1/0);
log.info("<--------------------------卖票结束");
}
}
# AirTicketController.java
@RestController
@RequestMapping("airTicket")
public class AirTicketController {
/**
* 服务对象
*/
@Resource
private AirTicketService airTicketService;
@GetMapping("/sell")
public void sellTicket() {
airTicketService.sellTicket(1L, 2);
}
}
7.2、business 里添加用户机票记录业务
# UserTicketsService.java
public interface UserTicketsService extends IService<UserTickets> {
/**
* 给用户机票表添加记录
*/
void addRecord();
}
# UserTicketsServiceImpl.java
@Service
@Slf4j
public class UserTicketsServiceImpl extends ServiceImpl<UserTicketsDao, UserTickets> implements UserTicketsService {
@Resource
private UserTicketsDao userTicketsDao;
@Override
public void addRecord() {
log.info("开始添加购票记录-----------------------<");
UserTickets userTickets = UserTickets.builder()
.id(1L)
.ticketId(1L)
.userId(1L)
.totalNum(2)
.build();
save(userTickets);
log.info("-----------------------<添加购票记录结束");
}
}
# UserTicketsRecordController
@RestController
@RequestMapping("/userTickets")
public class UserTicketsRecordController {
@Resource
private UserTicketsService userTicketsService;
@GetMapping("/add")
public void addRecords() {
userTicketsService.addRecord();
}
}
7.3、customer 服务里调用 售票,添加用户机票记录 服务
a、添加 AirTicketFeign
@Component
@FeignClient(contextId = "ticket", value = "provider", path = "/provider/airTicket")
public interface AirTicketFeign {
@GetMapping("/sell")
String sellTicket();
}
b、添加 UserTicketsFeign
@Component
@FeignClient(contextId = "userTickets", value = "business", path = "/business/userTickets")
public interface UserTicketsFeign {
@GetMapping("/add")
String addRecords();
}
注意
:存在多个OpenFeign 的 客户端时,需要添加contextId
,否则会报下面的错FeignClientSpecification‘ could not be registered. A bean with that name has already been defined
。
c、添加业务代码
# TicketService.java
public interface TicketService {
void buyTicket();
}
# TicketServiceImpl.java
@Service
public class TicketServiceImpl implements TicketService {
@Resource
private AirTicketFeign airTicketFeign;
@Resource
private UserTicketsFeign userTicketsFeign;
@Override
@GlobalTransactional
public void buyTicket() {
//添加购票记录
userTicketsFeign.addRecords();
//售票
airTicketFeign.sellTicket();
}
}
# CustomerDemoController.java
# 添加下面接口
@Resource
private TicketService ticketService;
@GetMapping("/buyTicket")
public void buyTicket() {
ticketService.buyTicket();
}
@GlobalTransactional
全局事务的注解
7.4、添加断点
由于 undo_log 表里的内容在 全局事务回滚完成之后,就会被清空,所以我们可以如下图添加断点,查看 undo_log 表里的内容
7.5 初始化 air_ticket 表 并启动服务
a、新增一条记录
INSERT INTO `alibaba`.`air_ticket`(`id`, `from_city`, `to_city`, `surplus_num`, `start_time`, `created_by`, `create_time`, `updated_by`, `update_time`) VALUES (1, '上海', '广州', 30, '2023-03-03 10:42:23', 1, '2023-02-03 10:42:42', 1, '2023-02-03 10:42:45');
b、依次启动 provider、business、customer 服务
customer 的控制台:
provider 的控制台:
business 的控制台:
7.6、访问
浏览器访问 http://localhost:9083/customer/demo/buyTicket
断点生效,查看 undo_log 表:
rollback_info 的内容:
{"@class":"io.seata.rm.datasource.undo.BranchUndoLog","xid":"192.168.2.213:8091:36384658307649560","branchId":36384658307649561,"sqlUndoLogs":["java.util.ArrayList",[{"@class":"io.seata.rm.datasource.undo.SQLUndoLog","sqlType":"INSERT","tableName":"user_tickets","beforeImage":{"@class":"io.seata.rm.datasource.sql.struct.TableRecords$EmptyTableRecords","tableName":"user_tickets","rows":["java.util.ArrayList",[]]},"afterImage":{"@class":"io.seata.rm.datasource.sql.struct.TableRecords","tableName":"user_tickets","rows":["java.util.ArrayList",[{"@class":"io.seata.rm.datasource.sql.struct.Row","fields":["java.util.ArrayList",[{"@class":"io.seata.rm.datasource.sql.struct.Field","name":"id","keyType":"PRIMARY_KEY","type":-5,"value":["java.lang.Long",1]},{"@class":"io.seata.rm.datasource.sql.struct.Field","name":"user_id","keyType":"NULL","type":-5,"value":["java.lang.Long",1]},{"@class":"io.seata.rm.datasource.sql.struct.Field","name":"ticket_id","keyType":"NULL","type":-5,"value":["java.lang.Long",1]},{"@class":"io.seata.rm.datasource.sql.struct.Field","name":"total_num","keyType":"NULL","type":4,"value":2}]]}]]}}]]}
business 控制台:
provider 控制台:
7.7、放行断点
business 控制台:
provider 控制台:
customer 控制台:
可以看见,全局事务生效。
air_ticket、user_tickets 表数据没有任何变化。
结尾
至此,整个项目整合 seata 已经结束。需要注意的几个点如下:
1、seata 1.5 开始 使用yml配置文件,1.5 之前是多个 file 配置。
2、seata 1.5.2 开始支持 mysql8
3、多个 FeignClient,需要设置 contextId
4、seata 默认使用 AT 模式,然后,AT模式下 一定要 引入 undo_log 表
更多知识参考 SpringBoot分布式事务Seata入门
更多推荐
所有评论(0)