参考
seata事务 · dynamic-datasource · 看云 (kancloud.cn)
分布式事务Seata集成 · JeecgBoot 开发文档 · 看云
Seata
服务部署
Windows
解压 seata-server-1.4.2.zip
打开 conf/file.conf
, store mode 改为 db, db 配置
1 2 3
| url = "jdbc:mysql://tcboot-mysql:3306/seata?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai" user = "seata" password = "seata"
|
redis 配置
1 2 3 4 5
| single { host = "tcboot-redis" port = "6379" } password = "bjtcrj"
|
MySQL 创建数据库、用户;初始化 3 张表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
| create database `seata` default character set utf8mb4 collate utf8mb4_general_ci; CREATE USER 'seata'@'%' IDENTIFIED BY 'seata'; GRANT ALL ON seata.* TO 'seata'@'%';
create table branch_table ( branch_id bigint not null primary key, xid varchar(128) not null, transaction_id bigint null, resource_group_id varchar(32) null, resource_id varchar(256) null, branch_type varchar(8) null, status tinyint null, client_id varchar(64) null, application_data varchar(2000) null, gmt_create datetime(6) null, gmt_modified datetime(6) null ) charset = utf8;
create index idx_xid on branch_table (xid);
create table global_table ( xid varchar(128) not null primary key, transaction_id bigint null, status tinyint not null, application_id varchar(32) null, transaction_service_group varchar(32) null, transaction_name varchar(128) null, timeout int null, begin_time bigint null, application_data varchar(2000) null, gmt_create datetime null, gmt_modified datetime null ) charset = utf8;
create index idx_gmt_modified_status on global_table (gmt_modified, status);
create index idx_transaction_id on global_table (transaction_id);
create table lock_table ( row_key varchar(128) not null primary key, xid varchar(96) null, transaction_id bigint null, branch_id bigint not null, resource_id varchar(256) null, table_name varchar(32) null, pk varchar(36) null, gmt_create datetime null, gmt_modified datetime null ) charset = utf8;
create index idx_branch_id on lock_table (branch_id);
|
配置 hosts
1 2
| 127.0.0.1 tcboot-mysql 127.0.0.1 tcboot-redis
|
启动 bin/seata-server.bat
Docker 部署【本地快速搭建环境】
使用 Docker 部署 Seata Server
1
| docker run --name seata-server -p 8091:8091 seataio/seata-server:1.4.2
|
容器命令行及查看日志
1 2
| $ docker exec -it seata-server sh $ docker logs -f seata-server
|
准备
修改
- 每个业务库中都要有
undo_log
表
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| DROP TABLE IF EXISTS `undo_log`; CREATE TABLE `undo_log` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `branch_id` bigint(20) NOT NULL, `xid` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL, `context` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL, `rollback_info` longblob NOT NULL, `log_status` int(11) NOT NULL, `log_created` datetime(0) NOT NULL, `log_modified` datetime(0) NOT NULL, PRIMARY KEY (`id`) USING BTREE, UNIQUE INDEX `ux_undo_log`(`xid`, `branch_id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
|
application.yml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| spring: datasource: dynamic: seata: true seata-mode: AT datasource: order: username: root password: 123456 url: jdbc:mysql://39.108.158.138:3306/seata_order?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&useSSL=false driver-class-name: com.mysql.cj.jdbc.Driver schema: classpath:db/schema-order.sql test: username: sa password: "" url: jdbc:h2:mem:test driver-class-name: org.h2.Driver seata: false seata: enable-auto-data-source-proxy: false service: grouplist: default: tcboot-seata:8091 vgroup-mapping: springboot-seata-group: default tx-service-group: springboot-seata-group
|
pom.xml
1 2 3 4 5
| <dependency> <groupId>org.jeecgframework.boot</groupId> <artifactId>jeecg-boot-starter-seata</artifactId> </dependency>
|
使用
多数据源:第一个事务开始的service方法添加注解 @GlobalTransactional
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| @GlobalTransactional public Result<?> test() { Event event = new Event(); event.setAddress("ceshi"); this.save(event);
ActZprocess actZprocess = new ActZprocess(); actZprocess.setBusinessTable("ceshi"); actZprocessService.save(actZprocess);
taskService.setVariable("577867", "ceshi", "ceshi");
if(true) { throw new RuntimeException("抛出异常1111"); } return Result.OK("test"); }
|
单数据源使用 @Transactional 或 @GlobalTransactional 都可以
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
|
public Result<?> test() { Event event = new Event(); event.setAddress("ceshi"); this.save(event);
ActZprocess actZprocess = new ActZprocess(); actZprocess.setBusinessTable("ceshi"); actZprocessService.save(actZprocess);
if(true) { throw new RuntimeException("抛出异常1111"); } return Result.OK("test"); }
|
代理 Activiti 工作流数据源
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| package org.jeecg.config;
import com.alibaba.druid.pool.DruidDataSource; import io.seata.rm.datasource.DataSourceProxy; import org.activiti.engine.ProcessEngineConfiguration; import org.activiti.spring.SpringProcessEngineConfiguration; import org.activiti.spring.boot.AbstractProcessEngineAutoConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.transaction.PlatformTransactionManager;
import javax.annotation.Resource; import javax.sql.DataSource;
@Configuration public class ActivitiDataSourceConfig extends AbstractProcessEngineAutoConfiguration { @Resource private ActivitiDataSourceProperties activitiDataSourceProperties;
public DataSource activitiDataSource() { DruidDataSource DruiddataSource = new DruidDataSource(); DruiddataSource.setUrl(activitiDataSourceProperties.getUrl()); DruiddataSource.setDriverClassName(activitiDataSourceProperties.getDriverClassName()); DruiddataSource.setPassword(activitiDataSourceProperties.getPassword()); DruiddataSource.setUsername(activitiDataSourceProperties.getUsername()); return DruiddataSource; }
public PlatformTransactionManager transactionManager() { return new DataSourceTransactionManager(new DataSourceProxy(activitiDataSource())); }
@Bean public SpringProcessEngineConfiguration springProcessEngineConfiguration() { SpringProcessEngineConfiguration configuration = new SpringProcessEngineConfiguration(); configuration.setDataSource(new DataSourceProxy(activitiDataSource())); configuration.setDatabaseSchemaUpdate(ProcessEngineConfiguration.DB_SCHEMA_UPDATE_TRUE); configuration.setJobExecutorActivate(false); configuration.setTransactionManager(transactionManager()); configuration.setActivityFontName("宋体"); configuration.setLabelFontName("宋体"); configuration.setAnnotationFontName("宋体"); configuration.setIdGenerator(new MyUUIDgenerator()); return configuration; } }
|
问题
Cause: io.seata.common.exception.ShouldNeverHappenException
解决:排查涉及到的数据库表,是否都有ID主键字段
TransactionException[Could not register branch into global session xid
解决:全局事物方式内部是否有 @DS(“xxx”) 切换数据源的接口调用;是否有加 @Transaction 注解的方式调用
文档
基础介绍
PS:一般需要分布式事务的场景大多数都是微服务化,个人并不建议在单体项目引入多数据源+分布式事务,有能力尽早拆开,可为过度方案。
注意事项
dynamic-datasource-sring-boot-starter
组件内部开启seata后会自动使用DataSourceProxy来包装DataSource,所以需要以下方式来保持兼容。
1.如果你引入的是seata-all,请不要使用@EnableAutoDataSourceProxy注解。
2.如果你引入的是seata-spring-boot-starter 请关闭自动代理。
1 2
| seata: enable-auto-data-source-proxy: false
|
示例项目
https://github.com/dynamic-datasource/dynamic-datasource-samples/tree/master/tx-samples/tx-seata-sample
此工程为 多数据源+druid+seata+mybatisPlus的版本。
模拟用户下单,扣商品库存,扣用户余额,初步可分为订单服务+商品服务+用户服务。
环境准备
为了快速演示相关环境都采用docker部署,生产上线请参考seata官方文档使用。
- 准备seata-server。
1
| docker run --name seata-server -p 8091:8091 -d seataio/seata-server
|
- 准备mysql数据库,账户root密码123456。
1
| docker run --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql:5.7
|
- 创建相关数据库。
创建 seata-order``seata-product``seata-account
模拟连接不同的数据库。
1 2 3
| CREATE DATABASE IF NOT EXIST seata-order; CREATE DATABASE IF NOT EXIST seata-product; CREATE DATABASE IF NOT EXIST seata-account;
|
- 准备相关数据库脚本。
每个数据库下脚本相关的表,seata需要undo_log来监测和回滚。
相关的脚本不用自行准备,本工程已在resources/db下面准备好,另外配合多数据源的自动执行脚本功能,应用启动后会自动执行。
工程准备
- 引入相关依赖,seata+druid+mybatisPlus+dynamic-datasource+mysql+lombok。
1 2 3 4 5 6 7 8 9 10 11
| <dependency> <groupId>io.seata</groupId> <artifactId>seata-spring-boot-starter</artifactId> <version>1.4.2</version> </dependency> <dependency> <groupId>com.baomidou</groupId> <artifactId>dynamic-datasource-spring-boot-starter</artifactId> <version>3.4.0</version> </dependency> # 省略,查看示例项目
|
- 编写相关yaml配置。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| spring: application: name: dynamic datasource: dynamic: primary: order strict: true seata: true #开启seata代理,开启后默认每个数据源都代理,如果某个不需要代理可单独关闭 seata-mode: AT #支持XA及AT模式,默认AT datasource: order: username: root password: 123456 url: jdbc:mysql://39.108.158.138:3306/seata_order?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&useSSL=false driver-class-name: com.mysql.cj.jdbc.Driver schema: classpath:db/schema-order.sql account: username: root password: 123456 url: jdbc:mysql://39.108.158.138:3306/seata_account?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&useSSL=false driver-class-name: com.mysql.cj.jdbc.Driver schema: classpath:db/schema-account.sql product: username: root password: 123456 url: jdbc:mysql://39.108.158.138:3306/seata_product?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&useSSL=false driver-class-name: com.mysql.cj.jdbc.Driver schema: classpath:db/schema-product.sql test: username: sa password: "" url: jdbc:h2:mem:test driver-class-name: org.h2.Driver seata: false #这个数据源不需要seata seata: enabled: true application-id: applicationName tx-service-group: my_test_tx_group enable-auto-data-source-proxy: false #一定要是false service: vgroup-mapping: my_test_tx_group: default #key与上面的tx-service-group的值对应 grouplist: default: 39.108.158.138:8091 #seata-server地址仅file注册中心需要 config: type: file registry: type: file
|
代码编写
参考工程下面的代码完成controller,service,maaper,entity,dto等。
订单服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| @Slf4j @Service public class OrderServiceImpl implements OrderService {
@Resource private OrderDao orderDao; @Autowired private AccountService accountService; @Autowired private ProductService productService; @DS("order")//每一层都需要使用多数据源注解切换所选择的数据库 @Override @Transactional @GlobalTransactional //重点 第一个开启事务的需要添加seata全局事务注解 public void placeOrder(PlaceOrderRequest request) { log.info("=============ORDER START================="); Long userId = request.getUserId(); Long productId = request.getProductId(); Integer amount = request.getAmount(); log.info("收到下单请求,用户:{}, 商品:{},数量:{}", userId, productId, amount);
log.info("当前 XID: {}", RootContext.getXID());
Order order = Order.builder() .userId(userId) .productId(productId) .status(OrderStatus.INIT) .amount(amount) .build();
orderDao.insert(order); log.info("订单一阶段生成,等待扣库存付款中"); // 扣减库存并计算总价 Double totalPrice = productService.reduceStock(productId, amount); // 扣减余额 accountService.reduceBalance(userId, totalPrice);
order.setStatus(OrderStatus.SUCCESS); order.setTotalPrice(totalPrice); orderDao.updateById(order); log.info("订单已成功下单"); log.info("=============ORDER END================="); } }
|
商品服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| @Slf4j @Service public class ProductServiceImpl implements ProductService {
@Resource private ProductDao productDao;
/** * 事务传播特性设置为 REQUIRES_NEW 开启新的事务 重要!!!!一定要使用REQUIRES_NEW */ @DS("product") @Transactional(propagation = Propagation.REQUIRES_NEW) @Override public Double reduceStock(Long productId, Integer amount) { log.info("=============PRODUCT START================="); log.info("当前 XID: {}", RootContext.getXID());
// 检查库存 Product product = productDao.selectById(productId); Integer stock = product.getStock(); log.info("商品编号为 {} 的库存为{},订单商品数量为{}", productId, stock, amount);
if (stock < amount) { log.warn("商品编号为{} 库存不足,当前库存:{}", productId, stock); throw new RuntimeException("库存不足"); } log.info("开始扣减商品编号为 {} 库存,单价商品价格为{}", productId, product.getPrice()); // 扣减库存 int currentStock = stock - amount; product.setStock(currentStock); productDao.updateById(product); double totalPrice = product.getPrice() * amount; log.info("扣减商品编号为 {} 库存成功,扣减后库存为{}, {} 件商品总价为 {} ", productId, currentStock, amount, totalPrice); log.info("=============PRODUCT END================="); return totalPrice; } }
|
用户服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| @Slf4j @Service public class AccountServiceImpl implements AccountService {
@Resource private AccountDao accountDao;
/** * 事务传播特性设置为 REQUIRES_NEW 开启新的事务 重要!!!!一定要使用REQUIRES_NEW */ @DS("account") @Override @Transactional(propagation = Propagation.REQUIRES_NEW) public void reduceBalance(Long userId, Double price) { log.info("=============ACCOUNT START================="); log.info("当前 XID: {}", RootContext.getXID());
Account account = accountDao.selectById(userId); Double balance = account.getBalance(); log.info("下单用户{}余额为 {},商品总价为{}", userId, balance, price);
if (balance < price) { log.warn("用户 {} 余额不足,当前余额:{}", userId, balance); throw new RuntimeException("余额不足"); } log.info("开始扣减用户 {} 余额", userId); double currentBalance = account.getBalance() - price; account.setBalance(currentBalance); accountDao.updateById(account); log.info("扣减用户 {} 余额成功,扣减后用户账户余额为{}", userId, currentBalance); log.info("=============ACCOUNT END================="); }
}
|
测试
在schema自动执行的脚本里,默认设置了商品价格为10,商品总数量为20,用户余额为50。
启动项目后通过命令行执行。
- 模拟正常下单,买一个商品。
1 2 3 4 5 6 7 8
| curl -X POST \ http://localhost:8080/order/placeOrder \ -H 'Content-Type: application/json' \ -d '{ "userId": 1, "productId": 1, "amount": 1 }'
|
- 模拟库存不足,事务回滚。
1 2 3 4 5 6 7 8
| curl -X POST \ http://localhost:8080/order/placeOrder \ -H 'Content-Type: application/json' \ -d '{ "userId": 1, "productId": 1, "amount": 22 }'
|
- 模拟用户余额不足,事务回滚。
1 2 3 4 5 6 7 8
| curl -X POST \ http://localhost:8080/order/placeOrder \ -H 'Content-Type: application/json' \ -d '{ "userId": 1, "productId": 1, "amount": 6 }'
|
注意观察运行日志,至此分布式事务集成案例全流程完毕。