如何解决停止无限流动
我有一个在两个(不同)客户端之间中继的服务器。当用户(第一个客户端,通过websockets)发送消息时,服务器需要每隔X毫秒向设备(第二个客户端)重复此消息,直到收到新消息,或者关闭websocket。
我将websocket作为流使用,并且创建了以下运算符:
fun <T> flowEvery(value: T,everMilliSeconds: Long): Flow<T> =
flow {
while (true) {
emit(value)
delay(everMilliSeconds)
}
}.cancellable()
@ExperimentalCoroutinesApi
fun <T> Flow<T>.repeatEvery(mSec: Long): Flow<T> =
this.flatMapLatest {
flowEvery(it,mSec)
}
问题是,一旦套接字关闭,最后一条消息将永远发送下去。
我的呼叫站点是:
try {
oscConnections.sendTo(
deviceIdentifier,incoming.consumeAsFlow().repeatEvery(50).mapNotNull { frame ->
when (frame) {
is Frame.Text -> listOf(frame.readText().toFloat())
else -> null
}
})
} finally {
close(CloseReason(CloseReason.Codes.NORMAL,"Ended"))
}
incoming
通道已关闭(已调用onCompletion
),但未发送到sendTo
的流。 sendTo
会自行消耗输入流,并为消耗的每个元素发送一条UDP消息。
如何强制流量停止?
解决方法
通过使用flatMapLatest
或transformLatest
,将上游Flow的最后一个值替换为永无止境的Flow。
您必须以某种方式停止Flow并在协程中到处使用CancellationException
来表示协程已取消。您可以将永无止境的Flow逻辑包装在coroutineScope
中,以在上游流程完成后仅精确地取消该作用域。
fun <T> Flow<T>.repeatEvery(delay: Long): Flow<T> =
flow<T> {
try {
coroutineScope {
onCompletion { this@coroutineScope.cancel() }
.transformLatest { value ->
while (true) {
emit(value)
delay(delay)
}
}
.collect(::emit)
}
}
catch (e: CancellationException) {
// done
}
}
PS:.cancellable()
在您的示例中没有做太多事情。根据文档,使用flow { … }
之类的流生成器构建的流可以自动取消。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。