Stream是什么
Stream 是一系列异步事情的序列。其类似于一个异步的 Iterable,不同的是当你向 Iterable 获取下一个事情时它会当即给你,可是 Stream 则不会当即给你而是在它准备好时告诉你。
| 单个 | 0个或多个 | |
|---|---|---|
| Sync | int |
Interable<int> |
| Async | Future<int> |
Stream<int> |
Stream目标
- StreamController:用于整个Stream过程的控制,供给各类接口用于创立各种事情流。
- StreamSink:事情的入口,
add,addStream等。 - Stream:事情源自身,一般可用于监听事情或者对事情进行转换,如
listen、where。 - StreamSubscription:事情订阅后的目标,表面上用于办理订阅过各类操作,如
cancel、pause,同时在内部也是事情的中转关键。
联系
有一个事情源叫Stream,为了方便控制Stream,官方供给了StreamController作为办理;同时它对外供给了
StreamSink目标作为事情入口;又为Stream特点供给了监听和变换,最终得到StreamSubscription能够办理事情的订阅。
Stream作业原理
-
Stream在listen的时分传入onData回调,这个回调会传入到StreamSubscription中,之后经过zone.registerUnaryCallback注册得到_onData目标 -
StreamSink在增加事情时,会履行到StreamSubscription中的_sendData办法,然后经过_zone.runUnaryGuarded(_onData, data)履行1中得到的_onData目标,触发listen时传入的回调办法
//1.listen传入onData回调到StreamSubscription中
StreamSubscription<T> listen(void onData(T data)?,
{Function? onError, void onDone()?, bool? cancelOnError}) {
cancelOnError ??= false;
StreamSubscription<T> subscription =
_createSubscription(onData, onError, onDone, cancelOnError);
_onListen(subscription);
return subscription;
}
//为节省篇幅,已省掉部分代码
//在此,现已获取到_onData函数目标
_onData = _registerDataHandler<T>(_zone, onData),
//把onData传入进行注册
static void Function(T) _registerDataHandler<T>(
Zone zone, void Function(T)? handleData) {
return zone.registerUnaryCallback<void, T>(handleData ?? _nullDataHandler);
}
仿制代码
//2.sink增加事情StreamSubscription._sendData,然后调用_zone.runUnaryGuarded(_onData, data),
/* _EventDispatch interface. */
void _sendData(T data) {
assert(!_isCanceled);
assert(!_isPaused);
assert(!_inCallback);
bool wasInputPaused = _isInputPaused;
_state |= _STATE_IN_CALLBACK;
_zone.runUnaryGuarded(_onData, data);
_state &= ~_STATE_IN_CALLBACK;
_checkState(wasInputPaused);
}
关于zone和zone.registerUnaryCallback,zone.runUnaryGuarded,详细请看这儿
值得留意的是,Stream是在microtask中被调度的
Stream支撑同步、异步(默认异步),播送与非播送
Stream支撑以下办法:
-
map():将此stream的每个元素转换为一个新的stream事情。 -
where():经过当时stream创立一个新的stream并依据条件丢弃某些元素 -
distinct():假如数据事情与前一个数据事情相等,则越过数据事情。 - ….
Stream的使用
经过Stream结构器创立
-
Stream.fromFuture:经过
Future创立一个单一订阅stream -
Stream.fromIterable:经过
Iterable的数据创立一个单一订阅的stream -
Stream.fromFutures:经过一组
Future创立单一个单一订阅流的stream -
Stream.periodic:经过时段创立一个重复发出事情的
stream
经过StreamController创立
import 'dart:async';
void main() {
//1.创立一个恣意类型StreamController目标
StreamController streamController = StreamController(
onListen: () => print('listen'),
onCancel: () => print('cancel'),
onPause: () => print('pause'),
onResume: () => print('resumr'));
//2.经过sink槽口增加恣意类型事情数据
streamController.sink.add(100);
streamController.sink.add(100.121212);
streamController.sink.add('THIS IS STRING');
streamController.sink.close();//只有手动调用close办法发送一个done事情,onDone才会被回调
//3.注册监听
streamController.stream.listen((event) => print(event),
onDone: () => print('is done'),
onError: (error, stacktrace) => print('is error, errMsg: $error'),
cancelOnError: true);
}
经过async*创立
留意:async*是Dart的关键字,表明该函数回来的是stream,yield是回来Iterable的单个数据,而yield*后边跟stream
asyncGet(31).listen((event) {
print(event);
});
Stream<String> asyncGet(int count) async* {
yield* asyncGetString(count).map((event) => event + 'C');
}
Stream<String> asyncGetString(int count) async* {
for (int i = 0; i < count; i++) {
yield await delayedGet(i);
}
}
Future<String> delayedGet(int i) async {
await Future.delayed(Duration(seconds: 1));
return i.toString() + 'B';
}
参考资料
/post/684490…
/post/694210…
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。

