如何解决使用Flink将POJO保存到Cassandra
我是Flink的新手,我想将Kafka流数据存储到Cassandra中。我已经将String转换为POJO。我的POJO如下,
@Table(keyspace = "sample",name = "contact")
public class Person implements Serializable {
private static final long serialVersionUID = 1L;
@Column(name = "name")
private String name;
@Column(name = "timeStamp")
private LocalDateTime timeStamp;
我的转换发生在下面,
stream.flatMap(new FlatMapFunction<String,Person>() {
public void flatMap(String value,Collector<Person> out) {
try {
out.collect(objectMapper.readValue(value,Person.class));
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
}).print(); // I need to use proper method to convert to Datastream.
env.execute();
我阅读了以下链接上的文档以供参考,
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/cassandra.html
Cassandra Sink接受DataStream实例。我需要转换我的转换并将其存储到Kafka中。
Cant create Cassandra Pojo Sink也给我一些想法。
有方法.forward()
返回 DataStream<Reading> forward
,并将实例传递给时,
CassandraSink.addSink(forward)
.setHost("localhost")
.build();
cannot access org.apache.flink.streaming.api.scala.DataStream
如何将POJO转换为存储在Cassandra中?
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。