如何解决Kafka Stream使用相同的谓词将消息发送到多个分支
我需要给两个不同的分支提供相同的
我尝试了下面的代码,但是我看到只有第一个分支正在收到消息,而第二个分支却没有得到消息。
KStream<String,byte[]>[] branches = builder.<String,byte[]>stream("source-topic)
.branch((key,val) -> true,// this goes to output topic1
(key,val) -> true); // this goes to output topic2
// branch[0] map on logic1Ondata and send to topic1
branches[0].map(logic1OnData).filter(
(key,value) -> {
if (key == null || value == null)
return false;
return value.data() != null;
}).to("topic1",Produced.with(Serdes.String(),Serdes.String())
// branch[1] map on logic2Ondata and send to topic2
branches[1].map(logic2OnData).filter(
(key,value) -> {
if (key == null || value == null)
return false;
return value.data() != null;
}).to("topic2",Serdes.String())
期望两个分支都应从“源主题”中获得相同的消息。每个分支分别对logic1OnData和logic2OnData进行验证,并写入topic1和topic2 但是我只看到branch [0]接收消息,而没有消息到branch [1]
如果不能选择分支,我想知道是否还有其他方法。
编辑: 分支是另外一种情况。因此,如果第一个谓词匹配,则不会考虑休息。因此,由于这个原因,只有topic1才能获得上述逻辑的消息。
代替下面的方法,我可以将消息分配给多个路径。
KStream<String,byte[]>[] stream = builder.<String,byte[]>stream("source-topic);
stream.map(logic1OnData).filter(
(key,Serdes.String())
stream.map(logic2OnData).filter(
(key,Serdes.String())
此更改将所有消息发送到两个地图。进一步过滤并写入相应主题。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。