使用NServiceBus开发分布式应用

系列主题:基于消息的软件架构模型演变

NServiceBus 是一个.Net平台下开源的消息服务框架,这类产品有时也被称作ESB(Enterprise Service Bus)——企业服务总线。
NServicebus官方地址:http://particular.net/
git: https://github.com/Particular/NServiceBus
NServiceBus原作者Udi Dahan,该产品最早于2006年发行了第一个版本,这是一个企业级的开源产品,企业开发需要购买License,参照:http://particular.net/licensing

一、NServiceBus的特性

1、高性能和可扩展性

可以广泛应用于许多业务领域,可扩展性和性能都经过了实战检验。

2、具有自动重试的可靠性集成

通过配置机制提供基于消息通讯的的最佳实践方案,能够识别错误响应并自动重试。

3、工作流和后台任务调度

通过Saga来完成长时间运行的流程定义和管理功能,提供强大而灵活的工作流功能。

4、消息的集中审核流程

很容易将整个分布式系统聚集到一个中心位置配置消息审核。

5、通过发布/订阅来减少耦合

提供了发布/订阅机制。可扩展、可配置、易于理解和易于使用。

6、易于扩展和配置

多个灵活的扩展点和配置选项,NServieBus可以根据用户需求对各个特性进行自定义配置。

7、支持广泛的消息传输技术

提供了MSMQ,RabbitMQ,SQL Server,Windows Azure Queues,Windows AzureService Bus消息传输机制,当然你也可以自定义或者选择由社区开发的消息传输方案。

二、.NET平台下其他ESB介绍

1、Biztalk

在微软的世界里,BizTalk Server一直被用来解决异构平台上应用程序之间数据交换的复杂集成问题。BizTalk同样提供了发布/订阅模式实现松耦合的架构。有时候你需要将现有的代码和一个运行在不同技术和协议下的历史遗留程序集成,这是一个经典的企业应用程序集成(Enterprise Application Integration-EAI)的场景。在这种场景之下,可以在业务服务之间使用NServiceBus,在这些服务的边界之内,你可以使用BizTalk与现有的历史遗留应用进行集成。

如您所见,服务边界之后的BizTalk是一个对异构应用的整合。

2、MassTransit

MassTransit是一个.NET平台下用来创建分布式应用程序的轻量级开源消息总线。

官网:http://masstransit-project.com/

git: https://github.com/MassTransit/MassTransit

MassTransit的第一个版本开发于2007年,作者Chris Patterson 和Dru Sellers 在一个会议中偶然相识,他们觉得当时.Net平台下没有一个他们想要的服务总线框架,而那时NServiceBus也刚刚发布,很多功能都不完善,并且也没有很好的社区支持。所以他俩开发了自己的ESB产品——MassTransit,目前最新的MassTransit基于NET4.0中的异步支持重写了所有代码。MassTransit的目标并不是要在分布式领域面面俱到从而适应大型的企业级开发,而是能实现一个强壮的轻量级消息总线。

三、从Hello World开始

分布式应用开发是一个比较复杂的过程,无论从涉及的技术知识体系还是开发,调试,部署都会带来很多挑战,我希望通过这个简单的例子展示分布式开发中的基本思想。

1、准备工作

安装MSMQ服务,NServiceBus默认使用MSMQ服务,所以在开始这个例子之前确保已安装MSMQ服务。

2、 新建一个类型为Console Application的客户端:NBus.Practice.GreetingClient,客户端会命令服务端输出“Hello World”。

安装nuget包:

Install-Package NServiceBus

3、 初始化一个Bus,然后给服务端发出命令。

NServiceBus提供了多个Host方案,应用程序自己Host或者使用NServiceBus.Host程序来Host应用程序。当然你还可以将NServicBus程序Host在一个Windows服务中。这个客户端我们选择Host在客户端自身当中。

我们只是用几个必要的选项来配置bus。重点是此段代码配置了一个EndPoint:“Nbus.Practice.HelloWorld.Client”,这个名称代表了了此客户端的网络地址。

有了这些配置,我们就可以利用这些配置来创建一个bus出来。

using (IBus bus = Bus.Create(busConfiguration).Start())
            {
                SendGreetingCommand(bus);
            }

接下来的代码通过bus.Send<TCommand>(TCommand cmd)方法给服务端发送一个命令:

        private static void SendGreetingCommand(IBus bus)
        {
            Console.WriteLine("Press 'Enter' to send a message.To exit,Ctrl + C");
            var i = 0;
            while (Console.ReadLine() != null)
            {
                var id = Guid.NewGuid();
                bus.Send(new GreetingCommand() { Id = id,Times = i });
                i++;
                Console.WriteLine("Send a new GreetingCommand message with id: {0}",id.ToString("N"));
            }
        }

GreetingCommand就是一个消息,在消息总线中,一切交流都是通过消息来实现的。

4、建立一个公共的类库:NBus.Practice.GreetingMessage来定义消息
在NServiceBus中,命令类型要继承于ICommand,事件类型要继承于IEvent,消息是一个由属性构成的简单类型,最终需要序列化并可以在网络中传播。

    public class GreetingCommand:ICommand
    {
        public Guid Id { get; set; }
        public int Times { get; set; }
    }

5、这个客户端几乎要完成了,我们创建了个bus,然后发送了一条消息。现在的问题在于这条息发送给谁呢?发送给谁这件事通过配置文件来完成。

<MessageEndpointMappings>
  <add Messages="NBus.Practice.GreetingMessage"  Endpoint="Nbus.Practice.HelloWorld.Server" />
</MessageEndpointMappings>

这个配置的含义是:将定义在程序集NBus.Practice.GreetingMessage中的消息发送到EndPoint为Nbus.Practice.HelloWorld.Server的程序中。

6、显然我们需要一个EndPoint为Nbus.Practice.HelloWorld.Server的服务端,新建一个类型为Console Application类型的服务端:NBus.Practice.GreetingServer,并且用同样的方法创建一个bus并启动。

        static void Main(string[] args)
        {
            var busConfiguration = new BusConfiguration();
            busConfiguration.EndpointName("Nbus.Practice.HelloWorld.Server");
            busConfiguration.UseSerialization<JsonSerializer>();
            busConfiguration.UsePersistence<InMemoryPersistence>();

            using (IBus bus = Bus.Create(busConfiguration).Start())
            {
                Console.WriteLine("Press any key to exit");
                Console.ReadKey();
            }
        }

为了处理GreetingCommand,新建一个GreetingHandler.cs的类,只需要继承IHandleMessages<GreetingCommand>即可表明该类会处理类型为GreetingCommand的消息。

    public class GreetingHandler:IHandleMessages<GreetingCommand>
    {
        private readonly IBus _bus;

        public GreetingHandler(IBus bus)
        {
            _bus = bus;
        }

        public void Handle(GreetingCommand message)
        {
            Console.WriteLine("Received greetingCommand:{0},times:{1},Hello world",message.Id,message.Times);

            _bus.Publish(new GreetingEvent(){Id = message.Id,Times = message.Times});
        }
    }

方法public void Handle(GreetingCommand message)描述了当收到GreetingCommand消息会输出了字符串“Hello World”。另外代码最后发布了一个类型为GreetingEvent的事件,所有对此事件感兴趣的订阅者都可以订阅此事件。在NServiceBus中发布一个事件采用bus.Publish<TEvent>(TEvent event)方法。你会注意到我们通过构造器注入的方式来获取bus实例。

也许你会很关心此服务端中的配置文件如何配置呢?此服务端收到了别的程序发送的消息,具体谁发送的他并不知情。另外发布了一个事件,但是具体谁来订阅该事件,作为事件的发布者并不知道,所以该项目的消息路由配置为空:

<MessageEndpointMappings></MessageEndpointMappings>

7、截至目前,我们已经完成了显示hello world的任务,让我们运行起来看看吧:

在项目配置中将NBus.Practice.GreetingClient和NBus.Practice.GreetingServer同时设置为启动项。CTRL+F5

运行结果:当我们在客户端中按"Enter"键,服务端会收到消息并输出“Hello World”。

此时如果我们关闭服务端,并在客户端中多敲几次回车键发送GreetingCommand消息会怎么样?让我么来模拟真实场景下服务端应用由于未知原因宕机,会出现什么情况呢?

此时如果我们打开MSMQ管理工具就会发现服务端未处理的消息存储在队列中,直到服务端再次上线重新处理这些消息,从而保证了分布式应用中数据的最终一致性。

8、刚才服务端处理完GreetingCommand并发布了GreetingEvent事件,我们接下来新建一个类型为Console Application的事件订阅者:NBus.Practice.GreetingSubscriber。

事件订阅者跟之前一样需要创建并启动一个bus。

9、新建一个Congratulation.cs来处理GreetingEvent消息:

    public class Congratulation:IHandleMessages<GreetingEvent>
    {
        public void Handle(GreetingEvent message)
        {
            Console.WriteLine("Received greeting event id:{0},time{1},congratulations,you have learned NServiceBus.",message.Times);
        }
    }

10、订阅者的消息路由如何配置?

<MessageEndpointMappings>
   <add Messages="NBus.Practice.GreetingMessage" Endpoint="Nbus.Practice.HelloWorld.Server" />
</MessageEndpointMappings>

这句配置的含义是:订阅EndPoint为Nbus.Practice.HelloWorld.Server且消息定义在程序集为NBus.Practice.GreetingMessage中的消息。

由此可见根据程序的角色不同,配置文件的配置具有不同的含义。

11、在事件的发布/订阅模式中,订阅者可以是一个或多个,我们将新建第二个订阅者来展示此功能。新建一个类型为Class library的程序集: NBus.Practice.GreetingAnotherSubscriber。

这次的程序之所以要换成Class library是因为我们本次要使用NServiceBus.Host来Host此程序。

Install-Package NServiceBus.Host

NServiceBus会为项目自动添加一个EndpointConfig.cs文件,我们将在此文件中配置bus:

    public class EndpointConfig : IConfigureThisEndpoint
    {
        public void Customize(BusConfiguration configuration)
        {
            configuration.EndpointName("Nbus.Practice.HelloWorld.AnotherSubscriber");
            configuration.UseSerialization<JsonSerializer>();
            configuration.UsePersistence<InMemoryPersistence>();
        }
    }

同时添加一个Handler来处理GreetingEvent消息。

    public class GreetingLogger:IHandleMessages<GreetingEvent>
    {
        public void Handle(GreetingEvent message)
        {
            Console.WriteLine("Received greeting event id:{0},I will log it.",message.Times);
        }
    }

通过下面的方式使用NServiceBus.Host.exe来Host本程序。

11、    最后把本教程中建立的四个程序全都跑起来看看效果:

四、总结

本文通过一个简单的实例展示了如何在服务总线中发送命令,如何使用发布/订阅基本思想来实现一个Hello world。这个例子很简单,但是隐隐之中展现了CQRS的基本思想:Client发送一个Command,DomainHandler收到Command后会调用Domain逻辑,此时Domain会发布领域事件,Query分支会订阅领域事件来更新Query数据库,同时还有缓存、搜索引擎、其他服务也会订阅此领域事件。各个服务之间构成了松耦合的分布式应用程序。

当然NServiceBus还有更多高级主题例如:持久化、Saga、单元测试、二级重试、依赖注入、负载均衡、更换消息队列等内容等着我们去一探究竟。

整个例子的代码:https://git.oschina.net/richieyangs/NServiceBusPractice,以后会将所有NServiceBus相关的例子放在这个项目中。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


引言 本文从Linux小白的视角, 在CentOS 7.x服务器上搭建一个Nginx-Powered AspNet Core Web准生产应用。 在开始之前,我们还是重温一下部署原理,正如你所常见的.Net Core 部署图: 在Linux上部署.Net Core App最好的方式是在Linux机器
引言: 多线程编程/异步编程非常复杂,有很多概念和工具需要去学习,贴心的.NET提供Task线程包装类和await/async异步编程语法糖简化了异步编程方式。 相信很多开发者都看到如下异步编程实践原则: 遵守以上冷冰冰的②③条的原则,可保证异步程序按照预期状态正常运作;我们在各大编程论坛常看到违背
一. 宏观概念 ASP.NET Core Middleware是在应用程序处理管道pipeline中用于处理请求和操作响应的组件。 每个组件是pipeline 中的一环。 自行决定是否将请求传递给下一个组件 在处理管道的下个组件执行之前和之后执行业务逻辑 二. 特性和行为 ASP.NET Core处
背景 在.Net和C#中运行异步代码相当简单,因为我们有时候需要取消正在进行的异步操作,通过本文,可以掌握 通过CancellationToken取消任务(包括non-cancellable任务)。 Task&#160;表示无返回值的异步操作, 泛型版本Task&lt;TResult&gt;表示有返
HTTP基本认证 在HTTP中,HTTP基本认证(Basic Authentication)是一种允许网页浏览器或其他客户端程序以(用户名:口令) 请求资源的身份验证方式,不要求cookie,session identifier、login page等标记或载体。 - 所有浏览器据支持HTTP基本认
1.Linq 执行多列排序 OrderBy的意义是按照指定顺序排序,连续两次OrderBy,后面一个有可能会打乱前面一个的排序顺序,可能与预期不符。 要实现sql中的order by word,name类似效果; LINQ 有ThenBy可以紧接使用, ThenBy记住原本排序的值,然后再排其他值,
ASP.NET Core 核心特性:开源、跨平台、高性能是其决战JAVA的必胜法宝,最引人关注的跨平台特性 到底是怎么实现? &#xA; 本文分Unix、Windows剖析跨平台内幕,读完让你大呼过瘾。
前导 Asynchronous programming Model(APM)异步编程模型以BeginMethod(...) 和 EndMethod(...)结对出现。 IAsyncResult BeginGetResponse(AsyncCallback callback, object state
引言 最近在公司开发了一个项目,项目部署架构图如下: 思路 如图中文本所述,公司大数据集群不允许直接访问外网,需要一个网关服务器代理请求,本处服务器A就是边缘代理服务器的作用。 通常技术人员最快捷的思路是在服务器A上部署IISʺpplication Request Routing Module组件
作为一枚后端程序狗,项目实践常遇到定时任务的工作,最容易想到的的思路就是利用Windows计划任务/wndows service程序/Crontab程序等主机方法在主机上部署定时任务程序/脚本。 但是很多时候,若使用的是共享主机或者受控主机,这些主机不允许你私自安装exe程序、Windows服务程序
引言 熟悉TPL Dataflow博文的朋友可能记得这是个单体程序,使用TPL Dataflow 处理工作流任务, 在使用Docker部署的过程中, 有一个问题一直无法回避: 在单体程序部署的瞬间(服务不可用)会有少量流量无法处理;更糟糕的情况下,迭代部署的这个版本有问题,上线后无法运作, 更多的流
合格的web后端程序员,除搬砖技能,还必须会给各种web服务器配置Https,本文结合ASP.NET Core部署模型聊一聊启用Https的方式。 温故知新 目前常见的Http请求明文传输,请求可能被篡改,访问的站点可能被伪造。 HTTPS是HTTP加上TLS/SSL协议构建的可进行加密传输、身份认
长话短说 前文《解剖HttpClientFactory,自由扩展HttpMessageHandler》主要讲如何为HttpClientFactory自定义HttpMessageHandler组件, 现在来完成课后的小作业: 将重点日志字段显示到Nlog的LayoutRenderer上。 本文实现一个
引言问题 作为资深老鸟,有事没事,出去面试;找准差距、定位价值。 面试必谈哈希, Q1:什么是哈希? Q2:哈希为什么快? Q3:你是怎么理解哈希算法利用空间换取时间的? Q4:你是怎么解决哈希冲突的? Q5:你有实际用写过哈希算法吗? 知识储备 哈希(也叫散列)是一种查找算法(可用于插入),哈希算
前言 如题,有感于博客园最近多次翻车,感觉像胡子眉毛一把抓, 定位不了生产环境的问题。 抛开流程问题,思考在生产环境中如何做故障排除,&#160;发现博客园里面这方面的文章比较少。 .Net 本身是提供了sos.dll工具帮助我们在生产中故障排除,通过提供有关内部公共语言运行时(CLR)环境的信息,
.NET程序是基于.NET Framework、.NET Core、Mono、【.NET实现】开发和运行的 ,定义以上【.NET实现】的标准规范称为.NET Standard .NET Standard .NET标准是一组API集合,由上层三种【.NET实现】的Basic Class Library
长话短说 上个月公司上线了一个物联网数据科学项目,我主要负责前端接受物联网事件,并提供 参数下载。 webapp 部署在Azure云上,参数使用Azure SQL Server存储。 最近从灰度测试转向全量部署之后,日志时常收到: SQL Session超限报错。 排查 我在Azure上使用的是 S
临近年关,搜狗,360浏览器出现页面无法成功跳转,同域Cookie丢失? 也许是服务端 SameSite惹的祸。&#xA;本文揭示由于Chrome低版本内核不识别 SameSite= None, 引发的单点登录故障。
本文聊一聊TraceID的作用和一般组成,衍生出ASP. NETCore 单体和分布式程序中 TraceId 的使用方式
通过给 HttpClint请求的日志增加 TraceId,解锁自定义扩展 HttpClientFacroty 的姿势