前言

AsyncThrowingStreamAsyncStreamSwift 5.5 中由 SE-314 引进的并发结构的一部分。异步流答应你替换根据闭包或 Combine 发布器的现有代码。

在深入研究围绕抛出流的细节之前,假如你还没有阅览我的文章,我主张你先阅览我的文章,内容包括async-await。本文解说的大部分代码将运用那里解说的API。

什么是 AsyncThrowingStream?

你能够把 AsyncThrowingStream 看作是一个有可能导致抛出过错的元素流。他的值随着时刻的推移而传递,流能够经过一个结束事件来关闭。一旦发生过错,结束事件既能够是成功,也能够是失败。

什么是 AsyncStream?

AsyncStream 相似于抛出的变体,但绝不会导致抛出过错。一个非抛出型的异步流会根据清晰的完结调用或流的撤销而完结。

留意: 在这篇文章中,咱们将解说如何运用AsyncThrowingStream。除了发生过错处理的部分,代码示例与AsyncStream相似。

AsyncThrowingStream

如何运用 AsyncThrowingStream

AsyncThrowingStream 能够很好地替代现有的根据闭包的代码,如进展和完结处理程序。为了更好地了解我的意思,我将向你介绍咱们在 WeTransfer 应用程序中遇到的一个场景。

在咱们的应用程序中,咱们有一个根据闭包的现有类,叫做 FileDownloader

struct FileDownloader {
    enum Status {
        case downloading(Float)
        case finished(Data)
    }
    func download(_ url: URL, progressHandler: (Float) -> Void, completion: (Result<Data, Error>) -> Void) throws {
        // .. Download implementation
    }
}

文件下载器承受一个URL,陈述进展状况,并完结一个包括下载数据的成果或在失败时显现一个过错。

文件下载器在文件下载过程中陈述一个数值流。在这种状况下,它陈述的是一个状况值流,以陈述正在运转的下载的当前状况。FileDownloader 是一个完美的比如,你能够重写一段代码来运用 AsyncThrowingStream。然而,重写需要你在完结层面上也重写你的代码,所以让咱们定义一个重载办法来替代:

extension FileDownloader {
    func download(_ url: URL) -> AsyncThrowingStream<Status, Error> {
        return AsyncThrowingStream { continuation in
            do {
                try self.download(url, progressHandler: { progress in
                    continuation.yield(.downloading(progress))
                }, completion: { result in
                    switch result {
                    case .success(let data):
                        continuation.yield(.finished(data))
                        continuation.finish()
                    case .failure(let error):
                        continuation.finish(throwing: error)
                    }
                })
            } catch {
                continuation.finish(throwing: error)
            }
        }
    }
}

正如你所看到的,咱们把下载办法包裹在一个 AsyncThrowingStream 里边。咱们将流的值 Status 的类型描绘为一个通用的类型,答应咱们用状况更新来延续流。

只需有过错发生,咱们就会经过抛出一个过错来完结流。在完结处理程序的状况下,咱们要么经过抛出一个过错来完结,要么用一个不抛出的完结回调来跟进数据的发生。

switch result {
case .success(let data):
    continuation.yield(.finished(data))
    continuation.finish()
case .failure(let error):
    continuation.finish(throwing: error)
}

在收到最终的状况更新后,不要忘掉 finish() 回调,这一点至关重要。不然,咱们将坚持流的存活,而完结层面的代码将永远不会持续。

咱们能够经过运用另一个 yield 办法来重写上述代码,承受一个 Result 枚举作为参数:

continuation.yield(with: result.map { .finished($0) })
continuation.finish()

重写后的代码简化了咱们的代码,并去掉了 switch-case 代码。咱们必须映射咱们的 Reslut 枚举以匹配预期的 Status 值。假如咱们发生一个失败的成果,咱们的流将在抛出包括的过错后结束。

AsyncThrowingStream 迭代

一旦你装备好你的异步抛出流,你就能够开始在数值流上进行迭代。在咱们的 FileDownloader 比如中,它将看起来如下所示:

do {
    for try await status in download(url) {
        switch status {
        case .downloading(let progress):
            print("Downloading progress: \(progress)")
        case .finished(let data):
            print("Downloading completed with data: \(data)")
        }
    }
    print("Download finished and stream closed")
} catch {
    print("Download failed with \(error)")
}

咱们处理任何状况的更新,而且咱们能够运用 catch 闭包来处理任何发生的过错。你能够运用根据 AsyncSequence 接口for ... in 循环进行迭代,这对 AsyncStream 来说是一样的。

假如你遇到了相似的编译过错:

‘async’ in a function that does not support concurrency

你可能想读一读我的文章,其间Swift 中的 async/await ——代码实例详解。

上述代码示例中的打印句子有助于你了解 AsyncThrowingStream 的生命周期。你能够替换打印句子来处理进展更新和处理数据,为你的用户完结可视化

调试 AsyncStream

假如一个流不能陈述数值,咱们能够经过放置断点来调试流发生的回调。尽管也可能是上面的 “Download finished and stream closed” 的打印句子不会调用,这意味着你在完结层的代码永远不会持续。后者可能是一个未完结的流的成果。

为了验证,咱们能够利用 onTermination 回调:

func download(_ url: URL) -> AsyncThrowingStream<Status, Error> {
    return AsyncThrowingStream { continuation in
        ///  装备一个停止回调,以了解你的流的生命周期。
        continuation.onTermination = { @Sendable status in
            print("Stream terminated with status \(status)")
        }
        // ..
    }
}

回调在流停止时被调用,它将告诉你你的流是否还活着。我引荐你阅览 Sendable 和 @Sendable 闭包代码实例详解 来了解 @Sendable 属性。

假如呈现了过错,输出成果可能如下:

Stream terminated with status finished(Optional(FileDownloader.FileDownloadingError.example))

上述输出只要在运用 AsyncThrowingStream 时才干完结。假如是一个普通的 AsyncStream,完结的输出看起来如下:

Stream terminated with status finished

而撤销的成果对这两种类型的流来说都是这样的:

Stream terminated with status cancelled

你也能够在流结束后运用这个停止回调进行任何整理。例如,删除任何观察者或在文件下载后整理磁盘空间。

撤销一个 AsyncStream

一个 AsyncStreamAsyncThrowingStream 能够因为一个关闭的使命被撤销而撤销。一个比如能够如下:

let task = Task.detached {
    do {
        for try await status in download(url) {
            switch status {
            case .downloading(let progress):
                print("Downloading progress: \(progress)")
            case .finished(let data):
                print("Downloading completed with data: \(data)")
            }
        }
    } catch {
        print("Download failed with \(error)")
    }
}
task.cancel()

一个流在超出范围或包围的使命撤销时就会撤销。如前所述,撤销将相应地触发 onTermination 回调。

定论

AsyncThrowingStreamAsyncStream 是重写根据闭包的现有代码到支持 async-awai t的替代品的好办法。你能够提供一个连续的值流,并在成功或失败时完结一个流。你能够运用根据 AsyncSequence APIs 的 for 循环在完结层面上迭代值。

本文正在参与「金石计划」