如何解决无法使用Apache flink编译加入流代码Kinesis数据分析
我无法使用Kinesis Data Analytics在Apache Flink中编译和运行联接代码。遵循文档并进行一些研究后,我的代码如下所示:
stream_1.join(stream_2)
.where((x: String) => x.orderId)
.equalTo((x: String) => x.orderId)
.window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
.apply{ (e1,e2) => e1 + "," + e2 }.addSink(createSinkFromStaticConfig())
此操作失败,并显示错误:value orderId is not a member of String
因为我的原始数据是json,所以我也尝试过类似的操作:
stream_1.join(stream_2)
.where((x: String) => jsonParser.readValue(x,classOf[JsonNode]).get("orderId").asText)
.equalTo((x: String) => jsonParser.readValue(x,classOf[JsonNode]).get("orderId").asText)
.window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
.apply{ (e1," + e2 }.addSink(createSinkFromStaticConfig())
失败并出现错误:
error: overloaded method value where with alternatives:
[INFO] [KEY](x$1: org.apache.flink.api.java.functions.KeySelector[String,KEY],x$2: org.apache.flink.api.common.typeinfo.TypeInformation[KEY])org.apache.flink.streaming.api.datastream.JoinedStreams[String,String]#Where[KEY] <and>
[INFO] [KEY](x$1: org.apache.flink.api.java.functions.KeySelector[String,KEY])org.apache.flink.streaming.api.datastream.JoinedStreams[String,String]#Where[KEY]
[INFO] cannot be applied to (String => String)
[INFO] .where((x: String) => jsonParser.readValue(x,classOf[JsonNode]).get("orderId").asText)
我不确定按select键输入什么文档:
orangeStream.join(greenStream)
.where(elem => /* select key */)
.equalTo(elem => /* select key */)
.window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
.apply { (e1," + e2 }
有人可以帮助我如何从我的信息流中选择加入密钥吗?
更多详细信息:
- 来源:Kinesis数据流
- 流类型:DataStream [String]
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。