如何解决FasterXML中的ReadValue时Flink收集器问题
我将Kafka值设置为String,并将POJO设置如下,
{"name":"John","timeStamp":"2020-08-11T13:31:31"}
class Person{
private String name;
private LocalDateTime timeStamp;
}
此时间戳记是来自Kafka的String,并将其转换为LocalDateTime
。
当我使用FasterXML的必需库以独立和objectMapper.readValue(value,Person.class)
的身份运行程序时,它运行良好。正在转换。
当我从Flink Framework阅读以下内容时,
stream.flatMap(new FlatMapFunction<String,Person>() {
public void flatMap(String value,Collector<Person> out) {
try {
out.collect(objectMapper.readValue(value,Person.class));
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
}).print();
env.execute();
我遇到以下问题,
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: com.fasterxml.jackson.databind.json.JsonMapper@1b7cc17c is not serializable. The object probably contains or references non serializable fields.
该消息显示我的Person对象不可序列化,并且我已经为Serializable
类实现了Person
,但是没有运气。而且,在下面尝试过,也不是很幸运。
@JsonSerialize(using = LocalDateTimeSerializer.class)
private LocalDateTime timeStamp;
更新:
类似于API的问题,我在下面的链接中阅读
https://issues.apache.org/jira/browse/FLINK-12113
解决方法
该异常指出JsonMapper
实例不是Serializable
-如果我没记错的话,它从版本2.1
开始可序列化。另外,Person
类也应设置为可序列化。
因此,在您的情况下,我会说您应该切换到jackson-databind
版本>=2.1
,或者可能将JsonMapper static
字段设为
对于Person
类,只需实现Serializable
接口:
class Person implements Serializable {
private String name;
private LocalDateTime timeStamp;
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。