在咱们学习 Combine 的过程中,咱们或许会觉得结构中缺少许多 Operator。反应式结构一般供给丰厚的 Operator 生态系统,包含内置的和第三方的。Combine 答应咱们创立自己的 Publisher,咱们将了解怎么完结。此外,咱们将学习背压(Backpressure),咱们将了解什么是背压以及怎么创立处理它的 Publisher。

创立自界说 Publisher

创立自界说 Publisher 的办法有多种,难度从“简略”到“杂乱”不等。关于咱们完结的每个 Operator,咱们将寻求最简略的完结形式,咱们将了解创立自界说 Publisher 的三种不同办法:

  • 在 Publisher 命名空间中运用简略的扩展办法。

  • 在 Publishers 命名空间中运用产生值的 Subscription 完结一个类型。

  • 运用 Subscription 转化来自上游 Publisher 的值。

从技术上讲,能够在没有自界说 Subscription 的情况下创立自界说 Publisher。但假如这样做,将失去应对 Subscriber demand 的才能,这使该 Publisher 在 Combine 生态系统中是非法的。提前撤销也会成为一个问题,这不是引荐的办法。

Publisher 作为扩展办法

假如,咱们想创立一个新的 unwrap() Operator,它解开可选值并疏忽 nil 值。咱们能够重用现有的 compactMap(_:) Operator 来进行完结。咱们将在 Publisher 命名空间中增加这个 Operator。

在 Plaoground 中增加以下代码:

extension Publisher {
    func unwrap<T>() -> Publishers.CompactMap<Self, T> where Output == Optional<T> {
        compactMap { $0 }
    }
}

将自界说 Operator 编写为办法,最杂乱的部分是函数签名。Operator 完结很简略:仅仅在 self 上调用 compactMap(_:)

办法的签名的制作有些杂乱。第一步是使 Operator 通用,由于它的输出是上游 Publisher 的包装类型,运用单个 compactMap(_:),因而回来类型为 -> Publishers.CompactMap<Self, T>。咱们检查 Publishers.CompactMap,会发现它是一个泛型类型:public struct CompactMap<Upstream, Output>。在完结自界说 Operator 时,Upstream 是 Self(咱们正在扩展的 Publisher),Output 是包装的类型。最终,咱们将 Operator 限制为 Optional 类型 where Output == Optional<T> {

留意:当开发更杂乱的 Operator 作为办法时,函数签名会变得非常杂乱。一个好的技巧是让 Operator 回来一个 AnyPublisher<OutputType, FailureType>。回来一个以 eraseToAnyPublisher() 结尾的 Publisher,以对函数签名进行类型擦除。

现在咱们能够测验咱们的新 Operator 了,在下方增加代码:

let values: [Int?] = [1, 2, nil, 3, nil, 4]
values.publisher
    .unwrap()
    .sink {
        print("Received value: \($0)")
    }

运转 Playground,只有非 nil 值会打印到调试操控台:

Received value: 1
Received value: 2
Received value: 3
Received value: 4

Subscription 机制

Subscription 是 Combine 的重要部分:虽然咱们处处都能看到 Publisher,但它们大多是无生命的实体。当咱们订阅 Publisher 时,会实例化一个 Subscription,Subscription 负责接纳 Subscriber 的 demand 并产生事情。以下是 Subscription 生命周期的详细信息:

Combine | (VI) 自定义 Publisher & 处理 Backpressure

  1. Subscriber 订阅 Publisher。
  2. Publisher 创立一个 Subscription,然后将其交给Subscriber(调用 receive(subscription:)办法)。
  3. Subscriber 经过向 Subscription 发送所需数量的值(调用 Subscription 的 request(_:))从 Subscription 中恳求值。
  4. Subscription 开端作业并开端宣布值。它将它们逐个发送给 Subscriber(调用 Subscriber 的 receive(_:) 办法)。
  5. 收到值后,Subscriber 回来一个新的 Subscribers.Demand,它会增加到先前的总 Demand 中。
  6. Subscription 会一向发送值,直到发送的值数量到达恳求的总数。

假如 Subscription 发送的值与 Subscriber 恳求的值相同多,它应该在发送更多值之前等候新的 Demand 恳求。咱们能够绕过此机制并持续发送值,但这会损坏 Subscriber 和 Subscription 之间的协议,并或许导致未界说的行为。

最终,假如呈现错误或订阅的值源完结,Subscription 会调用 Subscriber 的 receive(completion:) 办法。

运用 DispatchSource 自界说的 DispatchTimer

DispatchTimerConfiguration

咱们之前了解了 Timer.publish(),咱们能够根据 Dispatch 的 DispatchSourceTimer 开发自界说的 Timer。

咱们将首要界说一个装备结构,这将使 Subscriber 及其 Subscription 之间共 Timer 装备变得简单。将此代码增加到 Playground:

struct DispatchTimerConfiguration {
    let queue: DispatchQueue?
    let interval: DispatchTimeInterval
    let leeway: DispatchTimeInterval
    let times: Subscribers.Demand
}

咱们期望 Timer 能够在某个行列上触发,但也能够不关心在哪个行列上,因而,能够使 queue 成为可选的。interval 是 Timer 触发的时刻距离。leeway 是能够推迟传递 Timer 事情的最大时刻距离。times 是想要接纳的 Timer 事情数。

DispatchTimer Publisher

咱们现在能够开端创立 DispatchTimer Publisher。 Publisher 的代码很简略,由于一切作业都产生在 Subscription 内。在 DispatchTimerConfiguration 下方增加代码:

extension Publishers {
    struct DispatchTimer: Publisher {
        typealias Output = DispatchTime
        typealias Failure = Never
        let configuration: DispatchTimerConfiguration
        init(configuration: DispatchTimerConfiguration) {
            self.configuration = configuration
        }
        func receive<S: Subscriber>(subscriber: S)
        where Failure == S.Failure, Output == S.Input {
            let subscription = DispatchTimerSubscription(
                subscriber: subscriber,
                configuration: configuration
            )
            subscriber.receive(subscription: subscription)
        }
    }
}

咱们的 Timer 将当时时刻作为 DispatchTime 值宣布。 当然,它永久不会失利,所以 Publisher 的 Failure 类型是 Never

此外,完结了 Publisher 协议所需的 receive(subscriber:) 办法,它需求一个编译时特化来匹配 Subscriber 类型 where Failure == S.Failure, Output == S.Input。大部分动作将产生在 DispatchTimerSubscription 中。Subscriber 会收到一个 Subscription,然后它能够向该 Subscription 发送值的恳求。

DispatchTimerSubscription

Subscription 的效果包含:

  • 承受 Subscriber 的初始 Demend。

  • 按需生成 Timer 事情。

  • 每次 Subscriber 收到一个值并回来一个 Demend 时,都增加到 Demend 计数。

  • 保证它不会供给比装备中要求的更多的值。

开端在 Publishers 的扩展下方界说 Subscription:

private final class DispatchTimerSubscription <S: Subscriber>: Subscription where S.Input == DispatchTime {
}

此 Subscription 在外部不行见,只能经过 Subscription 协议,因而咱们将其设为 private。这是一个类,咱们能够经过引证传递它,Subscriber 能够将它增加到 Cancellable set 中,也能够保存它并独立调用 cancel()。当然,它用于 Input 值类型为 DispatchTime 的 Subscriber,这是 Subscription 宣布的值的类型。

将这些特点增加到 Subscription 类的界说中:

private final class DispatchTimerSubscription <S: Subscriber>: Subscription where S.Input == DispatchTime {
    let configuration: DispatchTimerConfiguration
    var times: Subscribers.Demand
    var requested: Subscribers.Demand = .none
    var source: DispatchSourceTimer? = nil
    var subscriber: S?
}

包含:Subscriber 的 configuration。从 configuration 中获取的 Timer 将触发的最大次数 times,将运用它作为每次发送值时递减的计数器。requested 是当时的 Demend,每次发送值时都会削减它。source 是内部的 DispatchSourceTimer,将生成 Timer 事情。以及 subscriber ,这表明,只要 Subscription 没有完结、失利或撤销,Subscription 就有职责保存 Subscriber。

DispatchTimerSubscription 界说中增加一个初始化办法:

init(subscriber: S,
     configuration: DispatchTimerConfiguration) {
    self.configuration = configuration
    self.subscriber = subscriber
    self.times = configuration.times
}

这很简略,将 times 设置为 Publisher 应接纳 Timer 事情的最大次数,每次 Publisher 宣布事情时,times 都会递减。当它到达零时,Timer 以完结事情完毕。

接着完结 cancel(),Subscription 有必要供给的必需办法:

func cancel() {
    source = nil
    subscriber = nil
}

DispatchSourceTimersource 设置为 nil 阻止它运转。将 Subscriber 特点设置为 nil 会其从Subscription 的范围中释放出来。

咱们现在能够开端编写 Subscription 的中心代码:request(_:),一旦 Subscriber 经过 Publisher 取得 Subscription,Subscriber 有必要从 Subscription 中恳求值:

func request(_ demand: Subscribers.Demand) {
    guard times > .none else {
        subscriber?.receive(completion: .finished)
        return
    }
    requested += demand
    if source == nil, requested > .none {
    		// todo
    }
}

这个办法接纳来自 Subscriber 的 Demand。Demand 是累积的:它们加起来构成 Subscriber 恳求的值的总数。假如 Subscription 现已发送了最大数量的值,能够通知 Subscriber 已完结发送值。

接着经过增加新 Demand 来增加恳求值的总数。检查 Timer 是否现已存在。假如没有,并且恳求的值存在,那么是时分发动它了。

将此代码增加到最终一个的 if 内:

let source = DispatchSource.makeTimerSource(queue: configuration.queue)
source.schedule(deadline: .now() + configuration.interval,
                repeating: configuration.interval,
                leeway: configuration.leeway)
source.setEventHandler { [weak self] in
    guard let self = self,
          self.requested > .none else { return }
    self.requested -= .max(1)
    self.times -= .max(1)
    _ = self.subscriber?.receive(.now())
    if self.times == .none {
        self.subscriber?.receive(completion: .finished)
    }
}
self.source = source
source.activate()

从装备的行列中创立 DispatchSourceTimer。安排 Timer 在每 configuration.interval 秒后触发。一旦 Timer 发动,将永久不会停止它,直到 Subscriber 撤销 Subscription。

然后咱们为 Timer 设置 EventHandler。 这是 Timer 每次触发时调用的闭包。验证当时是否有恳求的值后,削减两个计数器,然后向 Subscriber 发送一个值。假如要发送的值的总数到达指定的最大值,能够认为 Publisher 已完结并宣布完结事情。最终,激活 source

最终增加此扩展,以界说一个 Operator,以便轻松链接此 Publisher:

extension Publishers {
    static func timer(queue: DispatchQueue? = nil,
                      interval: DispatchTimeInterval,
                      leeway: DispatchTimeInterval = .nanoseconds(0),
                      times: Subscribers.Demand = .unlimited)
    -> Publishers.DispatchTimer {
        return Publishers.DispatchTimer(
            configuration: .init(queue: queue,
                                 interval: interval,
                                 leeway: leeway,
                                 times: times)
        )
    }
}

增加此代码以测验咱们的 Timer:

let publisher = Publishers.timer(interval: .seconds(1),
                                 times: .max(6))
let subscription = publisher.sink { time in
    print("Timer emits: \(time)")
}

运转 Playground:

Timer emits: DispatchTime(rawValue: 8517897314669)
Timer emits: DispatchTime(rawValue: 8517921315409)
Timer emits: DispatchTime(rawValue: 8517945315453)
Timer emits: DispatchTime(rawValue: 8517969296810)
Timer emits: DispatchTime(rawValue: 8517993313227)
Timer emits: DispatchTime(rawValue: 8518017296767)

虽然 Subscription 在 Combine API 中简直看不到,但正如咱们刚刚发现的那样,Subscription 完结了大部分作业。

完结 ShareReplay Operator

ShareReplaySubscription

要完结 shareReplay(),咱们需求:

  1. 契合 Subscription 协议的类型。这是每个 Subscriber 将收到的 Subscription。为保证咱们能够应对每个 Subscriber 的 demand 和 cancel,每个 Subscriber 都将收到单独的 Subscription。

  2. 契合 Publisher 协议的类型。咱们将把它完结为一个类,由于一切 Subscriber 都期望同享同一个实例。

在 Playground 中增加此代码,创立 Subscription 类:

fileprivate final class ShareReplaySubscription<Output, Failure: Error>: Subscription {
  let capacity: Int
  var subscriber: AnySubscriber<Output,Failure>? = nil
  var demand: Subscribers.Demand = .none
  var buffer: [Output]
  var completion: Subscribers.Completion<Failure>? = nil
}

咱们运用 class 而不是 struct 来完结 Subscription:Publisher 和 Subscriber 都需求访问和改动 Subscription。重播缓冲区的最大容量 capacity 是咱们在初始化时设置的常数。在订阅期间保存对 Subscriber 的引证,运用类型擦除的 AnySubscriber 能够使咱们免于与纠结其类型。盯梢 Publisher 从 Subscriber 那里收到的累积 demand,以便咱们能够精确地交给恳求数量的值。将挂起的值存储在缓冲区 buffer 中,直到它们被传递给 Subscriber 或被丢掉。completion 保存潜在的完结事情,以便在新 Subscriber 开端恳求值时当即将其交给给他们。

留意,咱们保存了完结事情:Publisher 不知道恳求什么时分产生,所以它将完结事情交给 Subscription,以便在正确的时刻被交给。

接着,将初始化办法增加到 Subscription 界说中:

init<S>(subscriber: S,
        replay: [Output],
        capacity: Int,
        completion: Subscribers.Completion<Failure>?)
where S: Subscriber,
      Failure == S.Failure,
      Output == S.Input {
    self.subscriber = AnySubscriber(subscriber)
    self.buffer = replay
    self.capacity = capacity
    self.completion = completion
}

从上游 Pulisher 接纳多个值并将它们设置在此 Subscription 实例上。具体来说:存储类型擦除的 Subscriber;存储上游 Pulisher 的当时缓冲区 buffer 、最大容量 capacity 和完结事情 completion(假如已宣布)。

咱们需求一种将完结事情中继给 Subscriber 的办法:

private func complete(with completion: Subscribers.Completion<Failure>) {
    guard let subscriber = subscriber else { return }
    self.subscriber = nil
    self.completion = nil
    self.buffer.removeAll()
    subscriber.receive(completion: completion)
}

将 Subscriber 设置为 nil;经过将完结事情设置为 nil 来保证只发送一次完结,然后清空缓冲区;将完结事情中继给 Subscriber。

咱们还需求一种能够向 Subscriber 宣布值的办法:

private func emitAsNeeded() {
    guard let subscriber = subscriber else { return }
    while self.demand > .none && !buffer.isEmpty {
        self.demand -= .max(1)
        let nextDemand = subscriber.receive(buffer.removeFirst())
        if nextDemand != .none {
            self.demand += nextDemand
        }
    }
    if let completion = completion {
        complete(with: completion)
    }
}

仅当缓冲区中有值并且有未完结的 Demand 时才宣布值。将未完结的 Demand 减一。向 Subscriber 发送第一个值,并收到新的 Demand。将新 Demand 增加到未完结的总 Demand 中,但前提是它不是 .none。 否则将溃散,由于 Combine 不会将 Subscribers.Demand.none 视为零,并且增加或减去 .none 将触发反常。假如完结事情未发送,请当即发送。

完结 Subscription 最重要的要求:

func request(_ demand: Subscribers.Demand) {
    if demand != .none {
        self.demand += demand
    }
    emitAsNeeded()
}

撤销订阅的代码更加简单, 增加此代码:

func cancel() {
  complete(with: .finished)
}

与 Subscriber 相同,咱们需求完结承受值的办法和完结事情,增加此办法以承受值:

func receive(_ input: Output) {
    guard subscriber != nil else { return }
    buffer.append(input)
    if buffer.count > capacity {
        buffer.removeFirst()
    }
    emitAsNeeded()
}

保证有 Subscriber 后,此办法将将值增加到 buffer。保证缓冲的值不要超过恳求的容量将结果交给给 Subscriber。

增加以下办法来承受完结事情:

func receive(completion: Subscribers.Completion<Failure>) {
    guard let subscriber = subscriber else { return }
    self.subscriber = nil
    self.buffer.removeAll()
    subscriber.receive(completion: completion)
}

此办法删去 Subscriber,清空缓冲区,并将完结事情发送到下流。

ShareReplay Publisher

Publisher 一般是 Publishers 命名空间中的值类型(struct)。但有时将 Publisher 完结为 class 如 Publishers.Multicast 或 Publishers.Share 是有意义的。关于这个 Publisher,咱们需求一个类,类似于 share()。不过这是例外,大多数情况下咱们会运用 struct。

ShareReplaySubscription 后增加此代码:

extension Publishers {
    final class ShareReplay<Upstream: Publisher>: Publisher {
        typealias Output = Upstream.Output
        typealias Failure = Upstream.Failure
    }
}

咱们期望多个 Subscriber 能够同享此 Operator 的单个实例,因而咱们运用 class。它也是通用的,上游 Publisher 的最终类型作为参数。这个新的 Publisher 不会改动上游 Publisher 的输出或失利类型——它仅仅运用上游的类型。

将 Publisher 需求的特点增加到 ShareReplay 的界说中:

private let lock = NSRecursiveLock()
private let upstream: Upstream
private let capacity: Int
private var replay = [Output]()
private var subscriptions = [ShareReplaySubscription<Output, Failure>]()
private var completion: Subscribers.Completion<Failure>? = nil

由于咱们将一起供给多个 Subscriber,所以咱们需求一个锁 lock 来保证对可变变量的独占访问。保存对上游 Publisher upstream 的引证,咱们将在 Subscription 生命周期的各个阶段需求它。咱们能够在初始化期间指定重放缓冲区的最大记载容量 capacity。当然,咱们还需求存储记载值 replay。咱们供给多个 Subscriber,因而咱们需求将 subscriptions 留以通知他们事情。每个 Subscriber 都从一个专用的 ShareReplaySubscription 获取其值。 Operator 即使在完结后也能够重播值,因而咱们需求记住上游 Publisher 是否完结。

首要,将必要的初始化程序增加到你的 ShareReplay 发布者:

init(upstream: Upstream, capacity: Int) {
  self.upstream = upstream
  self.capacity = capacity
}

这儿仅仅存储上游 Publisher upstream 和容量 capacity

增加将来自上游传入值中继到 Subscriber 的办法:

private func relay(_ value: Output) {
    lock.lock()
    defer { lock.unlock() }
    guard completion == nil else { return }
    replay.append(value)
    if replay.count > capacity {
        replay.removeFirst()
    }
    subscriptions.forEach {
        $0.receive(value)
    }
}

由于多个 Subscriber 同享此 Publisher,因而咱们有必要运用锁维护对可变变量的访问。仅在上游尚未完结时才中继值。将值增加到缓冲区并仅保存所需容量值。这些是重播给新 Subscriber 的内容。将缓冲的值中继到每个衔接的 Subscriber。

增加这个办法来处理完结事情:

private func complete(_ completion: Subscribers.Completion<Failure>) {
    lock.lock()
    defer { lock.unlock() }
    self.completion = completion
    subscriptions.forEach {
        $0.receive(completion: completion)
    }
}

为未来的 Subscriber 保存完结事情。将其转发给每个衔接的 Subscriber。

咱们现在已准备好开端编写每个 Publisher 有必要完结的 receive 办法。此办法将接纳 Subscriber。它的职责是创立一个新 Subscription,然后将其交给 Subscriber。

增加此代码以开端界说此办法:

func receive<S: Subscriber>(subscriber: S)
where Failure == S.Failure,
      Output == S.Input {
    lock.lock()
    defer { lock.unlock() }
    let subscription = ShareReplaySubscription(
        subscriber: subscriber,
        replay: replay,
        capacity: capacity,
        completion: completion)
    subscriptions.append(subscription)
    subscriber.receive(subscription: subscription)
    guard subscriptions.count == 1 else { return }
    let sink = AnySubscriber(
        receiveSubscription: { subscription in
            subscription.request(.unlimited)
        },
        receiveValue: { [weak self] (value: Output) -> Subscribers.Demand in
            self?.relay(value)
            return .none
        },
        receiveCompletion: { [weak self] in
            self?.complete($0)
        }
    )
    upstream.subscribe(sink)
}

新 Subscription 引证 Subscriber 并接纳当时 replaycapacitycompletion。咱们保存 Subscription 以将未来的事情传递给它。咱们将 Subscription 发送给 Subscriber,Subscriber 或许(现在或今后)开端恳求值。

只向上游 Publisher 订阅一次。运用方便的 AnySubscriber 类,它承受闭包,并在订阅时当即恳求 .unlimited 值以让 Publisher 运转完结。将咱们收到的值转发给下流 Subscriber。运用咱们从上游取得的完结事情来完结咱们的 Publisher。

留意:咱们开始能够恳求 .max(self.capacity) 并仅接纳它,但 Combine 是 Demand 驱动的!假如咱们恳求的值没有 Publisher 能够产生的那么多值,那么咱们或许永久不会收到完结事情!

最终咱们将 AnySubscriber 订阅到上游 Publisher。

咱们的 Publisher 已完结!咱们还需求一个便利的 Operator,帮助将这个新 Publisher 与其他 Publishe r联系起来。

将其作为扩展增加到 Playground 末尾的 Publishers 命名空间:

extension Publisher {
    func shareReplay(capacity: Int = .max)
    -> Publishers.ShareReplay<Self> {
        return Publishers.ShareReplay(upstream: self,
                                      capacity: capacity)
    }
}

咱们现在拥有一个功用齐全的 shareReplay(capacity:) Operator。

测验 shareReplay(capacity:)

在 Playground 中提那件以下代码:

let subject = PassthroughSubject<Int,Never>()
let publisher = subject.shareReplay(capacity: 2)
subject.send(0)
let subscription1 = publisher.sink(
    receiveCompletion: {
        print("subscription1 completed: \($0)")
    },
    receiveValue: {
        print("subscription1 received \($0)")
    }
)
subject.send(1)
subject.send(2)
subject.send(3)

subscription1 将收到:

subscription1 received 1
subscription1 received 2
subscription1 received 3

接下来,创立第二个 Subscription 并发送更多值和完结事情:

let subscription2 = publisher.sink(
    receiveCompletion: {
        print("subscription2 completed: \($0)")
    },
    receiveValue: {
        print("subscription2 received \($0)")
    }
)
subject.send(4)
subject.send(5)
subject.send(completion: .finished)

从头运转,subscription2 将收到回放,subscription1 subscription2 将收到新值:

subscription1 received 1
subscription1 received 2
subscription1 received 3
subscription2 received 2
subscription2 received 3
subscription1 received 4
subscription2 received 4
subscription1 received 5
subscription2 received 5
subscription1 completed: finished
subscription2 completed: finished

增加一个稍有推迟的 Subscription,以保证它在 Publisher 完结后产生:

var subscription3: Cancellable? = nil
DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
    print("Subscribing to shareReplay after upstream completed")
    subscription3 = publisher.sink(
        receiveCompletion: {
            print("subscription3 completed: \($0)")
        },
        receiveValue: {
            print("subscription3 received \($0)")
        }
    )
}
subscription1 received 1
subscription1 received 2
subscription1 received 3
subscription2 received 2
subscription2 received 3
subscription1 received 4
subscription2 received 4
subscription1 received 5
subscription2 received 5
subscription1 completed: finished
subscription2 completed: finished
Subscribing to shareReplay after upstream completed
subscription3 received 4
subscription3 received 5
subscription3 completed: finished

一切契合预期,shareReplay(capacity:) 作业做得很好!

处理 Backpressure

在流体动力学中,背压是与经过管道的流体活动相反的阻力。在 Combine 中,它是对来自 Publisher 的值流的阻力。

一般, Subscriber 需求处理 Publisher 宣布的值:

  • 处理高频数据,例如来自传感器的输入;

  • 履行大文件传输;

  • 在数据更新时渲染杂乱的 UI;

  • 等候用户输入;

  • 处理 Subscriber 无法以传入速度跟上的传入数据。

Combine 供给的 Publisher – Subscriber 机制是灵敏的。这是一种拉式规划,而不是推式规划。这意味着 Subscriber 要求Publisher 宣布值并指定他们想要接纳的数量。这种恳求机制是自适应的:每次订阅者收到一个新值时,需求都会更新。这答应 Subscriber 在他们不想接纳更多数据时经过“封闭水龙头”来处理背压,并在他们准备好接纳更多数据时“打开它”。

留意:请记住,咱们只能以累加的办法调整 Demand。能够在 Subscriber 每次收到新值时增加 Demand,办法是回来新的 .max(N).unlimited。或许能够回来 .none,表明需求不应增加。Subscriber 随后“挂机”以接纳至少到达新的最大 Demand 的值。例如,假如之前的最大 Demand 是接纳三个值,而 Subscriber 只接纳到一个,则在订阅者的 receive(_:) 中回来 .none 不会“封闭水龙头”。当 Publisher 准备好发送值时, Subscriber 仍将最少接纳两个值。

当更多值可用时会产生什么彻底取决于咱们的规划,咱们能够:

  • 经过办理 Demand 来操控流量,以防止 Publisher 发送超出处理才能的值;

  • 缓冲值,直到能够处理它们 – 存在耗尽可用内存的危险;

  • 删去无法当即处理的值;

  • 根据要求对以上的一些组合。

除了上述之外,处理背压还能够采取以下形式:

  • 具有处理拥塞的自界说 Subscription 的 Publisher。

  • 在 Publisher 链结尾供给值的 Subscriber。

咱们将专注于完结后者,创立一个 sink 函数的可暂停变体。

运用可暂停的 sink 来处理背压

在 Playground 创立一个协议,让咱们从暂停中康复:

protocol Pausable {
    var paused: Bool { get }
    func resume()
}

这儿不需求 pause() 办法,由于咱们将在收到每个值时确认是否暂停。

接下来开端界说可暂停的 Subscriber:

final class PausableSubscriber<Input, Failure: Error>:
    Subscriber, Pausable, Cancellable {
    let combineIdentifier = CombineIdentifier()
}

可暂停 Publisher 既能够暂停也能够撤销。这是 pausableSink 函数将回来的目标。这也是为什么咱们将它作为一个 class 而不是一个 struct 来完结的原因:咱们不期望一个目标被复制,并且咱们需求在其生命周期的某些点上具有可变性。Subscriber 有必要为 Combine 供给唯一标识符以办理和优化其 Publisher 流。

现在增加这些特点:

let receiveValue: (Input) -> Bool
let receiveCompletion: (Subscribers.Completion<Failure>) -> Void
private var subscription: Subscription? = nil
var paused = false

receiveValue 闭包回来一个 Bool:true 表明它或许会收到更多的值,false 表明订阅应该暂停。完结闭包将在收到来自 Publisher 的完结事情时被调用。保存 Subscription,以便它能够在暂停后恳求更多值。当咱们不再需求它时,你需求将此特点设置为 nil 以防止循环。根据 Pausable 协议揭露 paused 特点。

接下来,将以下代码增加到 PausableSubscriber 以完结初始化办法并契合 Cancelable 协议:

init(receiveValue: @escaping (Input) -> Bool,
     receiveCompletion: @escaping (Subscribers.Completion<Failure>) -> Void) {
    self.receiveValue = receiveValue
    self.receiveCompletion = receiveCompletion
}
func cancel() {
    subscription?.cancel()
    subscription = nil
}

初始化办法承受两个闭包,Subscriber 将在收到来自 Publisher 的新值和完结时调用它们。闭包类似于运用 sink 函数的闭包,但有一个例外:receiveValue 闭包回来一个布尔值以指示接纳器是否准备好承受更多值,或许是否需求暂停 Subscription。撤销 Subscription 时,不要忘掉之后将其设置为 nil 以防止循环。

现在增加此代码以满足 Subscriber 的要求:

func receive(subscription: Subscription) {
    self.subscription = subscription
    subscription.request(.max(1))
}
func receive(_ input: Input) -> Subscribers.Demand {
    paused = receiveValue(input) == false
    return paused ? .none : .max(1)
}
func receive(completion: Subscribers.Completion<Failure>) {
    receiveCompletion(completion)
    subscription = nil
}

收到 Publisher 创立的 Subscription 后,将其存储以备后用,以便咱们能够从暂停中康复。当即恳求一个值。 Subscriber 能够暂停,咱们无法猜测何时需求暂停。这儿的策略是一个一个地恳求值。

当接纳到新值时,调用 receiveValue 并相应更新暂停状态。假如 Subscriber 被暂停,回来 .none 表明你现在不想要更多的值——记住,你开始只恳求了一个。

接纳到完结事情后,将其转发给 receiveCompletion,然后将 Subscription 设置为 nil,由于你不再需求它。

最终,完结 Pausable 的其余部分:

func resume() {
    guard paused else { return }
    paused = false
    subscription?.request(.max(1))
}

假如 Publisher paused,则恳求一个值以从头开端。咱们先在能够在 Publishers 命名空间中揭露新的 pausableSink

在 Playground 的末尾增加以下代码:

extension Publisher {
    func pausableSink(
        receiveCompletion: @escaping ((Subscribers.Completion<Failure>) -> Void),
        receiveValue: @escaping ((Output) -> Bool))
    -> Pausable & Cancellable {
        let pausable = PausableSubscriber(
            receiveValue: receiveValue,
            receiveCompletion: receiveCompletion)
        self.subscribe(pausable)
        return pausable
    }
}

pausableSinksink 非常接近。唯一的区别是 receiveValue 闭包的回来类型:Bool。实例化一个新的 PausableSubscriber 并将其 subscribe。Subscriber 是咱们用来康复和撤销 Subscription 的目标。

测验 pausableSink

let subscription = [1, 2, 3, 4, 5, 6]
    .publisher
    .pausableSink(receiveCompletion: { completion in
        print("Pausable subscription completed: \(completion)")
    }) { value -> Bool in
        print("Receive value: \(value)")
        if value % 2 == 1 {
            print("Pausing")
            return false
        }
        return true
    }

Publisher 一般按顺序宣布一切值。运用 pausableSink,此发布者将在收到值 1、3 和 5 时暂停。运转 Playground:

Receive value: 1
Pausing

要康复 Publisher,需求异步调用 resume()。运用 Timer 很简单做到这一点:

let timer = Timer.publish(every: 1, on: .main, in: .common)
    .autoconnect()
    .sink { _ in
        guard subscription.paused else { return }
        print("Subscription is paused, resuming")
        subscription.resume()
    }

再次运转 Playground:

Receive value: 1
Pausing
Subscription is paused, resuming
Receive value: 2
Receive value: 3
Pausing
Subscription is paused, resuming
Receive value: 4
Receive value: 5
Pausing
Subscription is paused, resuming
Receive value: 6
Pausable subscription completed: finished

留意:假如 Publisher 无法保存值并等候 Subscriber 恳求它们怎么办?在这种情况下,咱们需求运用 buffer(size:prefetch:whenFull:) Operator 来缓冲值。此 Operator 能够将值缓冲到 size 参数中指定的容量,并在 Subscriber 准备好接纳它们时传递它们,其他参数确认缓冲区怎么填满以及缓冲区满时会产生什么。

内容参阅

  • Combine | Apple Developer Documentation;
  • 来自Kodeco的书本《Combine: Asynchronous Programming with Swift》;
  • 对上述Kodeco书本的汉语自译版《Combine: Asynchronous Programming with Swift》整理与弥补。