如何解决取消嵌套的合并发布者
我正在尝试将我的应用切换为使用合并管道。我的希望是简化线程管理,但使自己陷入合并的意外行为。
我假设尽管我正在订阅DispatchQueue.global(),取消主管道仍会取消嵌套订阅。
这是我的游乐场:
import Cocoa
import Combine
let folders = ["folder1","folder2","folder3","folder4"]
class OneByOnePublisher: Publisher {
typealias Output = String
typealias Failure = Never
let input: [String]
init(input: [String]) {
self.input = input
}
func receive<Downstream: Subscriber>(subscriber: Downstream) where Downstream.Input == Output,Downstream.Failure == Failure {
let subject = PassthroughSubject<String,Never>()
subject.receive(subscriber: subscriber)
for value in input {
subject.send(value)
}
subject.send(completion: .finished)
}
}
func uppercase(_ character: Character) -> String {
print("Uppercasing \(character)")
Thread.sleep(forTimeInterval: 0.5)
return character.uppercased()
}
func uppercasePublisher(_ folder: String) -> AnyPublisher<String,Never> {
return folder.publisher
// .handleEvents(receiveCancel: { print("Received cancel in nested") })
.map{uppercase($0)}
.collect()
.map{$0.joined()}
.eraseToAnyPublisher()
}
let stringPublisher = PassthroughSubject<String,Never>()
let oneByOnePublisher = OneByOnePublisher(input: folders)
let cancelable = oneByOnePublisher
.subscribe(on: DispatchQueue.global())
.handleEvents(receiveCancel: { print("Received cancel in main") })
.flatMap{uppercasePublisher($0)}
.receive(on: DispatchQueue.main)
.sink { (completion) in
print("Received completion: \(completion)")
} receiveValue: { (value) in
print("Received value: \(value)")
}
Thread.sleep(forTimeInterval: 2)
cancelable.cancel()
Thread.sleep(forTimeInterval: 2)
print("Done")
该输出
Uppercasing f
Uppercasing o
Uppercasing l
Uppercasing d
Received cancel in main
Uppercasing e
Uppercasing r
Uppercasing 1
Done
但是,如果我取消注释行
// .handleEvents(receiveCancel: { print("Received cancel in nested") })
然后输出就是我最初期望的结果
Uppercasing f
Uppercasing o
Uppercasing l
Uppercasing d
Received cancel in nested
Received cancel in main
Done
我想念什么?为什么在第一种情况下嵌套订阅不会立即被取消?为什么添加handleEvents()会更改取消流程?
解决方法
很可能Publishers.Sequence
发布者存在错误,这是folder.publisher
调用引起的错误。添加handleEvents
调用会隐藏此错误,因为该调用会导致包装发布者似乎正确处理了取消操作。
要对此理论进行检验,让我们修改您的OneByOnePublisher
以使其适用于任何类型的序列(这还将使其适用于原始的字符串数组):
class OneByOnePublisher<Seq: Sequence>: Publisher {
typealias Output = Seq.Element
typealias Failure = Never
let input: Seq
init(input: Seq) {
self.input = input
}
func receive<Downstream: Subscriber>(subscriber: Downstream) where Downstream.Input == Output,Downstream.Failure == Failure {
let subject = PassthroughSubject<Output,Failure>()
subject.receive(subscriber: subscriber)
for value in input {
subject.send(value)
}
subject.send(completion: .finished)
}
}
extension Sequence {
var oneByOne: OneByOnePublisher<Self> { OneByOnePublisher(input: self) }
}
现在,如果我们更改uppercasePublisher
以使用增强型发布者
func uppercasePublisher(_ folder: String) -> AnyPublisher<String,Never> {
return folder.oneByOne
// .handleEvents(receiveCancel: { NSLog("Received cancel in nested") })
.map{uppercase($0)}
.collect()
.map{$0.joined()}
.eraseToAnyPublisher()
}
我们可以看到取消操作按预期进行,无论handleEvents
行是否有注释。这表明原来的folder.publisher
行是问题的根源,更具体地说-是发生问题的Publishers.Sequence
实例。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。