上一节咱们讲了LocalEventBus,本节来讲本地事件总线(DistributedEventBus),采用的RabbitMQ进行实现。
Volo.Abp.EventBus.RabbitMQ
模块内部代码并不多,RabbitMQ的操作都集中在Volo.Abp.RabbitMQ
这个包中。
我们从模块定义开始看,项目启动的时候分别读取了appsetting.json
的配置参数和调用了RabbitMqDistributedEventBus
的Initialize
函数。
public class AbpEventBusRabbitMqModule : AbpModule { public override void ConfigureServices(ServiceConfigurationContext context) { var configuration = context.Services.GetConfiguration(); Configure<AbpRabbitMqEventBusOptions>(configuration.GetSection("RabbitMQ:EventBus")); } public override void OnApplicationInitialization(ApplicationInitializationContext context) { context .ServiceProvider .GetRequiredService<RabbitMqDistributedEventBus>() .Initialize(); } }
在Initialize
函数中我们根据 MessageConsumerFactory.Create
向内部进行查阅可以看到最终调用方法为RabbitMqMessageConsumer.TryCreateChannelAsync
并且在其内部我们可以看到下面代码,这里定义了消费的回调函数。反推Initialize
方法其实是在启动一个消费者。
public void Initialize() { Consumer = MessageConsumerFactory.Create( new ExchangeDeclareConfiguration( AbpRabbitMqEventBusOptions.ExchangeName, type: "direct", durable: true ), new QueueDeclareConfiguration( AbpRabbitMqEventBusOptions.ClientName, durable: true, exclusive: false, autoDelete: false ), AbpRabbitMqEventBusOptions.ConnectionName ); Consumer.OnMessageReceived(ProcessEventAsync); SubscribeHandlers(AbpDistributedEventBusOptions.Handlers); }
var consumer = new AsyncEventingBasicConsumer(Channel); consumer.Received += HandleIncomingMessageAsync;
继续向下看Consumer.OnMessageReceived(ProcessEventAsync);
该方法向一个并发安全集合输入一个委托事件,并该事件会在上面的HandleIncomingMessageAsync
会调中触发故确定为消费者的执行逻辑,而ProcessEventAsync
其实还是走了我们在讲LocalEventBus哪一套,寻找Handler执行函数。
SubscribeHandlers
还是上节讲的基类的函数,这里要注意内部调用的Subscribe
该方法中的 Consumer.BindAsync
会根据为消费者Bind路由,这样才能触发事件处理函数。
public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory) { var handlerFactories = GetOrCreateHandlerFactories(eventType); if (factory.IsInFactories(handlerFactories)) { return NullDisposable.Instance; } handlerFactories.Add(factory); if (handlerFactories.Count == 1) //TODO: Multi-threading! { Consumer.BindAsync(EventNameAttribute.GetNameOrDefault(eventType)); } return new EventHandlerFactoryUnregistrar(this, eventType, factory); }
看完了事件消费者我们来看看事件发布,直接看PublishAsync
函数就完事了,整个函数非常简单,都是RabbitMQ的操作语法,这里的路由Key是在EventNameAttribute.GetNameOrDefault(eventType);
函数中通过读取ETO上指定注解Name来指定的。
protected Task PublishAsync( string eventName, byte[] body, IBasicProperties properties, Dictionary<string, object> headersArguments = null, Guid? eventId = null) { using (var channel = ConnectionPool.Get(AbpRabbitMqEventBusOptions.ConnectionName).CreateModel()) { channel.ExchangeDeclare( AbpRabbitMqEventBusOptions.ExchangeName, "direct", durable: true ); if (properties == null) { properties = channel.CreateBasicProperties(); properties.DeliveryMode = RabbitMqConsts.DeliveryModes.Persistent; } if (properties.MessageId.IsNullOrEmpty()) { properties.MessageId = (eventId ?? GuidGenerator.Create()).ToString("N"); } SetEventMessageHeaders(properties, headersArguments); channel.BasicPublish( exchange: AbpRabbitMqEventBusOptions.ExchangeName, routingKey: eventName, mandatory: true, basicProperties: properties, body: body ); } return Task.CompletedTask; }
整个分布式事件的实现其实非常简单,在事件发生时发布者只需要定义好路由名称和消息内容发送RabbitMQ中,而消费者则是在项目运行的时候的通过调用Initialize
就启动起来了。
这里我们也同样根据整个原理自己实现一下这个流程。
在Dppt.EventBus
分别定义IDistributedEventBus、DistributedEventBusOptions、IDistributedEventHandler
分别用于采用分布式事件总线调用、配置选项用于存储处理程序Handler、定义分布式处理程序抽象。
新建Dppt.EventBus.RabbitMQ
类库先简单对RabbitMQ进行一个简单的封装
public class RabbitMqConnections : IRabbitMqConnections { private readonly IConnectionFactory _connectionFactory; private readonly ILogger<RabbitMqConnections> _logger; IConnection _connection; bool _disposed; public RabbitMqConnections(IConnectionFactory connectionFactory, ILogger<RabbitMqConnections> logger) { _connectionFactory = connectionFactory; _logger = logger; } public bool IsConnected { get { return _connection != null && _connection.IsOpen && !_disposed; } } public void TryConnect() { _connection = _connectionFactory.CreateConnection(); } public IModel CreateModel() { if (!IsConnected) { throw new InvalidOperationException("No RabbitMQ connections are available to perform this action"); } return _connection.CreateModel(); } public void Dispose() { if (_disposed) return; _disposed = true; try { _connection.Dispose(); } catch (IOException ex) { _logger.LogCritical(ex.ToString()); } } }
然后我们分别定义ExchangeDeclareConfiguration、QueueDeclareConfiguration
用于记录配置信息。
开始处理RabbitMqEventBus处理程序首先是发布事件,大体代码如下就是往RabbitMQ里面丢消息。
/// <summary> /// rabbmitmq 连接服务 /// </summary> public readonly IRabbitMqConnections _rabbitMqConnections; public Task PublishAsync<TEvent>(TEvent eventData) { var eventName = EventNameAttribute.GetNameOrDefault(typeof(TEvent)); var body = JsonSerializer.Serialize(eventData); return PublishAsync(eventName, body, null, null); } public Task PublishAsync(string eventName, string body, IBasicProperties properties, Dictionary<string, object> headersArguments = null, Guid? eventId = null) { if (!_rabbitMqConnections.IsConnected) { _rabbitMqConnections.TryConnect(); } using (var channel = _rabbitMqConnections.CreateModel()) { // durable 设置队列持久化 channel.ExchangeDeclare(RabbitMqEventBusOptions.ExchangeName, "direct", durable: true); if (properties == null) { properties = channel.CreateBasicProperties(); // 设置消息持久化 properties.DeliveryMode = 2; } if (properties.MessageId.IsNullOrEmpty()) { // 消息的唯一性标识 properties.MessageId = (eventId ?? Guid.NewGuid()).ToString("N"); } SetEventMessageHeaders(properties, headersArguments); channel.BasicPublish( exchange: RabbitMqEventBusOptions.ExchangeName, routingKey: eventName, mandatory: true, basicProperties: properties, body: Encoding.UTF8.GetBytes(body) ); } return Task.CompletedTask; } private void SetEventMessageHeaders(IBasicProperties properties, Dictionary<string, object> headersArguments) { if (headersArguments == null) { return; } properties.Headers ??= new Dictionary<string, object>(); foreach (var header in headersArguments) { properties.Headers[header.Key] = header.Value; } }
然后就是消费者的处理,我们同样定义Initialize
函数,并简化部分封装代码,完成消费者启动。
public void Initialize() { Exchange = new ExchangeDeclareConfiguration(RabbitMqEventBusOptions.ExchangeName,"direct",true); Queue = new QueueDeclareConfiguration(RabbitMqEventBusOptions.ClientName, true, false, false); // 启动一个消费者 if (!_rabbitMqConnections.IsConnected) { _rabbitMqConnections.TryConnect(); } try { Channel = _rabbitMqConnections.CreateModel(); Channel.ExchangeDeclare( exchange: Exchange.ExchangeName, type: Exchange.Type, durable: Exchange.Durable, autoDelete: Exchange.AutoDelete, arguments: Exchange.Arguments ); Channel.QueueDeclare( queue: Queue.QueueName, durable: Queue.Durable, exclusive: Queue.Exclusive, autoDelete: Queue.AutoDelete, arguments: Queue.Arguments ); var consumer = new AsyncEventingBasicConsumer(Channel); consumer.Received += HandleIncomingMessageAsync; Channel.BasicConsume( queue: Queue.QueueName, autoAck: false, consumer: consumer ); SubscribeHandlers(DistributedEventBusOptions.Handlers); } catch (Exception ex) { Console.WriteLine("Error:" + ex.Message); } }
参数配置这边主要是读取AppSetting信息和索要Handler
public static class DpptEventBusRabbitMqRegistrar { public static void AddDpptEventBusRabbitMq(this IServiceCollection services, IConfiguration configuration, List<Type> types) { services.AddSingleton<IRabbitMqConnections>(sp => { var logger = sp.GetRequiredService<ILogger<RabbitMqConnections>>(); var factory = new ConnectionFactory() { HostName = configuration["RabbitMQ:EventBusConnection"], VirtualHost = configuration["RabbitMQ:EventBusVirtualHost"], DispatchConsumersAsync = true, AutomaticRecoveryEnabled = true }; if (!string.IsNullOrEmpty(configuration["RabbitMQ:EventBusUserName"])) { factory.UserName = configuration["RabbitMQ:EventBusUserName"]; } if (!string.IsNullOrEmpty(configuration["RabbitMQ:EventBusPassword"])) { factory.Password = configuration["RabbitMQ:EventBusPassword"]; } return new RabbitMqConnections(factory, logger); }); var distributedHandlers = types; foreach (var item in distributedHandlers) { services.AddSingleton(item); } services.Configure<DistributedEventBusOptions>(options => { options.Handlers.AddIfNotContains(distributedHandlers); }); services.Configure<DpptRabbitMqEventBusOptions>(options => { options.ExchangeName = configuration["RabbitMQ:EventBus:ExchangeName"]; options.ClientName = configuration["RabbitMQ:EventBus:ClientName"]; }); services.AddSingleton<IDistributedEventBus, RabbitMqDistributedEventBus>(); } }
新建一个空项目,进行插件注册,然后创建ETO和Handler进行测试。
测试结果放在下面了。
本次挑选了一个比较简单的示例来讲,整个EventBus我应该分成3篇 下一篇我来讲分布式事务。
最后欢迎各位读者关注我的博客, https://github.com/MrChuJiu/Dppt/tree/master/src 欢迎大家Star
另外这里有个社区地址(https://github.com/MrChuJiu/Dppt/discussions),如果大家有技术点希望我提前档期可以写在这里,希望本项目助力我们一起成长