Creating streams in Dart

    The dart:async library contains two typesthat are important for many Dart APIs: andFuture.Where a Future represents the result of a single computation,a stream is a sequence of results.You listen on a stream to get notified of the results(both data and errors)and of the stream shutting down.You can also pause while listening or stop listening to the streambefore it is complete.

    But this article is not about using streams.It’s about creating your own streams.You can create streams in a few ways:

    • Transforming existing streams.
    • Creating a stream from scratch by using an function.
    • Creating a stream by using a StreamController.

    This article shows the code for each approachand gives tips to help you implement your stream correctly.

    For help on using streams, see.

    The common case for creating streams is that you already have a stream,and you want to create a new stream based on the original stream’s events.For example you might have a stream of bytes thatyou want to convert to a stream of strings by UTF-8 decoding the input.The most general approach is to create a new stream thatwaits for events on the original stream and thenoutputs new events. Example:

    For many common transformations,you can use Stream-supplied transforming methodssuch as map(), where(), expand(), and take().

    For example, assume you have a stream, counterStream,that emits an increasing counter every second.Here’s how it might be implemented:

    1. var counterStream =
    2. Stream<int>.periodic(Duration(seconds: 1), (x) => x).take(15);

    To quickly see the events, you can use code like this:

    1. counterStream.forEach(print); // Print an integer every second, 15 times.

    To transform the stream events, you can invoke a transforming methodsuch as map() on the stream before listening to it.The method returns a new stream.

    1. // Double the integer in each event.
    2. var doubleCounterStream = counterStream.map((int x) => x * 2);
    3. doubleCounterStream.forEach(print);

    Instead of map(), you could use any other transforming method,such as the following:

    Often, a transforming method is all you need.However, if you need even more control over the transformation,you can specify aStreamTransformerwith Stream’s transform() method.The platform libraries provide stream transformers for many common tasks.For example, the following code uses the utf8.decoder and LineSplittertransformers provided by the dart:convert library.

    1. Stream<List<int>> content = File('someFile.txt').openRead();
    2. List<String> lines =
    3. await content.transform(utf8.decoder).transform(LineSplitter()).toList();

    One way to create a new stream is withan asynchronous generator (async) function.The stream is created when the function is called,and the function’s body starts running when the stream is listened to.When the function returns, the stream closes.Until the function returns, it can emit events on the stream byusing yield or yield statements.

    Here’s a primitive example that emits numbers at regular intervals:

    1. Stream<int> timedCounter(Duration interval, [int maxCount]) async* {
    2. int i = 0;
    3. await Future.delayed(interval);
    4. yield i++;
    5. if (i == maxCount) break;
    6. }
    7. }

    When the listener cancels(by invoking cancel() on the StreamSubscriptionobject returned by the listen() method),then the next time the body reaches a yield statement,the yield instead acts as a return statement.Any enclosing finally block is executed,and the function exits.If the function attempts to yield a value before exiting,that fails and acts as a return.

    When the function finally exits, the future returned bythe cancel() method completes.If the function exits with an error, the future completes with that error;otherwise, it completes with null.

    Another, more useful example is a function that convertsa sequence of futures to a stream:

    1. Stream<T> streamFromFutures<T>(Iterable<Future<T>> futures) async* {
    2. for (var future in futures) {
    3. var result = await future;
    4. yield result;
    5. }
    6. }

    This function asks the futures iterable for a new future,waits for that future, emits the resulting value, and then loops.If a future completes with an error, then the stream completes with that error.

    It’s rare to have an async function building a stream from nothing.It needs to get its data from somewhere,and most often that somewhere is another stream.In some cases, like the sequence of futures above,the data comes from other asynchronous event sources.In many cases, however, an function is too simplistic toeasily handle multiple data sources.That’s where the StreamController class comes in.

    If the events of your stream comes from different parts of your program,and not just from a stream or futures that can traversed by an async function,then use ato create and populate the stream.

    A StreamController gives you a new streamand a way to add events to the stream at any point, and from anywhere.The stream has all the logic necessary to handle listeners and pausing.You return the stream and keep the controller to yourself.

    The following example(from stream_controller_bad.dart)shows a basic, though flawed, usage of StreamControllerto implement the timedCounter() function from the previous examples.This code creates a stream to return,and then feeds data into it based on timer events,which are neither futures nor stream events.

    As before, you can use the stream returned by timedCounter() like this:

    1. var counterStream = timedCounter(const Duration(seconds: 1), 15);
    2. counterStream.listen(print); // Print an integer every second, 15 times.

    This implementation of timedCounter() hasa couple of problems:

    • It starts producing events before it has subscribers.
    • It keeps producing events even if the subscriber requests a pause.

    As the next sections show,you can fix both of these problems by specifyingcallbacks such as onListen and onPausewhen creating the StreamController.

    As a rule, streams should wait for subscribers before starting their work.An async* function does this automatically,but when using a StreamController,you are in full control and can add events even when you shouldn’t.When a stream has no subscriber,its StreamController buffers events,which can lead to a memory leakif the stream never gets a subscriber.

    Try changing the code that uses the stream to the following:

    1. void listenAfterDelay() async {
    2. var counterStream = timedCounter(const Duration(seconds: 1), 15);
    3. await Future.delayed(const Duration(seconds: 5));
    4.  
    5. // After 5 seconds, add a listener.
    6. print(n); // Print an integer every second, 15 times.
    7. }
    8. }

    To be notified of subscriptions, specify anonListen argument when you create the StreamController.The onListen callback is calledwhen the stream gets its first subscriber.If you specify an onCancel callback,it’s called when the controller loses its last subscriber.In the preceding example,Timer.periodic()should move to an onListen handler,as shown in the next section.

    Honoring the pause state

    Avoid producing events when the listener has requested a pause.An async* function automatically pauses at a yield statementwhile the stream subscription is paused.A StreamController, on the other hand, buffers events during the pause.If the code providing the events doesn’t respect the pause,the size of the buffer can grow indefinitely.Also, if the listener stops listening soon after pausing,then the work spent creating the buffer is wasted.

    To see what happens without pause support,try changing the code that uses the stream to the following:

    1. void listenWithPause() {
    2. var counterStream = timedCounter(const Duration(seconds: 1), 15);
    3. StreamSubscription<int> subscription;
    4.  
    5. subscription = counterStream.listen((int counter) {
    6. print(counter); // Print an integer every second.
    7. if (counter == 5) {
    8. // After 5 ticks, pause for five seconds, then resume.
    9. subscription.pause(Future.delayed(const Duration(seconds: 5)));
    10. }
    11. });
    12. }

    When the five seconds of pause are up,the events fired during that time are all received at once.That happens because the stream’s source doesn’t honor pausesand keeps adding events to the stream.So the stream buffers the events,and it then empties its buffer when the stream becomes unpaused.

    The following version of timedCounter()(from )implements pause by using theonListen, onPause, onResume, and onCancel callbackson the StreamController.

    Run this code with the function above.You’ll see that it stops counting while paused,and it resumes nicely afterwards.

    You must use all of the listeners—onListen,onCancel, onPause, and onResume—to benotified of changes in pause state.The reason is that if thesubscription and pause states both change at the same time,only the onListen or onCancel callback is called.

    When creating a stream without using an async* function,keep these tips in mind:

    • Be careful when using a synchronous controller—for example,one created using StreamController(sync: true).When you send an event on an unpaused synchronous controller(for example, using the add(), addError(), or close() methods defined byEventSink),the event is sent immediately to all listeners on the stream.Stream listeners must never be called untilthe code that added the listener has fully returned,and using a synchronous controller at the wrong time canbreak this promise and cause good code to fail.Avoid using synchronous controllers.

    • If you use StreamController,the onListen callback is called beforethe listen call returns the StreamSubscription.Don’t let the onListen callback dependon the subscription already existing.For example, in the following code,an onListen event fires(and handler is called)before the subscription variablehas a valid value.

    1. subscription = stream.listen(handler);
    • The onListen, onPause, onResume, and onCancelcallbacks defined by StreamController arecalled by the stream when the stream’s listener state changes,but never during the firing of an eventor during the call of another state change handler.In those cases, the state change callback is delayed untilthe previous callback is complete.

    • Don’t try to implement the Stream interface yourself.It’s easy to get the interaction between events, callbacks,and adding and removing listeners subtly wrong.Always use an existing stream, possibly from a StreamController,to implement the listen call of a new stream.