如何解决通过KeySelector Flink使用.keyBy从Kafka获取数据
我在带有Kafka连接器的Java中具有Flink脚本。我正在从Kafka毫无问题地获取数据,第一步,我正在做一个.map从消息中获取时间戳。为了使用事件时间窗口,我从数据中提取了以毫秒为单位的时间戳,并将其返回给flink。为此,我使用了“ assignTimestampsAndWatermarks”
DataStream<String> kafkaData = env.addSource(new FlinkKafkaConsumer("CorID_0",new SimpleStringSchema(),p));
kafkaData.map(new MapFunction<
String,Tuple19<String,String,Double,Long,Integer,Double>>()
{
public Tuple19<String,Double> map(String value)
{
String[] words = value.split(",");
return new Tuple19<String,Double>
(words[0],words[1],words[2],words[3],words[4],words[5],Double.parseDouble(words[6]),Long.parseLong(words[7]),Double.parseDouble(words[8]),Long.parseLong(words[9]),Long.parseLong(words[10]),Integer.parseInt(words[11]),Long.parseLong(words[12]),Double.parseDouble(words[13]),Long.parseLong(words[14]),Double.parseDouble(words[15]),Double.parseDouble(words[16]),Integer.parseInt(words[17]),Double.parseDouble(words[18]));
}
})
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple19<String,Double>>()
{
private final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
public long extractAscendingTimestamp(Tuple19<String,Double> value)
{
try
{
Timestamp ts = new Timestamp(sdf.parse(value.f3).getTime());
return ts.getTime();
} catch (Exception e)
{
throw new RuntimeException("Parsing Error");
}
}
});
第二步是计算开始的地方。我正在尝试对数据进行一些处理,为此,我需要从kafka消息中获取数据,这基本上就是我被卡住的地方。
DataStream<String> largeDelta = kafkaData .keyBy(new KeySelector<Tuple19<String,Double>,String>()
{
public String getKey(Tuple19<String,Double> value)
{
return value.f2;
}
})
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.process(new TrackChanges(5));
largeDelta.writeAsText("/Alert.txt");
env.execute("ABCD");
问题是我有一条错误消息,告诉我“无法解析方法'KeyBy(匿名org.apache.flink.api.java.functions ....'
由于我正在努力了解我所缺少的内容,因此我们将竭诚欢迎任何帮助。
谢谢
解决方法
我猜您的new MapFunction()...
正在将传入的String
转换为Tuple2<String,String>
,否则拥有KeySelector<Tuple2<String,String>,String>
毫无意义。
如果是这样,则需要将kafkaData.map(new MapFunction<...
的结果分配给DataStream keyBy
一起使用。
尽管这么说,但我看不出keyBy().window()
中Tuple2<String,String>
产生DataStream<String> largeDelta
的情况。因此感觉就像是多个问题。
此外,对于简单的键选择器,请使用lambda表达式,而不是定义匿名函数。例如。 kafkaData.keyBy(r -> r.f1)
可以做到。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。