如何解决并行从单流主题写入不同的主题
我有一个流,它使消息映射到两个不同的map()调用,并进一步被过滤并写入两个不同的主题。
KStream<String,byte[]>[] stream = builder.<String,byte[]>stream("source-topic");
stream.map(logic1OnData).filter(
(key,value) -> {
if (key == null || value == null)
return false;
return value.data() != null;
}).to("topic1",Produced.with(Serdes.String(),Serdes.String())
stream.map(logic2OnData).filter(
(key,value) -> {
if (key == null || value == null)
return false;
return value.data() != null;
}).to("topic2",Serdes.String())
有没有一种方法可以并行运行stream.map(logc1OnData)...和stream.map(logic2OnData)? 看起来他们一个接一个地运行,即执行了第一个映射并将其写入topic1 然后执行第二个映射并将其写入topic2 仅供参考。我不希望num.threads.count是因为我的流输入来自单个主题,并且我正在运行同一个应用程序的多个实例以从源主题中读取内容以在使用时实现并行性。
我正在寻找的是执行和写入不同主题时的并行性
解决方法
您正在查看的是将操作添加到拓扑的顺序。一旦执行了拓扑,记录器便会按照它们到达的顺序流经耳道,但是logic2OnData
不会等待logic1OnData
在运行之前完成处理。
如果您担心性能,可以使用stream threads
,以获得更多的并行性。
编辑:看来我可能错过了这个问题。
单个子拓扑不允许您并行运行每个分支。但是,您可以使用repartition()
将logic2OnData变成自己的子拓扑,并且repartition()
调用之后的所有内容都可以与其之前的所有内容并行运行。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。