@[toc]
还是那句老话,网上关于分布式事务讲解理论比较多,案例比较少,最近松哥想通过几个案例,来和大家把常见的分布式事务解决方案过一遍,前面我和大家分享了 Seata 中的 AT 模式,今天我们来看 TCC 模式。
TCC 模式和松哥前面跟大家演示的 AT 模式有很多相似的地方,也有很多不同的地方,之前读者麻瓜大佬投稿过一篇文章讲 TCC 模式:
感兴趣的小伙伴也可以先看看。
今天我们还是先来整一个案例,把案例分析完了,大家基本上就明白 TCC 是咋回事了,同时也就明白 TCC 和 AT 之间的差异了。
还是 Seata 官方的那个仓库,它里边有 TCC 的案例,不过由于它这个仓库案例较多,需要下载的依赖也较多,所以全部导入会容易导入失败,下面是松哥整理好的案例(去除了不必要的工程),可以直接导入,大家可以在公号后台回复 seata-demo
下载这个案例。
官方给的 TCC 案例是一个经典的转账案例,很多小伙伴第一次接触事务的时候,学的案例就是转账,所以这个业务对于大家来说很好理解。
我先来说一下这个案例的业务逻辑,然后我们再来看代码,他的流程是这样的:
有人可能会说,都是 provider 提供的接口,也算分布式事务?算!当然算!虽然上面提到的两个接口都是 provider 提供的,但是由于这里存在两个数据库,不同接口操作不同的数据库,所以依然是分布式事务。
这是这个项目大致上要做的事情。
官方的案例用的是 H2 数据库,这个大家不方便看效果,因此,我们这里稍微做一点配置,将数据库换为 MySQL,这样我们方便看转账效果。
具体配置步骤如下:
创建两个空的库就行了,不用创建表,项目启动的时候会自动初始化表。
官方给的案例有点小问题,直接启动会报错,原因在于案例中使用的 DBCP 和 MyBatis 版本冲突,需要大家先在 pom.xml 中把 DBCP 的版本号改为 1.4,如下:
<properties> <curator.version>4.2.0</curator.version> <commons-dbcp.version>1.4</commons-dbcp.version> <h2.version>1.4.181</h2.version> <mybatis.version>3.5.6</mybatis.version> <mybatis.spring.version>1.3.1</mybatis.spring.version> </properties>
然后我们再加入 MySQL 驱动,如下:
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.25</version> </dependency>
虽然案例中有的东西有点像老古董了,但是本着能简则简的原则,我就不去修改了,咱们只要项目跑起来,能够帮助我们理解 TCC 就行了。
另外,这个项目引用的 Dubbo 版本也有问题,我们手动给其加上版本号(默认的 3.0.1 这个版本有问题,松哥亲测 2.7.3 可用):
<dependency> <groupId>org.apache.dubbo</groupId> <artifactId>dubbo</artifactId> <exclusions> <exclusion> <groupId>org.springframework</groupId> <artifactId>spring</artifactId> </exclusion> </exclusions> <version>2.7.3</version> </dependency>
数据库配置有两个,一个是转账转出数据源,另一个是转账转入数据源,相关配置在 src/main/resources/db-bean
目录下。
先来修改 from-datasource-bean.xml,主要修改数据源,如下:
<bean id="fromAccountDataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName"> <value>com.mysql.cj.jdbc.Driver</value> </property> <property name="url"> <value>jdbc:mysql:///transfer_from_db?serverTimezone=Asia/Shanghai</value> </property> <property name="username"> <value>root</value> </property> <property name="password"> <value>123</value> </property> </bean>
改四个东西:数据库驱动、数据库连接地址、数据库用户名、数据库密码。
再来修改 to-datasource-bean.xml:
<bean id="toAccountDataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName"> <value>com.mysql.cj.jdbc.Driver</value> </property> <property name="url"> <value>jdbc:mysql:///transfer_to_db?serverTimezone=Asia/Shanghai</value> </property> <property name="username"> <value>root</value> </property> <property name="password"> <value>123</value> </property> </bean>
这两个配置主要是连接的数据库不同。
OK,如此之后,我们的配置就算完成了。
案例运行分为两部分。
找到 src/main/java/io/seata/samples/tcc/transfer/starter/TransferProviderStarter.java
,执行 main 方法,直接执行即可,执行之后,控制台看到如下信息就表示项目启动成功并且表结构以及表数据初始化成功:
启动过程中,可能会有一个空指针异常,不过并不影响使用,所以可以忽略之。
项目启动成功之后,我们可以查看一下刚刚创建好的两个数据库,每个数据库里边都有三张表:
先来看转出的库:
account 表中有两条记录:
这张表中有 A、B 两个账户,各有 100 块钱,各自被冻结的资金(freezed_amount)都为 0。
business_action 和 business_activity 都是空表。
再来看转入的库:
可以看到,和 transfer_from_db 一模一样的三张表,就是 account 中的用户是 C,也有 100 块钱。
找到 src/main/java/io/seata/samples/tcc/transfer/starter/TransferApplication.java
,这个里边的 main 方法中有两个测试方法,doTransferSuccess
会转账成功,doTransferFailed
则会转账失败。
这两个方法我们首先注释掉 doTransferFailed
,运行 doTransferSuccess
方法,控制台输出日志如下:
这表示转账成功。
此时查看数据库,A 账户少了 10 块钱,C 账户多了 10 块钱:
然后我们注释掉 doTransferSuccess
,运行 doTransferFailed
方法,结果如下:
可以看到,转账失败,此时查看数据库,发现两个库中的数据均未发生改变,说明数据已经回滚了。
好啦,这就是官方给我们提供的一个典型的转账案例。那么这个转账案例是怎么实现的?接下来我们来分析一下代码,代码分析完了,大家就明白什么是 TCC 了!
这里关于 Dubbo 的调用逻辑,松哥就不多说了,相信大家都会,咱们主要来说说跟分布式事务相关的代码。
首先,这个项目中提供了两个接口:
这两个接口分别代表了转账时候的两个步骤:
这两个接口的定义其实非常类似,只要我们看懂其中一个,另外一个就很容易懂了。
这是把钱转出去的接口,我们先来看接口的定义:
public interface FirstTccAction { /** * 一阶段方法 * * @param businessActionContext * @param accountNo * @param amount */ @TwoPhaseBusinessAction(name = "firstTccAction", commitMethod = "commit", rollbackMethod = "rollback") public boolean prepareMinus(BusinessActionContext businessActionContext, @BusinessActionContextParameter(paramName = "accountNo") String accountNo, @BusinessActionContextParameter(paramName = "amount") double amount); /** * 二阶段提交 * @param businessActionContext * @return */ public boolean commit(BusinessActionContext businessActionContext); /** * 二阶段回滚 * @param businessActionContext * @return */ public boolean rollback(BusinessActionContext businessActionContext); }
可以看到,接口中有三个方法:
这三个方法的名字并不是固定的,可以自己定义,我们来看这三个方法是干嘛的(实现类是 FirstTccActionImpl):
FirstTccActionImpl#prepareMinus
方法中:@Override public boolean prepareMinus(BusinessActionContext businessActionContext, final String accountNo, final double amount) { //分布式事务ID final String xid = businessActionContext.getXid(); return fromDsTransactionTemplate.execute(new TransactionCallback<Boolean>(){ @Override public Boolean doInTransaction(TransactionStatus status) { try { //校验账户余额 Account account = fromAccountDAO.getAccountForUpdate(accountNo); if(account == null){ throw new RuntimeException("账户不存在"); } if (account.getAmount() - amount < 0) { throw new RuntimeException("余额不足"); } //冻结转账金额 double freezedAmount = account.getFreezedAmount() + amount; account.setFreezedAmount(freezedAmount); fromAccountDAO.updateFreezedAmount(account); System.out.println(String.format("prepareMinus account[%s] amount[%f], dtx transaction id: %s.", accountNo, amount, xid)); return true; } catch (Throwable t) { t.printStackTrace(); status.setRollbackOnly(); return false; } } }); }
这个方法就干了三件事:1.检查 A 账户是否存在,不存在就抛异常;2.检查 A 账户余额是否小于 10 块钱,如果是,抛异常(钱不够,没法转账);3.修改 A 账户的数据库记录,将冻结资金标记出来(A 账户的 freezed_amount 字段将被修改为 10)。
FirstTccActionImpl#commit
方法中:@Override public boolean commit(BusinessActionContext businessActionContext) { //分布式事务ID final String xid = businessActionContext.getXid(); //账户ID final String accountNo = String.valueOf(businessActionContext.getActionContext("accountNo")); //转出金额 final double amount = Double.valueOf(String.valueOf(businessActionContext.getActionContext("amount"))); return fromDsTransactionTemplate.execute(new TransactionCallback<Boolean>() { @Override public Boolean doInTransaction(TransactionStatus status) { try{ Account account = fromAccountDAO.getAccountForUpdate(accountNo); //扣除账户余额 double newAmount = account.getAmount() - amount; if (newAmount < 0) { throw new RuntimeException("余额不足"); } account.setAmount(newAmount); //释放账户 冻结金额 account.setFreezedAmount(account.getFreezedAmount() - amount); fromAccountDAO.updateAmount(account); System.out.println(String.format("minus account[%s] amount[%f], dtx transaction id: %s.", accountNo, amount, xid)); return true; }catch (Throwable t){ t.printStackTrace(); status.setRollbackOnly(); return false; } } }); }
看看这个方法的执行逻辑:
这就是 commit 方法所做的事情。
FirstTccActionImpl#rollback
方法中:@Override public boolean rollback(BusinessActionContext businessActionContext) { //分布式事务ID final String xid = businessActionContext.getXid(); //账户ID final String accountNo = String.valueOf(businessActionContext.getActionContext("accountNo")); //转出金额 final double amount = Double.valueOf(String.valueOf(businessActionContext.getActionContext("amount"))); return fromDsTransactionTemplate.execute(new TransactionCallback<Boolean>() { @Override public Boolean doInTransaction(TransactionStatus status) { try{ Account account = fromAccountDAO.getAccountForUpdate(accountNo); if(account == null){ //账户不存在,回滚什么都不做 return true; } //释放冻结金额 account.setFreezedAmount(account.getFreezedAmount() - amount); fromAccountDAO.updateFreezedAmount(account); System.out.println(String.format("Undo prepareMinus account[%s] amount[%f], dtx transaction id: %s.", accountNo, amount, xid)); return true; }catch (Throwable t){ t.printStackTrace(); status.setRollbackOnly(); return false; } } }); }
可以看到,回滚的反向补偿其实很简单,先看下账户是否存在,账户存在的话,把冻结的资金取消冻结就行了。
这就是把钱转出去的整个过程。
这是把钱转进来的接口。
public interface SecondTccAction { /** * 一阶段方法 * * @param businessActionContext * @param accountNo * @param amount */ @TwoPhaseBusinessAction(name = "secondTccAction", commitMethod = "commit", rollbackMethod = "rollback") public boolean prepareAdd(BusinessActionContext businessActionContext, @BusinessActionContextParameter(paramName = "accountNo") String accountNo, @BusinessActionContextParameter(paramName = "amount") double amount); /** * 二阶段提交 * @param businessActionContext * @return */ public boolean commit(BusinessActionContext businessActionContext); /** * 二阶段回滚 * @param businessActionContext * @return */ public boolean rollback(BusinessActionContext businessActionContext); }
接口的实现类:
public class SecondTccActionImpl implements SecondTccAction { /** * 加钱账户 DAP */ private AccountDAO toAccountDAO; private TransactionTemplate toDsTransactionTemplate; /** * 一阶段准备,转入资金 准备 * @param businessActionContext * @param accountNo * @param amount * @return */ @Override public boolean prepareAdd(final BusinessActionContext businessActionContext, final String accountNo, final double amount) { //分布式事务ID final String xid = businessActionContext.getXid(); return toDsTransactionTemplate.execute(new TransactionCallback<Boolean>(){ @Override public Boolean doInTransaction(TransactionStatus status) { try { //校验账户 Account account = toAccountDAO.getAccountForUpdate(accountNo); if(account == null){ System.out.println("prepareAdd: 账户["+accountNo+"]不存在, txId:" + businessActionContext.getXid()); return false; } //待转入资金作为 不可用金额 double freezedAmount = account.getFreezedAmount() + amount; account.setFreezedAmount(freezedAmount); toAccountDAO.updateFreezedAmount(account); System.out.println(String.format("prepareAdd account[%s] amount[%f], dtx transaction id: %s.", accountNo, amount, xid)); return true; } catch (Throwable t) { t.printStackTrace(); status.setRollbackOnly(); return false; } } }); } /** * 二阶段提交 * @param businessActionContext * @return */ @Override public boolean commit(BusinessActionContext businessActionContext) { //分布式事务ID final String xid = businessActionContext.getXid(); //账户ID final String accountNo = String.valueOf(businessActionContext.getActionContext("accountNo")); //转出金额 final double amount = Double.valueOf(String.valueOf(businessActionContext.getActionContext("amount"))); return toDsTransactionTemplate.execute(new TransactionCallback<Boolean>() { @Override public Boolean doInTransaction(TransactionStatus status) { try{ Account account = toAccountDAO.getAccountForUpdate(accountNo); //加钱 double newAmount = account.getAmount() + amount; account.setAmount(newAmount); //冻结金额 清除 account.setFreezedAmount(account.getFreezedAmount() - amount); toAccountDAO.updateAmount(account); System.out.println(String.format("add account[%s] amount[%f], dtx transaction id: %s.", accountNo, amount, xid)); return true; }catch (Throwable t){ t.printStackTrace(); status.setRollbackOnly(); return false; } } }); } /** * 二阶段回滚 * @param businessActionContext * @return */ @Override public boolean rollback(BusinessActionContext businessActionContext) { //分布式事务ID final String xid = businessActionContext.getXid(); //账户ID final String accountNo = String.valueOf(businessActionContext.getActionContext("accountNo")); //转出金额 final double amount = Double.valueOf(String.valueOf(businessActionContext.getActionContext("amount"))); return toDsTransactionTemplate.execute(new TransactionCallback<Boolean>() { @Override public Boolean doInTransaction(TransactionStatus status) { try{ Account account = toAccountDAO.getAccountForUpdate(accountNo); if(account == null){ //账户不存在, 无需回滚动作 return true; } //冻结金额 清除 account.setFreezedAmount(account.getFreezedAmount() - amount); toAccountDAO.updateFreezedAmount(account); System.out.println(String.format("Undo prepareAdd account[%s] amount[%f], dtx transaction id: %s.", accountNo, amount, xid)); return true; }catch (Throwable t){ t.printStackTrace(); status.setRollbackOnly(); return false; } } }); } }
看懂了上面的 FirstTccActionImpl,SecondTccActionImpl 这个接口松哥就不啰嗦了,简单说一下:
这就是把钱收进来的大致过程。
具体转账是在 TransferServiceImpl 类中,在它的 transfer 方法中,去调用 FirstTccAction 和 SecondTccAction,一起来看下:
public class TransferServiceImpl implements TransferService { private FirstTccAction firstTccAction; private SecondTccAction secondTccAction; /** * 转账操作 * @param from 扣钱账户 * @param to 加钱账户 * @param amount 转账金额 * @return */ @Override @GlobalTransactional public boolean transfer(final String from, final String to, final double amount) { //扣钱参与者,一阶段执行 boolean ret = firstTccAction.prepareMinus(null, from, amount); if(!ret){ //扣钱参与者,一阶段失败; 回滚本地事务和分布式事务 throw new RuntimeException("账号:["+from+"] 预扣款失败"); } //加钱参与者,一阶段执行 ret = secondTccAction.prepareAdd(null, to, amount); if(!ret){ throw new RuntimeException("账号:["+to+"] 预收款失败"); } System.out.println(String.format("transfer amount[%s] from [%s] to [%s] finish.", String.valueOf(amount), from, to)); return true; } public void setFirstTccAction(FirstTccAction firstTccAction) { this.firstTccAction = firstTccAction; } public void setSecondTccAction(SecondTccAction secondTccAction) { this.secondTccAction = secondTccAction; } }
来看一下具体的转账逻辑:
这就是大致的转账逻辑。
经过上面的分析,相信小伙伴们对 TCC 已经有一些感觉了。
那么什么是 TCC?
TCC 是 Try-Confirm-Cancel 英文单词的简写。
在 TCC 模式中,一个事物是通过 Do-Commit/Rollback 来实现的,开发者需要给每一个服务间调用的操作接口,都提供一套 Try-Confirm/Cancel 接口,这套接口就类似于我们上面的 prepareXXX/commit/rollback 接口。
再举一个简化的电商案例,用户支付完成的时候由先订单服务处理,然后调用商品服务去减库存,这两个操作同时成功或者同时失败,这就涉及到分布式事务了:在 TCC 模式下,我们需要 3 个接口。首先是减库存的 Try 接口,在这里,我们要检查业务数据的状态、检查商品库存够不够,然后做资源的预留,也就是在某个字段上设置预留的状态,然后在 Confirm 接口里,完成库存减 1 的操作,在 Cancel 接口里,把之前预留的字段重置(预留的状态其实就类似于前面案例的冻结资金字段 freezed_amount
)。
为什么搞得这么麻烦呢?分成三个步骤来做有一个好处,就是在出错的时候,能够顺利的完成数据库重置(反向补偿),并且,只要我们 prepare 中的逻辑是正确的,那么即使 confirm 执行出错了,我们也可以进行重试。
我们再来看下面一张图:
根据两阶段行为模式的不同,我们将分支事务划分为 Automatic (Branch) Transaction Mode 和 TCC (Branch) Transaction Mode。
AT 模式基于支持本地 ACID 事务的关系型数据库:
关于 AT 这块,如果小伙伴们不熟悉,可以参考松哥前面的文章:
相应的,TCC 模式,不依赖于底层数据资源的事务支持:
所谓 TCC 模式,是指支持把自定义的分支事务纳入到全局事务的管理中。
回顾前面的案例,小伙伴们发现,分布式事务两阶段提交,在 TCC 中,prepare、commit 以及 rollback 中的逻辑都是我们自己写的,因此说 TCC 不依赖于底层数据资源的事务支持。
相比于 AT 模式,TCC 需要我们自己实现 prepare、commit 以及 rollback 逻辑,而在 AT 模式中,commit 和 rollback 都不用我们去管,Seata 会自动帮我们完成。
好啦,今天这篇文章松哥就和大家简单分享一下 Seata 中的 TCC 模式,建议小伙伴们一定先跑一下文章中的案例,然后再去看分析,就很容易懂了~
分布式事务的其他解决方案,我们后面再继续聊~
公众号江南一点雨后台回复 seata-demo
,可以下载本文案例。