参考

seata事务 · dynamic-datasource · 看云 (kancloud.cn)

分布式事务Seata集成 · JeecgBoot 开发文档 · 看云

Seata

服务部署

Windows

  1. 解压 seata-server-1.4.2.zip

  2. 打开 conf/file.conf, store mode 改为 db, db 配置

    1
    2
    3
    url = "jdbc:mysql://tcboot-mysql:3306/seata?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai"
    user = "seata"
    password = "seata"
  3. redis 配置

    1
    2
    3
    4
    5
    single {
    host = "tcboot-redis"
    port = "6379"
    }
    password = "bjtcrj"
  4. MySQL 创建数据库、用户;初始化 3 张表

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    create database `seata` default character set utf8mb4 collate utf8mb4_general_ci;
    CREATE USER 'seata'@'%' IDENTIFIED BY 'seata';
    GRANT ALL ON seata.* TO 'seata'@'%';

    create table branch_table
    (
    branch_id bigint not null
    primary key,
    xid varchar(128) not null,
    transaction_id bigint null,
    resource_group_id varchar(32) null,
    resource_id varchar(256) null,
    branch_type varchar(8) null,
    status tinyint null,
    client_id varchar(64) null,
    application_data varchar(2000) null,
    gmt_create datetime(6) null,
    gmt_modified datetime(6) null
    )
    charset = utf8;

    create index idx_xid
    on branch_table (xid);

    create table global_table
    (
    xid varchar(128) not null
    primary key,
    transaction_id bigint null,
    status tinyint not null,
    application_id varchar(32) null,
    transaction_service_group varchar(32) null,
    transaction_name varchar(128) null,
    timeout int null,
    begin_time bigint null,
    application_data varchar(2000) null,
    gmt_create datetime null,
    gmt_modified datetime null
    )
    charset = utf8;

    create index idx_gmt_modified_status
    on global_table (gmt_modified, status);

    create index idx_transaction_id
    on global_table (transaction_id);

    create table lock_table
    (
    row_key varchar(128) not null
    primary key,
    xid varchar(96) null,
    transaction_id bigint null,
    branch_id bigint not null,
    resource_id varchar(256) null,
    table_name varchar(32) null,
    pk varchar(36) null,
    gmt_create datetime null,
    gmt_modified datetime null
    )
    charset = utf8;

    create index idx_branch_id
    on lock_table (branch_id);
  5. 配置 hosts

    1
    2
    127.0.0.1 tcboot-mysql
    127.0.0.1 tcboot-redis
  6. 启动 bin/seata-server.bat

Docker 部署【本地快速搭建环境】

使用 Docker 部署 Seata Server

1
docker run --name seata-server -p 8091:8091 seataio/seata-server:1.4.2

容器命令行及查看日志

1
2
$ docker exec -it seata-server sh
$ docker logs -f seata-server

准备

修改

  1. 每个业务库中都要有undo_log
1
2
3
4
5
6
7
8
9
10
11
12
13
14
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`context` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime(0) NOT NULL,
`log_modified` datetime(0) NOT NULL,
PRIMARY KEY (`id`) USING BTREE,
UNIQUE INDEX `ux_undo_log`(`xid`, `branch_id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

  1. application.yml

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    spring:
    datasource:
    dynamic:
    seata: true #开启seata代理,开启后默认每个数据源都代理,如果某个不需要代理可单独关闭
    seata-mode: AT #支持XA及AT模式,默认AT
    datasource:
    order:
    username: root
    password: 123456
    url: jdbc:mysql://39.108.158.138:3306/seata_order?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&useSSL=false
    driver-class-name: com.mysql.cj.jdbc.Driver
    schema: classpath:db/schema-order.sql
    test:
    username: sa
    password: ""
    url: jdbc:h2:mem:test
    driver-class-name: org.h2.Driver
    seata: false #这个数据源不需要seata

    seata:
    enable-auto-data-source-proxy: false
    service:
    grouplist:
    default: tcboot-seata:8091 # seata-server 服务地址
    vgroup-mapping:
    springboot-seata-group: default
    # seata 事务组编号 用于TC集群名
    tx-service-group: springboot-seata-group
  2. pom.xml

    1
    2
    3
    4
    5
    <!--分布式事务 seata-->
    <dependency>
    <groupId>org.jeecgframework.boot</groupId>
    <artifactId>jeecg-boot-starter-seata</artifactId>
    </dependency>

使用

  1. 多数据源:第一个事务开始的service方法添加注解 @GlobalTransactional

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    @GlobalTransactional
    public Result<?> test() {
    Event event = new Event();
    event.setAddress("ceshi");
    // 主库
    this.save(event);

    // 主库
    ActZprocess actZprocess = new ActZprocess();
    actZprocess.setBusinessTable("ceshi");
    actZprocessService.save(actZprocess);

    // act 库
    taskService.setVariable("577867", "ceshi", "ceshi");

    if(true) {
    throw new RuntimeException("抛出异常1111");
    }
    return Result.OK("test");
    }
  2. 单数据源使用 @Transactional 或 @GlobalTransactional 都可以

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    // @DS("main")
    // @GlobalTransactional
    // @Transactional
    public Result<?> test() {
    Event event = new Event();
    event.setAddress("ceshi");
    // 主库
    this.save(event);

    // 主库
    ActZprocess actZprocess = new ActZprocess();
    actZprocess.setBusinessTable("ceshi");
    actZprocessService.save(actZprocess);

    if(true) {
    throw new RuntimeException("抛出异常1111");
    }
    return Result.OK("test");
    }

代理 Activiti 工作流数据源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package org.jeecg.config;

import com.alibaba.druid.pool.DruidDataSource;
import io.seata.rm.datasource.DataSourceProxy;
import org.activiti.engine.ProcessEngineConfiguration;
import org.activiti.spring.SpringProcessEngineConfiguration;
import org.activiti.spring.boot.AbstractProcessEngineAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;

import javax.annotation.Resource;
import javax.sql.DataSource;

@Configuration
public class ActivitiDataSourceConfig extends AbstractProcessEngineAutoConfiguration {
@Resource
private ActivitiDataSourceProperties activitiDataSourceProperties;

public DataSource activitiDataSource() {
DruidDataSource DruiddataSource = new DruidDataSource();
DruiddataSource.setUrl(activitiDataSourceProperties.getUrl());
DruiddataSource.setDriverClassName(activitiDataSourceProperties.getDriverClassName());
DruiddataSource.setPassword(activitiDataSourceProperties.getPassword());
DruiddataSource.setUsername(activitiDataSourceProperties.getUsername());
return DruiddataSource;
}

public PlatformTransactionManager transactionManager() {
// act 数据源由 seata 代理
return new DataSourceTransactionManager(new DataSourceProxy(activitiDataSource()));
}

@Bean
public SpringProcessEngineConfiguration springProcessEngineConfiguration() {
SpringProcessEngineConfiguration configuration = new SpringProcessEngineConfiguration();
// act 数据源由 seata 代理
configuration.setDataSource(new DataSourceProxy(activitiDataSource()));
configuration.setDatabaseSchemaUpdate(ProcessEngineConfiguration.DB_SCHEMA_UPDATE_TRUE);
// 关闭自带定时任务
configuration.setJobExecutorActivate(false);
configuration.setTransactionManager(transactionManager());
configuration.setActivityFontName("宋体");
configuration.setLabelFontName("宋体");
configuration.setAnnotationFontName("宋体");
//id生成器- act_* 表ID字段值
configuration.setIdGenerator(new MyUUIDgenerator());
return configuration;
}
}

问题

Cause: io.seata.common.exception.ShouldNeverHappenException

解决:排查涉及到的数据库表,是否都有ID主键字段

TransactionException[Could not register branch into global session xid

解决:全局事物方式内部是否有 @DS(“xxx”) 切换数据源的接口调用;是否有加 @Transaction 注解的方式调用

文档

基础介绍

PS:一般需要分布式事务的场景大多数都是微服务化,个人并不建议在单体项目引入多数据源+分布式事务,有能力尽早拆开,可为过度方案。

注意事项

dynamic-datasource-sring-boot-starter 组件内部开启seata后会自动使用DataSourceProxy来包装DataSource,所以需要以下方式来保持兼容。

1.如果你引入的是seata-all,请不要使用@EnableAutoDataSourceProxy注解。

2.如果你引入的是seata-spring-boot-starter 请关闭自动代理。

1
2
seata:
enable-auto-data-source-proxy: false

示例项目

https://github.com/dynamic-datasource/dynamic-datasource-samples/tree/master/tx-samples/tx-seata-sample

此工程为 多数据源+druid+seata+mybatisPlus的版本。

模拟用户下单,扣商品库存,扣用户余额,初步可分为订单服务+商品服务+用户服务。

环境准备

为了快速演示相关环境都采用docker部署,生产上线请参考seata官方文档使用。

  1. 准备seata-server。
1
docker run --name seata-server -p 8091:8091 -d seataio/seata-server
  1. 准备mysql数据库,账户root密码123456。
1
docker run --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql:5.7
  1. 创建相关数据库。

创建 seata-order``seata-product``seata-account 模拟连接不同的数据库。

1
2
3
CREATE DATABASE IF NOT EXIST seata-order;
CREATE DATABASE IF NOT EXIST seata-product;
CREATE DATABASE IF NOT EXIST seata-account;
  1. 准备相关数据库脚本。

每个数据库下脚本相关的表,seata需要undo_log来监测和回滚。

相关的脚本不用自行准备,本工程已在resources/db下面准备好,另外配合多数据源的自动执行脚本功能,应用启动后会自动执行。

工程准备

  1. 引入相关依赖,seata+druid+mybatisPlus+dynamic-datasource+mysql+lombok。
1
2
3
4
5
6
7
8
9
10
11
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>1.4.2</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>dynamic-datasource-spring-boot-starter</artifactId>
<version>3.4.0</version>
</dependency>
# 省略,查看示例项目
  1. 编写相关yaml配置。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
spring:
application:
name: dynamic
datasource:
dynamic:
primary: order
strict: true
seata: true #开启seata代理,开启后默认每个数据源都代理,如果某个不需要代理可单独关闭
seata-mode: AT #支持XA及AT模式,默认AT
datasource:
order:
username: root
password: 123456
url: jdbc:mysql://39.108.158.138:3306/seata_order?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&useSSL=false
driver-class-name: com.mysql.cj.jdbc.Driver
schema: classpath:db/schema-order.sql
account:
username: root
password: 123456
url: jdbc:mysql://39.108.158.138:3306/seata_account?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&useSSL=false
driver-class-name: com.mysql.cj.jdbc.Driver
schema: classpath:db/schema-account.sql
product:
username: root
password: 123456
url: jdbc:mysql://39.108.158.138:3306/seata_product?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&useSSL=false
driver-class-name: com.mysql.cj.jdbc.Driver
schema: classpath:db/schema-product.sql
test:
username: sa
password: ""
url: jdbc:h2:mem:test
driver-class-name: org.h2.Driver
seata: false #这个数据源不需要seata
seata:
enabled: true
application-id: applicationName
tx-service-group: my_test_tx_group
enable-auto-data-source-proxy: false #一定要是false
service:
vgroup-mapping:
my_test_tx_group: default #key与上面的tx-service-group的值对应
grouplist:
default: 39.108.158.138:8091 #seata-server地址仅file注册中心需要
config:
type: file
registry:
type: file

代码编写

参考工程下面的代码完成controller,service,maaper,entity,dto等。

订单服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@Slf4j
@Service
public class OrderServiceImpl implements OrderService {

@Resource
private OrderDao orderDao;
@Autowired
private AccountService accountService;
@Autowired
private ProductService productService;

@DS("order")//每一层都需要使用多数据源注解切换所选择的数据库
@Override
@Transactional
@GlobalTransactional //重点 第一个开启事务的需要添加seata全局事务注解
public void placeOrder(PlaceOrderRequest request) {
log.info("=============ORDER START=================");
Long userId = request.getUserId();
Long productId = request.getProductId();
Integer amount = request.getAmount();
log.info("收到下单请求,用户:{}, 商品:{},数量:{}", userId, productId, amount);

log.info("当前 XID: {}", RootContext.getXID());

Order order = Order.builder()
.userId(userId)
.productId(productId)
.status(OrderStatus.INIT)
.amount(amount)
.build();

orderDao.insert(order);
log.info("订单一阶段生成,等待扣库存付款中");
// 扣减库存并计算总价
Double totalPrice = productService.reduceStock(productId, amount);
// 扣减余额
accountService.reduceBalance(userId, totalPrice);

order.setStatus(OrderStatus.SUCCESS);
order.setTotalPrice(totalPrice);
orderDao.updateById(order);
log.info("订单已成功下单");
log.info("=============ORDER END=================");
}
}

商品服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Slf4j
@Service
public class ProductServiceImpl implements ProductService {

@Resource
private ProductDao productDao;

/**
* 事务传播特性设置为 REQUIRES_NEW 开启新的事务 重要!!!!一定要使用REQUIRES_NEW
*/
@DS("product")
@Transactional(propagation = Propagation.REQUIRES_NEW)
@Override
public Double reduceStock(Long productId, Integer amount) {
log.info("=============PRODUCT START=================");
log.info("当前 XID: {}", RootContext.getXID());

// 检查库存
Product product = productDao.selectById(productId);
Integer stock = product.getStock();
log.info("商品编号为 {} 的库存为{},订单商品数量为{}", productId, stock, amount);

if (stock < amount) {
log.warn("商品编号为{} 库存不足,当前库存:{}", productId, stock);
throw new RuntimeException("库存不足");
}
log.info("开始扣减商品编号为 {} 库存,单价商品价格为{}", productId, product.getPrice());
// 扣减库存
int currentStock = stock - amount;
product.setStock(currentStock);
productDao.updateById(product);
double totalPrice = product.getPrice() * amount;
log.info("扣减商品编号为 {} 库存成功,扣减后库存为{}, {} 件商品总价为 {} ", productId, currentStock, amount, totalPrice);
log.info("=============PRODUCT END=================");
return totalPrice;
}
}

用户服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Slf4j
@Service
public class AccountServiceImpl implements AccountService {

@Resource
private AccountDao accountDao;

/**
* 事务传播特性设置为 REQUIRES_NEW 开启新的事务 重要!!!!一定要使用REQUIRES_NEW
*/
@DS("account")
@Override
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void reduceBalance(Long userId, Double price) {
log.info("=============ACCOUNT START=================");
log.info("当前 XID: {}", RootContext.getXID());

Account account = accountDao.selectById(userId);
Double balance = account.getBalance();
log.info("下单用户{}余额为 {},商品总价为{}", userId, balance, price);

if (balance < price) {
log.warn("用户 {} 余额不足,当前余额:{}", userId, balance);
throw new RuntimeException("余额不足");
}
log.info("开始扣减用户 {} 余额", userId);
double currentBalance = account.getBalance() - price;
account.setBalance(currentBalance);
accountDao.updateById(account);
log.info("扣减用户 {} 余额成功,扣减后用户账户余额为{}", userId, currentBalance);
log.info("=============ACCOUNT END=================");
}

}

测试

在schema自动执行的脚本里,默认设置了商品价格为10,商品总数量为20,用户余额为50。

启动项目后通过命令行执行。

  1. 模拟正常下单,买一个商品。
1
2
3
4
5
6
7
8
curl -X POST \
http://localhost:8080/order/placeOrder \
-H 'Content-Type: application/json' \
-d '{
"userId": 1,
"productId": 1,
"amount": 1
}'
  1. 模拟库存不足,事务回滚。
1
2
3
4
5
6
7
8
curl -X POST \
http://localhost:8080/order/placeOrder \
-H 'Content-Type: application/json' \
-d '{
"userId": 1,
"productId": 1,
"amount": 22
}'
  1. 模拟用户余额不足,事务回滚。
1
2
3
4
5
6
7
8
curl -X POST \
http://localhost:8080/order/placeOrder \
-H 'Content-Type: application/json' \
-d '{
"userId": 1,
"productId": 1,
"amount": 6
}'

注意观察运行日志,至此分布式事务集成案例全流程完毕。