Stream<E> asyncMap<E>(dynamic convert(T event))

Creates a new stream with each data event of this stream asynchronously mapped to a new event.

This acts like map, except that convert may return a Future, and in that case, the stream waits for that future to complete before continuing with its result.

The returned stream is a broadcast stream if this stream is.

Source

Stream<E> asyncMap<E>(convert(T event)) {
  StreamController<E> controller;
  StreamSubscription<T> subscription;

  void onListen() {
    final add = controller.add;
    assert(controller is _StreamController ||
           controller is _BroadcastStreamController);
    final _EventSink<E> eventSink =
        controller as Object /*=_EventSink<E>*/;
    final addError = eventSink._addError;
    subscription = this.listen(
        (T event) {
          dynamic newValue;
          try {
            newValue = convert(event);
          } catch (e, s) {
            controller.addError(e, s);
            return;
          }
          if (newValue is Future) {
            subscription.pause();
            newValue.then(add, onError: addError)
                    .whenComplete(subscription.resume);
          } else {
            controller.add(newValue as Object/*=E*/);
          }
        },
        onError: addError,
        onDone: controller.close
    );
  }

  if (this.isBroadcast) {
    controller = new StreamController<E>.broadcast(
      onListen: onListen,
      onCancel: () { subscription.cancel(); },
      sync: true
    );
  } else {
    controller = new StreamController<E>(
      onListen: onListen,
      onPause: () { subscription.pause(); },
      onResume: () { subscription.resume(); },
      onCancel: () => subscription.cancel(),
      sync: true
    );
  }
  return controller.stream;
}