Isolate是Dart中重要的异步通讯方式,它的效果和用法不再赘述,关键是怎样高雅的与它进行数据通讯。这儿实践了一种写法,能够按这种写法结合自己的工程比较高雅地完成数据通讯。

一般写法

在Dart中与ioslate通讯的一般用这种形式,也可看官方的示例代码

import 'dart:isolate';
void main() async {
  final recv = ReceivePort('main.incoming');
  final isolate = await Isolate.spawn<SendPort>(
    IsolateObj.setupIsolate,
    recv.sendPort,
  );
  final main = MainObj();
  recv.listen(main.handleResponsesFromIsolate);
  // call it somewhere
  main.create();
  ...
  ...
  recv.close();
  isolate.kill();
}
class MainObj {
  void handleResponsesFromIsolate(dynamic msg) {
  }
  void create() {
    _send?.send({
      'type': 'create',
    });
  }
}
class IsolateObj {
  static void setupIsolate(SendPort outgoing) async {
    final incoming = ReceivePort('_isolate.incoming');
    outgoing.send(incoming.sendPort);
  }
}

在Isolate中,响应每个音讯的动作或许是不同的,于是不得不有一个switch case,如果音讯的类型非常多则分支非常巨大。不管这个分支是针对数据类型的还是数据字段的,针对每个分支不得不新增许多处理办法:

class IsolateObj {
  final SendPort outgoing;
  IsolateObj(this.outgoing);
  void _create(Map<String, dynamic> msg) {
  }
  void _dispose(Map<String, dynamic> msg) {
  }
  void _handleMessageFromMain(Map<String, dynamic> msg) {
    final type = msg['type'];
    switch (type) {
      case 'create':
        _create(msg);
        break;
      case 'dispose':
        _dispose(msg);
        break;
    }
  }
  static void setupIsolate(SendPort outgoing) async {
    final incoming = ReceivePort('_isolate.incoming');
    outgoing.send(incoming.sendPort);
    final messages = incoming.cast<Map<String, dynamic>>();
    final obj = IsolateObj(outgoing);
    messages.listen(obj._handleMessageFromMain);
  }
}

另一个别扭的地方是ReceivePort中接纳的数据类型是不同的,第一个数据元素类型是SendPort,要让接纳方能够发送数据,其后的元素才干是真正通讯的数据类型。

void handleResponsesFromIsolate(dynamic msg) {
  if (msg is SendPort) {
  } else if (msg is Map<String, dynamic>) {
  }
}

在Isolate中处理完音讯,或许需求再给主线程一些反应,也便是说主线程除了“发送”数据外也要“接纳”数据,来完成“双向通讯”。Dart中发送和接纳这两个操作是分开的,需求分别用SendPortReceivePort。而一旦接纳数据,主线程中也不得不树立一个巨大的switch case

class IsolateObj {
  final SendPort outgoing;
  IsolateObj(this.outgoing);
  void _create(Map<String, dynamic> msg) {
    // do some time consuming creation operation
    outgoing.send({
      'type': 'created',
    });
  }
  void _dispose(Map<String, dynamic> msg) {
    // do some dipose
    outgoing.send({
      'type': 'created',
    });
  }
  void _handleMessageFromMain(Map<String, dynamic> msg) {
    final type = msg['type'];
    switch (type) {
      case 'create':
        _create(msg);
        break;
      case 'dispose':
        _dispose(msg);
        break;
    }
  }
}
class MainObj {
  void handleResponsesFromIsolate(dynamic msg) {
    if (msg is SendPort) {
    } else if (msg is Map<String, dynamic>) {
      switch (msg['type']) {
        case 'created':
          _onCreated(msg);
          break;
        case 'disposed':
          break;
      }
    }
  }
  void create() {
    _send?.send({
      'type': 'create',
    });
  }
  void _onCreated(msg) {
    final data = msg['data'];
  }
}

显然这种写法的第三个缺陷是上下文割裂。发送处和接纳处位于不同的办法体内,许多时候不得不再存储额外的上下文信息。由于时序的原因,一些类成员也不得不声明成可空类型,比如SendPort?

class MainObj {
  SendPort? _send;
  void handleResponsesFromIsolate(dynamic msg) {
    if (msg is SendPort) {
      _send = msg;
    } else if (msg is Map<String, dynamic>) {
    }
  }
}

上下文关联

已然Isolate通讯是用一种”通道“,咱们能不能像http恳求那样发送完恳求之后直接“等候”当次恳求的回来?就像这样:

class MainObj {
  Future<void> create() async {
    _send?.send({
      'type': 'create',
    });
    final msg = await getNextElementFromIsolate();
    final data = msg['data'];
  }
}

这样主线程一侧就能够移除switch case这一坨,并且createonCreated能够兼并起来。咱们能够发现ReceivePort其实便是一个Stream。能够满足获取下一个元素的办法只要Stream.first,并且只要能获取下一个Stream元素,有了Stream.first的另一个妙处是,SendPort能够防止成为可空类型:

final isolate = await Isolate.spawn<SendPort>(
  IsolateObj.setupIsolate,
  recv.sendPort,
);
final port = await recv.first;
final data = recv.cast<Map<String, dynamic>>();
final main = MainObj(data, port);
class MainObj {
  final Stream<Map<String, dynamic>> _data;
  final SendPort _send;
  MainObj(this._data, this._send);
  Future<void> create() async {
    _send.send({
      'type': 'create',
    });
    final msg = await _data.first;
    final data = msg['data'];
  }
}

太棒了,一下子大大简化了主线程这一侧的操作逻辑!

可是!别高兴太早。这个完成有个很大的问题。

监听时序

咱们的意图其实很清晰,发送一次恳求,接纳这次恳求的成果,也便是恳求和成果1对1。Stream中的元素应该一个接一个的接纳,上一个元素被消费完,下一个元素再接纳,不然成果就会紊乱。然而问题就在Stream.first中的完成仅仅是加了一个linstener,等同于Stream.elementAt(0)。屡次调用Stream.first,仅仅注册屡次监听,当一个元素数据来了成果仅仅触发屡次告诉,这并不是咱们期望的“1对1”。也便是说:

final results = [
  await _data.first,
  await _data.first,
]; // [element1, element2]

这样写是成果是对的,在接纳了一个元素之后再去等候下一个元素,但如果同时等候并行的操作成果,那数据则是错误的:

final results = await Future.wait([
  _data.first,
  _data.first,
]); // [element1, element1]

实际开发中,第二种情况才是最多的,比如在main.create()调用之后再一次调用了main.create(),这时候数据还没有在isolate中处理,等处理完之后内部接纳的其实是相同的数据。

流式队列

要处理这个问题,有必要得用新的办法。目的很清晰:在流中一个一个的获取元素对象。多亏了美好的StreamQueue,已经帮咱们达到这个目的。StreamQueue放在package:async/async.dart中。只需求一点点更改:

final port = await recv.first;
final data = recv.cast<Map<String, dynamic>>();
final main = MainObj(StreamQueue(data), port);
class MainObj {
  final StreamQueue<Map<String, dynamic>> _data;
  final SendPort _send;
  MainObj(this._data, this._send);
  Future<void> create() async {
    _send.send({
      'type': 'create',
    });
    final msg = await _data.next;
    final data = msg['data'];
  }
}

轻松完成咱们的效果!其有用一个简略的List<Completer<Map<String, dynamic>>就能够完成以上的效果,有爱好的朋友能够尝试一下!

被迫告诉

现实情况永远是更杂乱的,在异步通讯中,可不是只要主动恳求的情况,还有一种被迫告诉的情况,也便是不需求主线程发送恳求,不守时接纳isolate的告诉。这和网络恳求是类似的,ReceivePort就像一个长衔接,在长衔接树立起来之后,服务端很或许会主动推送一些事件,客户端然后被迫接纳告诉。这样的情况或许会构成乱序,本来流程是这样:

main                      isolate
create -----------------> _create
create -----------------> _create
onCreated <-------------- data1
onCreated <-------------- data2

但如果稠浊了被迫告诉:

main                      isolate
create -----------------> _create
create -----------------> _create
onCreated <-----notify--- data3
onCreated <-------------- data1
?         <-------------- data2

所以问题的关键是咱们预设了接纳的响应对应最近一次发送的恳求,典型的客户端服务端形式。但实际情况不是这样,Isolate虽然是执行主线程发出的指令,但在执行的过程中或许会触发某个条件去告诉主线程,在这种情况下,主线程接纳的或许都是错误的成果。

要处理这个问题其实也非常简略。已然被迫告诉的音讯会构成时序紊乱,那把它从流中过滤出来单独构成一个流就能够了。

final port = await recv.first;
final receiving = recv.cast<Map<String, dynamic>>().asBroadcastStream();
final data = receiving.where((e) => e['type'] != 'notify');
final notify = receiving.where((e) => e['type'] == 'notify');
final main = MainObj(StreamQueue(data), port);
notify.listen((e) {
  // do something on notified.
});

到这儿,乱序问题其实已经处理了。但这儿给出另一种办法,已然被迫告诉型音讯需求从流中除掉,那干脆就不要放进这个流里——谁说只能有一个通道的?咱们知道ReceivePort其实便是一个Stream,那针对notify音讯,单独开设一个通道就好,终究代码如下:

import 'dart:isolate';
import 'package:async/async.dart' show StreamQueue;
void main() async {
  final recv = ReceivePort('main.incoming');
  final notify = ReceivePort('notify.incoming');
  final isolate = await Isolate.spawn<(SendPort, SendPort)>(
    IsolateObj.setupIsolate,
    (recv.sendPort, notify.sendPort),
  );
  final port = await recv.first;
  final receiving = recv.cast<Map<String, dynamic>>().asBroadcastStream();
  final main = MainObj(StreamQueue(receiving), port);
  notify.cast<Map<String, dynamic>>().listen(main.onNotified);
  await main.create();
  recv.close();
  isolate.kill();
}
class MainObj {
  final StreamQueue<Map<String, dynamic>> _data;
  final SendPort _send;
  MainObj(this._data, this._send);
  Future<void> create() async {
    _send.send({
      'type': 'create',
    });
    final msg = await _data.next;
    final data = msg['data'];
    // continue to do something.
  }
  void onNotified(Map<String, dynamic> data) {
  }
}
class IsolateObj {
  final SendPort outgoing;
  final SendPort notify;
  IsolateObj(this.outgoing, this.notify);
  void _create(Map<String, dynamic> msg) {
    // do some creation
    outgoing.send({
      'type': 'created',
    });
  }
  void _dispose(Map<String, dynamic> msg) {
  }
  void _notifyCallback() {
    notify.send({
      'type': 'notify',
    });
  }
  void _handleMessageFromMain(Map<String, dynamic> msg) {
    final type = msg['type'];
    switch (type) {
      case 'create':
        _create(msg);
        break;
      case 'dispose':
        _dispose(msg);
        break;
    }
  }
  static void setupIsolate((SendPort, SendPort) r) async {
    final (outgoing, notify) = r;
    final incoming = ReceivePort('_isolate.incoming');
    outgoing.send(incoming.sendPort);
    final messages = incoming.cast<Map<String, dynamic>>();
    final obj = IsolateObj(outgoing, notify);
    messages.listen(obj._handleMessageFromMain);
  }
}

多亏了Dart3中的Record,传送多参数现在非常简略了!

这种写法能够作为Isolate通讯的一种新范式,和各种switch case说再见吧!