如何解决为什么RxJava中的onBackpressureDrop会发生这种情况
在这里,我有一个可流动的对象,它每毫秒发射一次元素。
Flowable<Long> source = Flowable.interval(1,TimeUnit.MILLISECONDS).take(14000);
source.map(e->{
Log.d("TAGBefore","before " + e);
return e;
})
.onBackpressureDrop()
.observeOn(Schedulers.computation())
.subscribe(
e-> {
Log.d("TAGNext","onNext: " + e);
Thread.sleep(100);
},e-> Log.d("TAGError","error: " + e),()-> Log.d("TAGComplete","onComplete")
);
我使用过之前的信息来知道可观察对象发出元素的时刻,我的疑问是,从127(观察者已满)开始,它会达到9688
TAGNext: onNext: 125
TAGNext: onNext: 126
TAGNext: onNext: 127
TAGNext: onNext: 9668
TAGNext: onNext: 9669
TAGNext: onNext: 9670
但是,当我进一步检查控制台(使用其他搜索过滤器)时,我意识到发布127时它已经去了12794,所以不是9688而不应该是12794或接近的数字? ,谢谢。
TAGBefore: before: 12793
TAGBefore: before: 12794
TAGNext: onNext: 127
TAGBefore: before: 12795
TAGBefore: before: 12796
但是,当我更多地检查控制台(使用其他搜索过滤器)时,我意识到发出127时它已经达到12794了,因此代替9688而不应该是12794或接近可观察的数字,而不是9794。已经免费了吗?如果出现错误,我要澄清一下,我是RxJava的新手,谢谢。
解决方法
observeOn
具有默认的128元素缓冲区,该缓冲区可以很快填充。
每100毫秒将其耗尽一次,直到仅剩32个元素为止,此时它又请求96个元素。因此,要让更多项目通过onBackpressureDrop
大约需要9600毫秒,因此您看到TAGNext: onNext: 9668
。
排空第127个元素时,您需要运行大约12700毫秒,因此从生产者端看到TAGBefore: before: 12795
。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。