银行跨行转账业务是一个典型分布式事务场景,假设 A 需要跨行转账给 B,那么就涉及两个银行的数据,无法通过一个数据库的本地事务保证转账的 ACID ,只能够通过分布式事务来解决。
市面上使用比较多的分布式事务框架,支持 SAGA 的,大部分都是 JAVA 为主的,没有提供 C# 的对接方式,或者是对接难度大,一定程度上让人望而却步。
这里推荐一下叶东富大佬的分布式事务框架 dtm,一款跨语言的开源分布式事务管理器,优雅的解决了幂等、空补偿、悬挂等分布式事务难题。提供了简单易用、高性能、易水平扩展的分布式事务解决方案。
老黄在搜索相关分布式事务资料的时候,他写的文章都是相对比较好理解的,也就是这样关注到了 dtm 这个项目。
下面就基于这个框架来实践一下银行转账的例子。
前置工作
dotnet add package Dtmcli --version 0.3.0
先来看一下一个成功完成的 SAGA 时序图。
上图的微服务1,对应我们示例的 OutApi,也就是转钱出去的那个服务。
微服务2,对应我们示例的 InApi,也就是转钱进来的那个服务。
下面是两个服务的正向操作和补偿操作的处理。
OutApi
app.MapPost("/api/TransOut", (string branch_id, string gid, string op, TransRequest req) => { // 进行 数据库操作 Console.WriteLine($"用户【{req.UserId}】转出【{req.Amount}】正向操作,gid={gid}, branch_id={branch_id}, op={op}"); return Results.Ok(TransResponse.BuildSucceedResponse()); }); app.MapPost("/api/TransOutCompensate", (string branch_id, string gid, string op, TransRequest req) => { // 进行 数据库操作 Console.WriteLine($"用户【{req.UserId}】转出【{req.Amount}】补偿操作,gid={gid}, branch_id={branch_id}, op={op}"); return Results.Ok(TransResponse.BuildSucceedResponse()); });
InApi
app.MapPost("/api/TransIn", (string branch_id, string gid, string op, TransRequest req) => { Console.WriteLine($"用户【{req.UserId}】转入【{req.Amount}】正向操作,gid={gid}, branch_id={branch_id}, op={op}"); return Results.Ok(TransResponse.BuildSucceedResponse()); }); app.MapPost("/api/TransInCompensate", (string branch_id, string gid, string op, TransRequest req) => { Console.WriteLine($"用户【{req.UserId}】转入【{req.Amount}】补偿操作,gid={gid}, branch_id={branch_id}, op={op}"); return Results.Ok(TransResponse.BuildSucceedResponse()); });
注:示例为了简单,没有进行实际的数据库操作。
到此各个子事务的处理已经 OK 了,然后是开启 SAGA 事务,进行分支调用
var userOutReq = new TransRequest() { UserId = "1", Amount = -30 }; var userInReq = new TransRequest() { UserId = "2", Amount = 30 }; var ct = new CancellationToken(); var gid = await dtmClient.GenGid(ct); var saga = new Saga(dtmClient, gid) .Add(outApi + "/TransOut", outApi + "/TransOutCompensate", userOutReq) .Add(inApi + "/TransIn", inApi + "/TransInCompensate", userInReq) ; var flag = await saga.Submit(ct); Console.WriteLine($"case1, {gid} saga 提交结果 = {flag}");
到这里,一个完整的 SAGA 分布式事务就编写完成了。
搭建好 dtm 的环境后,运行上面的例子,会看到下面的输出。
当然,上面的情况太理想了,转出转入都是一次性就成功了。
但是实际上我们会遇到许许多多的问题,最常见的应该就是网络故障了。
下面来看一个异常的 SAGA 示例
做一个假设,用户1的转出是正常的,但是用户2在转入的时候出现了问题。
由于事务已经提交给 dtm 了,按照 SAGA 事务的协议,dtm 会重试未完成的操作。
这个时候用户2 这边会出现什么样的情况呢?
无论是那一种,dtm 都会进行重试操作。这个时候会发生什么呢?我们继续往下看。
先看一下事务失败交互的时序图
再通过调整上面成功的例子,来比较直观的看看出现的情况。
在 InApi 加多一个转入失败的处理接口
app.MapPost("/api/TransInError", (string branch_id, string gid, string op, TransRequest req) => { Console.WriteLine($"用户【{req.UserId}】转入【{req.Amount}】正向操作--失败,gid={gid}, branch_id={branch_id}, op={op}"); //return Results.BadRequest(); return Results.Ok(TransResponse.BuildFailureResponse()); });
失败的返回有两种,一种是状态码大于 400,一种是状态码是 200 并且响应体包含 FAILURE,上面的例子是第二种
调整一下调用方,把转入正向操作替换成上面这个返回错误的接口。
var saga = new Saga(dtmClient, gid) .Add(outApi + "/TransOut", outApi + "/TransOutCompensate", userOutReq) .Add(inApi + "/TransInError", inApi + "/TransInCompensate", userInReq);
运行结果如下:
在这个例子中,只考虑补偿/重试成功的情况下。
用户1 转出的 30 块钱最终是回到了他的帐号上,他没有出现损失。
用户2 就有点苦逼了,转入没有成功,返回了失败,还触发了转入的补偿机制,结果就是把用户2 还没进帐的 30 块钱给多扣了,这个就是上面的情况2,常见的空补偿问题。
这个时候就要在进行转入补偿的时候做一系列的判断,转入有没有成功,转出有没有失败等等,把业务变的十分复杂。
如果出现了上述的情况1,会发生什么呢?
用户2 第一次已经成功转入 30 块钱,返回的也是成功,但是网络出了点问题,导致 dtm 认为失败了,它就会进行重试,相当于用户2 还会收到第二个转入 30 块钱的请求!也就是说这次转帐,用户2 会进账 60 块钱,翻倍了,也就是说这个请求不是幂等。
同样的,要处理这个问题,在进行转入的正向操作中也要进行一系列的判断,同样会把复杂度上升一个级别。
前面有提到 dtm 提供了子事务屏障的功能,保证了幂等、空补偿等常见问题。
再来看看这个子事务屏障的功能有没有帮我们简化上面异常处理。
子事务屏障,需要根据 trans_type,gid,branch_id 和 op 四个内容进行创建。
这4个内容 dtm 在回调时会放在 querysting 上面。
客户端里面提供了 IBranchBarrierFactory 来供我们使用。
针对上面的异常情况(用户2 凭空消失 30 块钱),对转入的补偿进行子事务屏障的改造。
app.MapPost("/api/BarrierTransInCompensate", async (string branch_id, string gid, string op, string trans_type, TransRequest req, IBranchBarrierFactory factory) => { var barrier = factory.CreateBranchBarrier(trans_type, gid, branch_id, op); using var db = Db.GeConn(); await barrier.Call(db, async (tx) => { // 转入失败的情况下,不应该输出下面这个 Console.WriteLine($"用户【{req.UserId}】转入【{req.Amount}】补偿操作,gid={gid}, branch_id={branch_id}, op={op}"); // tx 参数是事务,可和本地事务一起提交回滚 await Task.CompletedTask; }); Console.WriteLine($"子事务屏障-补偿操作,gid={gid}, branch_id={branch_id}, op={op}"); return Results.Ok(TransResponse.BuildSucceedResponse()); });
Call 方法就是关键所在了,需要传入一个 DbConnection 和真正的业务操作,这里的业务操作就是在控制台输出补偿操作的信息。
同样的,我们再调整一下调用方,把转入补偿操作替换成上面带子事务屏障的接口。
var saga = new Saga(dtmClient, gid) .Add(outApi + "/TransOut", outApi + "/TransOutCompensate", userOutReq) .Add(inApi + "/TransInError", inApi + "/BarrierTransInCompensate", userInReq) ;
再来运行这个例子。
会发现转入的补偿操作并没执行,控制台没有输出补偿信息,而是输出了
Will not exec busiCall, isNullCompensation=True, isDuplicateOrPend=False
这个就表明了,这个请求是个空补偿,是不应该执行业务方法的,既空操作。
再来看一下,转入成功的,但是 dtm 收到了失败的信号,不断重试造成重复请求的情况。
针对用户2 转入两次 30 块钱的异常情况,对转入的正向操作进行子事务屏障的改造。
app.MapPost("/api/BarrierTransIn", async (string branch_id, string gid, string op, string trans_type, TransRequest req, IBranchBarrierFactory factory) => { Console.WriteLine($"用户【{req.UserId}】转入【{req.Amount}】请求来了!!! gid={gid}, branch_id={branch_id}, op={op}"); var barrier = factory.CreateBranchBarrier(trans_type, gid, branch_id, op); using var db = Db.GeConn(); await barrier.Call(db, async (tx) => { var c = Interlocked.Increment(ref _errCount); // 模拟一个超时执行 if (c > 0 && c < 2) await Task.Delay(10000); Console.WriteLine($"用户【{req.UserId}】转入【{req.Amount}】正向操作,gid={gid}, branch_id={branch_id}, op={op}"); await Task.CompletedTask; }); return Results.Ok(TransResponse.BuildSucceedResponse()); });
这里通过一个超时执行来让 dtm 进行转入正向操作的重试。
同样的,我们再调整一下调用方,把转入的正向操作也替换成上面带子事务屏障的接口。
var saga = new Saga(dtmClient, gid) .Add(outApi + "/TransOut", outApi + "/TransOutCompensate", userOutReq) .Add(inApi + "/BarrierTransIn", inApi + "/BarrierTransInCompensate", userInReq) ;
再来运行这个例子。
可以看到转入的正向操作确实是触发了多次,第一次实际上是成功,只是响应比较慢,导致 dtm 认为是失败了,触发了第二次请求,但是第二次请求并没有执行业务操作,而是输出了
Will not exec busiCall, isNullCompensation=False, isDuplicateOrPend=True
这个就表明了,这个请求是个重复请求,是不应该执行业务方法的,保证了幂等。
到这里,可以看出,子事务屏障确实解决了幂等和空补偿的问题,大大降低了业务判断的复杂度和出错的可能性。
在这篇文章里,也通过几个例子,完整给出了编写一个 SAGA 事务的过程,涵盖了正常成功完成,异常情况,以及成功回滚的情况。希望对研究分布式事务的您有所帮助。
本文示例代码: DtmSagaSample
参考资料