Stream trait 相似于 Future trait, Future 对应的是一个 item 的状况的改变,但 Stream 与标准库中的 Iterator trait 相似,在完毕之前前能够生成多个值。或许咱们能够简略的理解为,Stream 是由一系列的 Future 组成,咱们能够从 Stream 读取各个 Future 的成果,直到 Stream 完毕。

Rust 异步编程之 Stream 流处理

Stream 的界说

Future 是异步开发中最根底的概念了,假如说 Future 代表了一次性的异步值,那么 Stream 则代表了一系列的异步值。Future 是1,Stream是0,1或许N。 Stream 签名如下:

pub trait Stream {
    type Item;
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
}

Stream 对应了同步原语中的 Iterator 的概念,回忆一下,是不是连签名都是如此的相像呢!

pub trait Iterator {
    type Item;
    fn next(&mut self) -> Option<Self::Item>;
}

Stream 用来笼统源源不断的数据源,当然也能够断(当 pollNone 的时分)

比方关于Stream的一个常见例子是音讯通道(futures包中的)的消费者Receiver。每次有音讯从Send端发送后,它都能够接收到一个Some(val)值, 一旦Send端关闭(drop),且音讯通道中没有音讯后,它会接收到一个None值。

use futures::channel::mpsc;
use futures::{executor::block_on, SinkExt, StreamExt};
async fn send_recv() {
    const BUFFER_SIZE: usize = 10;
    let (mut tx, mut rx) = mpsc::channel::<i32>(BUFFER_SIZE);
    println!("tx: Send 1, 2");
    tx.send(1).await.unwrap();
    tx.send(2).await.unwrap();
    drop(tx);
    // `StreamExt::next` 相似于 `Iterator::next`, 可是前者回来的不是值,
    // 而是一个 `Future<Output = Option<T>>`,因此还需求运用`.await`来获取详细的值
    assert_eq!(Some(1), rx.next().await);
    assert_eq!(Some(2), rx.next().await);
    assert_eq!(None, rx.next().await);
}
fn main() {
    block_on(send_recv());   
}

Iterator 和 Stream 的区别:

  • Iterator 能够不断调用next()办法,取得新的值,直到 Iterator 回来None。Iterator 是阻塞式回来数据的,每次调用 next(),必定独占 CPU 直到得到一个成果,而异步的 Stream 是非阻塞的,在等候的过程中会空出 CPU 做其他事情。
  • Streampoll_next()办法,它跟 Future 的poll()办法很像,和 Iterator 版本的 next() 的效果相似。可是poll_next()调用起来不方便,需求自己处理 Poll 状况,这不是很友好,所以 Rust 供给了 StreamExt,作为 Stream 的扩展,供给了next()办法,回来一个完成了 Future trait 的Next结构体,这样就能够直接经过stream.next().await来迭代一个值了。

注:StreamExt 是 StreamExtension 的简写。在 Rust 中,通常的做法是只在一个文件中放入最小界说(比方 Stream),且在另一个扩展的相关文件中放入额外的 api(比方 StreamExt)。

注:Stream trait 还没有像 future 相同在 Rust 的中心库(std::core)中,它在 future_utils crate 中,而 StreamExtensions 也不在标准库中。这意味着,由于不同的库供给不同的导入,你或许会得到冲突的导入。例如,tokio 供给不同的 StreamExt 与 futures_utils。假如能够的话,尽量运用 futures_utils,由于它是 async/await 最常用的 crate

StreamExt 的 next() 办法以及 Next 结构的完成:

pub trait StreamExt: Stream {
    fn next(&mut self) -> Next<'_, Self> where Self: Unpin {
        assert_future::<Option<Self::Item>, _>(Next::new(self))
    }
}
// next 回来了 Next 结构
pub struct Next<'a, St: ?Sized> {
    stream: &'a mut St,
}
// 假如 Stream Unpin 那么 Next 也是 Unpin
impl<St: ?Sized + Unpin> Unpin for Next<'_, St> {}
impl<'a, St: ?Sized + Stream + Unpin> Next<'a, St> {
    pub(super) fn new(stream: &'a mut St) -> Self {
        Self { stream }
    }
}
// Next 完成了 Future,每次 poll() 实际上就是从 stream 中 poll_next()
impl<St: ?Sized + Stream + Unpin> Future for Next<'_, St> {
    type Output = Option<St::Item>;
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        self.stream.poll_next_unpin(cx)
    }
}

Stream 的创立

futures 库供给了一些有用的办法来创立一些根本 Stream 流,例如:

  • empty():生成一个空的 Stream
  • once():生成一个只包括单个值的 Stream
  • pending():生成一个不包括任何值,只回来 Poll::Pending 的 Stream
  • repeat():生成一个一向回来相同值的 Stream
  • repeat_with():经过闭包函数无穷尽地回来数据的 Stream
  • poll_fn():经过一个回来 Poll 的闭包来产生 Stream
  • unfold():经过初始值和回来 Future 的闭包来产生 Stream
use futures::prelude::*;
#[tokio::main]
async fn main() {
    let mut st = stream::iter(1..10)
        .filter(|x| future::ready(x % 2 == 0))
        .map(|x| x * x);
    // 迭代
    while let Some(x) = st.next().await {
        println!("Got item: {}", x);
    }
}

上面代码运用stream::iter生成了一个 Stream,并对其进行 filter / map 的操作。最后,遍历整个 stream,把取得的数据打印出来。

当你不关心 async/await 的东西,而只对流感爱好时,Stream::iter 关于测验很有用。另一个风趣的是 repeat_with,在这里你能够传递一个闭包,来按需惰性生成流,比方:

use futures::stream::{self, StreamExt};
// From the zeroth to the third power of two:
async fn stream_repeat_with(){
    let mut curr = 1;
    let mut pow2 = futures::stream::repeat_with(|| { let tmp = curr; curr *= 2; tmp });
    assert_eq!(Some(1), pow2.next().await);
    assert_eq!(Some(2), pow2.next().await);
    assert_eq!(Some(4), pow2.next().await);
    assert_eq!(Some(8), pow2.next().await);
}

Stream 的完成

创立自己的 Stream 流涉及两个步骤:

  1. 首先创立一个结构体struct来保存流的状况
  2. 然后为该结构体struct完成Stream

让咱们创立一个名为Counter的流,它从15计数:

#![feature(async_stream)]
// 首先,结构体:
/// 从一数到五的流
struct Counter {
    count: usize,
}
// 咱们希望计数从一开始,所以让咱们添加一个 new() 办法来供给协助。
// 这不是严格必要的,但很方便。
// 请注意,咱们将 `count` 从零开始,咱们将在下面的 `poll_next () ` 的完成中看到其原因。
impl Counter {
    fn new() -> Counter {
        Counter { count: 0 }
    }
}
// 然后,咱们为 `Counter` 完成 `Stream`:
impl Stream for Counter {
    // 咱们将运用 usize 进行计数
    type Item = usize;
    // poll_next() 是仅有需求的办法
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        // 添加咱们的数量。这就是为什么咱们从零开始。
        self.count += 1;
        // 检查咱们是否已经完成计数。
        if self.count < 6 {
            Poll::Ready(Some(self.count))
        } else {
            Poll::Ready(None)
        }
    }
}

Stream Traits

Rust 中流供给的 trait 有多个,比方 StreamTryStreamFusedStream

  • Stream 与它对应的 Iterator 十分相似,只是当它回来 None 表示流耗尽时,此时不应该持续轮询流。假如这样做,就会进入未界说行为的规模,而且或许会出现一些杂乱无章的成果。
  • TryStream 是一个针对回来 Result<value, error> 流定制的特别 trait。TryStream 提出了能够轻松匹配和转换内部成果的函数。你能够将它们视为产生 Result 项的流的 API,而且这个 API 愈加方便。
  • FusedStream 和流是相同的,不过它能够让用户知道流在回来 None之后是否真的耗尽,或许是否能够再次轮询它。例如,假设你想创立一个由循环缓冲区支撑的流。在第一次迭代之后,FusedStream 将回来 None,可是在此之后重新轮询 FusedStream 是安全的,以便重新恢复该缓冲区新一轮的迭代。

迭代和并发

迭代器 Iterator 相似,Stream也能够迭代。 例如运用mapfilterfoldfor_eachskip等办法,以及它们的遇到错误提前回来的版本:try_maptry_filtertry_foldtry_for_each等等。

跟迭代器 Iterator 又有所不同的是for循环无法迭代Stream,可是命令式风格的循环while letloop,并不断显式地调用nexttry_next办法,比方能够运用下面两种循环读取的方法。

// 迭代方法 1
while let Some(value) = s.next().await {}
// 迭代方法 2
loop {
  match s.next().await {
    Some(value) => {}
    None => break;
  }
}

一个对 stream 流迭代核算(sum)的例子:

use futures_util::{pin_mut, Stream, stream, StreamExt};
async fn sum(stream: impl Stream<Item=usize>) -> usize {
    // 不要忘记在迭代流之前固定(pin)它
    pin_mut!(stream);
    let mut sum: usize = 0;
    // 迭代 stream
    while let Some(item) = stream.next().await {
        sum = sum + item;
    }
    sum
}

假如你选择一次只处理一个值,或许会形成无法并发,这就失去了异步编程的意义。假如要让一个Stream并发处理多个值,能够运用for_each_concurrenttry_for_each_concurrent办法:

use std::{pin::Pin, io};
use futures_util::{Stream, TryStreamExt};
async fn jump_around(stream: Pin<&mut dyn Stream<Item = Result<i32, io::Error>>>) -> Result<(), io::Error> {
    // 引入 `try_for_each_concurrent`
    stream.try_for_each_concurrent(100, |num| async move {
        jump_n_times(num).await?;
        report_n_jumps(num).await?;
        Ok(())
    }).await?;
    Ok(())
}
async fn jump_n_times(num: i32)-> Result<(), io::Error> {
    println!("jump_n_times :{}", num+1);
    Ok(())
}
async fn report_n_jumps(num: i32)-> Result<(), io::Error>{
    println!("report_n_jumps : {}", num);
    Ok(()) 
}

总结

Stream 和 Future 相似,可是 Future 对应的是一个 item 的状况的改变,而 Stream 则是相似于 iterator,在完毕之前能够得到多个值。或许咱们能够简略的理解为,Stream 是由一系列的 Future 组成,咱们能够从 Stream 读取各个 Future 的成果,直到 Stream 完毕,是异步迭代器。

Stream 的 poll_next 函数有三种或许的回来值,分别如下:

  • Poll::Pending:阐明下一个值还没有安排妥当,依然需求等候
  • Poll::Ready(Some(val)):表示已经安排妥当,成功回来一个值,能够经过调用poll_next再获取下一个值
  • Poll::Ready(None): 表示 Stream 已经完毕,不应该调用poll_next

参考

  • course.rs/advance/asy…
  • huangjj27.github.io/async-book/…
  • zhuanlan.zhihu.com/p/611587154
  • rustwiki.org/zh-CN/core/…
  • blog.csdn.net/suhanyujie/…
  • docs.rs/futures-uti…