Stream 是 Flutter 处理数据呼应的一个重要手法,它供给了一种处理数据流的办法,其效果相似于 Kotlin 中的 Flow,依据发布订阅模式的规划,经过监听Stream,订阅者能不断接收到数据流的最新改变。
Stream 的基本用法
Stream能经过async*和StreamController发生,也能经过其它Stream转化而来。相较于async*,StreamController由于灵活度更高,因而更为常见,两者在运用场景上也有必定不同。
async*
信任大家必定知道async,但async*就未必,同样作为 Flutter 里异步处理的一环,async主要跟Future打交道,而async*处理的目标是Stream,async*在运用上需求搭配yield。下面这段代码演示了怎么运用async*进行 1 到 10 的相加。
Stream<int> countStream(int to) async* {
print("stream 被监听");
for (int i = 1; i <= to; i++) {
yield i;
}
}
Future<int> sumStream(Stream<int> stream) async {
var sum = 0;
await for (final value in stream) {
sum += value;
}
return sum;
}
void main() async {
var stream = countStream(10);
// 当注释掉下面这行,控制台不会打印出 "stream 被监听",也就表明 async* 办法体没被履行
var sum = await sumStream(stream);
print(sum); // 55
}
在上面的示例中,async*办法体里yield在每次的遍历中,都往Stream回来一个数据,经过 await for的监听拿到每次回来的值,接着履行sum操作。值得注意的是,async*这种办法发生的stream,当stream没有被监听时,async* 办法体是不会被履行的。
假如你看着async*还有点别扭,请记住:async回来的是一个Future,而async*回来的是一个Stream。
StreamController
日常开发中,通常会经过StreamController创立Stream。只需求构造出StreamController目标,经过这个目标的.stream就能够得到Stream。
Stream<int> countStream(int to) {
// 先创立 StreamController
late StreamController<int> controller;
controller = StreamController<int>(onListen: () {
// 当 Stream 被监听时会触发 onListen 回调
for (var i = 0; i < to; i++) {
controller.add(i);
}
controller.close();
});
return controller.stream;
}
Future<int> listenOn(Stream<int> stream) async {
var completer = Completer<int>();
var sum = 0;
// 监听 stream
stream.listen(
(event) {
sum += event;
},
onDone: () => completer.complete(sum),
);
return completer.future;
}
void main() async {
var stream = countStream(10);
// 当注释掉下面这行,控制台也不会打印出 "stream 被监听"
var sum = await listenOn(stream);
print(sum); // 55
}
在创立StreamController的时候传入了一个onListen回调,当流第一次被监听的时候,会触发这个回调,此刻会往流里边依次增加多个数据,listenOn办法里拿到这些数据履行相加操作。这儿运用了stream的listen的办法进行监听。
Stream 左右护法
Flutter 中的 Stream 处理,涉及到三类目标,以发布订阅模式的视点去看的话,能够分为发布者 StreamController、数据通道 Stream、订阅者 StreamSubscription。
class Example {
var controller = StreamController<int>();
Stream<int> get stream => controller.stream;
StreamSubscription<int>? _subscription;
void initState() {
_subscription = stream.listen((event) {
print(event);
});
for(var i = 0; i <= 10; i++) {
controller.add(i);
}
}
void dispose() {
_subscription?.cancel();
_subscription = null;
}
}
每一个StreamController都对应着一个Stream,当Stream被订阅时,会得到一个StreamSubscription目标。
上面的比方中,接口运用是简单的,可是他们内部的工作原理是怎么?一个事情从发布到消费中心经过了哪些流程?
数据流向图

在事情处理上:Stream在被订阅时,会创立StreamSubscription,并将其间的onData等事情处理的回调传给StreamSubscription。
在事情输入输出上:StreamController经过add办法输入事情后,先判别此刻是否存在订阅者StreamSubscription,假如存在则调用StreamSubscription的onData处理,不存在就先存到_pendingEvents里,等到下次StreamSubscription呈现了再向它输出事情。
能够看到,StreamController 在整个事情流向的处理中肩负着最重要的使命,它控制着事情怎么输入和输出,StreamSubscription担任处理输出到这儿的事情,Stream在得到StreamSubscription后就完成了它的使命挑选“退隐山林”。
这么讲或许还有点“干”,为了更直观的介绍他们各自的责任,接下来咱们从他们界说的接口出发,去考虑他们都能做哪些事情。为了方便呈现,我只取其间最关键的部分。
StreamController
abstract interface class StreamController<T> implements StreamSink<T> {
// stream 流
Stream<T> get stream;
// 流状况的回调
abstract void Function()? onListen; // 被监听
abstract void Function()? onPause; // 流暂停
abstract void Function()? onResume; // 流康复
abstract void Function()? onCancel; // 流撤销/封闭
// 流状况
bool get isClosed;
bool get isPaused;
bool get hasLitener; // 当时流是否有订阅者
// 监听 source,转发给 stream
Future addStream(Stream<T> source, {bool? cancelOnError});
// 往流里边增加事情
void add(T event);
void addError(Object error, [StackTrace? stackTrace]);
Future close(); // 封闭流
// 输出事情
// 以下这三个接口在 _StreamControllerBase 中
void _sendData(T data);
void _sendError(Object error, StackTrace stackTrace);
void _sendDone();
}
-
StreamController担任办理事情流的状况,当状况改变时,会触发到相应的回调(onListen/onPause等)。 -
StreamController担任事情的输入,输入的办法有两种,一种是事情接口add、addError;别的一种是经过监听其它的Stream;一起事情也分为两种,一种是正常事情,一种是错误事情。 -
StreamController能封闭这个事情流通道,会发生一个onDone事情。 -
StreamController担任事情的输出,不同的输入对应不同的输出。
Stream
abstract mixin class Stream<T> {
// 是否地播送流,播送流允许多订阅
bool get isBroadcast => false;
// 监听流改变,回来订阅者
StreamSubscription<T> listen(void onData(T event)?,
{Function? onError, void onDone()?, bool? cancelOnError});
// 一系列 Stream 处理和改换操作
Stream<T> where(bool test(T event)) { ... }
Stream<S> map<S>(S convert(T event)) { ... }
Stream<E> asyncMap<E>(FutureOr<E> convert(T event)) { ...}
Stream<E> asyncExpand<E>(Stream<E>? convert(T event)) { ... }
Stream<T> handleError(Function onError, {bool test(error)?}) { ... }
...
}
-
Stream露出事情流的订阅办法listen,回来当时订阅者,并把listen办法中的onData等参数注册到当时订阅者里边。 -
Stream有许多过滤转化等语法糖办法。
StreamSubscription
abstract interface class StreamSubscription<T> {
// 监听数据改变
void onData(void handleData(T data)?);
void onError(Function? handleError);
void onDone(void handleDone()?);
// 暂停/康复 监听
void pause([Future<void>? resumeSignal]);
void resume();
bool get isPaused;
// 撤销监听
Future<void> cancel();
// 转成 Future 目标,监听流完毕事情
Future<E> asFuture<E>([E? futureValue]);
}
-
StreamSubscription作为事情输出端,担任事情的输出处理。 -
StreamSubscription也能对自己的订阅行为进行暂停、康复或撤销等动作。
Stream 的分类
Stream有许多子类,对应不同场景的完成。比方关于输入端而言,能够分为同步流和异步流;在输出端上,又可分为播送流和非播送流。
同步和异步
StreamController的工厂办法中,经过sync能够指定同步或许异步。同步和异步的区别是:事情输入后是否会立即输出。 同步流在事情输入后会立刻履行onData,异步流在事情输入后会注册一个异步事情,等到当时EventLoop中的同步事情处理后触发onData。
factory StreamController(
{void onListen()?,
void onPause()?,
void onResume()?,
FutureOr<void> onCancel()?,
bool sync = false}) {
return sync
? _SyncStreamController<T>(onListen, onPause, onResume, onCancel)
: _AsyncStreamController<T>(onListen, onPause, onResume, onCancel);
}
在完成上看,_SyncStreamController终究输出时运用的是_SyncStreamControllerDispatch,_AsyncStreamController运用的是_AsyncStreamControllerDispatch。
两者在输出处理不同,_SyncStreamControllerDispatch调用的是subscription的_add办法,_AsyncStreamControllerDispatch调用的是subscription的_addPending办法。_addPending会先将事情存到行列里,一起假如行列没有在跑就敞开行列的处理,经过scheduleMicrotask对事情进行异步处理,处理完当时事情持续处理行列时的其它事情,直到行列清空。
播送和非播送
上述代码中出产的对错播送流,播送流经过StreamController.broadcast办法创立。播送和非播送的区别是是否允许多次订阅。
factory StreamController.broadcast(
{void onListen()?, void onCancel()?, bool sync = false}) {
return sync
? _SyncBroadcastStreamController<T>(onListen, onCancel)
: _AsyncBroadcastStreamController<T>(onListen, onCancel);
}
非播送StreamController承继自_StreamController,播送StreamController承继自_BroadcastStreamController,两者的区别能够经过_subscribe的完成表现。_StreamController的完成如下,当重复订阅后会直接抛出 StateError 反常。
StreamSubscription<T> _subscribe(void onData(T data)?, Function? onError,
void onDone()?, bool cancelOnError) {
if (!_isInitialState) {
throw StateError("Stream has already been listened to.");
}
_ControllerSubscription<T> subscription = _ControllerSubscription<T>(
this, onData, onError, onDone, cancelOnError);
// ...
return subscription;
}
_BroadcastStreamController里边有两个目标_firstSubscription、_lastSubscription,_BroadcastSubscription是双向链表结构。当需求输出事情时,经过整个链表,告诉一切的订阅进行音讯的处理。
_BroadcastSubscription<T>? _firstSubscription;
_BroadcastSubscription<T>? _lastSubscription;
开发实战
经过前面接口和分类的分析,咱们对这 Stream 有了更深入的知道。刀已磨好,接下来便经过两个比方来试试这把刀究竟锋不尖利。
利用 Stream 完成事情的播送
事情的播送,在开发时总会遇到,尤其是在跨组件或跨页面的场景,信任大部分开发者的项目里也都会引进相似EventBus的三方或自研框架。比方:当我在修改个人资料时,Save之后需求告诉其它页面进行改写以展现最新的个人信息。
// user entity
class UserInfo {
int uid;
String name;
UserInfo(this.uid, this.name);
}
// userinfo update
class UserInfoChangeEvent {
static final _controller = StreamController<UserInfo>.broadcast();
static StreamSubscription<UserInfo> subscribe(Function(UserInfo) callback) {
return _controller.stream.listen(callback);
}
static void broadcast(UserInfo userInfo) {
_controller.add(userInfo);
}
}
// 用户修改页面
class UserProfileViewModel {
...
// 点击 save 时,会调用到 broadcast 办法向外发送事情
void onSave(int uid, String name) {
UserInfoChangeEvent.broadcast(UserInfo(uid, name));
}
}
// 其它页面状况改写
class ViewState extends State<ViewWidget> {
StreamSubscription<UserInfo>? _subscription;
UserInfo? _curUserInfo;
@override
void initState() {
super.initState();
// 初始化时,监听 UserInfoChangeEvent
_subscription = UserInfoChangeEvent.subscribe((userInfo) {
setState(() {
_curUserInfo = userInfo;
})
});
}
@override
void dispose() {
super.dispose();
// 退出时,要撤销监听。不然会有内存泄漏
_subscription?.cancel();
_subscription = null;
}
}
这儿,UserInfoChangeEvent 界说了播送类型的StreamController,而且向外露出了subscribe和boradcast接口,用户修改页面在点击save时走到onSave办法,这个办法里调用了UserInfoChangeEvent的broadcast办法向外发送了一个更改信息的事情;
接着ViewState这儿在initState时经过UserInfoChangeEvent的subscribe办法注册了监听,接收到了事情赋值到当时_curUserInfo,调用setState改写页面。
StreamBuilder 完成 Widget 主动改写
Flutter 供给了一个组件StreamBuilder,能协助咱们方便的监听Stream并改写 Widget。例如进入一个页面时,通常会有一个数据加载的过程,此刻页面会阅历 Loading -> Loaded/LoadError 的状况变更,不同的状况会呈现不同的页面 UI,这时咱们就需求界说一个 LoadingState 的枚举类型,在数据加载后时经过 StreamController 发布 LoadState 状况,StreamBuilder监听到更新然后会主动触发 Widget 的改写。AsyncSnapShot是快照的意思,保存着此刻最新的事情。
StreamBuilder<LoadingState>(
stream: viewModel.loadingStateStream,
initialData: LoadingState.loading,
builder: (BuildContext context, AsyncSnapshot snapshot) {
// 依据 snapshot 的数据处理回来
var data = snapshot.data;
if (data == LoadingState.Loaded) {
reutrn Container(child: Text("Loaded Success"));
} else if (data == LoadingStat.LoadError) {
reutrn Container(child: Text("Loading Error"));
} else {
return LoadingView()
}
},
)
不过,StreamBuilder也有坑。咱们知道,关于Stream来说,事情被消费了就会丢掉,无论是StreamController仍是Stream都不会保存上次的值,以页面加载为例,页面进来后ViewModel履行数据加载完成后,向loadingStateStream里发布了Loaded的状况,假如此刻页面还没有布局StreamBuilder,StreamBuilder就无法收到这次监听,等到后边StreamBuilder真实增加到界面上时现已错过了上次的事情,AsyncSnapshot拿到的仍是initialData时设置的数据,也就是 loading 态,这样状况就会展现反常。
你或许会有疑问,为什么StreamBuilder不能一开始就增加到页面的build方面里?当然能够,但即便如此也无法保证StreamBuilder的监听必定会比viewModel的状况更新早,由于假如页面的内容较长,一开始StreamBuilder还不在可视区内,它的initState办法就不会履行,也就不会监听loadingStateStream。
StreamBuilder会面对这种囧境,归根究竟是由于Stream的规划。
Stream 的改换和处理
前面在介绍Stream的接口时,咱们提到过Stream里边有许多操作办法。在这part,侧重挑几个从姓名上不太好了解的打开讲讲。
Future<E> drain<E>([E? futureValue]);
drain意为“排出、消耗“。这儿指”排掉”这条流中心一切的事情,只呼应完毕信号,当流封闭时回来 futureValue。
final result = await Stream.fromIterable([1, 2, 3]).drain(100);
print(result); // Outputs: 100.
Future<S> fold<S>(S initialValue, S Function(S previous, T element) combine)
事情迭代。依据combine兼并流里边的事情,该办法能够指定回来的类型S,一起能够指定初始值 initialValue。
final result = await Stream<int>.fromIterable([2, 4, 6, 8, 10])
.fold<String>("0", (previous, element) => "$previous - $element");
print(result); // 0 - 2 - 4 - 6 - 8 - 10
Future<T> reduce(T Function(T previous, T element) combine);
也是事情迭代。与fold不同的是,reduce无法指定初始值且它只能回来与原流相同的类型T。
final result = await Stream.fromIterable<int>([2, 6, 10, 8, 2])
.fold<int>(10, (previous, element) => previous + element);
print(result); // 38
Future pipe(StreamConsumer<T> streamConsumer);
流管道拼接。将当时流的事情流向streamConsumer中,streamConsumer的子类完成通常是一个StreamController,拿到事情后告诉给它的订阅者。
var controller = StreamController<int>();
var stream = controller.stream;
stream.listen((event) {
print(event); // 2 4 6 8 10
});
var result = await Stream<int>.fromIterable([2, 4, 6, 8, 10]).pipe(controller);
print("result: $result"); // null
Stream<E> asyncExpand<E>(Stream<E>? Function(T event) convert);
异步打开。将原流中的事情做一次打开操作,得到一个E类型的新流。
var stream = Stream<int>.fromIterable([2, 4, 6, 8, 10]);
var newStream = stream.asyncExpand((event) {
return Stream<int>.fromIterable([event, event + 1]);
});
newStream.listen((event){
print(event); // 2 3 4 5 6 7 8 9 10
});
Stream<E> asyncMap<E>(FutureOr<E> Function(T event) convert);
异步映射。跟asyncExpand相似,只是转化操作回来的是FutureOr目标,为那些转化过程中涉及到异步处理的场景供给便当。
var newStream = stream.asyncMap((event) async {
await Future.delayed(const Duration(seconds: 1));
return event + 1;
});
newStream.listen((event){
print(event); // 3 5 7 9 11
});
真实忍不住想吐槽一下,有些办法的姓名起的诚心不昨滴,其间部分都有点“挂羊头卖狗肉”的感觉了。。。
你真的懂了吗?
讲了许多,现在来查验一下。假设有一段逻辑,controller会增加三个事情,分别是add(1) add(2) add()3,subscription会在每次收到事情时打印output: $event,中心会有一次暂停,3 秒后康复,猜一下在以下几种场景下最终输出的顺序是什么?
1. 同步流
void main() async {
// 同步流:sync 为 true
var controller = StreamController<int>(sync: true);
var subscription = controller.stream.listen((event) {
print('output: $event');
});
controller.add(1);
controller.add(2);
controller.add(3);
print('暂停');
subscription.pause();
Future.delayed(const Duration(seconds: 3), () {
print('3秒后 -> 康复');
subscription.resume();
});
}
// will print:
// output: 1
// output: 2
// output: 3
// 暂停
// 3秒后 -> 康复
2. 异步流
保持 1 中其它代码不变,将sync的值设置成false。
// will print:
// 暂停
// 3秒后 -> 康复
// output: 1
// output: 2
// output: 3
3. 异步流:运用 Future.delayed 推迟暂停
保持 2 中其它代码不变,将暂停康复推迟Duraiton.zero。
Future.delayed(Duration.zero, (){
print('暂停');
subscription.pause();
Future.delayed(const Duration(seconds: 3), () {
print('3秒后 -> 康复');
subscription.resume();
});
});
// will print
// output: 1
// output: 2
// output: 3
// 暂停
// 3秒后 -> 康复
4. 异步流:运用 scheduleMicrotask 推迟暂停
保持 3 中其它代码不变,用scheduleMicrotask代替Future.delayed。
scheduleMicrotask(() {
print('暂停');
subscription.pause();
Future.delayed(const Duration(seconds: 3), () {
print('3秒后 -> 康复');
subscription.resume();
});
});
// will print
// output: 1
// 暂停
// 3秒后 -> 康复
// output: 2
// output: 3
上面的输出是否如你所料?信任假如你了解了之前的介绍,对1 2 3点的输出结果是没有问题的。可是关于第 4 点:虽然同样为推迟暂停,3 和 4 中的输出完全不一样,4 中的输出在输出output: 1后才会触发暂停。这又是为什么呢?要解说这个输出,就要从源码出发了。
结语
所以,咱们首先要从概念上了解他们,其次咱们还要从代码上知道详细的完成。当程度的履行不及预期,缺乏代码完成层面的了解,咱们便会显得手忙脚乱。像前面呈现的StreamBuilder处理中的坑和输出顺序的问题,只有阅读底层源码,才干发现原因并精确修复。下一篇文章,将从源码完成上深入分析Stream。
