ASP.NET Core Web API下事件驱动型架构的实现(三):基于RabbitMQ的事件总线

在上文中,我们讨论了事件处理器中对象生命周期的问题,在进入新的讨论之前,首先让我们总结一下,我们已经实现了哪些内容。下面的类图描述了我们已经实现的组件及其之间的关系,貌似系统已经变得越来越复杂了。

class_diagram_chapter2

其中绿色的部分就是上文中新实现的部分,包括一个简单的Event Store,一个事件处理器执行上下文的接口,以及一个基于ASP.NET Core依赖注入框架的执行上下文的实现。接下来,我们打算淘汰PassThroughEventBus,然后基于RabbitMQ实现一套新的事件总线。

事件总线的重构

根据前面的结论,事件总线的执行需要依赖于事件处理器执行上下文,也就是上面类图中PassThroughEventBus对于IEventHandlerExecutionContext的引用。更具体些,是在事件总线订阅某种类型的事件时,需要将事件处理器注册到IEventHandlerExecutionContext中。那么在实现RabbitMQ时,也会有着类似的设计需求,即RabbitMQEventBus也需要依赖IEventHandlerExecutionContext接口,以保证事件处理器生命周期的合理性。

为此,我们新建一个基类:BaseEventBus,并将这部分公共的代码提取出来,需要注意以下几点:

  1. 通过BaseEventBus的构造函数传入IEventHandlerExecutionContext实例,也就限定了所有子类的实现中,必须在构造函数中传入IEventHandlerExecutionContext实例,这对于框架的设计非常有利:在实现新的事件总线时,框架的使用者无需查看API文档,即可知道事件总线与IEventHandlerExecutionContext之间的关系,这符合SOLID原则中的Open/Closed Principle
  2. BaseEventBus的实现应该放在EdaSample.Common程序集中,更确切地说,它应该放在EdaSample.Common.Events命名空间下,因为它是属于框架级别的组件,并且不会依赖任何基础结构层的组件

BaseEventBus的代码如下:

public abstract class BaseEventBus : IEventBus
{
    protected readonly IEventHandlerExecutionContext eventHandlerExecutionContext;

    protected BaseEventBus(IEventHandlerExecutionContext eventHandlerExecutionContext)
    {
        this.eventHandlerExecutionContext = eventHandlerExecutionContext;
    }

    public abstract Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default) where TEvent : IEvent;

    public abstract void Subscribe<TEvent, TEventHandler>()
        where TEvent : IEvent
        where TEventHandler : IEventHandler<TEvent>;
	
	// Disposable接口实现代码省略
}

在上面的代码中,PublishAsync和Subscribe方法是抽象方法,以便子类根据不同的需要来实现。

接下来就是调整PassThroughEventBus,使其继承于BaseEventBus:

public sealed class PassThroughEventBus : BaseEventBus
{
    private readonly EventQueue eventQueue = new EventQueue();
    private readonly ILogger logger;

    public PassThroughEventBus(IEventHandlerExecutionContext context,
        ILogger<PassThroughEventBus> logger)
        : base(context)
    {
        this.logger = logger;
        logger.LogInformation($"PassThroughEventBus构造函数调用完成。Hash Code:{this.GetHashCode()}.");

        eventQueue.EventPushed += EventQueue_EventPushed;
    }

    private async void EventQueue_EventPushed(object sender, EventProcessedEventArgs e)
        => await this.eventHandlerExecutionContext.HandleEventAsync(e.Event);

    public override Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default)
    {
        return Task.Factory.StartNew(() => eventQueue.Push(@event));
    }

    public override void Subscribe<TEvent, TEventHandler>()
    {
        if (!this.eventHandlerExecutionContext.HandlerRegistered<TEvent, TEventHandler>())
        {
            this.eventHandlerExecutionContext.RegisterHandler<TEvent, TEventHandler>();
        }
    }
	
    // Disposable接口实现代码省略
}

代码都很简单,也就不多做说明了,接下来,我们开始实现RabbitMQEventBus。

RabbitMQEventBus的实现

首先需要新建一个.NET Standard 2.0的项目,使用.NET Standard 2.0的项目模板所创建的项目,可以同时被.NET Framework 4.6.1或者.NET Core 2.0的应用程序所引用。创建新的类库项目的目的,是因为RabbitMQEventBus的实现需要依赖RabbitMQ C#开发库这个外部引用。因此,为了保证框架核心的纯净和稳定,需要在新的类库项目中实现RabbitMQEventBus。

Note:对于RabbitMQ及其C#库的介绍,本文就不再涉及了,网上有很多资料和文档,博客园有很多朋友在这方面都有使用经验分享,RabbitMQ官方文档也写得非常详细,当然是英文版的,如果英语比较好的话,建议参考官方文档。

以下就是在EdaSample案例中,RabbitMQEventBus的实现,我们先读一读代码,再对这部分代码做些分析。

public class RabbitMQEventBus : BaseEventBus
{
    private readonly IConnectionFactory connectionFactory;
    private readonly IConnection connection;
    private readonly IModel channel;
    private readonly string exchangeName;
    private readonly string exchangeType;
    private readonly string queueName;
    private readonly bool autoAck;
    private readonly ILogger logger;
    private bool disposed;

    public RabbitMQEventBus(IConnectionFactory connectionFactory,
        ILogger<RabbitMQEventBus> logger,
        IEventHandlerExecutionContext context,
        string exchangeName,
        string exchangeType = ExchangeType.Fanout,
        string queueName = null,
        bool autoAck = false)
        : base(context)
    {
        this.connectionFactory = connectionFactory;
        this.logger = logger;
        this.connection = this.connectionFactory.CreateConnection();
        this.channel = this.connection.CreateModel();
        this.exchangeType = exchangeType;
        this.exchangeName = exchangeName;
        this.autoAck = autoAck;

        this.channel.ExchangeDeclare(this.exchangeName, this.exchangeType);

        this.queueName = this.InitializeEventConsumer(queueName);

        logger.LogInformation($"RabbitMQEventBus构造函数调用完成。Hash Code:{this.GetHashCode()}.");
    }

    public override Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default(CancellationToken))
    {
        var json = JsonConvert.SerializeObject(@event, new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All });
        var eventBody = Encoding.UTF8.GetBytes(json);
        channel.BasicPublish(this.exchangeName,
            @event.GetType().FullName,
            null,
            eventBody);
        return Task.CompletedTask;
    }

    public override void Subscribe<TEvent, TEventHandler>()
    {
        if (!this.eventHandlerExecutionContext.HandlerRegistered<TEvent, TEventHandler>())
        {
            this.eventHandlerExecutionContext.RegisterHandler<TEvent, TEventHandler>();
            this.channel.QueueBind(this.queueName, this.exchangeName, typeof(TEvent).FullName);
        }
    }

    protected override void Dispose(bool disposing)
    {
        if (!disposed)
        {
            if (disposing)
            {
                this.channel.Dispose();
                this.connection.Dispose();

                logger.LogInformation($"RabbitMQEventBus已经被Dispose。Hash Code:{this.GetHashCode()}.");
            }

            disposed = true;
            base.Dispose(disposing);
        }
    }

    private string InitializeEventConsumer(string queue)
    {
        var localQueueName = queue;
        if (string.IsNullOrEmpty(localQueueName))
        {
            localQueueName = this.channel.QueueDeclare().QueueName;
        }
        else
        {
            this.channel.QueueDeclare(localQueueName, true, false, false, null);
        }

        var consumer = new EventingBasicConsumer(this.channel);
        consumer.Received += async (model, eventArgument) =>
        {
            var eventBody = eventArgument.Body;
            var json = Encoding.UTF8.GetString(eventBody);
            var @event = (IEvent)JsonConvert.DeserializeObject(json, new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All });
            await this.eventHandlerExecutionContext.HandleEventAsync(@event);
            if (!autoAck)
            {
                channel.BasicAck(eventArgument.DeliveryTag, false);
            }
        };

        this.channel.BasicConsume(localQueueName, autoAck: this.autoAck, consumer: consumer);

        return localQueueName;
    }
}

阅读上面的代码,需要注意以下几点:

  1. 正如上面所述,构造函数需要接受IEventHandlerExecutionContext对象,并通过构造函数的base调用,将该对象传递给基类
  2. 构造函数中,queueName参数是可选参数,也就是说:
    1. 如果通过RabbitMQEventBus发送事件消息,则无需指定queueName参数,仅需指定exchangeName即可,因为在RabbitMQ中,消息的发布方无需知道消息是发送到哪个队列中
    2. 如果通过RabbitMQEventBus接收事件消息,那么也分两种情况:
      1. 如果两个进程在使用RabbitMQEventBus时,同时指定了queueName参数,并且queueName的值相同,那么这两个进程将会轮流处理路由至queueName队列的消息
      2. 如果两个进程在使用RabbitMQEventBus时,同时指定了queueName参数,但queueName的值不相同,或者都没有指定queueName参数,那么这两个进程将会同时处理路由至queueName队列的消息
    3. 有关Exchange和Queue的概念,请参考RabbitMQ的官方文档
  3. 在Subscribe方法中,除了将事件处理器注册到事件处理器执行上下文之外,还通过QueueBind方法,将指定的队列绑定到Exchange上
  4. 事件数据都通过Newtonsoft.Json进行序列化和反序列化,使用TypeNameHandling.All这一设定,使得序列化的JSON字符串中带有类型名称信息。在此处这样做既是合理的,又是必须的,因为如果没有带上类型名称的信息,JsonConvert.DeserializeObject反序列化时,将无法判定得到的对象是否可以转换为IEvent对象,这样就会出现异常。但如果是实现一个更为通用的消息系统,应用程序派发出去的事件消息可能还会被由Python或者Java所实现的应用程序所使用,那么对于这些应用,它们并不知道Newtonsoft.Json是什么,也无法通过Newtonsoft.Json加入的类型名称来获知事件消息的初衷(Intent),Newtonsoft.Json所带的类型信息又会显得冗余。因此,简单地使用Newtonsoft.Json作为事件消息的序列化、反序列化工具,其实是欠妥的。更好的做法是,实现自定义的消息序列化、反序列化器,在进行序列化的时候,将.NET相关的诸如类型信息等,作为Metadata(元数据)附着在序列化的内容上。理论上说,在序列化的数据中加上一些元数据信息是合理的,只不过我们对这些元数据做一些标注,表明它是由.NET框架产生的,第三方系统如果不关心这些信息,可以对元数据不做任何处理
  5. 在Dispose方法中,注意将RabbitMQ所使用的资源dispose掉

使用RabbitMQEventBus

在Customer服务中,使用RabbitMQEventBus就非常简单了,只需要引用RabbitMQEventBus的程序集,然后在Startup.cs文件的ConfigureServices方法中,替换PassThroughEventBus的使用即可:

public void ConfigureServices(IServiceCollection services)
{
    this.logger.LogInformation("正在对服务进行配置...");

    services.AddMvc();

    services.AddTransient<IEventStore>(serviceProvider => 
        new DapperEventStore(Configuration["mssql:connectionString"], 
            serviceProvider.GetRequiredService<ILogger<DapperEventStore>>()));

    var eventHandlerExecutionContext = new EventHandlerExecutionContext(services, 
        sc => sc.BuildServiceProvider());
    services.AddSingleton<IEventHandlerExecutionContext>(eventHandlerExecutionContext);
    // services.AddSingleton<IEventBus, PassThroughEventBus>();

    var connectionFactory = new ConnectionFactory { HostName = "localhost" };
    services.AddSingleton<IEventBus>(sp => new RabbitMQEventBus(connectionFactory,
        sp.GetRequiredService<ILogger<RabbitMQEventBus>>(),
        sp.GetRequiredService<IEventHandlerExecutionContext>(),
        RMQ_EXCHANGE,
        queueName: RMQ_QUEUE));

    this.logger.LogInformation("服务配置完成,已注册到IoC容器!");
}

Note:一种更好的做法是通过配置文件来配置IoC容器,在曾经的Microsoft Patterns and Practices Enterprise Library Unity Container中,使用配置文件是很方便的。这样只需要Customer服务能够通过配置文件来配置IoC容器,同时只需要让Customer服务依赖(注意,不是程序集引用)于不同的事件总线的实现即可,无需对Customer服务重新编译。

下面来验证一下效果。首先确保RabbitMQ已经配置并启动妥当,我是安装在本地机器上,使用默认安装。首先启动ASP.NET Core Web API,然后通过Powershell发起两次创建Customer的请求:

image

查看一下数据库是否更新正常:

image

并检查一下日志信息:

image

RabbitMQ中Exchange的信息:

image

总结

本文提供了一种RabbitMQEventBus的实现,目前来说是够用的,而且这种实现是可以使用在实际项目当中的。在实际使用中,或许也会碰到一些与RabbitMQ本身有关的问题,这就需要具体问题具体分析了。此外,本文没有涉及事件消息丢失、重发然后保证最终一致性的问题,这些内容会在后面讨论。从下文开始,我们着手逐步实现CQRS架构的领域事件和事件存储部分。

源代码的使用

本系列文章的源代码在https://github.com/daxnet/edasample这个Github Repo里,通过不同的release tag来区分针对不同章节的源代码。本文的源代码请参考chapter_3这个tag,如下:

image

欢迎访问我的博客新站:http://sunnycoding.net


原文地址:https://www.cnblogs.com/daxnet/p/aspnetcore-eda-part3.html

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


在上文中,我介绍了事件驱动型架构的一种简单的实现,并演示了一个完整的事件派发、订阅和处理的流程。这种实现太简单了,百十行代码就展示了一个基本工作原理。然而,要将这样的解决方案运用到实际生产环境,还有很长的路要走。今天,我们就研究一下在事件处理器中,对象生命周期的管理问题。事实上,不仅仅是在事件处理器
上文已经介绍了Identity Service的实现过程。今天我们继续,实现一个简单的Weather API和一个基于Ocelot的API网关。 回顾 《Angular SPA基于Ocelot API网关与IdentityServer4的身份认证与授权(一)》 Weather API Weather
最近我为我自己的应用开发框架Apworks设计了一套案例应用程序,并以Apache 2.0开源,开源地址是:https://github.com/daxnet/apworks-examples,目的是为了让大家更为方便地学习和使用.NET Core、最新的前端开发框架Angular,以及Apwork
HAL(Hypertext Application Language,超文本应用语言)是一种RESTful API的数据格式风格,为RESTful API的设计提供了接口规范,同时也降低了客户端与服务端接口的耦合度。很多当今流行的RESTful API开发框架,包括Spring REST,也都默认支
在前面两篇文章中,我详细介绍了基本事件系统的实现,包括事件派发和订阅、通过事件处理器执行上下文来解决对象生命周期问题,以及一个基于RabbitMQ的事件总线的实现。接下来对于事件驱动型架构的讨论,就需要结合一个实际的架构案例来进行分析。在领域驱动设计的讨论范畴,CQRS架构本身就是事件驱动的,因此,
HAL,全称为Hypertext Application Language,它是一种简单的数据格式,它能以一种简单、统一的形式,在API中引入超链接特性,使得API的可发现性(discoverable)更强,并具有自描述的特点。使用了HAL的API会更容易地被第三方开源库所调用,并且使用起来也很方便
何时使用领域驱动设计?其实当你的应用程序架构设计是面向业务的时候,你已经开始使用领域驱动设计了。领域驱动设计既不是架构风格(Architecture Style),也不是架构模式(Architecture Pattern),它也不是一种软件开发方法论,所以,是否应该使用领域驱动设计,以及什么时候使用
《在ASP.NET Core中使用Apworks快速开发数据服务》一文中,我介绍了如何使用Apworks框架的数据服务来快速构建用于查询和管理数据模型的RESTful API,通过该文的介绍,你会看到,使用Apworks框架开发数据服务是何等简单快捷,提供的功能也非常多,比如对Hypermedia的
在上一讲中,我们已经完成了一个完整的案例,在这个案例中,我们可以通过Angular单页面应用(SPA)进行登录,然后通过后端的Ocelot API网关整合IdentityServer4完成身份认证。在本讲中,我们会讨论在当前这种架构的应用程序中,如何完成用户授权。 回顾 《Angular SPA基于
Keycloak是一个功能强大的开源身份和访问管理系统,提供了一整套解决方案,包括用户认证、单点登录(SSO)、身份联合、用户注册、用户管理、角色映射、多因素认证和访问控制等。它广泛应用于企业和云服务,可以简化和统一不同应用程序和服务的安全管理,支持自托管或云部署,适用于需要安全、灵活且易于扩展的用
3月7日,微软发布了Visual Studio 2017 RTM,与之一起发布的还有.NET Core Runtime 1.1.0以及.NET Core SDK 1.0.0,尽管这些并不是最新版,但也已经从preview版本升级到了正式版。所以,在安装Visual Studio 2017时如果启用了
在上文中,我介绍了如何在Ocelot中使用自定义的中间件来修改下游服务的response body。今天,我们再扩展一下设计,让我们自己设计的中间件变得更为通用,使其能够应用在不同的Route上。比如,我们可以设计一个通用的替换response body的中间件,然后将其应用在多个Route上。 O
不少关注我博客的朋友都知道我在2009年左右开发过一个名为Apworks的企业级应用程序开发框架,旨在为分布式企业系统软件开发提供面向领域驱动(DDD)的框架级别的解决方案,并对多种系统架构风格提供支持。这个框架的开发和维护我坚持了很久,一直到2015年,我都一直在不停地重构这个项目。目前这个项目在
好吧,这个题目我也想了很久,不知道如何用最简单的几个字来概括这篇文章,原本打算取名《Angular单页面应用基于Ocelot API网关与IdentityServer4ʺSP.NET Identity实现身份认证与授权》,然而如你所见,这样的名字实在是太长了。所以,我不得不缩写“单页面应用”几个字
在前面两篇文章中,我介绍了基于IdentityServer4的一个Identity Service的实现,并且实现了一个Weather API和基于Ocelot的API网关,然后实现了通过Ocelot API网关整合Identity Service做身份认证的API请求。今天,我们进入前端开发,设计
Ocelot是ASP.NET Core下的API网关的一种实现,在微服务架构领域发挥了非常重要的作用。本文不会从整个微服务架构的角度来介绍Ocelot,而是介绍一下最近在学习过程中遇到的一个问题,以及如何使用中间件(Middleware)来解决这样的问题。 问题描述 在上文中,我介绍了一种在Angu
在大数据处理和人工智能时代,数据工厂(Data Factory)无疑是一个非常重要的大数据处理平台。市面上也有成熟的相关产品,比如Azure Data Factory,不仅功能强大,而且依托微软的云计算平台Azure,为大数据处理提供了强大的计算能力,让大数据处理变得更为稳定高效。由于工作中我的项目
在上文中,我们讨论了事件处理器中对象生命周期的问题,在进入新的讨论之前,首先让我们总结一下,我们已经实现了哪些内容。下面的类图描述了我们已经实现的组件及其之间的关系,貌似系统已经变得越来越复杂了。其中绿色的部分就是上文中新实现的部分,包括一个简单的Event Store,一个事件处理器执行上下文的接
在之前《在ASP.NET Core中使用Apworks快速开发数据服务》一文的评论部分,.NET大神张善友为我提了个建议,可以使用Compile As a Service的Roslyn为语法解析提供支持。在此非常感激友哥给我的建议,也让我了解了一些Roslyn的知识。使用Roslyn的一个很大的好处
很长一段时间以来,我都在思考如何在ASP.NET Core的框架下,实现一套完整的事件驱动型架构。这个问题看上去有点大,其实主要目标是为了实现一个基于ASP.NET Core的微服务,它能够非常简单地订阅来自于某个渠道的事件消息,并对接收到的消息进行处理,于此同时,它还能够向该渠道发送事件消息,以便