Future 界说

Future 是 Rust 异步编程的核心,Futuretrait 的界说:

#[must_use = "futures do nothing unless you `.await` or poll them"] #[lang = "future_trait"]
pub trait Future {
    type Output;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
#[must_use = "this `Poll` may be a `Pending` variant, which should be handled"]
pub enum Poll<T> {
    Ready(T),
    Pending,
}

Future 有一个相关类型 Output;还有一个 poll() 办法,它回来Poll<Self::Output>。Poll 是个枚举,有 ReadyPending 两个状况。经过调用 poll() 办法能够推动 Future 的进一步履行,直到使命完结被切走停止。

在当时 poll 中,若 Future 完结了,则回来Poll::Ready(result),即得到 Future 的值并回来;若Future 还没完结,则回来Poll::Pending(),此时 Future 会被挂起,需求等某个事情将其唤醒(wake唤醒函数)

履行调度器 executor

executor 是一个 Future 的调度器。操作系统担任调度线程,但它不会去调度用户态的协程(比方 Future),所以任何运用了协程来处理并发的程序,都需求有一个 executor 来担任协程的调度。

Rust 的 Future 是慵懒的:只要在被 poll 轮询时才会运转。其中一个推动它的方法便是在 async 函数中运用.await来调用另一个 async 函数,可是这个只能处理 async 内部的问题,那些最外层的 async 函数,需求靠履行器 executor 来推动 。

executor 运转时

Rust 尽管供给 Future 这样的协程,但它在言语层面并不供给 executor,当不需求运用协程时,不需求引进任何运转时;而需求运用协程时,能够在生态系统中选择最适宜的 executor。

Rust 有如下4中常见的 executor :

  • futures:这个库自带了很简略的 executor
  • tokio:供给 executor,当运用 #[tokio::main] 时,就隐含引进了 tokio 的 executor
  • async-std:供给 executor,和 tokio 相似
  • smol:供给 async-executor,主要供给了 block_on

wake 告知机制

executor 会管理一批 Future (最外层的 async 函数),然后经过不停地 poll 推动它们直到完结。 最开端,履行器会先 poll 一次 Future ,后边就不会自动去 poll 了,假如 poll 办法回来Poll::Pending,就挂起 Future,直到收到某个事情后,经过 wake()函数去唤醒被挂起 Future,Future 就能够去自动告知履行器,它才会持续去 poll,履行器就能够履行该 Future。这种 wake 告知然后 poll 的方法会不断重复,直到 Future 完结。

Waker 供给了 wake() 办法:其作用是能够告知履行器,相关的使命能够被唤醒了,此时履行器就能够对相应的 Future 再次进行 poll 操作。

Context 是 Waker 的一个封装,先看下 poll 办法里的 Context

pub struct Context<'a> {
    waker: &'a Waker,
    _marker: PhantomData<fn(&'a ()) -> &'a ()>,
}

Waker 的界说和相关的代码十分抽象,内部运用了一个 vtable 来允许各式各样的 waker 的行为:

pub struct RawWakerVTable {
    clone: unsafe fn(*const ()) -> RawWaker,
    wake: unsafe fn(*const ()),
    wake_by_ref: unsafe fn(*const ()),
    drop: unsafe fn(*const ()),
}

Rust 本身不供给异步运转时,它只在规范库里规则了一些根本的接口,能够由各个运转时自行决定怎样完结。所以在规范库中,只能看到这些接口的界说,以及“高层”接口的完结,比方 Waker 下的 wake 办法,只是调用了 vtable 里的 wake() 而已 。

impl Waker {
    /// Wake up the task associated with this `Waker`.
    #[inline]
    pub fn wake(self) {
        // The actual wakeup call is delegated through a virtual function call
        // to the implementation which is defined by the executor.
        let wake = self.waker.vtable.wake;
        let data = self.waker.data;
        // Don't call `drop` -- the waker will be consumed by `wake`.
        crate::mem::forget(self);
        // SAFETY: This is safe because `Waker::from_raw` is the only way
        // to initialize `wake` and `data` requiring the user to acknowledge
        // that the contract of `RawWaker` is upheld.
        unsafe { (wake)(data) };
    }
    ...
}

vtable 具体的完结并不在规范库中,而是在第三方的异步运转时里,比方 futures 库的 waker vtable界说。

构建一个计时器

用一个计时器例子,帮助了解 Future 调度机制,目标是: 在创立计时器时创立新线程,休眠特定时刻,然后过了时刻窗口时告知(signal) 计时器 future。

注:需求用到futures包的ArcWake特征,它能够供给一个方便的途径去构建一个Waker。修改Cargo.toml,添加下面依赖:

[dependencies]
futures = "0.3"

计时器 Future 完整代码:

// future_timer.rs
use futures;
use std::{
    future::Future,
    pin::Pin,
    sync::{Arc, Mutex},
    task::{Context, Poll, Waker},
    thread,
    time::Duration,
};
pub struct TimerFuture {
    shared_state: Arc<Mutex<SharedState>>,
}
/// 在Future和等候的线程间同享状况
struct SharedState {
    /// 定时(睡觉)是否完毕
    completed: bool,
    /// 当睡觉完毕后,线程能够用`waker`告知`TimerFuture`来唤醒使命
    waker: Option<Waker>,
}
impl Future for TimerFuture {
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // 经过检查同享状况,来确认定时器是否现已完结
        let mut shared_state = self.shared_state.lock().unwrap();
        if shared_state.completed {
            println!("future ready. execute poll to return.");
            Poll::Ready(())
        } else {
            println!("future not ready, tell the future task how to wakeup to executor");
            // 设置`waker`,这样新线程在睡觉(计时)完毕后能够唤醒当时的使命,接着再次对`Future`进行`poll`操作,
            // 下面的`clone`每次被`poll`时都会产生一次,实际上,应该是只`clone`一次更加合理。
            // 选择每次都`clone`的原因是: `TimerFuture`能够在履行器的不同使命间移动,假如只克隆一次,
            // 那么获取到的`waker`可能现已被篡改并指向了其它使命,终究导致履行器运转了过错的使命
            shared_state.waker = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}
impl TimerFuture {
    /// 创立一个新的`TimerFuture`,在指定的时刻完毕后,该`Future`能够完结
    pub fn new(duration: Duration) -> Self {
        let shared_state = Arc::new(Mutex::new(SharedState {
            completed: false,
            waker: None,
        }));
        // 创立新线程
        let thread_shared_state = shared_state.clone();
        thread::spawn(move || {
            // 睡觉指定时刻完结计时功用
            thread::sleep(duration);
            let mut shared_state = thread_shared_state.lock().unwrap();
            // 告知履行器定时器现已完结,能够持续`poll`对应的`Future`了
            shared_state.completed = true;
            if let Some(waker) = shared_state.waker.take() {
                println!("detect future is ready, wakeup the future task to executor.");
                waker.wake()
            }
        });
        TimerFuture { shared_state }
    }
}
fn main() {
    // 咱们现在还没有完结调度器,所以要用一下futues库里的一个调度器。
    futures::executor::block_on(TimerFuture::new(Duration::new(10, 0)));    
}

履行成果如下:

future not ready, tell the future task how to wakeup to executor
detect future is ready, wakeup the future task to executor.
future ready. execute poll to return.

能够看到,刚开端的时分,定时10s事情还未完结,处在Pending状况,这时要告知这个使命后边安排妥当后怎样唤醒去调度履行。等10s后,定时事情完结了,经过前面的设置的Waker,唤醒这个Future使命去调度履行。

构建一个履行器

上面的代码,咱们并没有完结调度器,而是运用的futures库中供给的一个调度器去履行,下面自己完结一个调度器,看一下它的原理。而在Rust中,真正要用的话,还是要学习tokio库,这儿咱们只是为了讲述一下完结原理,以便于了解异步是怎样一回事。关键代码如下:

// future_executor.rs
use {
    futures::{
        future::{BoxFuture, FutureExt},
        task::{waker_ref, ArcWake},
    },
    std::{
        future::Future,
        sync::mpsc::{sync_channel, Receiver, SyncSender},
        sync::{Arc, Mutex},
        task::Context,
        time::Duration,
    },
};
mod future_timer;
 // 引进之前完结的定时器模块
use future_timer::TimerFuture;
/// 使命履行器,担任从通道中接收使命然后履行
struct Executor {
    ready_queue: Receiver<Arc<Task>>,
}
/// `Spawner`担任创立新的`Future`然后将它发送到使命通道中
#[derive(Clone)]
struct Spawner {
    task_sender: SyncSender<Arc<Task>>,
}
/// 一个 Future,它能够调度自己(将自己放入使命通道中),然后等候履行器去`poll`
struct Task {
    /// 进行中的Future,在未来的某个时刻点会被完结
    ///
    /// 按理来说`Mutex`在这儿是剩余的,由于咱们只要一个线程来履行使命。可是由于
    /// Rust并不聪明,它无法知道`Future`只会在一个线程内被修改,并不会被跨线程修改。因此
    /// 咱们需求运用`Mutex`来满意这个笨笨的编译器对线程安全的执着。
    ///
    /// 假如是出产级的履行器完结,不会运用`Mutex`,由于会带来性能上的开支,取而代之的是运用`UnsafeCell`
    future: Mutex<Option<BoxFuture<'static, ()>>>,
    /// 能够将该使命本身放回到使命通道中,等候履行器的poll
    task_sender: SyncSender<Arc<Task>>,
}
fn new_executor_and_spawner() -> (Executor, Spawner) {
    // 使命通道允许的最大缓冲数(使命行列的最大长度)
    // 当时的完结只是是为了简略,在实际的履行中,并不会这么运用
    const MAX_QUEUED_TASKS: usize = 10_000;
    let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);
    (Executor { ready_queue }, Spawner { task_sender })
}
impl Spawner {
    fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
        let future = future.boxed();
        let task = Arc::new(Task {
            future: Mutex::new(Some(future)),
            task_sender: self.task_sender.clone(),
        });
        println!("first dispatch the future task to executor.");
        self.task_sender.send(task).expect("too many tasks queued.");
    }
}
/// 完结ArcWake,标明怎样去唤醒使命去调度履行。
impl ArcWake for Task {
    fn wake_by_ref(arc_self: &Arc<Self>) {
        // 经过发送使命到使命管道的方法来完结`wake`,这样`wake`后,使命就能被履行器`poll`
        let cloned = arc_self.clone();
        arc_self
            .task_sender
            .send(cloned)
            .expect("too many tasks queued");
    }
}
impl Executor {
     // 实际运转具体的Future使命,不断的接收Future task履行。
    fn run(&self) {
        let mut count = 0;
        while let Ok(task) = self.ready_queue.recv() {
            count = count + 1;
            println!("received task. {}", count);
            // 获取一个future,若它还没有完结(仍然是Some,不是None),则对它进行一次poll并测验完结它
            let mut future_slot = task.future.lock().unwrap();
            if let Some(mut future) = future_slot.take() {
                // 根据使命本身创立一个 `LocalWaker`
                let waker = waker_ref(&task);
                let context = &mut Context::from_waker(&*waker);
                // `BoxFuture<T>`是`Pin<Box<dyn Future<Output = T> + Send + 'static>>`的类型别号
                // 经过调用`as_mut`办法,能够将上面的类型转换成`Pin<&mut dyn Future + Send + 'static>`
                if future.as_mut().poll(context).is_pending() {
                    println!("executor run the future task, but is not ready, create a future again.");
                    // Future还没履行完,因此将它放回使命中,等候下次被poll
                    *future_slot = Some(future);
                } else {
                    println!("executor run the future task, is ready. the future task is done.");
                }
            }
        }
    }
}
fn main() {
    let (executor, spawner) = new_executor_and_spawner();
   // 将 TimerFuture 封装成一个使命,分发到调度器去履行
    spawner.spawn(async {
        println!("TimerFuture await");
        // 创立定时器Future,并等候它完结
        TimerFuture::new(Duration::new(10, 0)).await;
        println!("TimerFuture Done");
    });
    // drop掉使命,这样履行器就知道使命现已完结,不会再有新的使命进来
    drop(spawner);
    // 运转履行器直到使命行列为空
    // 使命运转后,会先打印`howdy!`, 暂停2秒,接着打印 `done!`
    executor.run();
}

运转成果如下:

first dispatch the future task to executor.
received task. 1
TimerFuture await
future not ready, tell the future task how to wakeup to executor
executor run the future task, but is not ready, create a future again.
detect future is ready, wakeup the future task to executor.
received task. 2
future ready. execute poll to return.
TimerFuture Done
executor run the future task, is ready. the future task is done.

第一次调度的时分,由于还没有安排妥当,在Pending状况,告知这个使命,后边安排妥当是怎样唤醒wake该使命。然后当事情安排妥当的时分,由于前面告知了怎么唤醒,按办法唤醒了该使命去调度履行。

异步处理流程

Reactor Pattern 是构建高性能事情驱动系统的一个很典型形式,executor 和 reactor 是 Reactor Pattern 的组成部分。Reactor pattern 包含三部分:

  • task:待处理的使命。使命能够被打断,并且把控制权交给 executor,等候之后的调度
  • executor:一个调度器。保护等候运转的使命(ready queue),以及被堵塞的使命(wait queue)
  • reactor:保护事情行列。当事情来暂时,告知 executor 唤醒某个使命等候运转

executor 会调度履行待处理的使命,当使命无法持续进行却又没有完结时,它会挂起使命,并设置好适宜的唤醒条件。之后,假如 reactor 得到了满意条件的事情,它会唤醒之前挂起的使命,然后 executor 就有机会持续履行这个使命。这样一直循环下去,直到使命履行完毕。

Rust 运用 Future 做异步处理便是一个典型的 Reactor Pattern 形式。

以 tokio 为例:async/await 供给语法层面的支撑,Future 是异步使命的数据结构,当 .await 时,executor 就会调度并履行它

下图为 tokio 上 Future 异步处理整个流程:

Rust 异步编程之 Future 执行器与任务调度

引证自《陈天 Rust 编程第一课》

注:tokio 的调度器会运转在多个线程上,运转线程上自己的 ready queue 上的使命(Future),假如没有,就去别的线程的调度器上偷一些过来运转(work-stealing 调度机制)。当某个使命无法再持续取得发展,此时 Future 运转的成果是Poll::Pending,那么调度器会挂起使命,并设置好适宜的唤醒条件(Waker),等候被 reactor 唤醒。而reactor 会利用操作系统供给的异步 I/O(如epoll / kqueue / IOCP),来监听操作系统供给的 IO 事情,当遇到满意条件的事情时,就会调用 Waker.wake() 唤醒被挂起的 Future,这个 Future 会回到 ready queue 等候履行。

总结

Future 是 Rust 异步编程的核心,代表一些将在未来完结的操作。 Rust 的Future 是慵懒的,需求履行器executor 调度履行,这种调度履行完结根据轮询,在当时轮询 poll 中,若 Future 完结了,则回来Poll::Ready(result),即得到 Future 的值并回来;若 Future 还没完结,则回来Poll::Pending(),此时 Future 会被挂起,需求等某个事情产生 Waker 将其唤醒,Waker 供给wake()办法来告知履行器哪个相关使命应该要唤醒。当wake()函数被调用时, 履行器知道 Waker 相关的使命现已准备好持续了,该 future 会被再轮询一遍。这种 wake 告知然后 poll 的方法会不断重复,直到 Future 完结。

每个异步使命分成三个阶段:

  1. 轮询阶段(The Poll): 调度履行器(executor)触发一个Future被轮询后,开端履行,遇堵塞(Pending)则挂起进入等候阶段。
  2. 等候阶段:事情源(一般称为reactor)注册 Waker 等候一个事情产生,当该事情准备好时唤醒 wake 相应的Future,进入唤醒阶段。
  3. 唤醒阶段:事情产生,相应的Future被 Waker 唤醒。 履行器(executor)调度Future再次被轮询,并向前走一步,直到它完结或达到一个Pending点,不能再向前走, 如此往复,直到终究完结。

参考

  • Rust 圣经 – 异步编程

  • Rust 异步编程

  • 200 行代码讲透 RUST FUTURES

  • Futures Explained in 200 Lines of Rust

  • 陈天 Rust 编程第一课 – 异步处理