徒手打造基于Spark的数据工厂(Data Factory):从设计到实现

在大数据处理和人工智能时代,数据工厂(Data Factory)无疑是一个非常重要的大数据处理平台。市面上也有成熟的相关产品,比如Azure Data Factory,不仅功能强大,而且依托微软的云计算平台Azure,为大数据处理提供了强大的计算能力,让大数据处理变得更为稳定高效。由于工作中我的项目也与大数据处理相关,于是我就在思考,是否自己也可以设计打造一个数据工厂,以便寻求一些技术痛点的解决方案,并且引入一些有趣的新功能。 因此,我利用业余时间,逐步打造了一个基于Spark的数据工厂,并取名为Abacuza(Abacus是中国的“算盘”的意思,隐喻它是一个专门做数据计算的平台,使用“算盘”一词的变体,也算是体现一点中国元素吧)。说是基于Spark,其实从整个架构来看,Abacuza并不一定非要基于Spark,只需要为其定制某种数据处理引擎的插件即可,所以,Spark其实仅仅是Abacuza的一个插件,当然,Spark是目前主流的数据处理引擎,Abacuza将其作为默认的数据处理插件。 Abacuza是开源的,项目地址是:https://github.com/daxnet/abacuza。徒手打造?是的,没错,从前端界面都后端开发,从代码到持续集成,再到部署脚本和SDK与容器镜像的发布,都是自己一步步弄出来的。项目主页上有一个简单的教程,后面我会详细介绍一下。在介绍如何使用Abacuza之前,我们先了解一下它的整体架构和设计思想。虽然目前Abacuza还有很多功能没有完成,但并不影响整个数据处理流程的执行。

整体架构

Abacuza和其它的数据工厂平台一样,它的业务流程就是分三步走:数据读入、数据处理、结果输出。Abacuza的整体架构图就很清楚地体现了这个业务流程:

(点击查看大图)

数据输入部分

数据的输入是由输入端点(Input Endpoints)来定义的。Abacuza支持多种数据类型的输入:CSV文件、JSON文件、TXT文本文件、Microsoft SQL Server(暂未完全实现)以及S3的对象存储路径,今后还可以继续扩展输入端点,以支持基于管道(Pipeline)的数据处理流程,这样一来,用户就不需要自己使用C#或者Scala来编写数据处理的逻辑代码,只需要一套JSON文件进行Pipeline定义就可以了。

数据处理部分

当数据输入已经定义好以后,Abacuza会根据Input Endpoint的设置,将数据读入,然后转交给后端的数据处理集群(Cluster)进行处理。Abacuza可以以插件的形式支持不同类型的集群,如上文所说,Apache Spark是Abacuza所支持的一种数据处理集群,在上面的架构图中可以看到,Abacuza Cluster Service管理这些集群,工作任务调度器(Job Scheduler)会通过Abacuza Cluster Service将数据处理任务分配到指定类型的集群上进行处理。 对于Spark而言,具体的数据处理逻辑是由用户自己编写代码实现的。Spark原生支持Scala,也可以使用PySpark,Abacuza使用Microsoft .NET for Spark项目实现从.NET到Spark的绑定(Binding),用户可以使用C#来编写Spark的数据处理逻辑,后面的演练部分我会详细介绍。 那么与Scala相比,通过.NET for Spark使用C#编写的数据处理程序会不会有性能问题?嗯,会有点性能问题,请看下图(图片来源:微软.NET for Spark官方网站):

在这个Benchmark中,处理相同总量的数据,Scala使用了375秒,.NET花了406秒,Python使用433秒,虽然与Scala相比有些差距,但是比Python要好一些。但是不用担心,如果在你的应用场景中,性能是放在第一位的,那么Abacuza的Job Runner机制允许你使用Scala编写数据处理程序,然后上传到Spark集群执行(也就是你不需要依赖于.NET和C#)。

数据输出部分

与数据输入部分类似,处理之后的数据输出方式是由输出端点(Output Endpoints)来定义的。Abacuza也支持多种数据输出方式:将结果打印到日志、将结果输出到外部文件系统以及将结果输出到当前项目所在的S3对象存储路径。无论是数据输入部分还是输出部分,这些端点都是可以定制的,并且可以通过ASP.NET Core的插件系统以及docker-compose或者Kubernetes的volume/Block Storage来实现动态加载。

相关概念和运作机理

Abacuza有以下这些概念:

  1. 集群(Cluster):一个集群是一个完整的大数据处理平台,比如Apache Spark
  2. 集群类型(Cluster Type):定义集群的类型,例如,运行在localhost的Spark集群和运行在云端的Spark集群都是Spark集群,那么它们的集群类型就是spark。
  3. 集群连接(Cluster Connection):定义了Abacuza数据工厂访问集群的方式,类似于数据库系统的连接字符串
  4. 任务执行器(Job Runner):定义了数据处理任务应该如何被提交到集群上执行。它可以包含具体的数据处理业务逻辑
  5. 输入端点(Input Endpoint):定义了原始数据(需要被处理的数据)的来源
  6. 输出端点(Output Endpoint):定义了处理完成后的数据的输出方式
  7. 项目(Project):一种类型数据处理任务的逻辑定义,它包括多个输入端点、一个输出端点以及多个数据处理版本(Revision)的信息,同时它还定义了应该使用哪个任务执行器来执行数据处理任务
  8. 数据处理版本(Revision):它归属于一个特定的项目,表示不同批次的数据处理结果

当一个用户准备使用Abacuza完成一次大数据处理的任务时,一般会按照下面的步骤进行:

  1. 使用用户名/密码(暂时只支持用户名密码登录)登录Abacuza的管理界面
  2. 基于一个已经安装好的集群(比如Apache Spark),配置它的集群类型集群连接,用来定义Abacuza与该集群的通信方式(集群和集群连接定义了数据应该在哪里被处理(where))
  3. 定义任务执行器,在任务执行器中,设置运行数据处理任务的集群类型,当数据处理任务被提交时,Abacuza Cluster Service会基于所选的集群类型,根据一定的算法来选择一个集群进行数据处理。任务执行器中也定义了数据处理的逻辑,(比如,由Scala、C#或者Python编写的应用程序,可以上传到spark类型的集群上运行)。简单地说,任务执行器定义了数据应该如何被处理(how
  4. 创建一个新的项目,在这个项目中,通过输入端点来设置所需处理的数据来源,通过输出端点来设置处理后的数据的存放地点,并设置该项目所用到的任务执行器。之后,用户点击Submit按钮,将数据提交到集群上进行处理。处理完成后,在数据处理版本列表中查看结果

技术选型

Abacuza采用微服务架构风格,每个单独的微服务都在容器中运行,目前实验阶段采用docker-compose进行容器编排,今后会加入Kubernetes支持。现将Abacuza所使用的框架与相关技术简单罗列一下:

  1. Spark执行程序选择Microsoft .NET for Spark,一方面自己对.NET技术栈比较熟悉,另一方面,.NET for Spark有着很好的流式数据处理的SDK API,并且可以很方便地整合ML.NET实现机器学习的业务场景
  2. 所有的微服务都是使用运行在.NET 5下的ASP.NET Core Web API实现,每个微服务的后端数据库采用MongoDB
  3. 用于任务调度的Abacuza Job Service微服务使用Quartz.NET实现定期任务调度,用来提交数据处理任务以及更新任务状态。后端同时采用了PostgreSQL数据库
  4. 存储层与服务层之间引入Redis做数据缓存,减少MongoDB的查询负载
  5. 默认支持的Spark集群使用Apache Livy为其提供RESTful API接口
  6. 文件对象存储采用MinIO S3
  7. API网关采用Ocelot框架
  8. 微服务的瞬态故障处理:Polly框架
  9. 身份认证与授权采用ASP.NET Core Identity集成的IdentityServer4解决方案
  10. 反向代理:nginx
  11. 前端页面:Angular 12Angular powered BootstrapBootstrapAdminLTE

弱弱补一句:本人前端技术没有后端技术精湛,所以前端页面会有不少问题,样式也不是那么的专业美观,前端高手请忽略这些细节。;) Abacuza采用了插件化的设计,用户可以根据需要扩展下面这些组件:

  • 实现自己的数据处理集群以及集群连接:因此你不必拘泥于使用Apache Spark
  • 实现自己的输入端点输出端点:因此你可以自定义数据的输入部分和输出部分
  • 实现自己的任务执行器:因此你可以选择不采用基于.NET for Spark的解决方案,你可以自己用Scala或者Python来编写数据处理程序

在Abacuza的管理界面中,可以很方便地看到目前系统中已经被加载的插件:

因此,Abacuza数据工厂应该可以满足绝大部分大数据处理的业务场景。本身整个平台都是基于.NET开发,并且通过NuGet分发了Abacuza SDK,因此扩展这些组件是非常简单的,后面的演练部分可以看到详细介绍。

部署拓扑

以下是Abacuza的部署拓扑:

(点击查看大图)

 

整个部署结构还是比较简单的:5个主要的微服务由基于Ocelot实现的API Gateway负责代理,Ocelot可以整合IdentityServer4,在Gateway的层面完成用户的认证(Gateway层面的授权暂未实现)。基于IdentityServer4实现的Identity Service并没有部署在API Gateway的后端,因为在这个架构中,它的认证授权策略与一般的微服务不同。API Gateway、Identity Service以及基于Angular实现的web app都由nginx反向代理,向外界(客户端浏览器)提供统一的访问端点。所有的后端服务都运行在docker里,并可以部署在Kubernetes中。

演练:在Abacuza上运行Word Count程序

Word Count是Spark官方推荐的第一个案例程序,它的任务是统计输入文件中每个单词的出现次数。.NET for Spark也有一个相同的Word Count案例。在此,我仍然使用Word Count案例,介绍如何在Abacuza上运行数据处理程序。

先决条件

你需要一台Windows、MacOS或者Linux的计算机,上面装有.NET 5 SDK、docker以及docker-compose(如果是Windows或者MacOS,则安装docker的桌面版),同时确保安装了git客户端命令行。

创建Word Count数据处理程序

首先使用dotnet命令行创建一个控制台应用程序,然后添加相关的引用:

$ dotnet new console -f net5.0 -n WordCountApp
$ cd WordCountApp
$ dotnet add package Microsoft.Spark --version 1.0.0
$ dotnet add package Abacuza.JobRunners.Spark.SDK --prerelease

然后在项目中新加入一个class文件,实现一个WordCountRunner类:

using Abacuza.JobRunners.Spark.SDK;
using Microsoft.Spark.Sql;

namespace WordCountApp
{
   public class WordCountRunner : SparkRunnerBase
   {
      public WordCountRunner(string[] args) : base(args)
      {
      }

      protected override DataFrame RunInternal(SparkSession sparkSession, DataFrame dataFrame)
            => dataFrame
               .Select(Functions.Split(Functions.Col("value"), " ").Alias("words"))
               .Select(Functions.Explode(Functions.Col("words"))
               .Alias("word"))
               .GroupBy("word")
               .Count()
               .OrderBy(Functions.Col("count").Desc());
   }
}

接下来修改Program.cs文件,在Main函数中调用WordCountRunner:

static void Main(string[] args)
{
   new WordCountRunner(args).Run();
}

然后,在命令行中,WordCountApp.csproj所在的目录下,使用下面的命令来生成基于Linux x64平台的编译输出:

$ dotnet publish -c Release -f net5.0 -r linux-x64 -o published

最后,使用ZIP工具,将published下的所有文件(不包括published目录本身)全部打包成一个ZIP压缩包。例如,在Linux下,可以使用下面的命令将published目录下的所有文件打成一个ZIP包:

$ zip -rj WordCountApp.zip published/.

Word Count程序已经写好了,接下来我们就启动Abacuza,并在其中运行这个WordCountApp。

运行Word Count程序

你可以使用git clone https://github.com/daxnet/abacuza.git命令,将Abacuza源代码下载到本地,然后在Abacuza的根目录下,使用下面的命令进行编译:

$ docker-compose -f docker-compose.build.yaml build

编译成功之后,用文本编辑器编辑template.env文件,在里面设置好本机的IP地址(不能使用localhost或者127.0.0.1,因为在容器环境中,localhost和127.0.0.1表示当前容器本身,而不是运行容器的主机),端口号可以默认:

然后,使用下面的命令启动Abacuza:

$ docker-compose --env-file template.env up

启动成功后,可以使用docker ps命令查看正在运行的容器:

用浏览器访问http://<你的IP地址>:9320,即可打开Abacuza登录界面,输入用户名super,密码P@ssw0rd完成登录,进入Dashboard(目前Dashboard还未完成)。然后在左侧菜单中,点击Cluster Connections,然后点击右上角的Add Connection按钮:

在弹出的对话框中,输入集群连接的名称和描述,集群类型选择spark,在设置栏中,输入用于连接Spark集群的JSON配置信息。由于我们本地启动的Spark在容器中,直接使用本机的IP地址即可,如果你的Spark集群部署在其它机器上,也可以使用其它的IP地址。在配置完这些信息后,点击Save按钮保存:

接下来就是创建任务执行器。在Abacuza管理界面,点击左边的Job Runners菜单,然后点击右上角的Add Job Runner按钮:

在弹出的对话框中,输入任务执行器的名称和描述信息,集群类型选择spark,之后当该任务执行器开始执行时,会挑选任意一个类型为spark的集群来处理数据。

填入这些基本信息后,点击Save按钮,此时会进入任务执行器的详细页面,用来进行进一步的设置。在Payload template中,输入以下JSON文本:

{
  "file": "${jr:binaries:microsoft-spark-3-0_2.12-1.0.0.jar}",
  "className": "org.apache.spark.deploy.dotnet.DotnetRunner",
  "args": [
    "${jr:binaries:WordCountApp.zip}",
    "WordCountApp",
    "${proj:input-defs}",
    "${proj:output-defs}",
    "${proj:context}"
  ]
}

大概介绍一下每个参数:

  • file:指定了在Spark集群上需要运行的程序所在的JAR包,这里直接使用微软的Spark JAR
  • className:指定了需要运行的程序在JAR包中的名称,这里固定使用org.apache.spark.deploy.dotnet.DotnetRunner
  • ${jr:binaries:WordCountApp.zip} 表示由className指定的DotnetRunner会调用当前任务执行器中的二进制文件WordCountApp.zip中的程序来执行数据处理任务
  • WordCountApp 为ZIP包中可执行程序的名称
  • ${proj:input-defs} 表示输入文件及其配置将引用当前执行数据处理的项目中的输入端点的定义
  • ${proj:output-defs} 表示输出文件及其配置将引用当前执行数据处理的项目中的输出端点的定义
  • ${proj:context} 表示Spark会从当前项目读入相关信息并将其传递给任务执行器

在上面的配置中,引用了两个binary文件:microsoft-spark-3-0_2.12-1.0.0.jar和WordCountApp.zip。于是,我们需要将这两个文件上传到任务执行器中。仍然在任务执行器的编辑界面,在Binaries列表中,点击加号按钮,将这两个文件附加到任务执行器上。注意:microsoft-spark-3-0_2.12-1.0.0.jar文件位于上文用到的published目录中,而WordCountApp.zip则是在上文中生成的ZIP压缩包。

配置完成后,点击Save & Close按钮,保存任务执行器。 接下来,创建一个数据处理项目,在左边的菜单中,点击Projects,然后在右上角点击Add Project按钮:

在弹出的Add Project对话框中,输入项目的名称、描述,然后选择输入端点和输出端点,以及负责处理该项目数据的任务执行器:

在此,我们将输入端点设置为文本文件(Text Files),输出端点设置为控制台(Console),也就是直接输出到日志中。这些配置在后续的项目编辑页面中也是可以更改的。一个项目可以包含多个输入端点,但是只能有一个输出端点。点击Save按钮保存设置,此时Abacuza会打开项目的详细页,在INPUT选项卡下,添加需要统计单词出现次数的文本文件:

在OUTPUT选项卡下,确认输出端点设置为Console:

然后点击右上角或者右下角的Submit按钮,提交数据处理任务,此时,选项卡会自动切换到REVISIONS,并且更新Job的状态:

稍等片刻,如果数据处理成功,Job Status会从RUNNING变为COMPLETED:

点击Actions栏中的文件按钮,即可查看数据处理的日志输出:

从日志文件中可以看到,Abacuza已经根据我们写的数据处理程序,统计出输入文件input.txt中每个单词的出现次数。通过容器的日志输出也能看到同样的信息:

总结

本文介绍了自己纯手工打造的数据工厂(Data Factory)的设计与实现,并开发了一个案例来演示该数据工厂完成数据处理的整个过程。之后还有很多功能可以完善:Dashboard、认证授权的优化、用户与组的管理、第三方IdP的集成、Pipeline的实现等等,今后有空再慢慢弄吧。

 

欢迎访问本人的个人站点https://sunnycoding.cn,获得更好的阅读体验。

 

原文地址:https://www.cnblogs.com/daxnet/p/15173341.html

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


在上文中,我介绍了事件驱动型架构的一种简单的实现,并演示了一个完整的事件派发、订阅和处理的流程。这种实现太简单了,百十行代码就展示了一个基本工作原理。然而,要将这样的解决方案运用到实际生产环境,还有很长的路要走。今天,我们就研究一下在事件处理器中,对象生命周期的管理问题。事实上,不仅仅是在事件处理器
上文已经介绍了Identity Service的实现过程。今天我们继续,实现一个简单的Weather API和一个基于Ocelot的API网关。 回顾 《Angular SPA基于Ocelot API网关与IdentityServer4的身份认证与授权(一)》 Weather API Weather
最近我为我自己的应用开发框架Apworks设计了一套案例应用程序,并以Apache 2.0开源,开源地址是:https://github.com/daxnet/apworks-examples,目的是为了让大家更为方便地学习和使用.NET Core、最新的前端开发框架Angular,以及Apwork
HAL(Hypertext Application Language,超文本应用语言)是一种RESTful API的数据格式风格,为RESTful API的设计提供了接口规范,同时也降低了客户端与服务端接口的耦合度。很多当今流行的RESTful API开发框架,包括Spring REST,也都默认支
在前面两篇文章中,我详细介绍了基本事件系统的实现,包括事件派发和订阅、通过事件处理器执行上下文来解决对象生命周期问题,以及一个基于RabbitMQ的事件总线的实现。接下来对于事件驱动型架构的讨论,就需要结合一个实际的架构案例来进行分析。在领域驱动设计的讨论范畴,CQRS架构本身就是事件驱动的,因此,
HAL,全称为Hypertext Application Language,它是一种简单的数据格式,它能以一种简单、统一的形式,在API中引入超链接特性,使得API的可发现性(discoverable)更强,并具有自描述的特点。使用了HAL的API会更容易地被第三方开源库所调用,并且使用起来也很方便
何时使用领域驱动设计?其实当你的应用程序架构设计是面向业务的时候,你已经开始使用领域驱动设计了。领域驱动设计既不是架构风格(Architecture Style),也不是架构模式(Architecture Pattern),它也不是一种软件开发方法论,所以,是否应该使用领域驱动设计,以及什么时候使用
《在ASP.NET Core中使用Apworks快速开发数据服务》一文中,我介绍了如何使用Apworks框架的数据服务来快速构建用于查询和管理数据模型的RESTful API,通过该文的介绍,你会看到,使用Apworks框架开发数据服务是何等简单快捷,提供的功能也非常多,比如对Hypermedia的
在上一讲中,我们已经完成了一个完整的案例,在这个案例中,我们可以通过Angular单页面应用(SPA)进行登录,然后通过后端的Ocelot API网关整合IdentityServer4完成身份认证。在本讲中,我们会讨论在当前这种架构的应用程序中,如何完成用户授权。 回顾 《Angular SPA基于
Keycloak是一个功能强大的开源身份和访问管理系统,提供了一整套解决方案,包括用户认证、单点登录(SSO)、身份联合、用户注册、用户管理、角色映射、多因素认证和访问控制等。它广泛应用于企业和云服务,可以简化和统一不同应用程序和服务的安全管理,支持自托管或云部署,适用于需要安全、灵活且易于扩展的用
3月7日,微软发布了Visual Studio 2017 RTM,与之一起发布的还有.NET Core Runtime 1.1.0以及.NET Core SDK 1.0.0,尽管这些并不是最新版,但也已经从preview版本升级到了正式版。所以,在安装Visual Studio 2017时如果启用了
在上文中,我介绍了如何在Ocelot中使用自定义的中间件来修改下游服务的response body。今天,我们再扩展一下设计,让我们自己设计的中间件变得更为通用,使其能够应用在不同的Route上。比如,我们可以设计一个通用的替换response body的中间件,然后将其应用在多个Route上。 O
不少关注我博客的朋友都知道我在2009年左右开发过一个名为Apworks的企业级应用程序开发框架,旨在为分布式企业系统软件开发提供面向领域驱动(DDD)的框架级别的解决方案,并对多种系统架构风格提供支持。这个框架的开发和维护我坚持了很久,一直到2015年,我都一直在不停地重构这个项目。目前这个项目在
好吧,这个题目我也想了很久,不知道如何用最简单的几个字来概括这篇文章,原本打算取名《Angular单页面应用基于Ocelot API网关与IdentityServer4ʺSP.NET Identity实现身份认证与授权》,然而如你所见,这样的名字实在是太长了。所以,我不得不缩写“单页面应用”几个字
在前面两篇文章中,我介绍了基于IdentityServer4的一个Identity Service的实现,并且实现了一个Weather API和基于Ocelot的API网关,然后实现了通过Ocelot API网关整合Identity Service做身份认证的API请求。今天,我们进入前端开发,设计
Ocelot是ASP.NET Core下的API网关的一种实现,在微服务架构领域发挥了非常重要的作用。本文不会从整个微服务架构的角度来介绍Ocelot,而是介绍一下最近在学习过程中遇到的一个问题,以及如何使用中间件(Middleware)来解决这样的问题。 问题描述 在上文中,我介绍了一种在Angu
在大数据处理和人工智能时代,数据工厂(Data Factory)无疑是一个非常重要的大数据处理平台。市面上也有成熟的相关产品,比如Azure Data Factory,不仅功能强大,而且依托微软的云计算平台Azure,为大数据处理提供了强大的计算能力,让大数据处理变得更为稳定高效。由于工作中我的项目
在上文中,我们讨论了事件处理器中对象生命周期的问题,在进入新的讨论之前,首先让我们总结一下,我们已经实现了哪些内容。下面的类图描述了我们已经实现的组件及其之间的关系,貌似系统已经变得越来越复杂了。其中绿色的部分就是上文中新实现的部分,包括一个简单的Event Store,一个事件处理器执行上下文的接
在之前《在ASP.NET Core中使用Apworks快速开发数据服务》一文的评论部分,.NET大神张善友为我提了个建议,可以使用Compile As a Service的Roslyn为语法解析提供支持。在此非常感激友哥给我的建议,也让我了解了一些Roslyn的知识。使用Roslyn的一个很大的好处
很长一段时间以来,我都在思考如何在ASP.NET Core的框架下,实现一套完整的事件驱动型架构。这个问题看上去有点大,其实主要目标是为了实现一个基于ASP.NET Core的微服务,它能够非常简单地订阅来自于某个渠道的事件消息,并对接收到的消息进行处理,于此同时,它还能够向该渠道发送事件消息,以便