【译】SE-0300 Continuation -- 执行同步代码的异步任务接口

原文链接:SE-0300 Continuations for interfacing async tasks with synchronous code

简介

异步 Swift代 码需要能够与现有的同步代码一起工作,这些代码使用回调和 delegate 等方式来响应事件。异步任务可以在 continuations 上暂停自己,然后同步代码可以捕获并调用它来恢复任务以响应事件。

Swift-evolution thread:

动机

Swift 的 API 经常会通过回调的方式提供异步执行的功能。这种情况可能是因为代码本身是在引入 async/await 之前编写的,或者(更有趣的是,从长远来看)是因为它与其它一些(主要是)事件驱动的系统联系在一起。在这种情况下,人们可能希望向客户端提供一个异步接口,同时在内部使用回调。在这些情况下,调用异步任务需要能够暂停自己,同时提供一个机制让事件驱动的同步系统在响应事件时恢复它。

解决方案

本提案将提供 API 来获取当前异步任务的 continuation。获取任务的 continuation 会暂停任务,并产生一个值,同步的代码可以使用 handle 来恢复任务。给定一个基于 completionHandler 的 API,例如:

func beginOperation(completion: (OperationResult) -> Void)

我们可以把它变成一个 async 接口,先将当前的任务暂停,然后把它的 continuation 传入闭包中用来恢复任务,把传入回调的参数作为 async 函数的返回值:

func operation() async -> OperationResult {
// 暂停当前任务,立即执行这个闭包,并且将它的 contunation 传进去
return await withUnsafeContinuation { continuation in
// 执行基于 callback 的同步 API
beginOperation(completion: { result in
// 当回调执行时恢复 contiuation
continuation.resume(returning: result)
})
}
}

具体设计

原始的 unsafe continuations

本提案提供了两个函数,withUnsafeContinuationwithUnsafeThrowingContinuation,允许用户在异步代码内部使用基于回调的 API。这两个函数都需要传入一个 operation 闭包,它通常会在基于回调的 API 里调用。闭包里接收一个必须由回调恢复的 continuation 实例,用于提供结果值或(在 throw 的版本中)抛出错误,当异步任务恢复时,成为 withUnsafeContinuation 调用的结果:

struct UnsafeContinuation<T, E: Error> {
func resume(returning: T)
func resume(throwing: E)
func resume(with result: Result<T, E>)
}

extension UnsafeContinuation where T == Void {
func resume() { resume(returning: ()) }
}

extension UnsafeContinuation where E == Error {
// 允许 Result 版本使用更严格的 Error 类型
func resume<ResultError: Error>(with result: Result<T, ResultError>)
}

func withUnsafeContinuation<T>(
_ operation: (UnsafeContinuation<T, Never>) -> ()
) async -> T

func withUnsafeThrowingContinuation<T>(
_ operation: (UnsafeContinuation<T, Error>) throws -> ()
) async throws -> T

withUnsafe*Continuation 将在任务的当前上下文中立即执行它的 operation 参数,传入一个可用于恢复任务的 continuation 值。operation 函数必须安排在未来的某个时刻恢复 continuation;在 operation 函数返回后,任务就会被暂停。然后,必须通过调用 continuation 任意一个 resume 方法使任务脱离暂停状态。注意,resume 在将任务从暂停状态过渡出来后,会立即将控制权返回给调用者;任务本身实际上并不会恢复执行,而是等到它的执行者再次调度它。当任务恢复执行时,resume(returning:) 的参数会成为 withUnsafe*Continuation 的返回值。resume(throwing:) 可以通过传入给定的 Error 使任务恢复。为了方便起见,可以传入一个 Resultresume(with:) 会根据 Result 的状态,通过正常返回或抛出错误来恢复任务。如果 operation 在返回前引发了一个未捕获的错误,就会跟调用了 resume(throwing:) 一样。

如果 withUnsafe*Continuation 的返回类型是 Void,那么在调用 resume(returning:) 时必须传入 ()。这样做会产生一些丑陋的代码,所以 Unsafe*Continuation<Void> 有一个额外的方法 resume(),使调用代码更容易阅读。

在调用 withUnsafeContinuation 后,每个分支上都必须调用一次且仅一次 resume 方法。Unsafe*Continuation 是一个不安全的接口,所以同一个 continuation 多次调用 resume 方法属于未定义的行为。在任务被恢复之前,它会一直处于暂停状态;如果 continuation 被释放掉了,并且从未被恢复,那么任务将一直处于暂停状态,直到进程结束,它所拥有的任何资源都会泄漏。我们可以提供一层封装捕获这些错误的使用,本提案也打算引入这样的一个 Wrapper,下面将详细讨论。

例如,使用 Unsafe*Continuation API,可以封装这样的函数(这里为了展示 continuation API 的灵活性而故意弄得很复杂):

func buyVegetables(
shoppingList: [String],
// a) if all veggies were in store, this is invoked *exactly-once*
onGotAllVegetables: ([Vegetable]) -> (),

// b) if not all veggies were in store, invoked one by one *one or more times*
onGotVegetable: (Vegetable) -> (),
// b) if at least one onGotVegetable was called *exactly-once*
// this is invoked once no more veggies will be emitted
onNoMoreVegetables: () -> (),

// c) if no veggies _at all_ were available, this is invoked *exactly once*
onNoVegetablesInStore: (Error) -> ()
)
// returns 1 or more vegetables or throws an error
func buyVegetables(shoppingList: [String]) async throws -> [Vegetable] {
try await withUnsafeThrowingContinuation { continuation in
var veggies: [Vegetable] = []

buyVegetables(
shoppingList: shoppingList,
onGotAllVegetables: { veggies in continuation.resume(returning: veggies) },
onGotVegetable: { v in veggies.append(v) },
onNoMoreVegetables: { continuation.resume(returning: veggies) },
onNoVegetablesInStore: { error in continuation.resume(throwing: error) },
)
}
}

let veggies = try await buyVegetables(shoppingList: ["onion", "bell pepper"])

由于在 buyVegetables 函数的复杂回调里正确地对 continuation resume 进行了调用,我们能够为这个函数提供一个更好的重载,让异步代码以更自然的方式与这个函数交互。

Checked continuations

Unsafe*Continuation 为同步和异步代码的接口提供了一个轻量的机制,但它很容易用错,并且会以危险的方式破坏进程状态。为了提供额外的安全性和指导(在开发同步和异步代码交互的接口时),本提案还将提供一个 Wrapper,用于检查对 continuation 的非法使用:

struct CheckedContinuation<T, E: Error> {
func resume(returning: T)
func resume(throwing: E)
func resume(with result: Result<T, E>)
}

extension CheckedContinuation where T == Void {
func resume()
}

extension CheckedContinuation where E == Error {
// Allow covariant use of a `Result` with a stricter error type than
// the continuation:
func resume<ResultError: Error>(with result: Result<T, ResultError>)
}

func withCheckedContinuation<T>(
_ operation: (CheckedContinuation<T, Never>) -> ()
) async -> T

func withCheckedThrowingContinuation<T>(
_ operation: (CheckedContinuation<T, Error>) throws -> ()
) async throws -> T

这里的 API 特意与 Unsafe 的 API 保持一致,因此代码可以很容易地在 checked 和 unsafe 的版本之间切换。例如,上面的 buyVegetables 例子只需将 withUnsafeThrowingContinuation 的调用变成 withCheckedThrowingContinuation 的调用,就可以提供运行时的检查:

// returns 1 or more vegetables or throws an error
func buyVegetables(shoppingList: [String]) async throws -> [Vegetable] {
try await withCheckedThrowingContinuation { continuation in
var veggies: [Vegetable] = []

buyVegetables(
shoppingList: shoppingList,
onGotAllVegetables: { veggies in continuation.resume(returning: veggies) },
onGotVegetable: { v in veggies.append(v) },
onNoMoreVegetables: { continuation.resume(returning: veggies) },
onNoVegetablesInStore: { error in continuation.resume(throwing: error) },
)
}
}

CheckedContinuation 不会导致未定义的行为,相反,如果程序试图多次 resume continuation 的话,CheckedContinuation 就会捕获到这种情况并且触发 trap。如果在没有恢复任务的情况下释放了 continuation,这使任务停留在它的暂停状态,泄露它所拥有的任何资源,并且 CheckedContinuation 会打印一个警告。无论程序采用了哪种优化级别,这些检查都会执行。

更多例子

Continuation 也可以用来跟(比回调)更复杂的事件驱动接口对接。只要整个过程都符合要求(continuation 只被恢复一次),那么在哪里恢复 continuation 就没有其它限制。例如,一个 Operation 可以在操作完成时触发 continuation 的恢复:

class MyOperation: Operation {
let continuation: UnsafeContinuation<OperationResult, Never>
var result: OperationResult?

init(continuation: UnsafeContinuation<OperationResult, Never>) {
self.continuation = continuation
}

/* rest of operation populates `result`... */

override func finish() {
continuation.resume(returning: result!)
}
}

func doOperation() async -> OperationResult {
return await withUnsafeContinuation { continuation in
MyOperation(continuation: continuation).start()
}
}

使用结构化并发提案中的 API,可以将一个 URLSession 包裹在一个任务中,让任务的取消来控制 session 的取消,并使用 continuation 来响应网络请求接收的数据和抛出的错误:

func download(url: URL) async throws -> Data? {
var urlSessionTask: URLSessionTask?

return try Task.withCancellationHandler {
urlSessionTask?.cancel()
} operation: {
let result: Data? = try await withUnsafeThrowingContinuation { continuation in
urlSessionTask = URLSession.shared.dataTask(with: url) { data, _, error in
if case (let cancelled as NSURLErrorCancelled)? = error {
continuation.resume(returning: nil)
} else if let error = error {
continuation.resume(throwing: error)
} else {
continuation.resume(returning: data)
}
}
urlSessionTask?.resume()
}
if let result = result {
return result
} else {
Task.cancel()
return nil
}
}
}

围绕基于回调的 API 的 Wrapper 也可以跟随其父级/当前任务的取消,如下所示。

func fetch(items: Int) async throws -> [Items] {
let worker = ...
return try Task.withCancellationHandler(
handler: { worker?.cancel() }
) {
return try await withUnsafeThrowingContinuation { c in
worker.work(
onNext: { value in c.resume(returning: value) },
onCancelled: { value in c.resume(throwing: CancellationError()) },
)
}
}
}

如果允许任务拥有实例(这是在结构化并发提案里讨论的内容),那么也可以获得调用 fetch(items:) 函数的任务,并在 withUnsafeThrowingContinuation 内部认为需要时,调用它的 isCanceled

其它方案

命名缩短为 Continuation(而不是 CheckedContinuation

我们可以将 CheckedContinuation 定位为做同步/异步接口”默认”的 API,把 Checked 这个词从名字中去掉。这符合 Swift 的理念,首选安全的接口,而不安全的接口是性能成为首要考虑因素时才会选择性地使用。不过,这里有几个理由让我们很犹豫:

  • 虽然错误使用 CheckedContinuation 的后果没有 UnsafeContinuation 那么严重,但它仍然只是尽最大努力检查一些常见的错误模式,并不能掩盖掉错误使用 continuation 的后果:释放一个 continuation 而不恢复它仍然会泄露未恢复的任务,而试图多次恢复一个 continuation 仍然会导致通过 continuation 传递的信息丢失。如果 with*Continuation 的操作错误使用 continuation,仍然是一个严重的程序逻辑错误; CheckedContinuation 只能使错误变得更加明显。
  • 如果我们在未来的某个时候有了 move-only 类型,我们想引入一个静态的只恢复 exactly-once 的 continuation 类型,那么现在命名一个类型 Continuation 会占用掉这个”好”名字。

不暴露 UnsafeContinuation 接口

同样的,我们也可以得出另一个论点,即 UnsafeContinuation 根本不应该暴露给上层用户,因为用户总是会使用 Checked 的版本。但我们认为,在与性能敏感的 API 交互时,一旦用户验证了他们调用这些 API 的方式是正确的,避免掉 checked 的成本就是有价值的。

CheckedContinuation 在错误使用时全部触发 trap, 或者全部 log 出来

CheckedContinuation 的目的是当程序试图多次恢复同一个 continuation 时捕获这种情况,但如果一个 continuation 被释放而没有得到恢复,则只 log 一个警告。我们认为这是对不同情况的正确权衡,原因如下:

  • 对于 UnsafeContinuation,多次恢复会破坏进程,使其处于未定义状态。通过在任务多次恢复时触发 trap,CheckedContinuation 就会将未定义的行为变成了定义良好的 trap。这类似于标准库中其它的 checked/unchecked 配对,例如 !OptionalunsafelyUnwrapped
  • 相比之下,如果没有用 UnsafeContinuation 来恢复 continuation,除了泄露暂停的任务的资源之外,并不会破坏任务,程序的其它部分也可以继续正常执行。此外,目前我们能检测和报告这种泄漏的唯一方法是在其实现中使用类的 deinit。由于 ARC 优化的 refcounting 变化,这样 deinit 执行的准确时机是完全无法预测的。如果把 deinit 做成 trap,那么这个 trap 是否被执行以及何时执行可能会随着优化等级的变化而变化,我们认为这不会带来很好的体验。

*Continuation 提供更多 Task API,或者允许 continuation 恢复 Handle

完整的 TaskHandle API 为 Handle 的持有者提供了对任务状态的额外控制,特别是查询和设置取消状态,以及等待任务最终结果的能力,大家可能会问,为什么 *Continuation 类型没有这些功能?ContinuationHandle 的作用有很大的不同,Handle 代表和控制任务的整个生命周期,而 continuation 只代表任务生命期中的一个暂停点。此外,*Continuation API 的设计主要是为了与 Swift 结构化并发模型之外的代码进行对接,我们认为任务之间的交互最好还是尽量在这个模型里处理。

注意,*Continuation 其实也不需要自己去直接提供任何任务的 API。例如,如果有人想让一个任务在响应回调时自行取消,他们可以通过控制 continuation 的 resume 类型来实现(例如一个 Optional 的 nil):

let callbackResult: Result? = await withUnsafeContinuation { c in
someCallbackBasedAPI(
completion: { c.resume($0) },
cancellation: { c.resume(nil) })
}

if let result = callbackResult {
process(result)
} else {
cancel()
}

提供 API 立刻恢复任务,避免队列跳转

有些 API 除了接收一个 completionHandler 或 delegate,还允许客户端控制 completionHandler 或 delegate 的方法调用的位置;例如,Apple 平台上的一些 API 接收一个参数,作为 completionHandler 应该被使用的 DispatchQueue。在这些情况下,如果原始 API 能够直接在 DispatchQueue(或其它任何调度机制,如 Thread 或 Runloop)上恢复任务,那将是最理想的。为了实现这一点,我们可以提供一个 with*Continuation 的变体,除了提供一个 continuation 之外,还提供任务期望被恢复的 DispatchQueue。*Continuation 类型也可以提供一套 unsafeResumeImmediately 的 API,它将立即在当前线程上恢复任务的执行。这样就可以实现下面的功能:

// Given an API that takes a queue and completion handler:
func doThingAsynchronously(queue: DispatchQueue, completion: (ResultType) -> Void)

// We could wrap it in a Swift async function like:
func doThing() async -> ResultType {
await withUnsafeContinuationAndCurrentDispatchQueue { c, queue in
// Schedule to resume on the right queue, if we know it
doThingAsynchronously(queue: queue) {
c.unsafeResumeImmediately(returning: $0)
}
}
}

然而,这样的 API 必须非常谨慎地使用;程序员必须保证 unsafeResumeImmediately 在正确的上下文中被调用,并且在任意时间内,从调用者手中接管当前线程的行为必须保证是安全的。如果在错误的上下文中恢复任务,就会破坏代码逻辑以及编译器和运行时的假设,这将导致难以诊断的奇妙 bug。如果 continuation-based adapter 的”队列跳转”在实践中被证实是一个性能问题,我们可以将其作为核心提案的补充进行研究。

修订历史

Third revision:

  • Replaced separate *Continuation<T> and *ThrowingContinuation<T> types with a single Continuation<T, E: Error> type parameterized on the error type.
  • Added a convenience resume() equivalent to resume(returning: ()) for continuations with a Void return type.
  • Changed with*ThrowingContinuation to take an operation block that may throw, and to immediately resume the task throwing the error if an uncaught error propagates from the operation.

Second revision:

  • Clarified the execution behavior of with*Continuation and *Continuation.resume, namely that with*Continuation immediately executes its operation argument in the current context before suspending the task, and that resume immediately returns to its caller after un-suspending the task, leaving the task to be scheduled by its executor.
  • Removed an unnecessary invariant on when resume must be invoked; it is valid to invoke it exactly once at any point after the with*Continuation operation has started executing; it does not need to run exactly when the operation returns.
  • Added “future direction” discussion of a potential more advanced API that could allow continuations to directly resume their task when the correct dispatch queue to do so is known.
  • Added resume() on Void-returning Continuation types.