如何解决为什么与空的fs2.Stream合并会更改程序的行为
有据可查的是,与空fs2.Stream
合并应产生相同的fs2.Stream
。这是来自Scaladocs的引用:
具有
merge(Stream.empty,s) == s
使用fs2.Stream
考虑以下完整的Scala程序:
发光元素
import scala.concurrent.duration._
import cats.effect.{ContextShift,IO,Timer}
import cats.syntax.flatMap._
import cats.effect.concurrent.Ref
import scala.concurrent.ExecutionContext
object TestFs2 extends App {
implicit val timerIo: Timer[IO] = IO.timer(ExecutionContext.global)
implicit val concurrentIo: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
val program = Ref.of[IO,Int](0).map(ref => {
fs2.Stream.repeatEval(ref.get).evalMap(value => {
IO(println(s"Got value $value")) >> IO.sleep(1.second) >> ref.set(value + 1)
})
})
program.flatMap(_.compile.drain).unsafeRunSync()
}
程序将打印以下内容:
Got value 0
Got value 1
Got value 2
...
,看起来还可以。现在应用上面Scaladoc
的引用,我得出结论,替换
fs2.Stream.repeatEval(ref.get)
使用
fs2.Stream.repeatEval(ref.get).merge(fs2.Stream.empty.covaryAll[IO,Int])
行为应相同。这是更新的程序:
发射元素并与空的fs2.Stream合并
import scala.concurrent.duration._
import cats.effect.{ContextShift,Int](0).map(ref => {
fs2.Stream.repeatEval(ref.get).merge(fs2.Stream.empty.covaryAll[IO,Int]).evalMap(value => {
IO(println(s"Got value $value")) >> IO.sleep(1.second) >> ref.set(value + 1)
})
})
program.flatMap(_.compile.drain).unsafeRunSync()
}
程序输出为
Got value 0
Got value 0
Got value 1
Got value 1
Got value 2
Got value 2
Got value 3
Got value 3
...
问题::为什么与空的fs2.Stream
合并会更改程序的行为,从而导致原始fs2.Stream
的元素重复?
解决方法
merge
的文档中还说:
在等待结果流使用它之前,实现总是尝试从每一侧提取一个块。这样,当结果流是处理元素时,可能有多达两个块(每个流一个)等待处理。
如果我正确理解这意味着在结果流忙于处理值0
时,在更新ref
之前已经从源中提取了一个新值。
严格来说,我认为这种行为不会违反任何不变性。但是对您来说却有所不同,因为
- 您的流改变了它从中提取的源
- 您的源流总是 准备发射元素
要解决第二点,可以使用1元素队列而不是Ref
。
AFAICT,如果不使用merge
,可能会发生相同的问题。只要源可以发射它们,流就可以在处理它们之前从其源中随意拉出尽可能多的元素。基本上,您在第一段代码中就很幸运,因为您的流很简单,只有一个1元素的块。
原来是bug。
它从源流中提取下一个块,然后获取 信号量许可,直到处理上一个块为止 从队列中。因此,它总是提前读取1个块。
按照 mpilquist 的建议,我创建了pull request,用于修复刚刚合并的问题。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。