你是否曾经想过futures中的block_on是怎么作业的呢?今日咱们就来完结你自己的 block_on 版别.

这篇博客创意应该来自两个 crate, 分别是 wakeful 和 extreme.wakeful供给了一种简略的直接从一个函数创立Waker的办法,extreme则供给了一种十分简练的block_on完结.

咱们的完结目标将与extreme略有不同。 咱们不再寻求零依赖性和最少的代码行数,而是寻求一个安全、高效但仍然相当简略的完结。

咱们将运用的依赖项有pin-utils、crossbeam和async-task。

签名

block_on的签名如下所示。 咱们以一个 future 作为参数,在当时线程上运转它(当它pending时堵塞) ,然后回来它的输出:

fn block_on<F: Future>(future: F) -> F::Output {
    todo!()
}

现在让咱们完结遗失的 todo! ()部分。

初度尝试

请注意,规范库的Future中的poll的参数是一个Pin<&mut Future>。 所以咱们需求先把它固定(Pin)住。 虽然有一种办法能够安全地运用Box::pin()来完结这一点,可是咱们更乐意把Future放在栈上而不是堆上(译者注 Box::Pin会把Future分配到堆上,然后Pin)。

不幸的是,安全地将Future固定在栈上的仅有办法是运用pin-utils:

pin_utils::pin_mut!(future);

pin_mut这个宏会将一个类型为Ffuture转换为Pin<&mut F>,并将其固定在栈上.

接下来咱们需求具体说明当这个future被唤醒时会发生什么。 在这种情况下,唤醒应该仅仅免除运转future的线程的堵塞。

结构一个唤醒器或许很丑恶ーー只要看一眼extreme的完结就能够了。 这是手工构建Waker最简略的办法! 处处都是原始指针和不安全的代码… … 让咱们暂时越过这一部分,今后再填空。

let waker = todo!();

终究,咱们从Waker创立一个使命上下文,并在循环中轮询future。 假如完结,回来输出。 假如它挂起,阻止当时线程:

let cx = &mut Context::from_waker(&waker);
loop {
    match future.as_mut().poll(cx) {
        Poll::Ready(output) => return output,
        Poll::Pending => thread::park(),
    }
}

请别对Contex类型感到困惑,它便是 Waker 的一个包装器ーー没有什么比这更好的了。 当在 Rust 中设计 async / await 时,咱们不确定除了传递 Waker给poll ()之外,传递其他任何东西是否有用,所以咱们想出了这个包装器,它或许在 Rust 的未来版别中包含更多的东西。

不管怎样… 咱们差不多完结了。让咱们回到方才的block_on,并将todo替换为上面的代码。

仔细想想,Waker 实际上是Arc<dyn Fn () + Send + Sync>的精心优化版别,wake()调用这个函数。 换句话说,Waker 是一个回调函数,当将来能够继续履行时,它就会被调用。

因为 Waker 是如此难以结构,所以sagebind供给了waker_fn(),这是一种将任何函数转换为 Waker 的直接办法。 不幸的是,wakeful好像此时被猛拉,所以我借用了wakerfn()并将其放入我的async-task中。

在咱们的代码块中,回调函数便是唤醒future所在线程:

let thread = thread::current();
let waker = async_task::waker_fn(move || thread.unpark());

简略多了! 比起处理RawWaker 和 RawWakerVTable 好多了。

在内部,waker_fn()结构函数实际上创立了Arc<impl Fn () + Send + Sync>,然后用不安全的代码将其转换为 Waker,这些代码看起来与咱们在extreme中看到的类似。

下面是对block_on的完好完结:

fn block_on<F: Future>(future: F) -> F::Output {
    pin_utils::pin_mut!(future);
    let thread = thread::current();
    let waker = async_task::waker_fn(move || thread.unpark());
    let cx = &mut Context::from_waker(&waker);
    loop {
        match future.as_mut().poll(cx) {
            Poll::Ready(output) => return output,
            Poll::Pending => thread::park(),
        }
    }
}

完好代码见v1.rs

park问题

可是,现在还不是庆祝的时分。 有个问题。 假如future的用户代码也运用 park / unpark API,它或许会从回调函数中获取并“盗取” unpark 告诉。 阅读这个问题能够得到更具体的解说。

译者注 说白了便是因为都在用unpark,会让导致不该唤醒的时分被唤醒,或许收不到唤醒告诉.

一个或许的解决方案是运用一种不同于 std: : thread 模块中的线程的park/unpark办法。 这样,future的代码就不会搅扰唤醒。

在crossbeam中有一个十分类似的 park / unpark 机制,只不过它答应咱们创立恣意多的独立的 parkers,而不是每个线程都有一个。 让咱们为block_on的每一次调都独立创立一个parker:

fn block_on<F: Future>(future: F) -> F::Output {
    pin_utils::pin_mut!(future);
    let parker = Parker::new();
    let unparker = parker.unparker().clone();
    let waker = async_task::waker_fn(move || unparker.unpark());
    let cx = &mut Context::from_waker(&waker);
    loop {
        match future.as_mut().poll(cx) {
            Poll::Ready(output) => return output,
            Poll::Pending => parker.park(),
        }
    }
}

就这样! 问题解决了。 完好的代码在v2.rs,你能够去运转他.

一个优化

创立一个 Parker 和 Waker 并不是免费的ーー这两者都会引起内存分配, 咱们能改进吗?

为什么不在线程本地存储器中缓存它们,而不是在每次调用block_on时结构 Parker 和 Waker 呢? 这样,线程就能够在 block on ()的所有调用中重用相同的实例:

fn block_on<F: Future>(future: F) -> F::Output {
    pin_utils::pin_mut!(future);
    thread_local! {
        static CACHE: (Parker, Waker) = {
            let parker = Parker::new();
            let unparker = parker.unparker().clone();
            let waker = async_task::waker_fn(move || unparker.unpark());
            (parker, waker)
        };
    }
    CACHE.with(|(parker, waker)| {
        let cx = &mut Context::from_waker(&waker);
        loop {
            match future.as_mut().poll(cx) {
                Poll::Ready(output) => return output,
                Poll::Pending => parker.park(),
            }
        }
    })
}

假如future能够快速履行,这个小小的更改将使 block on ()功率大大提升!

完好的代码在v3.rs中

怎么处理递归

是否还有其他问题?

假如future在block_on的内部块再次递归地调用block_on会怎样? 当然咱们能够答应也能够禁止递归。

假如咱们选择答应递归,那么咱们还需求确保 block on ()的递归调用不同享相同的 Parker 和 Waker 实例,否则就无法知道哪个 block on ()调用会被唤醒。

futures的block on 发生递归调用会panic。 对于答应递归仍是禁止递归,我没有激烈的意见—- 这两种行为都是明智的。 可是,已然咱们在仿照futures的版别,那么就不要运用递归。

为了检测递归调用,咱们能够引入另一个线程本地变量来指示咱们当时是否在 block on()中。 但这是一个很大的作业量。

这里有一个很帅的技巧,它只需对代码进行更少的更改。 让咱们把(Parker,Waker)包装到 RefCell 中,假如屡次发生可变的借用,程序就会panic:

fn block_on<F: Future>(future: F) -> F::Output {
    pin_utils::pin_mut!(future);
    thread_local! {
        static CACHE: RefCell<(Parker, Waker)> = {
            let parker = Parker::new();
            let unparker = parker.unparker().clone();
            let waker = async_task::waker_fn(move || unparker.unpark());
            RefCell::new((parker, waker))
        };
    }
    CACHE.with(|cache| {
        let (parker, waker) = &mut *cache.try_borrow_mut().ok()
            .expect("recursive `block_on`");
        let cx = &mut Context::from_waker(&waker);
        loop {
            match future.as_mut().poll(cx) {
                Poll::Ready(output) => return output,
                Poll::Pending => parker.park(),
            }
        }
    })
}

终于。 现在咱们真的结束了,我保证! 终究的完结是正确的,强健的,高效的。 差不多吧。 :)

完好的代码见v4.rs

Benchmarks

为了测验block_on的功率,让咱们将它与futures的进行基准测验比较。

可是首要,咱们将编写一个 helper future 类型,它能够反复被轮询,直到完结:

struct Yields(u32);
impl Future for Yields {
    type Output = ();
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
        if self.0 == 0 {
            Poll::Ready(())
        } else {
            self.0 -= 1;
            cx.waker().wake_by_ref();
            Poll::Pending
        }
    }
}

举个例子,为了测验轮询十次的性能,能够这样写:

#[bench]
fn custom_block_on_10_yields(b: &mut Bencher) {
    b.iter(|| block_on(Yields(10)));
}

让咱们设定一组三个基准,轮询次数分别为0、10和50。 咱们运用自定义的block_on,然后运用 futures 的。 您能够在yield.rs中找到完好的基准测验代码。

下面是我的机器上的结果:

test custom_block_on_0_yields   ... bench:           3 ns/iter (+/- 0)
test custom_block_on_10_yields  ... bench:         130 ns/iter (+/- 12)
test custom_block_on_50_yields  ... bench:         638 ns/iter (+/- 20)
test futures_block_on_0_yields  ... bench:          10 ns/iter (+/- 0)
test futures_block_on_10_yields ... bench:         236 ns/iter (+/- 10)
test futures_block_on_50_yields ... bench:       1,139 ns/iter (+/- 30)

结果显现,在这个特定的基准测验中,咱们的block_on比futures的大约快2到3倍,这一点都不差!

结论

Async Rust 之所以令人生畏,是因为它包含了太多的机制: Future trait、 pinning、 Context、 Waker 及其相关的RawWaker 和 RawWakerVTable、 Async 和 await 的语法糖背面的机制、不安全的代码、原始指针等等。

但问题是,许多丑恶的东西乃至不是那么重要ーー实际上它们仅仅无聊的样板文件,能够用像 pin-utils、async-task 和 crossbeam 这样的 crate 绕开。

实际上,今日咱们已经在几行安全代码中构建了一个高效的 block_on,而不需求理解大部分的样板文件。 在另一篇博文中,咱们将树立一个真正的履行器..。

原文:stjepang.github.io/2020/01/25/…

翻译:stevenbai.top/rust/build_…