近期写的一个项目,整个项目采用的DDD(领域驱动)设计,所以刚开始设计的时候就将各个业务以聚合根的方式进行划分,以该业务场景为例,整体的业务简述为,当客户进行付款以后,创建一个付款单,然后由财务手动将付款单与发货单进行账务冲抵和关联,同时还需要针对付款的客户及企业的余额进行相应的变动,所以,当付款单和发货单进行冲抵业务的时候,客户及其企业的待付款金额将会根据冲抵的金额,进行变动,所以该业务的主要操作是首先针对发货单的待付款金额进行冲抵扣减,此时操作的聚合根为发货单的聚合根,而因为还需要同时针对用户的账户金额进行变动,所以在操作发货单的聚合根的时候,触发一个领域事件,而用户的聚合根订阅该事件,当该事件被触发的时候,用户的聚合根接收到事件,并随之进行相应的操作。
一般情况下,领域事件可以看作一个一对多的多播事件即一方触发多方进行响应,一个聚合根发生改变并且触发领域事件的时候,其他与之关联的聚合根都将订阅该事件,在被触发的时候进行响应并对自身进行对应的操作,而且事件一般不会存在返回值的情况,所以订阅方的业务是否执行成功,失败后需要进行什么样的操作,可以根据业务的不同进行不同的操作,如果是需要强一致性的业务,就需要考虑操作异常的处理。如果是一致性不强的业务,则可以考虑自身重试等机制。而目前该项目所遇到的就是强一致性的业务需求,那么只能一荣俱荣,一损俱损。
在领域事件的是先方面,我采用的是NetCore项目中比较流行的MediatR组件(一种简单的实现进程内的消息传递机制的类库),采用MediatR的消息通知机制,在进行数据操作的时候,添加并触发领域事件,从而实现领域事件的触发以及订阅处理,同时采用EF的事务来确保数据库在操作数据时候的一致性。
//业务代码 /// <summary> /// 业务开始 /// </summary> /// <returns></returns> public async Task Task(CancellationToken token=default) { // 下面所用的未声明对象均有DI生成。 // DbCotext 继承 IUnitOfWork,并且通过IOC将其生命周期设为Scope(请求域) //_repository 为聚合根的仓储类,在实例化时注入IUnitOfWork进行相应的数据库操作。 //具体略 var db = _repository.UnitOfWork as TestDbContext; using (var transaction=await db.Database.BeginTransactionAsync(IsolationLevel.ReadCommitted,token)) { A a = new A(); a.AddDomainEvent(new TestEvent()); await _repository.AddAsync(a); await db.CommitTransactionAsync(transaction, token); } }
//数据库上下文部分方法 /// <summary> /// 异步提交事务 /// </summary> /// <param name="transaction"></param> /// <param name="cancellationToken"></param> /// <returns></returns> public async Task CommitTransactionAsync(IDbContextTransaction transaction,CancellationToken cancellationToken=default) { try { await EventTrigger(cancellationToken); await SaveChangesAsync(cancellationToken); await transaction.CommitAsync(cancellationToken); } catch (Exception ex) { await RollbackTransactionAsync(cancellationToken); throw; } finally { transaction.Dispose(); transaction = null; } } /// <summary> /// 事件触发器 /// </summary> /// <param name="cancellationToken"></param> /// <returns></returns> private async Task EventTrigger(CancellationToken cancellationToken = default(CancellationToken)) { var mediator =_serviceProvider.GetService<IMediator>()!; await DispatchDomainEventAsync(mediator,cancellationToken); } /// <summary> /// 调度领域事件 /// </summary> /// <param name="mediator"></param> /// <param name="cancellationToken"></param> /// <typeparam name="T"></typeparam> /// <returns></returns> private async Task DispatchDomainEventAsync(IMediator mediator,CancellationToken cancellationToken = default(CancellationToken)) { //当前上下文的所有添加了领域事件的聚合根 List<EntityEntry<IAggregateRoot>> domainEntries = this.ChangeTracker .Entries<IAggregateRoot>() .Where(x => x.Entity.DomainEvents.Any()) .ToList(); //获取领域事件 IEnumerable<INotification> domainEvents = domainEntries.SelectMany(x => x.Entity.DomainEvents).ToList(); foreach (var domainEntry in domainEntries) { domainEntry.Entity.ClearDomainEvent(); } //发送事件 var tasks = domainEvents.Select(async domainEvent => { await mediator.Publish(domainEvent, cancellationToken); }); //同时执行 await Task.WhenAll(tasks); }
//订阅方 public class TestEventHandler: INotificationHandler<TestEvent> { #region fields private readonly IBRepository _repository; #endregion #region ctor /// <summary> /// 事件处理方 /// </summary> /// <param name="repository"></param> public TestEventHandler(IBRepository repository) { _repository = repository; } #endregion #region 处理程序 /// <summary> /// 处理程序 /// </summary> /// <param name="notification"></param> /// <param name="cancellationToken"></param> public async Task Handle(TestEvent eventData, CancellationToken cancellationToken) { // 参数 eventData 是可以传递数据的,此示例省略 B b = new B(); b.Num = 1; await _repository.AddAsync(b); } #endregion }
通过上面代码可以推断出,这次业务首先在操作之前开启ef事务,确保数据,一致性,然后在聚合根A进行保存之前触发领域事件,然后通过MediatR对事件进行调度,通知订阅方,而订阅方则根据自身的情况,实现自身的仓储,对操作进行处理。最后通过统一保存,提交事务,确保数据的一致性。
在业务代码实现以后,就针对该项业务进行测试,为了保险起见,专门针对数据一致性进行了测试,而结果大失所望,在数据进行保存的时候,故意调整了表结构的表A在保存的时候报错了没有将数据添加成功,而未调整的表B,则正常添加进了数据,数据的一致性并没有确保成功。这整个事情就变得很邪门了。而后就开始我的爬坑之旅。
起初,我以为是因为在进行调度的时候,采用了异步+Task的方式对领域事件进行了调度操作,所以导致事件在进行处理的时候和主方法的数据库上下不是一个导致的,所以针对数据库上下文的注入方式进行了排查,最后结果是 事件订阅处理方的数据库上下文和主方法的数据库上下文为同一个实例,所以不存在生命周期或不是实例不同的问题。
因为整个项目的架构都是我自己搭建的,出于对自身能力的怀疑,于是就有上面Demo的诞生,上面的Demo是我根据思路又重新调整后写的,结果神奇的一幕出现了,上面的框架事务生效了!!!!(但是这又是另外一个坑,不过不知道是不是负负得正把,反正促使我找到了真正的问题。)
在这个阶段,我进行了疯狂的调试,在调试的时候,特意输出了EF的Debug日志。从事务开启,到事务保存前创建事务保存点,再到保存,报错,回滚,删除事务保存点,这些日志我全都看到EF输出出来了并且排查了一遍,各种操作层出不穷,不再赘述,反正没有解决。
不得不说我飘了,我真真切切的开始怀疑过EFCore,甚至把这部分源码以及文档看了一遍,结果还是没看出什么所以然来。还是无果。
最后!!!!我要感恩的是马桶,在我一次次一天天的失败后,与 昨晚(今天凌晨)12:30在我心灰意冷关了电脑以后,坐在马桶上思考解决方案,随手用手机搜了一下Mysql事务打开了一篇博客,具体博客内容我忘了,是一篇JAVA的,但是核心内容是在JAVA中开启事务不管用的情况,一下我就来劲了,仔细一看,卧槽,我怀疑这个,怀疑那个,为毛就是没有怀疑过是Mysql的问题呢?Mysql的存储引擎,我所使用的版本是5.7.26,它默认的存储引擎是MyISAM的,这玩意它不支持事务啊!!!!!它不支持!!!
在我恶补了Mysql数据库引擎区别以后,我将数据库的存储引擎改为InnoDB后,完美!!!解决了!!(具体区别可以去搜一下,网上到处都是,烂大街了都,我就不复制别人的了)两周,整整两周,只要有时间,就在电脑前摸索研究这个问题,最后却发现是这么一个不起眼的问题导致的。却也说明了我个人对数据库知识的薄弱,后期需要恶补数据库。