如何解决将callbackflow转换为sharedflow
我只是从协程/流(通常是kotlin)的使用开始,我正在努力将callbackFlow转换为sharedFlow。
我整理了以下简单示例,只是为了展示我尝试过的方法,但没有成功。我的代码更加复杂,但是我相信这个示例反映了我要解决的问题。
fun main() = runBlocking {
getMySharedFlow().collect{
println("collector 1 value: $it")
}
getMySharedFlow().collect{
println("collector 2 value: $it")
}
}
val sharedFlow = MutableSharedFlow<Int>()
suspend fun getMySharedFlow(): SharedFlow<Int> {
println("inside sharedflow")
getMyCallbackFlow().collect{
println("emitting to sharedflow value: $it")
sharedFlow.emit(it)
}
return sharedFlow
}
fun getMyCallbackFlow(): Flow<Int> = callbackFlow<Int> {
println("inside callbackflow producer")
fetchSomethingContinuously {
println("fetched something")
offer(1)
offer(2)
offer(3)
}
awaitClose()
}
fun fetchSomethingContinuously(myCallBack: ()->Unit) {
println("fetching something...")
myCallBack()
}
这个想法是fetchSomethingContinuously
仅被调用一次,与sharedFlow的收集器数量无关。但是,从输出中可以看到,收集器永远不会获取值:
inside sharedflow
inside callbackflow producer
fetching something...
fetched something
emitting to sharedflow value: 1
emitting to sharedflow value: 2
emitting to sharedflow value: 3
我查看了shareIn运算符,但不确定如何正确使用它。
我如何实现这样的目标?任何提示将不胜感激。
解决方法
因此,您在这里所缺少的是以下事实:对collect
,emit()
和awaitClose()
的调用正在挂起,并且只有在完成相应的操作后才能结束。
函数getMySharedFlow()
甚至没有返回以在其上应用收集,因为它正在收集callbackFlow
,callbackFlow
停留在对awaitClose()
的调用上反过来并没有完成,因为fetchSomethingContinuously
没有用close()
函数结束回调。
您需要意识到,您必须在这里创建一些显式的并行性,而不是生活在悬浮的世界中。您的示例代码的有效变体为:
val sharedFlow = MutableSharedFlow<Int>()
suspend fun startSharedFlow() {
println("Starting Shared Flow callback collection")
getMyCallbackFlow().collect {
println("emitting to sharedflow value: $it")
sharedFlow.emit(it)
}
}
fun main() = runBlocking<Unit> {
launch {
startSharedFlow()
}
launch {
sharedFlow.collect {
println("collector 1 value: $it")
}
}
launch {
sharedFlow.collect {
println("collector 2 value: $it")
}
}
}
fun getMyCallbackFlow(): Flow<Int> = callbackFlow<Int> {
println("inside callbackflow producer")
fetchSomethingContinuously {
println("fetched something")
offer(1)
offer(2)
offer(3)
//close() - call close here if you need to signal that this callback is done sending elements
}
awaitClose()
}
fun fetchSomethingContinuously(myCallBack: () -> Unit) {
println("fetching something...")
myCallBack()
}
对launch
的调用允许异步执行发射和收集值。
此外,关于shareIn()
运算符,它只是从指定的上游创建了一个SharedFlow,就像您想要的那样。另外,您可以使用started
参数指定何时开始共享。有关here的更多信息。
这是您在示例中使用它的方式:
fun main() = runBlocking<Unit> {
val sharedFlow = getMyCallbackFlow().shareIn(this,started = SharingStarted.Eagerly)
launch {
sharedFlow.collect {
println("collector 1 value: $it")
}
}
launch {
sharedFlow.collect {
println("collector 2 value: $it")
}
}
}
fun getMyCallbackFlow(): Flow<Int> = callbackFlow<Int> {
println("inside callbackflow producer")
fetchSomethingContinuously {
println("fetched something")
offer(1)
offer(2)
offer(3)
//close() - call close here if you need to signal that this callback is done sending elements
}
awaitClose()
}
fun fetchSomethingContinuously(myCallBack: () -> Unit) {
println("fetching something...")
myCallBack()
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。