如何解决操作在步骤 (1) Match Files/Via MatchAll/Match filepatterns 至少 05m00s 不输出或完成状态过程
我有一个数据流管道,可以读取大量文件(至少 500 万个文档)并尝试将其存储在数据库中。
我有以下管道执行:
StorageToXrOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(StorageToXOptions.class);
Pipeline p = Pipeline.create(options);
PCollection<KV<String,String>> docs = p
.apply("(1) Match Files",FileIO.match().filepattern(options.getInputFile()))
// withCompression can be omitted - by default compression is detected from the filename.
.apply("(2) Read Matches",FileIO.readMatches())
.apply("(3) Transform into KV",MapElements // uses imports from TypeDescriptors
.into(kvs(strings(),strings()))
.via((FileIO.ReadableFile f) -> {
try {
return KV.of(
f.getMetadata().resourceId().toString(),f.readFullyAsUTF8String());
} catch (IOException ex) {
throw new RuntimeException("Failed to read the file",ex);
}
}));
options.getInputFile()
返回对 GCS 对象子集的通配符选择。此管道过去处理的文件数量较少,但使用当前大小(5-6 百万个文档),FileIO.match() 超时。
我收到以下错误日志:
Operation ongoing in step (1) Match Files/Via MatchAll/Match filepatterns for at least 05m00s without outputting or completing in state process
at java.base@11.0.9/java.net.SocketInputStream.socketRead0(Native Method)
at java.base@11.0.9/java.net.SocketInputStream.socketRead(SocketInputStream.java:115)
at java.base@11.0.9/java.net.SocketInputStream.read(SocketInputStream.java:168)
at java.base@11.0.9/java.net.SocketInputStream.read(SocketInputStream.java:140)
at java.base@11.0.9/sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:476)
at java.base@11.0.9/sun.security.ssl.SSLSocketInputRecord.readHeader(SSLSocketInputRecord.java:470)
at java.base@11.0.9/sun.security.ssl.SSLSocketInputRecord.bytesInCompletePacket(SSLSocketInputRecord.java:70)
at java.base@11.0.9/sun.security.ssl.SSLSocketImpl.readApplicationRecord(SSLSocketImpl.java:1354)
at java.base@11.0.9/sun.security.ssl.SSLSocketImpl$AppInputStream.read(SSLSocketImpl.java:963)
at java.base@11.0.9/java.io.BufferedInputStream.fill(BufferedInputStream.java:252)
at java.base@11.0.9/java.io.BufferedInputStream.read1(BufferedInputStream.java:292)
at java.base@11.0.9/java.io.BufferedInputStream.read(BufferedInputStream.java:351)
at java.base@11.0.9/sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:754)
at java.base@11.0.9/sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:689)
at java.base@11.0.9/sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1615)
at java.base@11.0.9/sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1520)
at java.base@11.0.9/java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:527)
at java.base@11.0.9/sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:334)
at app//com.google.api.client.http.javanet.NetHttpResponse.<init>(NetHttpResponse.java:36)
at app//com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:149)
at app//com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:84)
at app//com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1012)
at app//com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:541)
at app//com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:474)
at app//com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:591)
at app//com.google.cloud.hadoop.util.ResilientOperation$AbstractGoogleClientRequestExecutor.call(ResilientOperation.java:171)
at app//com.google.cloud.hadoop.util.ResilientOperation.retry(ResilientOperation.java:67)
at app//com.google.cloud.hadoop.util.ResilientOperation.retry(ResilientOperation.java:106)
at app//org.apache.beam.sdk.extensions.gcp.util.GcsUtil.listObjects(GcsUtil.java:346)
at app//org.apache.beam.sdk.extensions.gcp.util.GcsUtil.listObjects(GcsUtil.java:324)
at app//org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystem.expand(GcsFileSystem.java:222)
at app//org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystem.lambda$matchGlobs$0(GcsFileSystem.java:195)
at app//org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystem$$Lambda$234/0x0000000800516c40.apply(Unknown Source)
at app//org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators$6.transform(Iterators.java:785)
at app//org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
at app//org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList.copyOf(ImmutableList.java:273)
at app//org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList.copyOf(ImmutableList.java:234)
at app//org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.FluentIterable.toList(FluentIterable.java:617)
at app//org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystem.matchGlobs(GcsFileSystem.java:200)
at app//org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystem.match(GcsFileSystem.java:101)
at app//org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:124)
at app//org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:145)
at app//org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:157)
at app//org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn.process(FileIO.java:660)
at app//org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn$DoFnInvoker.invokeProcessElement(Unknown Source)
"
Operation ongoing in step (1) Match Files/Via MatchAll/Match filepatterns for at least 10m00s without outputting or completing in state process
at java.base@11.0.9/java.lang.String.intern(Native Method)
at app//com.google.api.client.util.ClassInfo.getFieldInfo(ClassInfo.java:126)
at app//com.google.api.client.json.JsonParser.parse(JsonParser.java:441)
at app//com.google.api.client.json.JsonParser.parseValue(JsonParser.java:787)
at app//com.google.api.client.json.JsonParser.parseArray(JsonParser.java:641)
at app//com.google.api.client.json.JsonParser.parseValue(JsonParser.java:744)
at app//com.google.api.client.json.JsonParser.parse(JsonParser.java:451)
at app//com.google.api.client.json.JsonParser.parseValue(JsonParser.java:787)
at app//com.google.api.client.json.JsonParser.parse(JsonParser.java:360)
at app//com.google.api.client.json.JsonParser.parse(JsonParser.java:335)
at app//com.google.api.client.json.JsonObjectParser.parseAndClose(JsonObjectParser.java:79)
at app//com.google.api.client.json.JsonObjectParser.parseAndClose(JsonObjectParser.java:73)
at app//com.google.api.client.http.HttpResponse.parseAs(HttpResponse.java:451)
at app//com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:591)
at app//com.google.cloud.hadoop.util.ResilientOperation$AbstractGoogleClientRequestExecutor.call(ResilientOperation.java:171)
at app//com.google.cloud.hadoop.util.ResilientOperation.retry(ResilientOperation.java:67)
at app//com.google.cloud.hadoop.util.ResilientOperation.retry(ResilientOperation.java:106)
at app//org.apache.beam.sdk.extensions.gcp.util.GcsUtil.listObjects(GcsUtil.java:346)
at app//org.apache.beam.sdk.extensions.gcp.util.GcsUtil.listObjects(GcsUtil.java:324)
at app//org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystem.expand(GcsFileSystem.java:222)
at app//org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystem.lambda$matchGlobs$0(GcsFileSystem.java:195)
at app//org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystem$$Lambda$234/0x0000000800516c40.apply(Unknown Source)
at app//org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators$6.transform(Iterators.java:785)
at app//org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
at app//org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList.copyOf(ImmutableList.java:273)
at app//org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList.copyOf(ImmutableList.java:234)
at app//org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.FluentIterable.toList(FluentIterable.java:617)
at app//org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystem.matchGlobs(GcsFileSystem.java:200)
at app//org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystem.match(GcsFileSystem.java:101)
at app//org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:124)
at app//org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:145)
at app//org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:157)
at app//org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn.process(FileIO.java:660)
at app//org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn$DoFnInvoker.invokeProcessElement(Unknown Source)
和另外两个超时异常,例如:
Operation ongoing in step (1) Match Files/Via MatchAll/Match filepatterns for at least 05m00s without outputting or completing in state process
at java.base@11.0.9/sun.security.provider.SHA.implCompress(SHA.java:129)
和
Operation ongoing in step (1) Match Files/Via MatchAll/Match filepatterns for at least 10m00s without outputting or completing in state process
at java.base@11.0.9/java.util.Calendar.<init>(Calendar.java:1607)
我是否使用了错误的 API 来读取 GCS?我认为 Apache Beam + Cloud Dataflow 可以轻松读取 5/6 百万个文档。由于这些错误,我的管道执行停止并且作业没有完成。
请告知解决此问题的可能解决方案。
解决方法
您说得对,对于 Dataflow 而言,读取这么多文件应该不是问题,但在一个同步操作中将它们全部匹配可能会出现问题(如演示所示)。
一种可能的解决方法是将您的单个模式分解为多个较小的模式并使用 FileIO.matchAll(),例如
PCollection<KV<String,String>> docs = p
.apply("(0) Input Patterns",Create.of("/path/to/a*","/path/to/b*",...))
.apply("(1) Match Files",FileIO.matchAll()
...
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。