如何解决flink SQL 1.13.0&1.13.1 elasticsearch sink 失败
Flink SQL> INSERT INTO es_sink SELECT 'hello';
[INFO] Submitting SQL update statement to the cluster...
Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue.
at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161)
Caused by: java.lang.NoSuchMethodError: org.apache.flink.streaming.connectors.elasticsearch.table.RowElasticsearchSinkFunction.<init>(Lorg/apache/flink/streaming/connectors/elasticsearch/table/IndexGenerator;Ljava/lang/String;Lorg/apache/flink/api/common/serialization/SerializationSchema;Lorg/apache/flink/elasticsearch7/shaded/org/elasticsearch/common/xcontent/XContentType;Lorg/apache/flink/streaming/connectors/elasticsearch/table/RequestFactory;Ljava/util/function/Function;)V
at org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7DynamicSink.lambda$getSinkRuntimeProvider$0(Elasticsearch7DynamicSink.java:129)
at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink.createSinkTransformation(CommonExecSink.java:161)
at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:130)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:70)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:69)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:69)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:165)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:740)
at org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$executeModifyOperations$4(LocalExecutor.java:226)
at org.apache.flink.table.client.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:90)
at org.apache.flink.table.client.gateway.local.LocalExecutor.executeModifyOperations(LocalExecutor.java:226)
at org.apache.flink.table.client.cli.CliClient.callInserts(CliClient.java:518)
at org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:507)
at org.apache.flink.table.client.cli.CliClient.callOperation(CliClient.java:409)
at org.apache.flink.table.client.cli.CliClient.lambda$executeStatement$0(CliClient.java:327)
at java.util.Optional.ifPresent(Optional.java:159)
at org.apache.flink.table.client.cli.CliClient.executeStatement(CliClient.java:327)
at org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:297)
at org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:221)
at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151)
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95)
at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187)
... 1 more
解决方法
根据您的评论,您的类路径中似乎必须存在冲突的 ES 版本。
与其将罐子放入 lib
,不如将其放入 execute the sqlClient
with the -l
option。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。