如何解决Flink SerializationSchema:无法序列化行错误
在使用flink的SerializationSchema时遇到一些麻烦。
这是我的主要代码:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DeserializationSchema<Row> sourceDeserializer = new JsonRowDeserializationSchema.Builder( /*Extract TypeInformation<Row> from an avsc schema file*/ ).build();
DataStream<Row> myDataStream = env.addSource( new MyCustomSource(sourceDeserializer) ) ;
final SinkFunction<Row> sink = new MyCustomSink(new JsonRowSerializationSchema.Builder(myDataStream.getType()).build());
myDataStream.addSink(sink).name("MyCustomSink");
env.execute("MyJob");
这是我自定义的接收器功能:
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings("serial")
public class MyCustomSink implements SinkFunction<Row> {
private static final Logger LOGGER = LoggerFactory.getLogger(MyCustomSink.class);
private final boolean print;
private final SerializationSchema<Row> serializationSchema;
public MyCustomSink(final SerializationSchema<Row> serializationSchema) {
this.serializationSchema = serializationSchema;
}
@Override
public void invoke(final Row value,final Context context) throws Exception {
try {
LOGGER.info("MyCustomSink- invoke : [{}]",new String(serializationSchema.serialize(value)));
}catch (Exception e){
LOGGER.error("MyCustomSink- Error while sending data : " + e);
}
}
}
这是我自定义的源函数(不确定它对我遇到的问题是否有用):
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.io.ByteStreams;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MyCustomSource<T> extends RichSourceFunction<T> implements ResultTypeQueryable<T> {
/** logger */
private static final Logger LOGGER = LoggerFactory.getLogger(MyCustomSource.class);
/** the JSON deserializer */
private final DeserializationSchema<T> deserializationSchema;
public MyCustomSource(final DeserializationSchema<T> deserializer) {
this.deserializationSchema = deserializer;
}
@Override
public void open(final Configuration parameters) {
...
}
@Override
public void run(final SourceContext<T> ctx) throws Exception {
LOGGER.info("run");
InputStream data = ...; // Retrieve the input json data
final T row = deserializationSchema
.deserialize(ByteStreams.toByteArray(data));
ctx.collect(row);
}
@Override
public void cancel() {
...
}
@Override
public TypeInformation<T> getProducedType() {
return deserializationSchema.getProducedType();
}
}
现在,我运行我的代码,并将一些数据顺序发送到管道:
==>
{
"id": "sensor1","data":{
"rotation": 250
}
}
在这里,数据已由我的接收器正确打印:MyCustomSink- invoke : [{"id":"sensor1","data":{"rotation":250}}]
==>
{
"id": "sensor1"
}
在这里,数据已由我的接收器正确打印:MyCustomSink- invoke : [{"id":"sensor1","data":null}]
==>
{
"id": "sensor1","data":{
"rotation": 250
}
}
在这里,序列化有错误。打印的错误日志为:
MyCustomSink- Error while sending data : java.lang.RuntimeException: Could not serialize row 'sensor1,250'. Make sure that the schema matches the input.
我根本不明白为什么会有这种行为。有人有主意吗?
注意:
- 使用Flink 1.9.2
-编辑-
我添加了CustomSource部分
-编辑2-
经过更多调查,看来此行为是由private transient ObjectNode node
中的JsonRowSerializationSchema
引起的。如果我理解正确,这是用于优化的,但似乎是造成我问题的原因。
这是正常行为吗?如果是,那么在我的情况下,该类的正确用法是什么? (否则,有什么方法可以绕过这个问题吗?)
解决方法
这是一个JsonRowSerializationSchema
错误,已在最新的Flink版本中修复-我相信this PR解决了上述问题。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。