在咱们学习 Combine 的过程中,咱们或许会觉得结构中缺少许多 Operator。反应式结构一般供给丰厚的 Operator 生态系统,包含内置的和第三方的。Combine 答应咱们创立自己的 Publisher,咱们将了解怎么完结。此外,咱们将学习背压(Backpressure),咱们将了解什么是背压以及怎么创立处理它的 Publisher。
创立自界说 Publisher
创立自界说 Publisher 的办法有多种,难度从“简略”到“杂乱”不等。关于咱们完结的每个 Operator,咱们将寻求最简略的完结形式,咱们将了解创立自界说 Publisher 的三种不同办法:
-
在 Publisher 命名空间中运用简略的扩展办法。
-
在 Publishers 命名空间中运用产生值的 Subscription 完结一个类型。
-
运用 Subscription 转化来自上游 Publisher 的值。
从技术上讲,能够在没有自界说 Subscription 的情况下创立自界说 Publisher。但假如这样做,将失去应对 Subscriber demand 的才能,这使该 Publisher 在 Combine 生态系统中是非法的。提前撤销也会成为一个问题,这不是引荐的办法。
Publisher 作为扩展办法
假如,咱们想创立一个新的 unwrap()
Operator,它解开可选值并疏忽 nil 值。咱们能够重用现有的 compactMap(_:)
Operator 来进行完结。咱们将在 Publisher 命名空间中增加这个 Operator。
在 Plaoground 中增加以下代码:
extension Publisher {
func unwrap<T>() -> Publishers.CompactMap<Self, T> where Output == Optional<T> {
compactMap { $0 }
}
}
将自界说 Operator 编写为办法,最杂乱的部分是函数签名。Operator 完结很简略:仅仅在 self 上调用 compactMap(_:)
。
办法的签名的制作有些杂乱。第一步是使 Operator 通用,由于它的输出是上游 Publisher 的包装类型,运用单个 compactMap(_:)
,因而回来类型为 -> Publishers.CompactMap<Self, T>
。咱们检查 Publishers.CompactMap
,会发现它是一个泛型类型:public struct CompactMap<Upstream, Output>
。在完结自界说 Operator 时,Upstream
是 Self(咱们正在扩展的 Publisher),Output
是包装的类型。最终,咱们将 Operator 限制为 Optional 类型 where Output == Optional<T> {
留意:当开发更杂乱的 Operator 作为办法时,函数签名会变得非常杂乱。一个好的技巧是让 Operator 回来一个
AnyPublisher<OutputType, FailureType>
。回来一个以eraseToAnyPublisher()
结尾的 Publisher,以对函数签名进行类型擦除。
现在咱们能够测验咱们的新 Operator 了,在下方增加代码:
let values: [Int?] = [1, 2, nil, 3, nil, 4]
values.publisher
.unwrap()
.sink {
print("Received value: \($0)")
}
运转 Playground,只有非 nil 值会打印到调试操控台:
Received value: 1
Received value: 2
Received value: 3
Received value: 4
Subscription 机制
Subscription 是 Combine 的重要部分:虽然咱们处处都能看到 Publisher,但它们大多是无生命的实体。当咱们订阅 Publisher 时,会实例化一个 Subscription,Subscription 负责接纳 Subscriber 的 demand 并产生事情。以下是 Subscription 生命周期的详细信息:
- Subscriber 订阅 Publisher。
- Publisher 创立一个 Subscription,然后将其交给Subscriber(调用
receive(subscription:)
办法)。 - Subscriber 经过向 Subscription 发送所需数量的值(调用 Subscription 的
request(_:)
)从 Subscription 中恳求值。 - Subscription 开端作业并开端宣布值。它将它们逐个发送给 Subscriber(调用 Subscriber 的
receive(_:)
办法)。 - 收到值后,Subscriber 回来一个新的
Subscribers.Demand
,它会增加到先前的总 Demand 中。 - Subscription 会一向发送值,直到发送的值数量到达恳求的总数。
假如 Subscription 发送的值与 Subscriber 恳求的值相同多,它应该在发送更多值之前等候新的 Demand 恳求。咱们能够绕过此机制并持续发送值,但这会损坏 Subscriber 和 Subscription 之间的协议,并或许导致未界说的行为。
最终,假如呈现错误或订阅的值源完结,Subscription 会调用 Subscriber 的 receive(completion:)
办法。
运用 DispatchSource 自界说的 DispatchTimer
DispatchTimerConfiguration
咱们之前了解了 Timer.publish()
,咱们能够根据 Dispatch 的 DispatchSourceTimer
开发自界说的 Timer。
咱们将首要界说一个装备结构,这将使 Subscriber 及其 Subscription 之间共 Timer 装备变得简单。将此代码增加到 Playground:
struct DispatchTimerConfiguration {
let queue: DispatchQueue?
let interval: DispatchTimeInterval
let leeway: DispatchTimeInterval
let times: Subscribers.Demand
}
咱们期望 Timer 能够在某个行列上触发,但也能够不关心在哪个行列上,因而,能够使 queue
成为可选的。interval
是 Timer 触发的时刻距离。leeway
是能够推迟传递 Timer 事情的最大时刻距离。times
是想要接纳的 Timer 事情数。
DispatchTimer Publisher
咱们现在能够开端创立 DispatchTimer Publisher。 Publisher 的代码很简略,由于一切作业都产生在 Subscription 内。在 DispatchTimerConfiguration
下方增加代码:
extension Publishers {
struct DispatchTimer: Publisher {
typealias Output = DispatchTime
typealias Failure = Never
let configuration: DispatchTimerConfiguration
init(configuration: DispatchTimerConfiguration) {
self.configuration = configuration
}
func receive<S: Subscriber>(subscriber: S)
where Failure == S.Failure, Output == S.Input {
let subscription = DispatchTimerSubscription(
subscriber: subscriber,
configuration: configuration
)
subscriber.receive(subscription: subscription)
}
}
}
咱们的 Timer 将当时时刻作为 DispatchTime
值宣布。 当然,它永久不会失利,所以 Publisher 的 Failure
类型是 Never
。
此外,完结了 Publisher 协议所需的 receive(subscriber:)
办法,它需求一个编译时特化来匹配 Subscriber 类型 where Failure == S.Failure, Output == S.Input
。大部分动作将产生在 DispatchTimerSubscription
中。Subscriber 会收到一个 Subscription,然后它能够向该 Subscription 发送值的恳求。
DispatchTimerSubscription
Subscription 的效果包含:
-
承受 Subscriber 的初始 Demend。
-
按需生成 Timer 事情。
-
每次 Subscriber 收到一个值并回来一个 Demend 时,都增加到 Demend 计数。
-
保证它不会供给比装备中要求的更多的值。
开端在 Publishers 的扩展下方界说 Subscription:
private final class DispatchTimerSubscription <S: Subscriber>: Subscription where S.Input == DispatchTime {
}
此 Subscription 在外部不行见,只能经过 Subscription 协议,因而咱们将其设为 private
。这是一个类,咱们能够经过引证传递它,Subscriber 能够将它增加到 Cancellable set 中,也能够保存它并独立调用 cancel()
。当然,它用于 Input
值类型为 DispatchTime
的 Subscriber,这是 Subscription 宣布的值的类型。
将这些特点增加到 Subscription 类的界说中:
private final class DispatchTimerSubscription <S: Subscriber>: Subscription where S.Input == DispatchTime {
let configuration: DispatchTimerConfiguration
var times: Subscribers.Demand
var requested: Subscribers.Demand = .none
var source: DispatchSourceTimer? = nil
var subscriber: S?
}
包含:Subscriber 的 configuration
。从 configuration
中获取的 Timer 将触发的最大次数 times
,将运用它作为每次发送值时递减的计数器。requested
是当时的 Demend,每次发送值时都会削减它。source
是内部的 DispatchSourceTimer
,将生成 Timer 事情。以及 subscriber
,这表明,只要 Subscription 没有完结、失利或撤销,Subscription 就有职责保存 Subscriber。
在 DispatchTimerSubscription
界说中增加一个初始化办法:
init(subscriber: S,
configuration: DispatchTimerConfiguration) {
self.configuration = configuration
self.subscriber = subscriber
self.times = configuration.times
}
这很简略,将 times
设置为 Publisher 应接纳 Timer 事情的最大次数,每次 Publisher 宣布事情时,times
都会递减。当它到达零时,Timer 以完结事情完毕。
接着完结 cancel()
,Subscription 有必要供给的必需办法:
func cancel() {
source = nil
subscriber = nil
}
将 DispatchSourceTimer
即 source
设置为 nil 阻止它运转。将 Subscriber 特点设置为 nil 会其从Subscription 的范围中释放出来。
咱们现在能够开端编写 Subscription 的中心代码:request(_:)
,一旦 Subscriber 经过 Publisher 取得 Subscription,Subscriber 有必要从 Subscription 中恳求值:
func request(_ demand: Subscribers.Demand) {
guard times > .none else {
subscriber?.receive(completion: .finished)
return
}
requested += demand
if source == nil, requested > .none {
// todo
}
}
这个办法接纳来自 Subscriber 的 Demand。Demand 是累积的:它们加起来构成 Subscriber 恳求的值的总数。假如 Subscription 现已发送了最大数量的值,能够通知 Subscriber 已完结发送值。
接着经过增加新 Demand 来增加恳求值的总数。检查 Timer 是否现已存在。假如没有,并且恳求的值存在,那么是时分发动它了。
将此代码增加到最终一个的 if 内:
let source = DispatchSource.makeTimerSource(queue: configuration.queue)
source.schedule(deadline: .now() + configuration.interval,
repeating: configuration.interval,
leeway: configuration.leeway)
source.setEventHandler { [weak self] in
guard let self = self,
self.requested > .none else { return }
self.requested -= .max(1)
self.times -= .max(1)
_ = self.subscriber?.receive(.now())
if self.times == .none {
self.subscriber?.receive(completion: .finished)
}
}
self.source = source
source.activate()
从装备的行列中创立 DispatchSourceTimer
。安排 Timer 在每 configuration.interval
秒后触发。一旦 Timer 发动,将永久不会停止它,直到 Subscriber 撤销 Subscription。
然后咱们为 Timer 设置 EventHandler
。 这是 Timer 每次触发时调用的闭包。验证当时是否有恳求的值后,削减两个计数器,然后向 Subscriber 发送一个值。假如要发送的值的总数到达指定的最大值,能够认为 Publisher 已完结并宣布完结事情。最终,激活 source
。
最终增加此扩展,以界说一个 Operator,以便轻松链接此 Publisher:
extension Publishers {
static func timer(queue: DispatchQueue? = nil,
interval: DispatchTimeInterval,
leeway: DispatchTimeInterval = .nanoseconds(0),
times: Subscribers.Demand = .unlimited)
-> Publishers.DispatchTimer {
return Publishers.DispatchTimer(
configuration: .init(queue: queue,
interval: interval,
leeway: leeway,
times: times)
)
}
}
增加此代码以测验咱们的 Timer:
let publisher = Publishers.timer(interval: .seconds(1),
times: .max(6))
let subscription = publisher.sink { time in
print("Timer emits: \(time)")
}
运转 Playground:
Timer emits: DispatchTime(rawValue: 8517897314669)
Timer emits: DispatchTime(rawValue: 8517921315409)
Timer emits: DispatchTime(rawValue: 8517945315453)
Timer emits: DispatchTime(rawValue: 8517969296810)
Timer emits: DispatchTime(rawValue: 8517993313227)
Timer emits: DispatchTime(rawValue: 8518017296767)
虽然 Subscription 在 Combine API 中简直看不到,但正如咱们刚刚发现的那样,Subscription 完结了大部分作业。
完结 ShareReplay Operator
ShareReplaySubscription
要完结 shareReplay()
,咱们需求:
-
契合 Subscription 协议的类型。这是每个 Subscriber 将收到的 Subscription。为保证咱们能够应对每个 Subscriber 的 demand 和 cancel,每个 Subscriber 都将收到单独的 Subscription。
-
契合 Publisher 协议的类型。咱们将把它完结为一个类,由于一切 Subscriber 都期望同享同一个实例。
在 Playground 中增加此代码,创立 Subscription 类:
fileprivate final class ShareReplaySubscription<Output, Failure: Error>: Subscription {
let capacity: Int
var subscriber: AnySubscriber<Output,Failure>? = nil
var demand: Subscribers.Demand = .none
var buffer: [Output]
var completion: Subscribers.Completion<Failure>? = nil
}
咱们运用 class
而不是 struct
来完结 Subscription:Publisher 和 Subscriber 都需求访问和改动 Subscription。重播缓冲区的最大容量 capacity
是咱们在初始化时设置的常数。在订阅期间保存对 Subscriber 的引证,运用类型擦除的 AnySubscriber 能够使咱们免于与纠结其类型。盯梢 Publisher 从 Subscriber 那里收到的累积 demand,以便咱们能够精确地交给恳求数量的值。将挂起的值存储在缓冲区 buffer
中,直到它们被传递给 Subscriber 或被丢掉。completion
保存潜在的完结事情,以便在新 Subscriber 开端恳求值时当即将其交给给他们。
留意,咱们保存了完结事情:Publisher 不知道恳求什么时分产生,所以它将完结事情交给 Subscription,以便在正确的时刻被交给。
接着,将初始化办法增加到 Subscription 界说中:
init<S>(subscriber: S,
replay: [Output],
capacity: Int,
completion: Subscribers.Completion<Failure>?)
where S: Subscriber,
Failure == S.Failure,
Output == S.Input {
self.subscriber = AnySubscriber(subscriber)
self.buffer = replay
self.capacity = capacity
self.completion = completion
}
从上游 Pulisher 接纳多个值并将它们设置在此 Subscription 实例上。具体来说:存储类型擦除的 Subscriber;存储上游 Pulisher 的当时缓冲区 buffer
、最大容量 capacity
和完结事情 completion
(假如已宣布)。
咱们需求一种将完结事情中继给 Subscriber 的办法:
private func complete(with completion: Subscribers.Completion<Failure>) {
guard let subscriber = subscriber else { return }
self.subscriber = nil
self.completion = nil
self.buffer.removeAll()
subscriber.receive(completion: completion)
}
将 Subscriber 设置为 nil;经过将完结事情设置为 nil 来保证只发送一次完结,然后清空缓冲区;将完结事情中继给 Subscriber。
咱们还需求一种能够向 Subscriber 宣布值的办法:
private func emitAsNeeded() {
guard let subscriber = subscriber else { return }
while self.demand > .none && !buffer.isEmpty {
self.demand -= .max(1)
let nextDemand = subscriber.receive(buffer.removeFirst())
if nextDemand != .none {
self.demand += nextDemand
}
}
if let completion = completion {
complete(with: completion)
}
}
仅当缓冲区中有值并且有未完结的 Demand 时才宣布值。将未完结的 Demand 减一。向 Subscriber 发送第一个值,并收到新的 Demand。将新 Demand 增加到未完结的总 Demand 中,但前提是它不是 .none。 否则将溃散,由于 Combine 不会将 Subscribers.Demand.none 视为零,并且增加或减去 .none 将触发反常。假如完结事情未发送,请当即发送。
完结 Subscription 最重要的要求:
func request(_ demand: Subscribers.Demand) {
if demand != .none {
self.demand += demand
}
emitAsNeeded()
}
撤销订阅的代码更加简单, 增加此代码:
func cancel() {
complete(with: .finished)
}
与 Subscriber 相同,咱们需求完结承受值的办法和完结事情,增加此办法以承受值:
func receive(_ input: Output) {
guard subscriber != nil else { return }
buffer.append(input)
if buffer.count > capacity {
buffer.removeFirst()
}
emitAsNeeded()
}
保证有 Subscriber 后,此办法将将值增加到 buffer
。保证缓冲的值不要超过恳求的容量将结果交给给 Subscriber。
增加以下办法来承受完结事情:
func receive(completion: Subscribers.Completion<Failure>) {
guard let subscriber = subscriber else { return }
self.subscriber = nil
self.buffer.removeAll()
subscriber.receive(completion: completion)
}
此办法删去 Subscriber,清空缓冲区,并将完结事情发送到下流。
ShareReplay Publisher
Publisher 一般是 Publishers 命名空间中的值类型(struct)。但有时将 Publisher 完结为 class 如 Publishers.Multicast 或 Publishers.Share 是有意义的。关于这个 Publisher,咱们需求一个类,类似于 share()
。不过这是例外,大多数情况下咱们会运用 struct。
在 ShareReplaySubscription
后增加此代码:
extension Publishers {
final class ShareReplay<Upstream: Publisher>: Publisher {
typealias Output = Upstream.Output
typealias Failure = Upstream.Failure
}
}
咱们期望多个 Subscriber 能够同享此 Operator 的单个实例,因而咱们运用 class。它也是通用的,上游 Publisher 的最终类型作为参数。这个新的 Publisher 不会改动上游 Publisher 的输出或失利类型——它仅仅运用上游的类型。
将 Publisher 需求的特点增加到 ShareReplay
的界说中:
private let lock = NSRecursiveLock()
private let upstream: Upstream
private let capacity: Int
private var replay = [Output]()
private var subscriptions = [ShareReplaySubscription<Output, Failure>]()
private var completion: Subscribers.Completion<Failure>? = nil
由于咱们将一起供给多个 Subscriber,所以咱们需求一个锁 lock
来保证对可变变量的独占访问。保存对上游 Publisher upstream
的引证,咱们将在 Subscription 生命周期的各个阶段需求它。咱们能够在初始化期间指定重放缓冲区的最大记载容量 capacity
。当然,咱们还需求存储记载值 replay
。咱们供给多个 Subscriber,因而咱们需求将 subscriptions
留以通知他们事情。每个 Subscriber 都从一个专用的 ShareReplaySubscription
获取其值。 Operator 即使在完结后也能够重播值,因而咱们需求记住上游 Publisher 是否完结。
首要,将必要的初始化程序增加到你的 ShareReplay 发布者:
init(upstream: Upstream, capacity: Int) {
self.upstream = upstream
self.capacity = capacity
}
这儿仅仅存储上游 Publisher upstream
和容量 capacity
。
增加将来自上游传入值中继到 Subscriber 的办法:
private func relay(_ value: Output) {
lock.lock()
defer { lock.unlock() }
guard completion == nil else { return }
replay.append(value)
if replay.count > capacity {
replay.removeFirst()
}
subscriptions.forEach {
$0.receive(value)
}
}
由于多个 Subscriber 同享此 Publisher,因而咱们有必要运用锁维护对可变变量的访问。仅在上游尚未完结时才中继值。将值增加到缓冲区并仅保存所需容量值。这些是重播给新 Subscriber 的内容。将缓冲的值中继到每个衔接的 Subscriber。
增加这个办法来处理完结事情:
private func complete(_ completion: Subscribers.Completion<Failure>) {
lock.lock()
defer { lock.unlock() }
self.completion = completion
subscriptions.forEach {
$0.receive(completion: completion)
}
}
为未来的 Subscriber 保存完结事情。将其转发给每个衔接的 Subscriber。
咱们现在已准备好开端编写每个 Publisher 有必要完结的 receive
办法。此办法将接纳 Subscriber。它的职责是创立一个新 Subscription,然后将其交给 Subscriber。
增加此代码以开端界说此办法:
func receive<S: Subscriber>(subscriber: S)
where Failure == S.Failure,
Output == S.Input {
lock.lock()
defer { lock.unlock() }
let subscription = ShareReplaySubscription(
subscriber: subscriber,
replay: replay,
capacity: capacity,
completion: completion)
subscriptions.append(subscription)
subscriber.receive(subscription: subscription)
guard subscriptions.count == 1 else { return }
let sink = AnySubscriber(
receiveSubscription: { subscription in
subscription.request(.unlimited)
},
receiveValue: { [weak self] (value: Output) -> Subscribers.Demand in
self?.relay(value)
return .none
},
receiveCompletion: { [weak self] in
self?.complete($0)
}
)
upstream.subscribe(sink)
}
新 Subscription 引证 Subscriber 并接纳当时 replay
、capacity
和 completion
。咱们保存 Subscription 以将未来的事情传递给它。咱们将 Subscription 发送给 Subscriber,Subscriber 或许(现在或今后)开端恳求值。
只向上游 Publisher 订阅一次。运用方便的 AnySubscriber 类,它承受闭包,并在订阅时当即恳求 .unlimited 值以让 Publisher 运转完结。将咱们收到的值转发给下流 Subscriber。运用咱们从上游取得的完结事情来完结咱们的 Publisher。
留意:咱们开始能够恳求 .max(self.capacity) 并仅接纳它,但 Combine 是 Demand 驱动的!假如咱们恳求的值没有 Publisher 能够产生的那么多值,那么咱们或许永久不会收到完结事情!
最终咱们将 AnySubscriber 订阅到上游 Publisher。
咱们的 Publisher 已完结!咱们还需求一个便利的 Operator,帮助将这个新 Publisher 与其他 Publishe r联系起来。
将其作为扩展增加到 Playground 末尾的 Publishers 命名空间:
extension Publisher {
func shareReplay(capacity: Int = .max)
-> Publishers.ShareReplay<Self> {
return Publishers.ShareReplay(upstream: self,
capacity: capacity)
}
}
咱们现在拥有一个功用齐全的 shareReplay(capacity:)
Operator。
测验 shareReplay(capacity:)
在 Playground 中提那件以下代码:
let subject = PassthroughSubject<Int,Never>()
let publisher = subject.shareReplay(capacity: 2)
subject.send(0)
let subscription1 = publisher.sink(
receiveCompletion: {
print("subscription1 completed: \($0)")
},
receiveValue: {
print("subscription1 received \($0)")
}
)
subject.send(1)
subject.send(2)
subject.send(3)
subscription1
将收到:
subscription1 received 1
subscription1 received 2
subscription1 received 3
接下来,创立第二个 Subscription 并发送更多值和完结事情:
let subscription2 = publisher.sink(
receiveCompletion: {
print("subscription2 completed: \($0)")
},
receiveValue: {
print("subscription2 received \($0)")
}
)
subject.send(4)
subject.send(5)
subject.send(completion: .finished)
从头运转,subscription2
将收到回放,subscription1
subscription2
将收到新值:
subscription1 received 1
subscription1 received 2
subscription1 received 3
subscription2 received 2
subscription2 received 3
subscription1 received 4
subscription2 received 4
subscription1 received 5
subscription2 received 5
subscription1 completed: finished
subscription2 completed: finished
增加一个稍有推迟的 Subscription,以保证它在 Publisher 完结后产生:
var subscription3: Cancellable? = nil
DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
print("Subscribing to shareReplay after upstream completed")
subscription3 = publisher.sink(
receiveCompletion: {
print("subscription3 completed: \($0)")
},
receiveValue: {
print("subscription3 received \($0)")
}
)
}
subscription1 received 1
subscription1 received 2
subscription1 received 3
subscription2 received 2
subscription2 received 3
subscription1 received 4
subscription2 received 4
subscription1 received 5
subscription2 received 5
subscription1 completed: finished
subscription2 completed: finished
Subscribing to shareReplay after upstream completed
subscription3 received 4
subscription3 received 5
subscription3 completed: finished
一切契合预期,shareReplay(capacity:)
作业做得很好!
处理 Backpressure
在流体动力学中,背压是与经过管道的流体活动相反的阻力。在 Combine 中,它是对来自 Publisher 的值流的阻力。
一般, Subscriber 需求处理 Publisher 宣布的值:
-
处理高频数据,例如来自传感器的输入;
-
履行大文件传输;
-
在数据更新时渲染杂乱的 UI;
-
等候用户输入;
-
处理 Subscriber 无法以传入速度跟上的传入数据。
Combine 供给的 Publisher – Subscriber 机制是灵敏的。这是一种拉式规划,而不是推式规划。这意味着 Subscriber 要求Publisher 宣布值并指定他们想要接纳的数量。这种恳求机制是自适应的:每次订阅者收到一个新值时,需求都会更新。这答应 Subscriber 在他们不想接纳更多数据时经过“封闭水龙头”来处理背压,并在他们准备好接纳更多数据时“打开它”。
留意:请记住,咱们只能以累加的办法调整 Demand。能够在 Subscriber 每次收到新值时增加 Demand,办法是回来新的
.max(N)
或.unlimited
。或许能够回来.none
,表明需求不应增加。Subscriber 随后“挂机”以接纳至少到达新的最大 Demand 的值。例如,假如之前的最大 Demand 是接纳三个值,而 Subscriber 只接纳到一个,则在订阅者的receive(_:)
中回来.none
不会“封闭水龙头”。当 Publisher 准备好发送值时, Subscriber 仍将最少接纳两个值。
当更多值可用时会产生什么彻底取决于咱们的规划,咱们能够:
-
经过办理 Demand 来操控流量,以防止 Publisher 发送超出处理才能的值;
-
缓冲值,直到能够处理它们 – 存在耗尽可用内存的危险;
-
删去无法当即处理的值;
-
根据要求对以上的一些组合。
除了上述之外,处理背压还能够采取以下形式:
-
具有处理拥塞的自界说 Subscription 的 Publisher。
-
在 Publisher 链结尾供给值的 Subscriber。
咱们将专注于完结后者,创立一个 sink 函数的可暂停变体。
运用可暂停的 sink 来处理背压
在 Playground 创立一个协议,让咱们从暂停中康复:
protocol Pausable {
var paused: Bool { get }
func resume()
}
这儿不需求 pause()
办法,由于咱们将在收到每个值时确认是否暂停。
接下来开端界说可暂停的 Subscriber:
final class PausableSubscriber<Input, Failure: Error>:
Subscriber, Pausable, Cancellable {
let combineIdentifier = CombineIdentifier()
}
可暂停 Publisher 既能够暂停也能够撤销。这是 pausableSink
函数将回来的目标。这也是为什么咱们将它作为一个 class 而不是一个 struct 来完结的原因:咱们不期望一个目标被复制,并且咱们需求在其生命周期的某些点上具有可变性。Subscriber 有必要为 Combine 供给唯一标识符以办理和优化其 Publisher 流。
现在增加这些特点:
let receiveValue: (Input) -> Bool
let receiveCompletion: (Subscribers.Completion<Failure>) -> Void
private var subscription: Subscription? = nil
var paused = false
receiveValue
闭包回来一个 Bool
:true 表明它或许会收到更多的值,false 表明订阅应该暂停。完结闭包将在收到来自 Publisher 的完结事情时被调用。保存 Subscription,以便它能够在暂停后恳求更多值。当咱们不再需求它时,你需求将此特点设置为 nil 以防止循环。根据 Pausable
协议揭露 paused
特点。
接下来,将以下代码增加到 PausableSubscriber
以完结初始化办法并契合 Cancelable 协议:
init(receiveValue: @escaping (Input) -> Bool,
receiveCompletion: @escaping (Subscribers.Completion<Failure>) -> Void) {
self.receiveValue = receiveValue
self.receiveCompletion = receiveCompletion
}
func cancel() {
subscription?.cancel()
subscription = nil
}
初始化办法承受两个闭包,Subscriber 将在收到来自 Publisher 的新值和完结时调用它们。闭包类似于运用 sink
函数的闭包,但有一个例外:receiveValue
闭包回来一个布尔值以指示接纳器是否准备好承受更多值,或许是否需求暂停 Subscription。撤销 Subscription 时,不要忘掉之后将其设置为 nil 以防止循环。
现在增加此代码以满足 Subscriber 的要求:
func receive(subscription: Subscription) {
self.subscription = subscription
subscription.request(.max(1))
}
func receive(_ input: Input) -> Subscribers.Demand {
paused = receiveValue(input) == false
return paused ? .none : .max(1)
}
func receive(completion: Subscribers.Completion<Failure>) {
receiveCompletion(completion)
subscription = nil
}
收到 Publisher 创立的 Subscription 后,将其存储以备后用,以便咱们能够从暂停中康复。当即恳求一个值。 Subscriber 能够暂停,咱们无法猜测何时需求暂停。这儿的策略是一个一个地恳求值。
当接纳到新值时,调用 receiveValue
并相应更新暂停状态。假如 Subscriber 被暂停,回来 .none
表明你现在不想要更多的值——记住,你开始只恳求了一个。
接纳到完结事情后,将其转发给 receiveCompletion
,然后将 Subscription 设置为 nil,由于你不再需求它。
最终,完结 Pausable 的其余部分:
func resume() {
guard paused else { return }
paused = false
subscription?.request(.max(1))
}
假如 Publisher paused
,则恳求一个值以从头开端。咱们先在能够在 Publishers 命名空间中揭露新的 pausableSink
。
在 Playground 的末尾增加以下代码:
extension Publisher {
func pausableSink(
receiveCompletion: @escaping ((Subscribers.Completion<Failure>) -> Void),
receiveValue: @escaping ((Output) -> Bool))
-> Pausable & Cancellable {
let pausable = PausableSubscriber(
receiveValue: receiveValue,
receiveCompletion: receiveCompletion)
self.subscribe(pausable)
return pausable
}
}
pausableSink
与 sink
非常接近。唯一的区别是 receiveValue
闭包的回来类型:Bool。实例化一个新的 PausableSubscriber
并将其 subscribe
。Subscriber 是咱们用来康复和撤销 Subscription 的目标。
测验 pausableSink
let subscription = [1, 2, 3, 4, 5, 6]
.publisher
.pausableSink(receiveCompletion: { completion in
print("Pausable subscription completed: \(completion)")
}) { value -> Bool in
print("Receive value: \(value)")
if value % 2 == 1 {
print("Pausing")
return false
}
return true
}
Publisher 一般按顺序宣布一切值。运用 pausableSink
,此发布者将在收到值 1、3 和 5 时暂停。运转 Playground:
Receive value: 1
Pausing
要康复 Publisher,需求异步调用 resume()
。运用 Timer 很简单做到这一点:
let timer = Timer.publish(every: 1, on: .main, in: .common)
.autoconnect()
.sink { _ in
guard subscription.paused else { return }
print("Subscription is paused, resuming")
subscription.resume()
}
再次运转 Playground:
Receive value: 1
Pausing
Subscription is paused, resuming
Receive value: 2
Receive value: 3
Pausing
Subscription is paused, resuming
Receive value: 4
Receive value: 5
Pausing
Subscription is paused, resuming
Receive value: 6
Pausable subscription completed: finished
留意:假如 Publisher 无法保存值并等候 Subscriber 恳求它们怎么办?在这种情况下,咱们需求运用
buffer(size:prefetch:whenFull:)
Operator 来缓冲值。此 Operator 能够将值缓冲到size
参数中指定的容量,并在 Subscriber 准备好接纳它们时传递它们,其他参数确认缓冲区怎么填满以及缓冲区满时会产生什么。