Spring Cloud Alibaba系列四:集成 seata 实现分布式事务

spring cloud alibaba 版本对照

spring cloud alibaba 有严格的版本要求,所以一定要选对 spring cloud 版本,以及 spring boot 的版本

此系列文章的版本选取如下:

  • Spring Cloud 版本:Spring Cloud 2021.0.1
  • Spring Boot 版本:2.6.3
  • Spring Cloud Alibaba 版本:2021.0.1.0

Spring Cloud Alibaba系列一:nacos 注册中心

Spring Cloud Alibaba系列二:openFeign 实现服务间的通信

Spring Cloud Alibaba系列三:集成Gateway实现路由管理

Spring Cloud Alibaba系列四:集成 seata 实现分布式事务

前言

前面的几篇文章中,记录了整个 Spring Cloud Alibaba 系列的实现以及整合 nacosopenFeigngateway等功能。这篇文章主要用来记录自己整合 seata 的过程,以及遇到的坑。

Seata 是什么?

我们可以 引用 Seata文档 里的描述:

Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。

比如说,在一个机票售票系统中,可能需要多个服务支持:

  • 售票服务:修改库存机票数量进行修改
  • 流水服务:添加订单流水记录
  • 通知服务:给用户发送购买结果消息

一个售票系统被拆成了多个独立的应用,可能多个应用对应多个数据库。在各自的应用中,可以用事务确保数据的一致性,但是对于整个系统来说,数据的一致性就没法保证了。

但是,引入了 seata 之后,开启全局事务,那么当售票服务出错之后,那么流水服务也会回滚,就不会往数据库里写入数据。

Seata 术语

1. TC (Transaction Coordinator) - 事务协调者
维护全局和分支事务的状态,驱动全局事务提交或回滚。

2.TM (Transaction Manager) - 事务管理器
定义全局事务的范围:开始全局事务、提交或回滚全局事务。

3.RM (Resource Manager) - 资源管理器
管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。

安装 seata

描述说明
版本1.5.2
安装方式docker
register 方式nacos
config 方式nacos
store 方式db
mysql版本8.0.25

因为一开始安装的 MySQL8.0 版本的,所以就使用了 1.5.2 版本的 seata,在这之前的版本 不支持 8.0 的数据库,另外 1.5.0 之后的 seata 的配置做了整合,只需要修改 seataapplication.yml 即可。

1、创建 seata 数据库,并添加对应的表
-- ----------------------------
-- Table structure for branch_table
-- ----------------------------
DROP TABLE IF EXISTS `branch_table`;
CREATE TABLE `branch_table`  (
  `branch_id` bigint(0) NOT NULL,
  `xid` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,
  `transaction_id` bigint(0) NULL DEFAULT NULL,
  `resource_group_id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
  `resource_id` varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
  `branch_type` varchar(8) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
  `status` tinyint(0) NULL DEFAULT NULL,
  `client_id` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
  `application_data` varchar(2000) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
  `gmt_create` datetime(6) NULL DEFAULT NULL,
  `gmt_modified` datetime(6) NULL DEFAULT NULL,
  PRIMARY KEY (`branch_id`) USING BTREE,
  INDEX `idx_xid`(`xid`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of branch_table
-- ----------------------------

-- ----------------------------
-- Table structure for distributed_lock
-- ----------------------------
DROP TABLE IF EXISTS `distributed_lock`;
CREATE TABLE `distributed_lock`  (
  `lock_key` char(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,
  `lock_value` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,
  `expire` bigint(0) NULL DEFAULT NULL,
  PRIMARY KEY (`lock_key`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of distributed_lock
-- ----------------------------
INSERT INTO `distributed_lock` VALUES ('AsyncCommitting', ' ', 0);
INSERT INTO `distributed_lock` VALUES ('RetryCommitting', ' ', 0);
INSERT INTO `distributed_lock` VALUES ('RetryRollbacking', ' ', 0);
INSERT INTO `distributed_lock` VALUES ('TxTimeoutCheck', ' ', 0);

-- ----------------------------
-- Table structure for global_table
-- ----------------------------
DROP TABLE IF EXISTS `global_table`;
CREATE TABLE `global_table`  (
  `xid` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,
  `transaction_id` bigint(0) NULL DEFAULT NULL,
  `status` tinyint(0) NOT NULL,
  `application_id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
  `transaction_service_group` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
  `transaction_name` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
  `timeout` int(0) NULL DEFAULT NULL,
  `begin_time` bigint(0) NULL DEFAULT NULL,
  `application_data` varchar(2000) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
  `gmt_create` datetime(0) NULL DEFAULT NULL,
  `gmt_modified` datetime(0) NULL DEFAULT NULL,
  PRIMARY KEY (`xid`) USING BTREE,
  INDEX `idx_status_gmt_modified`(`status`, `gmt_modified`) USING BTREE,
  INDEX `idx_transaction_id`(`transaction_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of global_table
-- ----------------------------

-- ----------------------------
-- Table structure for lock_table
-- ----------------------------
DROP TABLE IF EXISTS `lock_table`;
CREATE TABLE `lock_table`  (
  `row_key` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,
  `xid` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
  `transaction_id` bigint(0) NULL DEFAULT NULL,
  `branch_id` bigint(0) NOT NULL,
  `resource_id` varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
  `table_name` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
  `pk` varchar(36) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
  `status` tinyint(0) NOT NULL DEFAULT 0 COMMENT '0:locked ,1:rollbacking',
  `gmt_create` datetime(0) NULL DEFAULT NULL,
  `gmt_modified` datetime(0) NULL DEFAULT NULL,
  PRIMARY KEY (`row_key`) USING BTREE,
  INDEX `idx_status`(`status`) USING BTREE,
  INDEX `idx_branch_id`(`branch_id`) USING BTREE,
  INDEX `idx_xid_and_branch_id`(`xid`, `branch_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of lock_table
-- ----------------------------
2、docker 拉取镜像
docker pull seataio/seata-server:1.5.2
3、启动 seata
docker run --name seata-server -p 8091:8091 -p 7091:7091 seataio/seata-server:1.5.2
4、复制 seata 的配置文件到本地
docker cp seata-server:/seata-server/resources /mydata/seata
5、修改配置文件

vim /mydata/seata/resources/application.yml

server:
  port: 7091

spring:
  application:
    name: seata-server

logging:
  config: classpath:logback-spring.xml
  file:
    path: ${user.home}/logs/seata
  extend:
    logstash-appender:
      destination: 127.0.0.1:4560
    kafka-appender:
      bootstrap-servers: 127.0.0.1:9092
      topic: logback_to_logstash

console:
  user:
    username: seata
    password: seata

seata:
  config:
    # support: nacos, consul, apollo, zk, etcd3
    type: nacos
    nacos:
      server-addr: 192.168.2.213:8848
      namespace:
      group: SEATA_GROUP
      username: nacos
      password: nacos
      ##if use MSE Nacos with auth, mutex with username/password attribute
      #      # access-key: ""
      #            # secret-key: ""
      #                  data-id: seataServer.propertie
  registry:
    # support: nacos, eureka, redis, zk, consul, etcd3, sofa
    type: nacos
    preferred-networks: 30.240.*
    nacos:
      application: seata-server
      group: SEATA_GROUP
      server-addr: 192.168.2.213:8848
      namespace:
      cluster: default
      username:
      password:
      ##if use MSE Nacos with auth, mutex with username/password attribute
      #      #access-key: ""
      #            #secret-key: "" 
  store:
    # support: file 、 db 、 redis
    mode: db
    db:
      datasource: druid
      db-type: mysql
      driver-class-name: com.mysql.cj.jdbc.Driver
      url: jdbc:mysql://192.168.2.213:3306/seata?rewriteBatchedStatements=true
      user: root
      password: root
      min-conn: 5
      max-conn: 100
      global-table: global_table
      branch-table: branch_table
      lock-table: lock_table
      distributed-lock-table: distributed_lock
      query-limit: 100
      max-wait: 5000
#  server:
#    service-port: 8091 #If not configured, the default is '${server.port} + 1000'
  security:
    secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017
    tokenValidityInMilliseconds: 1800000
    ignore:
      urls: /,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.ico,/console-fe/public/**,/api/v1/auth/login

修改完 配置文件后,删除启动的 seata,重新启动

docker ps -a

在这里插入图片描述

docker stop seata-server
docker rm cfccdab1bf46 # 对应 第一步查出来的 container id

docker run -d  --name  seata-server \
-p 8091:8091  \
-p 7091:7091  \
-e SEATA_IP=192.168.2.213  \
-v /mydata/seata/resources:/seata-server/resources \
seataio/seata-server:1.5.2

SETA_IP seata 服务所在服务器的ip
-v 指定本地修改的配置文件启动

docker logs -f seata-server # 查看启动日志

启动成功之后,就可以在 nacos 注册中心找到对应的服务:
在这里插入图片描述

集成 seata

1、创建表

在项目的数据库里创建 3 张表:

-- auto-generated definition
create table air_ticket
(
    id          bigint auto_increment
        primary key,
    from_city   varchar(30)                        not null comment '起飞城市',
    to_city     varchar(30)                        not null comment '目的地城市',
    surplus_num int                                not null comment '库存数量',
    start_time  datetime                           not null comment '出发时间',
    created_by  bigint                             not null comment '创建人id',
    create_time datetime default CURRENT_TIMESTAMP not null comment '创建时间',
    updated_by  bigint                             not null comment '修改人',
    update_time datetime default CURRENT_TIMESTAMP not null comment '修改时间'
)
    comment '飞机票表';

-- auto-generated definition
create table user_tickets
(
    id        bigint auto_increment
        primary key,
    user_id   bigint not null comment '用户id',
    ticket_id bigint not null comment '机票id',
    total_num int    not null comment '机票数量'
)
    comment '用户机票表';

-- auto-generated definition
create table undo_log
(
    branch_id     bigint       not null comment 'branch transaction id',
    xid           varchar(128) not null comment 'global transaction id',
    context       varchar(128) not null comment 'undo_log context,such as serialization',
    rollback_info longblob     not null comment 'rollback info',
    log_status    int          not null comment '0:normal status,1:defense status',
    log_created   datetime(6)  not null comment 'create datetime',
    log_modified  datetime(6)  not null comment 'modify datetime',
    constraint ux_undo_log
        unique (xid, branch_id)
)
    comment 'AT transaction mode undo table' charset = utf8mb4;

undo_log 表是 seataAT 模式必须要使用的表,如果没有,项目启动会报错。如果多个服务使用 seata ,且对应多个数据库,则,多个数据库都要引入这张表。

2、新建一个子 module:business

pom 的设置 以及 配置文件 的设置同 provider

3、生成实例等文件

使用插件或者手动也好,在 provider 子项目里生 成 air_ticketentity、dao、mapper、controller
business 子项目里生成 user_ticketsentity、dao、mapper、controller

4、引入依赖

common 模块的配置文件中添加 seata 依赖

<!--添加seata依赖,-->
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
    <exclusions>
        <exclusion>
            <groupId>io.seata</groupId>
            <artifactId>seata-spring-boot-starter</artifactId>
        </exclusion>
    </exclusions>
</dependency>

<dependency>
    <groupId>io.seata</groupId>
    <artifactId>seata-spring-boot-starter</artifactId>
    <version>${seata.version}</version>
</dependency>

这里我引入的是 1.5.2,但是 参考spring cloud alibaba 版本对照,实际我引入的 1.5.2 的版本并没有生效,项目里生效的是 1.4.2 的版本。

5、修改配置文件

依次在 provider、business、customer 的配置文件中添加如下内容:

# provider
# application.yml
seata:
  tx-service-group: provider-tx-group
  service:
    vgroupMapping:
      provider-tx-group: default
  config:
    type: nacos
    nacos:
      group: SEATA_GROUP
      username: ${spring.cloud.nacos.config.username}
      password: ${spring.cloud.nacos.config.password}
  registry:
    type: nacos
    nacos:
      group: SEATA_GROUP
      username: ${spring.cloud.nacos.discovery.username}
      password: ${spring.cloud.nacos.discovery.password}

# customer
# application.yml
seata:
  tx-service-group: customer-tx-group
  service:
    vgroupMapping:
      customer-tx-group: default
  config:
    type: nacos
    nacos:
      group: SEATA_GROUP
      username: ${spring.cloud.nacos.config.username}
      password: ${spring.cloud.nacos.config.password}
  registry:
    type: nacos
    nacos:
      group: SEATA_GROUP
      username: ${spring.cloud.nacos.discovery.username}
      password: ${spring.cloud.nacos.discovery.password}

# business
# application.yml
seata:
  tx-service-group: business-tx-group
  service:
    vgroupMapping:
      business-tx-group: default
  config:
    type: nacos
    nacos:
      group: SEATA_GROUP
      username: ${spring.cloud.nacos.config.username}
      password: ${spring.cloud.nacos.config.password}
  registry:
    type: nacos
    nacos:
      group: SEATA_GROUP
      username: ${spring.cloud.nacos.discovery.username}
      password: ${spring.cloud.nacos.discovery.password}

# application-dev.yml
seata:
  registry:
    nacos:
      server-addr: ${spring.cloud.nacos.discovery.server-addr}
  config:
    nacos:
      server-addr: ${spring.cloud.nacos.config.server-addr}

1、tx-service-group:Seata 事务组编号,用于 TC 集群名。
2、vgroupMapping:虚拟组和分组的映射
3、网上说 vgroupMapping 下的名字要和 tx-service-group 后的值一致,但是我自己在测试中,由于失误,没有设置一致,发现并不会影响 全局事务的生效。

6、nacos 添加配置

nacos 上为 3 个 tx-service-group 添加配置
在这里插入图片描述

Data Id:值固定, service.vgroupMapping.XXX。XXX 要和 项目的配置文件里的 tx-service-group 的值一致。
Group:和 seata 服务端注册到 nacos 的分组一致。
配置内容:default
在这里插入图片描述

7、添加业务代码
7.1、provider 里添加售票业务
# AirTicketService.java
/**
* 售票
* @param ticketId 机票id
* @param count 数量
*/
void sellTicket(Long ticketId, int count);

# AirTicketServiceImpl.java
@Service
@Slf4j
public class AirTicketServiceImpl extends ServiceImpl<AirTicketDao, AirTicket> implements AirTicketService {

    @Override
    public void sellTicket(Long ticketId, int count) {
        log.info("卖票开始---------------------->");

        AirTicket airTicket = AirTicket.builder()
                .id(ticketId)
                .build();
        AirTicket airTicket1 = getById(ticketId);
        airTicket.setSurplusNum(airTicket1.getSurplusNum() - count);

        updateById(airTicket);

        // 这段代码就是模拟  发生异常
        System.out.println(1/0);
        log.info("<--------------------------卖票结束");
    }
}

# AirTicketController.java
@RestController
@RequestMapping("airTicket")
public class AirTicketController {
    /**
     * 服务对象
     */
    @Resource
    private AirTicketService airTicketService;

    @GetMapping("/sell")
    public void sellTicket() {
        airTicketService.sellTicket(1L, 2);
    }
}
7.2、business 里添加用户机票记录业务
# UserTicketsService.java
public interface UserTicketsService extends IService<UserTickets> {
    /**
     * 给用户机票表添加记录
     */
    void addRecord();
}

# UserTicketsServiceImpl.java
@Service
@Slf4j
public class UserTicketsServiceImpl extends ServiceImpl<UserTicketsDao, UserTickets> implements UserTicketsService {
    @Resource
    private UserTicketsDao userTicketsDao;

    @Override
    public void addRecord() {
        log.info("开始添加购票记录-----------------------<");
        UserTickets userTickets = UserTickets.builder()
                .id(1L)
                .ticketId(1L)
                .userId(1L)
                .totalNum(2)
                .build();
        save(userTickets);
        log.info("-----------------------<添加购票记录结束");
    }
}

# UserTicketsRecordController 
@RestController
@RequestMapping("/userTickets")
public class UserTicketsRecordController {

    @Resource
    private UserTicketsService userTicketsService;

    @GetMapping("/add")
    public void addRecords() {
        userTicketsService.addRecord();
    }
}
7.3、customer 服务里调用 售票,添加用户机票记录 服务

a、添加 AirTicketFeign

@Component
@FeignClient(contextId = "ticket", value = "provider", path = "/provider/airTicket")
public interface AirTicketFeign {
    @GetMapping("/sell")
    String sellTicket();
}

b、添加 UserTicketsFeign

@Component
@FeignClient(contextId = "userTickets", value = "business", path = "/business/userTickets")
public interface UserTicketsFeign {
    @GetMapping("/add")
    String addRecords();
}

注意:存在多个OpenFeign 的 客户端时,需要添加 contextId,否则会报下面的错 FeignClientSpecification‘ could not be registered. A bean with that name has already been defined

c、添加业务代码

# TicketService.java
public interface TicketService {
    void buyTicket();
}

# TicketServiceImpl.java
@Service
public class TicketServiceImpl implements TicketService {
    @Resource
    private AirTicketFeign airTicketFeign;

    @Resource
    private UserTicketsFeign userTicketsFeign;

    @Override
    @GlobalTransactional
    public void buyTicket() {
        //添加购票记录
        userTicketsFeign.addRecords();
        //售票
        airTicketFeign.sellTicket();
    }
}

# CustomerDemoController.java
# 添加下面接口
@Resource
private TicketService ticketService;

@GetMapping("/buyTicket")
public void buyTicket() {
    ticketService.buyTicket();
}

@GlobalTransactional 全局事务的注解

7.4、添加断点

由于 undo_log 表里的内容在 全局事务回滚完成之后,就会被清空,所以我们可以如下图添加断点,查看 undo_log 表里的内容
在这里插入图片描述

7.5 初始化 air_ticket 表 并启动服务

a、新增一条记录

INSERT INTO `alibaba`.`air_ticket`(`id`, `from_city`, `to_city`, `surplus_num`, `start_time`, `created_by`, `create_time`, `updated_by`, `update_time`) VALUES (1, '上海', '广州', 30, '2023-03-03 10:42:23', 1, '2023-02-03 10:42:42', 1, '2023-02-03 10:42:45');

b、依次启动 provider、business、customer 服务

customer 的控制台:
在这里插入图片描述
provider 的控制台:
在这里插入图片描述
business 的控制台:
在这里插入图片描述

7.6、访问

浏览器访问 http://localhost:9083/customer/demo/buyTicket
在这里插入图片描述
断点生效,查看 undo_log 表:
在这里插入图片描述
rollback_info 的内容:

{"@class":"io.seata.rm.datasource.undo.BranchUndoLog","xid":"192.168.2.213:8091:36384658307649560","branchId":36384658307649561,"sqlUndoLogs":["java.util.ArrayList",[{"@class":"io.seata.rm.datasource.undo.SQLUndoLog","sqlType":"INSERT","tableName":"user_tickets","beforeImage":{"@class":"io.seata.rm.datasource.sql.struct.TableRecords$EmptyTableRecords","tableName":"user_tickets","rows":["java.util.ArrayList",[]]},"afterImage":{"@class":"io.seata.rm.datasource.sql.struct.TableRecords","tableName":"user_tickets","rows":["java.util.ArrayList",[{"@class":"io.seata.rm.datasource.sql.struct.Row","fields":["java.util.ArrayList",[{"@class":"io.seata.rm.datasource.sql.struct.Field","name":"id","keyType":"PRIMARY_KEY","type":-5,"value":["java.lang.Long",1]},{"@class":"io.seata.rm.datasource.sql.struct.Field","name":"user_id","keyType":"NULL","type":-5,"value":["java.lang.Long",1]},{"@class":"io.seata.rm.datasource.sql.struct.Field","name":"ticket_id","keyType":"NULL","type":-5,"value":["java.lang.Long",1]},{"@class":"io.seata.rm.datasource.sql.struct.Field","name":"total_num","keyType":"NULL","type":4,"value":2}]]}]]}}]]}

business 控制台:
在这里插入图片描述
provider 控制台:
在这里插入图片描述
在这里插入图片描述

7.7、放行断点

business 控制台:
在这里插入图片描述

provider 控制台:
在这里插入图片描述
customer 控制台:
在这里插入图片描述
可以看见,全局事务生效。
air_ticket、user_tickets 表数据没有任何变化。

结尾

至此,整个项目整合 seata 已经结束。需要注意的几个点如下:

1、seata 1.5 开始 使用yml配置文件,1.5 之前是多个 file 配置。
2、seata 1.5.2 开始支持 mysql8
3、多个 FeignClient,需要设置 contextId
4、seata 默认使用 AT 模式,然后,AT模式下 一定要 引入 undo_log 表

更多知识参考 SpringBoot分布式事务Seata入门

SpringCloudAlibaba系列demo (github.com)

Logo

鸿蒙生态一站式服务平台。

更多推荐