使Flink工作流与自定义源并行化

如何解决使Flink工作流与自定义源并行化

我有一个用Flink构建的工作流,它包含一个自定义源,一系列地图/平面图和一个接收器。

我的自定义源的run()方法遍历存储在文件夹中的文件,并通过上下文的collect()方法收集每个文件的名称和内容(我有一个自定义对象来存储该文件两个字段中的信息。

然后,我有一系列的地图/平面图转换这些对象,然后使用自定义接收器将它们打印到文件中。在Flink的Web UI中生成的执行图如下:

Flink execution graph

我有一个集群或2个worker设置,每个都有6个插槽(它们也都有6个核心)。我将并行度设置为12。从执行图中可以看到源的并行度为1,而工作流的其余部分具有并行度12。

运行工作流程时(专用文件夹中有大约15,000个文件),我使用htop监视工作人员的资源。在大多数情况下,所有内核都达到100%的利用率,但是大约每30分钟左右,就有8-10个内核闲置大约2-3分钟。

我的问题如下:

  1. 我了解该源运行的并行度为1,我认为这是从本地存储读取时的正常现象(我的文件位于每个工作进程的同一目录中,因为我不知道将选择哪个工作进程执行源代码)。确实正常吗?你能解释为什么会这样吗?

  2. 我的其余工作流程在并行性12下执行,这看起来是正确的,因为通过检查任务管理器的日志,我可以从所有插槽(例如.... [Flat Map -> Map -> Map -> Sink: Unnamed (**3/12**)] INFO ........ [Flat Map -> Map -> Map -> Sink: Unnamed (**5/12**)] INFO ....等))。我不明白的是,如果一个插槽正在执行源角色,并且集群中有12个插槽,剩下的12个插槽如何执行工作流程的其余部分?一个插槽既可以充当源,也可以充当其余工作流程的一个实例吗?如果是,该特定插槽的资源如何分配?有人可以解释此工作流程中正在进行的步骤吗?例如(这可能是错误的):

  • 插槽1读取文件并将其转发到可用插槽(2到12)
  • 插槽1将一个文件转发给自己,并停止读取,直到完成工作为止
  • 完成后,插槽1读取更多文件并将其转发到可用的插槽中

我相信我上面描述的是错误的,但是我举一个例子来更好地解释我的问题

  1. 为什么我每30分钟(或多或少)每隔30分钟就会有大部分内核处于这种空闲状态?

解决方法

要回答有关并行阅读的特定问题,我将执行以下操作...

  1. 通过扩展RichSourceFunction来实现自定义源。
  2. 在您的open()方法中,调用getRuntimeContext().getNumberOfParallelSubtasks()以获取总体并行性,并调用getRuntimeContext().getIndexOfThisSubtask()以获取正在初始化的子任务的索引。
  3. run()方法中,当您遍历文件时,获得每个文件名的hashCode(),以总并行度为模。如果该值等于子任务的索引,则进行处理。

通过这种方式,您可以将工作分散到12个子任务中,而无需让子任务尝试处理同一文件。

,
  1. 单个使用者设置将您管道的总吞吐量限制为只有一个使用者的性能。此外,它给所有插槽带来了沉重的洗牌-在这种情况下,消费者读取的所有数据也会在此消费者插槽上序列化,这是额外的CPU负载。相反,使消费者并行度等于map / flat map parallelsm将允许链接源-> map操作并避免随机播放。
  2. 默认情况下,Flink允许子任务共享插槽,即使它们是不同任务的子任务也是如此,只要它们来自同一任务即可。结果是一个插槽可以容纳整个作业流水线。因此,在您的情况下,插槽1同时具有使用者和地图/平面地图任务,而其他插槽仅具有地图/平面地图任务。有关更多详细信息,请参见此处:https://ci.apache.org/projects/flink/flink-docs-release-1.10/concepts/runtime.html#task-slots-and-resources。另外,您实际上可以在Web UI上查看每个子任务的实例。
  3. 您是否启用了检查点?如果是,并且是30分钟,则可能是对状态进行快照的时间间隔。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


依赖报错 idea导入项目后依赖报错,解决方案:https://blog.csdn.net/weixin_42420249/article/details/81191861 依赖版本报错:更换其他版本 无法下载依赖可参考:https://blog.csdn.net/weixin_42628809/a
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下 2021-12-03 13:33:33.927 ERROR 7228 [ main] o.s.b.d.LoggingFailureAnalysisReporter : *************************** APPL
错误1:gradle项目控制台输出为乱码 # 解决方案:https://blog.csdn.net/weixin_43501566/article/details/112482302 # 在gradle-wrapper.properties 添加以下内容 org.gradle.jvmargs=-Df
错误还原:在查询的过程中,传入的workType为0时,该条件不起作用 <select id="xxx"> SELECT di.id, di.name, di.work_type, di.updated... <where> <if test=&qu
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct redisServer’没有名为‘server_cpulist’的成员 redisSetCpuAffinity(server.server_cpulist); ^ server.c: 在函数‘hasActiveC
解决方案1 1、改项目中.idea/workspace.xml配置文件,增加dynamic.classpath参数 2、搜索PropertiesComponent,添加如下 <property name="dynamic.classpath" value="tru
删除根组件app.vue中的默认代码后报错:Module Error (from ./node_modules/eslint-loader/index.js): 解决方案:关闭ESlint代码检测,在项目根目录创建vue.config.js,在文件中添加 module.exports = { lin
查看spark默认的python版本 [root@master day27]# pyspark /home/software/spark-2.3.4-bin-hadoop2.7/conf/spark-env.sh: line 2: /usr/local/hadoop/bin/hadoop: No s
使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams['font.sans-serif'] = ['SimHei'] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -> systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping("/hires") public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate<String
使用vite构建项目报错 C:\Users\ychen\work>npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-