参考 seata事务 · dynamic-datasource · 看云 (kancloud.cn)
分布式事务Seata集成 · JeecgBoot 开发文档 · 看云
Seata
服务部署 Windows
解压 seata-server-1.4.2.zip
打开 conf/file.conf
, store mode 改为 db, db 配置
1 2 3 url = "jdbc:mysql://tcboot-mysql:3306/seata?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai" user = "seata" password = "seata"
redis 配置
1 2 3 4 5 single { host = "tcboot-redis" port = "6379" } password = "bjtcrj"
MySQL 创建数据库、用户;初始化 3 张表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 create database `seata` default character set utf8mb4 collate utf8mb4_general_ci;CREATE USER 'seata' @'%' IDENTIFIED BY 'seata' ;GRANT ALL ON seata.* TO 'seata' @'%' ;create table branch_table( branch_id bigint not null primary key , xid varchar (128 ) not null , transaction_id bigint null , resource_group_id varchar (32 ) null , resource_id varchar (256 ) null , branch_type varchar (8 ) null , status tinyint null , client_id varchar (64 ) null , application_data varchar (2000 ) null , gmt_create datetime(6 ) null , gmt_modified datetime(6 ) null ) charset = utf8; create index idx_xid on branch_table (xid); create table global_table( xid varchar (128 ) not null primary key , transaction_id bigint null , status tinyint not null , application_id varchar (32 ) null , transaction_service_group varchar (32 ) null , transaction_name varchar (128 ) null , timeout int null , begin_time bigint null , application_data varchar (2000 ) null , gmt_create datetime null , gmt_modified datetime null ) charset = utf8; create index idx_gmt_modified_status on global_table (gmt_modified, status); create index idx_transaction_id on global_table (transaction_id); create table lock_table( row_key varchar (128 ) not null primary key , xid varchar (96 ) null , transaction_id bigint null , branch_id bigint not null , resource_id varchar (256 ) null , table_name varchar (32 ) null , pk varchar (36 ) null , gmt_create datetime null , gmt_modified datetime null ) charset = utf8; create index idx_branch_id on lock_table (branch_id);
配置 hosts
1 2 127.0.0.1 tcboot-mysql 127.0.0.1 tcboot-redis
启动 bin/seata-server.bat
Docker 部署【本地快速搭建环境】 使用 Docker 部署 Seata Server
1 docker run --name seata-server -p 8091:8091 seataio/seata-server:1.4.2
容器命令行及查看日志 1 2 $ docker exec -it seata-server sh $ docker logs -f seata-server
准备 修改
每个业务库中都要有undo_log
表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 DROP TABLE IF EXISTS `undo_log`;CREATE TABLE `undo_log` ( `id` bigint (20 ) NOT NULL AUTO_INCREMENT, `branch_id` bigint (20 ) NOT NULL , `xid` varchar (100 ) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL , `context` varchar (128 ) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL , `rollback_info` longblob NOT NULL , `log_status` int (11 ) NOT NULL , `log_created` datetime(0 ) NOT NULL , `log_modified` datetime(0 ) NOT NULL , PRIMARY KEY (`id`) USING BTREE, UNIQUE INDEX `ux_undo_log`(`xid`, `branch_id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic ;
application.yml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 spring: datasource: dynamic: seata: true seata-mode: AT datasource: order: username: root password: 123456 url: jdbc:mysql://39.108.158.138:3306/seata_order?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&useSSL=false driver-class-name: com.mysql.cj.jdbc.Driver schema: classpath:db/schema-order.sql test: username: sa password: "" url: jdbc:h2:mem:test driver-class-name: org.h2.Driver seata: false seata: enable-auto-data-source-proxy: false service: grouplist: default: tcboot-seata:8091 vgroup-mapping: springboot-seata-group: default tx-service-group: springboot-seata-group
pom.xml
1 2 3 4 5 <dependency > <groupId > org.jeecgframework.boot</groupId > <artifactId > jeecg-boot-starter-seata</artifactId > </dependency >
使用
多数据源:第一个事务开始的service方法添加注解 @GlobalTransactional
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @GlobalTransactional public Result<?> test() { Event event = new Event (); event.setAddress("ceshi" ); this .save(event); ActZprocess actZprocess = new ActZprocess (); actZprocess.setBusinessTable("ceshi" ); actZprocessService.save(actZprocess); taskService.setVariable("577867" , "ceshi" , "ceshi" ); if (true ) { throw new RuntimeException ("抛出异常1111" ); } return Result.OK("test" ); }
单数据源使用 @Transactional 或 @GlobalTransactional 都可以
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public Result<?> test() { Event event = new Event (); event.setAddress("ceshi" ); this .save(event); ActZprocess actZprocess = new ActZprocess (); actZprocess.setBusinessTable("ceshi" ); actZprocessService.save(actZprocess); if (true ) { throw new RuntimeException ("抛出异常1111" ); } return Result.OK("test" ); }
代理 Activiti 工作流数据源 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 package org.jeecg.config;import com.alibaba.druid.pool.DruidDataSource;import io.seata.rm.datasource.DataSourceProxy;import org.activiti.engine.ProcessEngineConfiguration;import org.activiti.spring.SpringProcessEngineConfiguration;import org.activiti.spring.boot.AbstractProcessEngineAutoConfiguration;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.jdbc.datasource.DataSourceTransactionManager;import org.springframework.transaction.PlatformTransactionManager;import javax.annotation.Resource;import javax.sql.DataSource;@Configuration public class ActivitiDataSourceConfig extends AbstractProcessEngineAutoConfiguration { @Resource private ActivitiDataSourceProperties activitiDataSourceProperties; public DataSource activitiDataSource () { DruidDataSource DruiddataSource = new DruidDataSource (); DruiddataSource.setUrl(activitiDataSourceProperties.getUrl()); DruiddataSource.setDriverClassName(activitiDataSourceProperties.getDriverClassName()); DruiddataSource.setPassword(activitiDataSourceProperties.getPassword()); DruiddataSource.setUsername(activitiDataSourceProperties.getUsername()); return DruiddataSource; } public PlatformTransactionManager transactionManager () { return new DataSourceTransactionManager (new DataSourceProxy (activitiDataSource())); } @Bean public SpringProcessEngineConfiguration springProcessEngineConfiguration () { SpringProcessEngineConfiguration configuration = new SpringProcessEngineConfiguration (); configuration.setDataSource(new DataSourceProxy (activitiDataSource())); configuration.setDatabaseSchemaUpdate(ProcessEngineConfiguration.DB_SCHEMA_UPDATE_TRUE); configuration.setJobExecutorActivate(false ); configuration.setTransactionManager(transactionManager()); configuration.setActivityFontName("宋体" ); configuration.setLabelFontName("宋体" ); configuration.setAnnotationFontName("宋体" ); configuration.setIdGenerator(new MyUUIDgenerator ()); return configuration; } }
问题 Cause: io.seata.common.exception.ShouldNeverHappenException 解决:排查涉及到的数据库表,是否都有ID主键字段
TransactionException[Could not register branch into global session xid 解决:全局事物方式内部是否有 @DS(“xxx”) 切换数据源的接口调用;是否有加 @Transaction 注解的方式调用
文档 基础介绍
PS:一般需要分布式事务的场景大多数都是微服务化,个人并不建议在单体项目引入多数据源+分布式事务,有能力尽早拆开,可为过度方案。
注意事项 dynamic-datasource-sring-boot-starter
组件内部开启seata后会自动使用DataSourceProxy来包装DataSource,所以需要以下方式来保持兼容。
1.如果你引入的是seata-all,请不要使用@EnableAutoDataSourceProxy注解。
2.如果你引入的是seata-spring-boot-starter 请关闭自动代理。
1 2 seata: enable-auto-data-source-proxy: false
示例项目 https://github.com/dynamic-datasource/dynamic-datasource-samples/tree/master/tx-samples/tx-seata-sample
此工程为 多数据源+druid+seata+mybatisPlus的版本。
模拟用户下单,扣商品库存,扣用户余额,初步可分为订单服务+商品服务+用户服务。
环境准备 为了快速演示相关环境都采用docker部署,生产上线请参考seata官方文档使用。
准备seata-server。
1 docker run --name seata-server -p 8091:8091 -d seataio/seata-server
准备mysql数据库,账户root密码123456。
1 docker run --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql:5.7
创建相关数据库。
创建 seata-order``seata-product``seata-account
模拟连接不同的数据库。
1 2 3 CREATE DATABASE IF NOT EXIST seata-order; CREATE DATABASE IF NOT EXIST seata-product; CREATE DATABASE IF NOT EXIST seata-account;
准备相关数据库脚本。
每个数据库下脚本相关的表,seata需要undo_log来监测和回滚。
相关的脚本不用自行准备,本工程已在resources/db下面准备好,另外配合多数据源的自动执行脚本功能,应用启动后会自动执行。
工程准备
引入相关依赖,seata+druid+mybatisPlus+dynamic-datasource+mysql+lombok。
1 2 3 4 5 6 7 8 9 10 11 <dependency> <groupId>io.seata</groupId> <artifactId>seata-spring-boot-starter</artifactId> <version>1.4.2</version> </dependency> <dependency> <groupId>com.baomidou</groupId> <artifactId>dynamic-datasource-spring-boot-starter</artifactId> <version>3.4.0</version> </dependency> # 省略,查看示例项目
编写相关yaml配置。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 spring: application: name: dynamic datasource: dynamic: primary: order strict: true seata: true #开启seata代理,开启后默认每个数据源都代理,如果某个不需要代理可单独关闭 seata-mode: AT #支持XA及AT模式,默认AT datasource: order: username: root password: 123456 url: jdbc:mysql://39.108.158.138:3306/seata_order?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&useSSL=false driver-class-name: com.mysql.cj.jdbc.Driver schema: classpath:db/schema-order.sql account: username: root password: 123456 url: jdbc:mysql://39.108.158.138:3306/seata_account?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&useSSL=false driver-class-name: com.mysql.cj.jdbc.Driver schema: classpath:db/schema-account.sql product: username: root password: 123456 url: jdbc:mysql://39.108.158.138:3306/seata_product?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&useSSL=false driver-class-name: com.mysql.cj.jdbc.Driver schema: classpath:db/schema-product.sql test: username: sa password: "" url: jdbc:h2:mem:test driver-class-name: org.h2.Driver seata: false #这个数据源不需要seata seata: enabled: true application-id: applicationName tx-service-group: my_test_tx_group enable-auto-data-source-proxy: false #一定要是false service: vgroup-mapping: my_test_tx_group: default #key与上面的tx-service-group的值对应 grouplist: default: 39.108.158.138:8091 #seata-server地址仅file注册中心需要 config: type: file registry: type: file
代码编写 参考工程下面的代码完成controller,service,maaper,entity,dto等。
订单服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 @Slf4j @Service public class OrderServiceImpl implements OrderService { @Resource private OrderDao orderDao; @Autowired private AccountService accountService; @Autowired private ProductService productService; @DS("order")//每一层都需要使用多数据源注解切换所选择的数据库 @Override @Transactional @GlobalTransactional //重点 第一个开启事务的需要添加seata全局事务注解 public void placeOrder(PlaceOrderRequest request) { log.info("=============ORDER START================="); Long userId = request.getUserId(); Long productId = request.getProductId(); Integer amount = request.getAmount(); log.info("收到下单请求,用户:{}, 商品:{},数量:{}", userId, productId, amount); log.info("当前 XID: {}", RootContext.getXID()); Order order = Order.builder() .userId(userId) .productId(productId) .status(OrderStatus.INIT) .amount(amount) .build(); orderDao.insert(order); log.info("订单一阶段生成,等待扣库存付款中"); // 扣减库存并计算总价 Double totalPrice = productService.reduceStock(productId, amount); // 扣减余额 accountService.reduceBalance(userId, totalPrice); order.setStatus(OrderStatus.SUCCESS); order.setTotalPrice(totalPrice); orderDao.updateById(order); log.info("订单已成功下单"); log.info("=============ORDER END================="); } }
商品服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 @Slf4j @Service public class ProductServiceImpl implements ProductService { @Resource private ProductDao productDao; /** * 事务传播特性设置为 REQUIRES_NEW 开启新的事务 重要!!!!一定要使用REQUIRES_NEW */ @DS("product") @Transactional(propagation = Propagation.REQUIRES_NEW) @Override public Double reduceStock(Long productId, Integer amount) { log.info("=============PRODUCT START================="); log.info("当前 XID: {}", RootContext.getXID()); // 检查库存 Product product = productDao.selectById(productId); Integer stock = product.getStock(); log.info("商品编号为 {} 的库存为{},订单商品数量为{}", productId, stock, amount); if (stock < amount) { log.warn("商品编号为{} 库存不足,当前库存:{}", productId, stock); throw new RuntimeException("库存不足"); } log.info("开始扣减商品编号为 {} 库存,单价商品价格为{}", productId, product.getPrice()); // 扣减库存 int currentStock = stock - amount; product.setStock(currentStock); productDao.updateById(product); double totalPrice = product.getPrice() * amount; log.info("扣减商品编号为 {} 库存成功,扣减后库存为{}, {} 件商品总价为 {} ", productId, currentStock, amount, totalPrice); log.info("=============PRODUCT END================="); return totalPrice; } }
用户服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 @Slf4j @Service public class AccountServiceImpl implements AccountService { @Resource private AccountDao accountDao; /** * 事务传播特性设置为 REQUIRES_NEW 开启新的事务 重要!!!!一定要使用REQUIRES_NEW */ @DS("account") @Override @Transactional(propagation = Propagation.REQUIRES_NEW) public void reduceBalance(Long userId, Double price) { log.info("=============ACCOUNT START================="); log.info("当前 XID: {}", RootContext.getXID()); Account account = accountDao.selectById(userId); Double balance = account.getBalance(); log.info("下单用户{}余额为 {},商品总价为{}", userId, balance, price); if (balance < price) { log.warn("用户 {} 余额不足,当前余额:{}", userId, balance); throw new RuntimeException("余额不足"); } log.info("开始扣减用户 {} 余额", userId); double currentBalance = account.getBalance() - price; account.setBalance(currentBalance); accountDao.updateById(account); log.info("扣减用户 {} 余额成功,扣减后用户账户余额为{}", userId, currentBalance); log.info("=============ACCOUNT END================="); } }
测试 在schema自动执行的脚本里,默认设置了商品价格为10,商品总数量为20,用户余额为50。
启动项目后通过命令行执行。
模拟正常下单,买一个商品。
1 2 3 4 5 6 7 8 curl -X POST \ http://localhost:8080/order/placeOrder \ -H 'Content-Type: application/json' \ -d '{ "userId": 1, "productId": 1, "amount": 1 }'
模拟库存不足,事务回滚。
1 2 3 4 5 6 7 8 curl -X POST \ http://localhost:8080/order/placeOrder \ -H 'Content-Type: application/json' \ -d '{ "userId": 1, "productId": 1, "amount": 22 }'
模拟用户余额不足,事务回滚。
1 2 3 4 5 6 7 8 curl -X POST \ http://localhost:8080/order/placeOrder \ -H 'Content-Type: application/json' \ -d '{ "userId": 1, "productId": 1, "amount": 6 }'
注意观察运行日志,至此分布式事务集成案例全流程完毕。