.Net Core微服务入门全纪录六——EventBus-事件总线

Tips:本篇已加入系列文章阅读目录,可点击查看更多相关文章。

前言

上一篇【.Net Core微服务入门全纪录(五)——Ocelot-API网关(下)】中已经完成了Ocelot + Consul的搭建,这一篇简单说一下EventBus。

EventBus-事件总线

  • 首先,什么是事件总线呢?

贴一段引用:

事件总线是对观察者(发布-订阅)模式的一种实现。它是一种集中式事件处理机制,允许不同的组件之间进行彼此通信而又不需要相互依赖,达到一种解耦的目的。

如果没有接触过EventBus,可能不太好理解。其实EventBus在客户端开发中应用非常广泛(android,ios,web前端等),用于多个组件(或者界面)之间的相互通信,懂的人都懂。。。

  • 那么,我们为什么要用EventBus呢?

就拿当前的项目举例,我们有一个订单服务,一个产品服务。客户端有一个下单功能,当用户下单时,调用订单服务的下单接口,那么下单接口需要调用产品服务的减库存接口,这涉及到服务与服务之间的调用。那么服务之间又怎么调用呢?直接RESTAPI?或者效率更高的gRPC?可能这两者各有各的使用场景,但是他们都存在一个服务之间的耦合问题,或者难以做到异步调用。

试想一下:假设我们下单时调用订单服务,订单服务需要调用产品服务,产品服务又要调用物流服务,物流服务再去调用xx服务 等等。。。如果每个服务处理时间需要2s,不使用异步的话,那这种体验可想而知。

如果使用EventBus的话,那么订单服务只需要向EventBus发一个“下单事件”就可以了。产品服务会订阅“下单事件”,当产品服务收到下单事件时,自己去减库存就好了。这样就避免了两个服务之间直接调用的耦合性,并且真正做到了异步调用。

既然涉及到多个服务之间的异步调用,那么就不得不提分布式事务。分布式事务并不是微服务独有的问题,而是所有的分布式系统都会存在的问题。
关于分布式事务,可以查一下“CAP原则”和“BASE理论”了解更多。当今的分布式系统更多的会追求事务的最终一致性。

下面使用国人开发的优秀项目“CAP”,来演示一下EventBus的基本使用。之所以使用“CAP”是因为它既能解决分布式系统的最终一致性,同时又是一个EventBus,它具备EventBus的所有功能!
作者介绍:https://www.cnblogs.com/savorboard/p/cap.html

CAP使用

  • 环境准备

在Docker中准备一下需要的环境,首先是数据库,数据库我使用PostgreSQL,用别的也行。CAP支持:SqlServer,MySql,PostgreSql,MongoDB。
关于在Docker中运行PostgreSQL可以看我的另一篇博客:https://www.cnblogs.com/xhznl/p/13155054.html

然后是MQ,这里我使用RabbitMQ,Kafka也可以。
Docker运行RabbitMQ:

docker pull rabbitmq:management
docker run -d -p 15672:15672 -p 5672:5672 --name rabbitmq rabbitmq:management

默认用户:guest,密码:guest

环境准备就完成了,Docker就是这么方便。。。

  • 代码修改:

为了模拟以上业务,需要修改大量代码,下面代码如有遗漏的直接去github找。

NuGet安装:

Microsoft.EntityFrameworkCore
Microsoft.EntityFrameworkCore.Tools
Npgsql.EntityFrameworkCore.PostgreSQL

CAP相关:

DotNetCore.CAP
DotNetCore.CAP.RabbitMQ
DotNetCore.CAP.PostgreSql

Order.API/Controllers/OrdersController.cs增加下单接口:

[Route("[controller]")]
[ApiController]
public class OrdersController : ControllerBase
{
    private readonly ILogger<OrdersController> _logger;
    private readonly IConfiguration _configuration;
    private readonly ICapPublisher _capBus;
    private readonly OrderContext _context;

    public OrdersController(ILogger<OrdersController> logger,IConfiguration configuration,ICapPublisher capPublisher,OrderContext context)
    {
        _logger = logger;
        _configuration = configuration;
        _capBus = capPublisher;
        _context = context;
    }

    [HttpGet]
    public IActionResult Get()
    {
        string result = $"【订单服务】{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}——" +
            $"{Request.HttpContext.Connection.LocalIpAddress}:{_configuration["ConsulSetting:ServicePort"]}";
        return Ok(result);
    }

    /// <summary>
    /// 下单 发布下单事件
    /// </summary>
    /// <param name="order"></param>
    /// <returns></returns>
    [Route("Create")]
    [HttpPost]
    public async Task<IActionResult> CreateOrder(Models.Order order)
    {
        using (var trans = _context.Database.BeginTransaction(_capBus,autoCommit: true))
        {
            //业务代码
            order.CreateTime = DateTime.Now;
            _context.Orders.Add(order);

            var r = await _context.SaveChangesAsync() > 0;

            if (r)
            {
                //发布下单事件
                await _capBus.PublishAsync("order.services.createorder",new CreateOrderMessageDto() { Count = order.Count,ProductID = order.ProductID });
                return Ok();
            }
            return BadRequest();
        }

    }

}

Order.API/MessageDto/CreateOrderMessageDto.cs:

/// <summary>
/// 下单事件消息
/// </summary>
public class CreateOrderMessageDto
{
    /// <summary>
    /// 产品ID
    /// </summary>
    public int ProductID { get; set; }

    /// <summary>
    /// 购买数量
    /// </summary>
    public int Count { get; set; }
}

Order.API/Models/Order.cs订单实体类:

public class Order
{
    [Key]
    [DatabaseGenerated(DatabaseGeneratedOption.Identity)]
    public int ID { get; set; }

    /// <summary>
    /// 下单时间
    /// </summary>
    [Required]
    public DateTime CreateTime { get; set; }

    /// <summary>
    /// 产品ID
    /// </summary>
    [Required]
    public int ProductID { get; set; }

    /// <summary>
    /// 购买数量
    /// </summary>
    [Required]
    public int Count { get; set; }
}

Order.API/Models/OrderContext.cs数据库Context:

public class OrderContext : DbContext
{
    public OrderContext(DbContextOptions<OrderContext> options)
       : base(options)
    {

    }

    public DbSet<Order> Orders { get; set; }

    protected override void OnModelCreating(ModelBuilder modelBuilder)
    {

    }
}

Order.API/appsettings.json增加数据库连接字符串:

"ConnectionStrings": {
  "OrderContext": "User ID=postgres;Password=pg123456;Host=host.docker.internal;Port=5432;Database=Order;Pooling=true;"
}

Order.API/Startup.cs修改ConfigureServices方法,添加Cap配置:

public void ConfigureServices(IServiceCollection services)
{
    services.AddControllers();

    services.AddDbContext<OrderContext>(opt => opt.UseNpgsql(Configuration.GetConnectionString("OrderContext")));

    //CAP
    services.AddCap(x =>
    {
        x.UseEntityFramework<OrderContext>();

        x.UseRabbitMQ("host.docker.internal");
    });
}


以上是订单服务的修改。

Product.API/Controllers/ProductsController.cs增加减库存接口:

[Route("[controller]")]
[ApiController]
public class ProductsController : ControllerBase
{
    private readonly ILogger<ProductsController> _logger;
    private readonly IConfiguration _configuration;
    private readonly ICapPublisher _capBus;
    private readonly ProductContext _context;

    public ProductsController(ILogger<ProductsController> logger,ProductContext context)
    {
        _logger = logger;
        _configuration = configuration;
        _capBus = capPublisher;
        _context = context;
    }

    [HttpGet]
    public IActionResult Get()
    {
        string result = $"【产品服务】{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}——" +
            $"{Request.HttpContext.Connection.LocalIpAddress}:{_configuration["ConsulSetting:ServicePort"]}";
        return Ok(result);
    }

    /// <summary>
    /// 减库存 订阅下单事件
    /// </summary>
    /// <param name="message"></param>
    /// <returns></returns>
    [NonAction]
    [CapSubscribe("order.services.createorder")]
    public async Task ReduceStock(CreateOrderMessageDto message)
    {
        //业务代码
        var product = await _context.Products.FirstOrDefaultAsync(p => p.ID == message.ProductID);
        product.Stock -= message.Count;

        await _context.SaveChangesAsync();
    }

}

Product.API/MessageDto/CreateOrderMessageDto.cs:

/// <summary>
/// 下单事件消息
/// </summary>
public class CreateOrderMessageDto
{
    /// <summary>
    /// 产品ID
    /// </summary>
    public int ProductID { get; set; }

    /// <summary>
    /// 购买数量
    /// </summary>
    public int Count { get; set; }
}

Product.API/Models/Product.cs产品实体类:

public class Product
{
    [Key]
    [DatabaseGenerated(DatabaseGeneratedOption.Identity)]
    public int ID { get; set; }

    /// <summary>
    /// 产品名称
    /// </summary>
    [Required]
    [Column(TypeName = "VARCHAR(16)")]
    public string Name { get; set; }

    /// <summary>
    /// 库存
    /// </summary>
    [Required]
    public int Stock { get; set; }
}

Product.API/Models/ProductContext.cs数据库Context:

public class ProductContext : DbContext
{
    public ProductContext(DbContextOptions<ProductContext> options)
       : base(options)
    {

    }

    public DbSet<Product> Products { get; set; }

    protected override void OnModelCreating(ModelBuilder modelBuilder)
    {
        base.OnModelCreating(modelBuilder);
        
        //初始化种子数据
        modelBuilder.Entity<Product>().HasData(new Product
        {
            ID = 1,Name = "产品1",Stock = 100
        },new Product
        {
            ID = 2,Name = "产品2",Stock = 100
        });
    }
}

Product.API/appsettings.json增加数据库连接字符串:

"ConnectionStrings": {
  "ProductContext": "User ID=postgres;Password=pg123456;Host=host.docker.internal;Port=5432;Database=Product;Pooling=true;"
}

Product.API/Startup.cs修改ConfigureServices方法,添加Cap配置:

public void ConfigureServices(IServiceCollection services)
{
    services.AddControllers();

    services.AddDbContext<ProductContext>(opt => opt.UseNpgsql(Configuration.GetConnectionString("ProductContext")));

    //CAP
    services.AddCap(x =>
    {
        x.UseEntityFramework<ProductContext>();

        x.UseRabbitMQ("host.docker.internal");
    });
}


以上是产品服务的修改。

订单服务和产品服务的修改到此就完成了,看着修改很多,其实功能很简单。就是各自增加了自己的数据库表,然后订单服务增加了下单接口,下单接口会发出“下单事件”。产品服务增加了减库存接口,减库存接口会订阅“下单事件”。然后客户端调用下单接口下单时,产品服务会减去相应的库存,功能就这么简单。

关于EF数据库迁移之类的基本使用就不介绍了。使用Docker重新构建镜像,运行订单服务,产品服务:

docker build -t orderapi:1.1 -f ./Order.API/Dockerfile .
docker run -d -p 9060:80 --name orderservice orderapi:1.1 --ConsulSetting:ServicePort="9060"
docker run -d -p 9061:80 --name orderservice1 orderapi:1.1 --ConsulSetting:ServicePort="9061"
docker run -d -p 9062:80 --name orderservice2 orderapi:1.1 --ConsulSetting:ServicePort="9062"

docker build -t productapi:1.1 -f ./Product.API/Dockerfile .
docker run -d -p 9050:80 --name productservice productapi:1.1 --ConsulSetting:ServicePort="9050"
docker run -d -p 9051:80 --name productservice1 productapi:1.1 --ConsulSetting:ServicePort="9051"
docker run -d -p 9052:80 --name productservice2 productapi:1.1 --ConsulSetting:ServicePort="9052"

最后 Ocelot.APIGateway/ocelot.json 增加一条路由配置:

好了,进行到这里,整个环境就有点复杂了。确保我们的PostgreSQL,RabbitMQ,Consul,Gateway,服务实例都正常运行。

服务实例运行成功后,数据库应该是这样的:





产品表种子数据:

cap.published表和cap.received表是由CAP自动生成的,它内部是使用本地消息表+MQ来实现异步确保。

运行测试

这次使用Postman作为客户端调用下单接口(9070是之前的Ocelot网关端口):

订单库published表:


订单库order表:

产品库received表:


产品库product表:

再试一下:


OK,完成。虽然功能很简单,但是我们实现了服务的解耦,异步调用,和最终一致性。

总结

注意,上面的例子纯粹是为了说明EventBus的使用,实际中的下单流程绝对不会这么做的!希望大家不要较真。。。

可能有人会说如果下单成功,但是库存不足导致减库存失败了怎么办,是不是要回滚订单表的数据?如果产生这种想法,说明还没有真正理解最终一致性的思想。首先下单前肯定会检查一下库存数量,既然允许下单那么必然是库存充足的。这里的事务是指:订单保存到数据库,和下单事件保存到cap.published表(保存到cap.published表理论上就能够发送到MQ)这两件事情,要么一同成功,要么一同失败。如果这个事务成功,那么就可以认为这个业务流程是成功的,至于产品服务的减库存是否成功那就是产品服务的事情了(理论上也应该是成功的,因为消息已经确保发到了MQ,产品服务必然会收到消息),CAP也提供了失败重试,和失败回调机制。

如果非要数据回滚也是能实现的,CAP的ICapPublisher.Publish方法提供一个callbackName参数,当减库存时,可以触发这个回调。其本质也是通过发布订阅完成,这是不推荐的做法,就不详细说了,有兴趣自己研究一下。
另外,CAP无法保证消息不重复,实际使用中需要自己考虑一下消息的重复过滤和幂等性。

这一篇内容有点多,不知道有没有表达清楚,有问题欢迎评论交流,如有不对之处还望大家指出。

下一篇计划写一下授权认证相关的内容。

代码放在:https://github.com/xiajingren/NetCoreMicroserviceDemo

未完待续...

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


在上文中,我介绍了事件驱动型架构的一种简单的实现,并演示了一个完整的事件派发、订阅和处理的流程。这种实现太简单了,百十行代码就展示了一个基本工作原理。然而,要将这样的解决方案运用到实际生产环境,还有很长的路要走。今天,我们就研究一下在事件处理器中,对象生命周期的管理问题。事实上,不仅仅是在事件处理器
上文已经介绍了Identity Service的实现过程。今天我们继续,实现一个简单的Weather API和一个基于Ocelot的API网关。 回顾 《Angular SPA基于Ocelot API网关与IdentityServer4的身份认证与授权(一)》 Weather API Weather
最近我为我自己的应用开发框架Apworks设计了一套案例应用程序,并以Apache 2.0开源,开源地址是:https://github.com/daxnet/apworks-examples,目的是为了让大家更为方便地学习和使用.NET Core、最新的前端开发框架Angular,以及Apwork
HAL(Hypertext Application Language,超文本应用语言)是一种RESTful API的数据格式风格,为RESTful API的设计提供了接口规范,同时也降低了客户端与服务端接口的耦合度。很多当今流行的RESTful API开发框架,包括Spring REST,也都默认支
在前面两篇文章中,我详细介绍了基本事件系统的实现,包括事件派发和订阅、通过事件处理器执行上下文来解决对象生命周期问题,以及一个基于RabbitMQ的事件总线的实现。接下来对于事件驱动型架构的讨论,就需要结合一个实际的架构案例来进行分析。在领域驱动设计的讨论范畴,CQRS架构本身就是事件驱动的,因此,
HAL,全称为Hypertext Application Language,它是一种简单的数据格式,它能以一种简单、统一的形式,在API中引入超链接特性,使得API的可发现性(discoverable)更强,并具有自描述的特点。使用了HAL的API会更容易地被第三方开源库所调用,并且使用起来也很方便
何时使用领域驱动设计?其实当你的应用程序架构设计是面向业务的时候,你已经开始使用领域驱动设计了。领域驱动设计既不是架构风格(Architecture Style),也不是架构模式(Architecture Pattern),它也不是一种软件开发方法论,所以,是否应该使用领域驱动设计,以及什么时候使用
《在ASP.NET Core中使用Apworks快速开发数据服务》一文中,我介绍了如何使用Apworks框架的数据服务来快速构建用于查询和管理数据模型的RESTful API,通过该文的介绍,你会看到,使用Apworks框架开发数据服务是何等简单快捷,提供的功能也非常多,比如对Hypermedia的
在上一讲中,我们已经完成了一个完整的案例,在这个案例中,我们可以通过Angular单页面应用(SPA)进行登录,然后通过后端的Ocelot API网关整合IdentityServer4完成身份认证。在本讲中,我们会讨论在当前这种架构的应用程序中,如何完成用户授权。 回顾 《Angular SPA基于
Keycloak是一个功能强大的开源身份和访问管理系统,提供了一整套解决方案,包括用户认证、单点登录(SSO)、身份联合、用户注册、用户管理、角色映射、多因素认证和访问控制等。它广泛应用于企业和云服务,可以简化和统一不同应用程序和服务的安全管理,支持自托管或云部署,适用于需要安全、灵活且易于扩展的用
3月7日,微软发布了Visual Studio 2017 RTM,与之一起发布的还有.NET Core Runtime 1.1.0以及.NET Core SDK 1.0.0,尽管这些并不是最新版,但也已经从preview版本升级到了正式版。所以,在安装Visual Studio 2017时如果启用了
在上文中,我介绍了如何在Ocelot中使用自定义的中间件来修改下游服务的response body。今天,我们再扩展一下设计,让我们自己设计的中间件变得更为通用,使其能够应用在不同的Route上。比如,我们可以设计一个通用的替换response body的中间件,然后将其应用在多个Route上。 O
不少关注我博客的朋友都知道我在2009年左右开发过一个名为Apworks的企业级应用程序开发框架,旨在为分布式企业系统软件开发提供面向领域驱动(DDD)的框架级别的解决方案,并对多种系统架构风格提供支持。这个框架的开发和维护我坚持了很久,一直到2015年,我都一直在不停地重构这个项目。目前这个项目在
好吧,这个题目我也想了很久,不知道如何用最简单的几个字来概括这篇文章,原本打算取名《Angular单页面应用基于Ocelot API网关与IdentityServer4ʺSP.NET Identity实现身份认证与授权》,然而如你所见,这样的名字实在是太长了。所以,我不得不缩写“单页面应用”几个字
在前面两篇文章中,我介绍了基于IdentityServer4的一个Identity Service的实现,并且实现了一个Weather API和基于Ocelot的API网关,然后实现了通过Ocelot API网关整合Identity Service做身份认证的API请求。今天,我们进入前端开发,设计
Ocelot是ASP.NET Core下的API网关的一种实现,在微服务架构领域发挥了非常重要的作用。本文不会从整个微服务架构的角度来介绍Ocelot,而是介绍一下最近在学习过程中遇到的一个问题,以及如何使用中间件(Middleware)来解决这样的问题。 问题描述 在上文中,我介绍了一种在Angu
在大数据处理和人工智能时代,数据工厂(Data Factory)无疑是一个非常重要的大数据处理平台。市面上也有成熟的相关产品,比如Azure Data Factory,不仅功能强大,而且依托微软的云计算平台Azure,为大数据处理提供了强大的计算能力,让大数据处理变得更为稳定高效。由于工作中我的项目
在上文中,我们讨论了事件处理器中对象生命周期的问题,在进入新的讨论之前,首先让我们总结一下,我们已经实现了哪些内容。下面的类图描述了我们已经实现的组件及其之间的关系,貌似系统已经变得越来越复杂了。其中绿色的部分就是上文中新实现的部分,包括一个简单的Event Store,一个事件处理器执行上下文的接
在之前《在ASP.NET Core中使用Apworks快速开发数据服务》一文的评论部分,.NET大神张善友为我提了个建议,可以使用Compile As a Service的Roslyn为语法解析提供支持。在此非常感激友哥给我的建议,也让我了解了一些Roslyn的知识。使用Roslyn的一个很大的好处
很长一段时间以来,我都在思考如何在ASP.NET Core的框架下,实现一套完整的事件驱动型架构。这个问题看上去有点大,其实主要目标是为了实现一个基于ASP.NET Core的微服务,它能够非常简单地订阅来自于某个渠道的事件消息,并对接收到的消息进行处理,于此同时,它还能够向该渠道发送事件消息,以便