操作在步骤 (1) Match Files/Via MatchAll/Match filepatterns 至少 05m00s 不输出或完成状态过程

如何解决操作在步骤 (1) Match Files/Via MatchAll/Match filepatterns 至少 05m00s 不输出或完成状态过程

我有一个数据流管道,可以读取大量文件(至少 500 万个文档)并尝试将其存储在数据库中。

我有以下管道执行:

StorageToXrOptions options = PipelineOptionsFactory.fromArgs(args)
      .withValidation()
      .as(StorageToXOptions.class);
Pipeline p = Pipeline.create(options);

PCollection<KV<String,String>> docs = p
    .apply("(1) Match Files",FileIO.match().filepattern(options.getInputFile()))
    // withCompression can be omitted - by default compression is detected from the filename.
    .apply("(2) Read Matches",FileIO.readMatches())
    .apply("(3) Transform into KV",MapElements // uses imports from TypeDescriptors
        .into(kvs(strings(),strings()))
        .via((FileIO.ReadableFile f) -> {
            try {
                return KV.of(
                    f.getMetadata().resourceId().toString(),f.readFullyAsUTF8String());
            } catch (IOException ex) {
                throw new RuntimeException("Failed to read the file",ex);
            }
         }));

options.getInputFile() 返回对 GCS 对象子集的通配符选择。此管道过去处理的文件数量较少,但使用当前大小(5-6 百万个文档),FileIO.match() 超时。

我收到以下错误日志:

Operation ongoing in step (1) Match Files/Via MatchAll/Match filepatterns for at least 05m00s without outputting or completing in state process
  at java.base@11.0.9/java.net.SocketInputStream.socketRead0(Native Method)
  at java.base@11.0.9/java.net.SocketInputStream.socketRead(SocketInputStream.java:115)
  at java.base@11.0.9/java.net.SocketInputStream.read(SocketInputStream.java:168)
  at java.base@11.0.9/java.net.SocketInputStream.read(SocketInputStream.java:140)
  at java.base@11.0.9/sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:476)
  at java.base@11.0.9/sun.security.ssl.SSLSocketInputRecord.readHeader(SSLSocketInputRecord.java:470)
  at java.base@11.0.9/sun.security.ssl.SSLSocketInputRecord.bytesInCompletePacket(SSLSocketInputRecord.java:70)
  at java.base@11.0.9/sun.security.ssl.SSLSocketImpl.readApplicationRecord(SSLSocketImpl.java:1354)
  at java.base@11.0.9/sun.security.ssl.SSLSocketImpl$AppInputStream.read(SSLSocketImpl.java:963)
  at java.base@11.0.9/java.io.BufferedInputStream.fill(BufferedInputStream.java:252)
  at java.base@11.0.9/java.io.BufferedInputStream.read1(BufferedInputStream.java:292)
  at java.base@11.0.9/java.io.BufferedInputStream.read(BufferedInputStream.java:351)
  at java.base@11.0.9/sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:754)
  at java.base@11.0.9/sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:689)
  at java.base@11.0.9/sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1615)
  at java.base@11.0.9/sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1520)
  at java.base@11.0.9/java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:527)
  at java.base@11.0.9/sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:334)
  at app//com.google.api.client.http.javanet.NetHttpResponse.<init>(NetHttpResponse.java:36)
  at app//com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:149)
  at app//com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:84)
  at app//com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1012)
  at app//com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:541)
  at app//com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:474)
  at app//com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:591)
  at app//com.google.cloud.hadoop.util.ResilientOperation$AbstractGoogleClientRequestExecutor.call(ResilientOperation.java:171)
  at app//com.google.cloud.hadoop.util.ResilientOperation.retry(ResilientOperation.java:67)
  at app//com.google.cloud.hadoop.util.ResilientOperation.retry(ResilientOperation.java:106)
  at app//org.apache.beam.sdk.extensions.gcp.util.GcsUtil.listObjects(GcsUtil.java:346)
  at app//org.apache.beam.sdk.extensions.gcp.util.GcsUtil.listObjects(GcsUtil.java:324)
  at app//org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystem.expand(GcsFileSystem.java:222)
  at app//org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystem.lambda$matchGlobs$0(GcsFileSystem.java:195)
  at app//org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystem$$Lambda$234/0x0000000800516c40.apply(Unknown Source)
  at app//org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators$6.transform(Iterators.java:785)
  at app//org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
  at app//org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList.copyOf(ImmutableList.java:273)
  at app//org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList.copyOf(ImmutableList.java:234)
  at app//org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.FluentIterable.toList(FluentIterable.java:617)
  at app//org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystem.matchGlobs(GcsFileSystem.java:200)
  at app//org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystem.match(GcsFileSystem.java:101)
  at app//org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:124)
  at app//org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:145)
  at app//org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:157)
  at app//org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn.process(FileIO.java:660)
  at app//org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn$DoFnInvoker.invokeProcessElement(Unknown Source)
" 
Operation ongoing in step (1) Match Files/Via MatchAll/Match filepatterns for at least 10m00s without outputting or completing in state process
  at java.base@11.0.9/java.lang.String.intern(Native Method)
  at app//com.google.api.client.util.ClassInfo.getFieldInfo(ClassInfo.java:126)
  at app//com.google.api.client.json.JsonParser.parse(JsonParser.java:441)
  at app//com.google.api.client.json.JsonParser.parseValue(JsonParser.java:787)
  at app//com.google.api.client.json.JsonParser.parseArray(JsonParser.java:641)
  at app//com.google.api.client.json.JsonParser.parseValue(JsonParser.java:744)
  at app//com.google.api.client.json.JsonParser.parse(JsonParser.java:451)
  at app//com.google.api.client.json.JsonParser.parseValue(JsonParser.java:787)
  at app//com.google.api.client.json.JsonParser.parse(JsonParser.java:360)
  at app//com.google.api.client.json.JsonParser.parse(JsonParser.java:335)
  at app//com.google.api.client.json.JsonObjectParser.parseAndClose(JsonObjectParser.java:79)
  at app//com.google.api.client.json.JsonObjectParser.parseAndClose(JsonObjectParser.java:73)
  at app//com.google.api.client.http.HttpResponse.parseAs(HttpResponse.java:451)
  at app//com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:591)
  at app//com.google.cloud.hadoop.util.ResilientOperation$AbstractGoogleClientRequestExecutor.call(ResilientOperation.java:171)
  at app//com.google.cloud.hadoop.util.ResilientOperation.retry(ResilientOperation.java:67)
  at app//com.google.cloud.hadoop.util.ResilientOperation.retry(ResilientOperation.java:106)
  at app//org.apache.beam.sdk.extensions.gcp.util.GcsUtil.listObjects(GcsUtil.java:346)
  at app//org.apache.beam.sdk.extensions.gcp.util.GcsUtil.listObjects(GcsUtil.java:324)
  at app//org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystem.expand(GcsFileSystem.java:222)
  at app//org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystem.lambda$matchGlobs$0(GcsFileSystem.java:195)
  at app//org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystem$$Lambda$234/0x0000000800516c40.apply(Unknown Source)
  at app//org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators$6.transform(Iterators.java:785)
  at app//org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
  at app//org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList.copyOf(ImmutableList.java:273)
  at app//org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList.copyOf(ImmutableList.java:234)
  at app//org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.FluentIterable.toList(FluentIterable.java:617)
  at app//org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystem.matchGlobs(GcsFileSystem.java:200)
  at app//org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystem.match(GcsFileSystem.java:101)
  at app//org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:124)
  at app//org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:145)
  at app//org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:157)
  at app//org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn.process(FileIO.java:660)
  at app//org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn$DoFnInvoker.invokeProcessElement(Unknown Source)

和另外两个超时异常,例如:

Operation ongoing in step (1) Match Files/Via MatchAll/Match filepatterns for at least 05m00s without outputting or completing in state process
  at java.base@11.0.9/sun.security.provider.SHA.implCompress(SHA.java:129)

Operation ongoing in step (1) Match Files/Via MatchAll/Match filepatterns for at least 10m00s without outputting or completing in state process
  at java.base@11.0.9/java.util.Calendar.<init>(Calendar.java:1607)

我是否使用了错误的 API 来读取 GCS?我认为 Apache Beam + Cloud Dataflow 可以轻松读取 5/6 百万个文档。由于这些错误,我的管道执行停止并且作业没有完成。

请告知解决此问题的可能解决方案。

解决方法

您说得对,对于 Dataflow 而言,读取这么多文件应该不是问题,但在一个同步操作中将它们全部匹配可能会出现问题(如演示所示)。

一种可能的解决方法是将您的单个模式分解为多个较小的模式并使用 FileIO.matchAll(),例如

PCollection<KV<String,String>> docs = p
    .apply("(0) Input Patterns",Create.of("/path/to/a*","/path/to/b*",...))
    .apply("(1) Match Files",FileIO.matchAll()
    ...

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


依赖报错 idea导入项目后依赖报错,解决方案:https://blog.csdn.net/weixin_42420249/article/details/81191861 依赖版本报错:更换其他版本 无法下载依赖可参考:https://blog.csdn.net/weixin_42628809/a
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下 2021-12-03 13:33:33.927 ERROR 7228 [ main] o.s.b.d.LoggingFailureAnalysisReporter : *************************** APPL
错误1:gradle项目控制台输出为乱码 # 解决方案:https://blog.csdn.net/weixin_43501566/article/details/112482302 # 在gradle-wrapper.properties 添加以下内容 org.gradle.jvmargs=-Df
错误还原:在查询的过程中,传入的workType为0时,该条件不起作用 &lt;select id=&quot;xxx&quot;&gt; SELECT di.id, di.name, di.work_type, di.updated... &lt;where&gt; &lt;if test=&qu
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct redisServer’没有名为‘server_cpulist’的成员 redisSetCpuAffinity(server.server_cpulist); ^ server.c: 在函数‘hasActiveC
解决方案1 1、改项目中.idea/workspace.xml配置文件,增加dynamic.classpath参数 2、搜索PropertiesComponent,添加如下 &lt;property name=&quot;dynamic.classpath&quot; value=&quot;tru
删除根组件app.vue中的默认代码后报错:Module Error (from ./node_modules/eslint-loader/index.js): 解决方案:关闭ESlint代码检测,在项目根目录创建vue.config.js,在文件中添加 module.exports = { lin
查看spark默认的python版本 [root@master day27]# pyspark /home/software/spark-2.3.4-bin-hadoop2.7/conf/spark-env.sh: line 2: /usr/local/hadoop/bin/hadoop: No s
使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams[&#39;font.sans-serif&#39;] = [&#39;SimHei&#39;] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -&gt; systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping(&quot;/hires&quot;) public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate&lt;String
使用vite构建项目报错 C:\Users\ychen\work&gt;npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-