如何解决使用 RxJava2 中的一系列运算符一次处理一个 PublishSubject 发射
我有一个PublishSubject<Event>
。
在每个新的事件上,我触发一个数据库查询(一些本地缓存的数据),然后获取结果并尝试通过HTTPPOST它> 向服务器发出请求,当该服务器回复 200 时,我进行另一个数据库查询以删除刚刚发送的行。
这是通过粗略的链接完成的:
subject
.toSerialized()
.flatMapMaybe { getCachedData() }
.flatMap { uploadData() }
.flatMapCompletable { cleanCache() }
.subscribe()
主体在特定条件下可能会发出两个快速事件,假设间隔为 10 毫秒。
问题是 getCachedData() 在第一次发射完成后的 getCachedData() 之后,即在 cleanCache() 有机会在第二次发射之前清理数据库。
我想以某种方式将这些 flatMaps 组合成一个观察者,以便 subject 在整个链完成之前不会产生新的排放,最好没有任何手工制作的信号量.
我在单个线程池调度程序上执行 subscribeOn(),它只对每个 flatMap 内的调用进行排序。
我看到了一些将 toSerialized() 添加到主题的建议,但现在我认为它与链的工作方式无关。
我看到还有 lift() 和 compose() 操作符。我试图将所有 flatMaps 放在后者中,这并没有改变行为。前者我还在想。
解决方法
将它们放在 concatMapX
子流中:
subject
.concatMapCompletable {
getCachedData()
.flatMap { uploadData() }
.flatMapCompletable { cleanCache() }
}
我看到了一些将 toSerialized() 添加到主题的建议,但现在我认为它与链的工作方式无关
除非您实际驱动它和多个线程返回的 Subject
,否则它对您的流程没有实际影响。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。