Stream是什么

Stream 是一系列异步事情的序列。其类似于一个异步的 Iterable,不同的是当你向 Iterable 获取下一个事情时它会当即给你,可是 Stream 则不会当即给你而是在它准备好时告诉你。

单个 0个或多个
Sync int Interable<int>
Async Future<int> Stream<int>

Stream目标

  • StreamController:用于整个Stream过程的控制,供给各类接口用于创立各种事情流。
  • StreamSink:事情的入口,add,addStream等。
  • Stream:事情源自身,一般可用于监听事情或者对事情进行转换,如listenwhere
  • StreamSubscription:事情订阅后的目标,表面上用于办理订阅过各类操作,如cancelpause,同时在内部也是事情的中转关键。

联系

有一个事情源叫Stream,为了方便控制Stream,官方供给了StreamController作为办理;同时它对外供给了StreamSink目标作为事情入口;又为Stream特点供给了监听和变换,最终得到StreamSubscription能够办理事情的订阅。

Flutter Stream

Stream作业原理

  1. Streamlisten的时分传入onData回调,这个回调会传入到StreamSubscription中,之后经过zone.registerUnaryCallback注册得到_onData目标
  2. StreamSink在增加事情时,会履行到StreamSubscription中的_sendData办法,然后经过_zone.runUnaryGuarded(_onData, data)履行1中得到的_onData目标,触发listen时传入的回调办法
//1.listen传入onData回调到StreamSubscription中
StreamSubscription<T> listen(void onData(T data)?,
    {Function? onError, void onDone()?, bool? cancelOnError}) {
  cancelOnError ??= false;
  StreamSubscription<T> subscription =
      _createSubscription(onData, onError, onDone, cancelOnError);
  _onListen(subscription);
  return subscription;
}
//为节省篇幅,已省掉部分代码
//在此,现已获取到_onData函数目标
_onData = _registerDataHandler<T>(_zone, onData),
//把onData传入进行注册
static void Function(T) _registerDataHandler<T>(
    Zone zone, void Function(T)? handleData) {
  return zone.registerUnaryCallback<void, T>(handleData ?? _nullDataHandler);
}
仿制代码
//2.sink增加事情StreamSubscription._sendData,然后调用_zone.runUnaryGuarded(_onData, data),
/* _EventDispatch interface. */
void _sendData(T data) {
  assert(!_isCanceled);
  assert(!_isPaused);
  assert(!_inCallback);
  bool wasInputPaused = _isInputPaused;
  _state |= _STATE_IN_CALLBACK;
  _zone.runUnaryGuarded(_onData, data);
  _state &= ~_STATE_IN_CALLBACK;
  _checkState(wasInputPaused);
}

关于zonezone.registerUnaryCallback,zone.runUnaryGuarded,详细请看这儿

值得留意的是,Stream是在microtask中被调度的

Stream支撑同步、异步(默认异步),播送与非播送

Stream支撑以下办法:

  • map():将此stream的每个元素转换为一个新的stream事情。
  • where():经过当时stream创立一个新的stream并依据条件丢弃某些元素
  • distinct():假如数据事情与前一个数据事情相等,则越过数据事情。
  • ….

Stream的使用

经过Stream结构器创立

  • Stream.fromFuture:经过Future创立一个单一订阅stream
  • Stream.fromIterable:经过Iterable的数据创立一个单一订阅的stream
  • Stream.fromFutures:经过一组Future创立单一个单一订阅流的stream
  • Stream.periodic:经过时段创立一个重复发出事情的stream

经过StreamController创立

import 'dart:async';
void main() {
  //1.创立一个恣意类型StreamController目标
  StreamController streamController = StreamController(
      onListen: () => print('listen'),
      onCancel: () => print('cancel'),
      onPause: () => print('pause'),
      onResume: () => print('resumr'));
  //2.经过sink槽口增加恣意类型事情数据
  streamController.sink.add(100);
  streamController.sink.add(100.121212);
  streamController.sink.add('THIS IS STRING');
  streamController.sink.close();//只有手动调用close办法发送一个done事情,onDone才会被回调
  //3.注册监听
  streamController.stream.listen((event) => print(event),
      onDone: () => print('is done'),
      onError: (error, stacktrace) => print('is error, errMsg: $error'),
      cancelOnError: true);
}

经过async*创立

留意:async*是Dart的关键字,表明该函数回来的是stream,yield是回来Iterable的单个数据,而yield*后边跟stream


asyncGet(31).listen((event) {
  print(event);
});
Stream<String> asyncGet(int count) async* {
  yield* asyncGetString(count).map((event) => event + 'C');
}
Stream<String> asyncGetString(int count) async* {
  for (int i = 0; i < count; i++) {
    yield await delayedGet(i);
  }
}
Future<String> delayedGet(int i) async {
  await Future.delayed(Duration(seconds: 1));
  return i.toString() + 'B';
}

参考资料

/post/684490…

/post/694210…