如何解决如何结束/关闭MutableSharedFlow?
SharedFlow
刚刚在1.4.0-M1协程中引入,它意在替换所有BroadcastChannel
实现(如design issue定义中所述)。
我有一个用例,其中我使用BroadcastChannel
表示传入的Web套接字框架,以便多个侦听器可以“订阅”这些框架。
当我移到SharedFlow
时遇到的问题是,当我收到关闭帧或上游错误(我想通知所有订阅者该流)时,我无法“结束”流结束了。
当我想有效地“关闭” SharedFlow
时如何使所有订阅终止?
有没有办法区分正常关闭和异常关闭之间的区别? (如渠道)
如果MutableSharedFlow
不允许将流的结尾传达给订户,那么如果BroadcastChannel
被弃用/删除,该怎么办?
解决方法
SharedFlow
文档描述了您的需求:
请注意,大多数终端运算符(例如Flow.toList)在应用于共享流时也不会完成,但是可以在共享流上使用流截断运算符(例如Flow.take和Flow.takeWhile)将其转换为完成操作一个。
SharedFlow不能像BroadcastChannel一样关闭,永远不能代表失败。如果需要,应明确实现所有错误和完成信号。
基本上,您将需要引入一个可以从共享流中发出的特殊对象,以指示该流已结束,在使用者端使用takeWhile
可以使它们一直发出,直到接收到该特殊对象为止。 / p>
我认为一个可能的解决方案是创建一个布尔标志 ValueError: Shape must be rank 4 but is rank 3 for '{{node sequential/random_rotation/transform/ImageProjectiveTransformV3}} = ImageProjectiveTransformV3[dtype=DT_FLOAT,fill_mode="WRAP",interpolation="BILINEAR"](rescaling/add,sequential/random_rotation/rotation_matrix/concat,sequential/random_rotation/transform/strided_slice,sequential/random_rotation/transform/fill_value)' with input shapes: [299,299,3],[299,8],[2],[].
并仅公开公开带有 isValid
的流。然后,当您想关闭所有订阅者时,只需调用 .takeWhile { isValid }
和 isValid = false
。
可能的实现:
sFlow.emit()
编辑:在我的情况下,private var isValid = true // In real scenario use atomic boolean
private val _sharedFlow = MutableSharedFlow<Unit>()
val sharedFlow: Flow<Unit> get() = _sharedFlow.takeWhile { isValid }
suspend fun cancelSharedFlow() {
isValid = false
_sharedFlow.emit(Unit)
}
总是挂起,所以我不得不使用 .emit()
(这不适合许多用例)。不确定问题是在这个例子中还是在我的应用程序的其他地方。如果您发现问题,请发表评论:)