Understanding Streams in Dart (Part 2)

Understanding Streams in Dart (Part 2)

In Part 1 of the series, we learnt what Streams were and looked at two ways we would be interacting with them. We also looked at the flexibility of handling Streams with the StreamController<T> type and libraries that expose Stream<T> types for us to listen to and process our chunks of data.

In this post we will be looking at how transformations are made in the Stream using what are known as Stream Transformers. From that we will proceed to looking at a common design pattern centred around the use of Streams.

What is a Stream Transformer?

A Stream Transformer allows us to perform data transformations on a Stream. These transformations are then pushed back into the Stream to be received by all the listeners defined for that particular stream.

Dart does stream transformation through a class called StreamTransformer<S,T> which comes in three different forms:

// 1. Creates a transformer based on the provided `onListen` callback
StreamTransformer(
  StreamSubscription<T> onListen(
    Stream<S> stream,
    bool cancelOnError
  )
);

// 2. Creates a transformer based on the provided `bind` callback fn
StreamTransformer.fromBind(
  Stream<T> bind(Stream<S>)
);

// 3. Creates a transformer that will delegate events to the
// provided `handleData`, `handleError` and `handleDone` callback functions
StreamTransformer.fromHandlers({
  void handleData(
    S data,
    EventSink<T> sink
  ),
  void handleError(
    Object error,
    StackTrace stackTrace,
    EventSink<T> sink
  ),
  void handleDone(
    EventSink<T> sink
  ), 
});

For this post I will focus on StreamTransformer.fromHandlers(...) because I found it simpler to work with. Here’s a code sample of what a valid stream transformer looks like:

var streamTransformer = StreamTransformer<num, num>.fromHandlers(
    handleData: (num data, EventSink sink) {
      // The actual transformation we're making
      // here is multiplying $data by 2
      sink.add(data * 2);
    }, 
    handleError: (Object error, StackTrace stacktrace, EventSink sink) {
      sink.addError('Something went wrong: $error');
    }, 
    handleDone: (EventSink sink) => sink.close(),
  );

Our StreamTransformer.fromHandlers(...) named constructor receives callback functions under three named parameters:

  1. handleData: This will respond to any data event emitted from the stream. The arguments supplied are the data from the emitted event and the EventSink<T> instance belonging to the current stream undergoing this transformation. This instance also contains the add() method for relaying manipulated data to stream listeners.

  2. handleError: This will respond to any error events emitted from the stream. You are given arguments containing the error, a stack trace and the EventSink<T> instance that contains the addError() method for sending a customised error message to listeners of the stream.

  3. handleDone: Runs when there is no more data to be processed by the stream. This usually happens when the close() method of the stream’s EventSink<T> instance is invoked.

To use our streamTransformer, we need to pass it as an argument via the stream’s transform() method:

var controller = StreamController<num>();

// Call the `transform` method on the controller's stream
// while passing in the stream transformer
var controllerStream = controller.stream.transform(streamTransformer);

// Just print out transformations to the console
controllerStream.listen(print);

// Add data to stream to see transformations in effect
controller.sink.add(1); // 2
controller.sink.add(2); // 4
controller.sink.add(3); // 6
controller.sink.add(4); // 8
controller.sink.add(5); // 10

View running example on DartPad

Single subscription vs Broadcast streams

Streams are instantiated in two forms: single subscription and broadcast. The differences in principle is that single subscription can only have one data listener event. Adding another will throw the exception below:

controllerStream.listen(print);
controllerStream.listen(print); // Throws the error below:
// ---
// Uncaught exception: Bad state: Stream has already been listened to.

However, broadcast streams allow you to define multiple data listeners. So let’s amend the controller value to use a broadcast stream instance:

var controller = StreamController<num>.broadcast();

Now lets run this again:

controllerStream.listen(print);
controllerStream.listen(print)
// --
// 2
// 2
// 4
// 4
// 6
// 6
// 8
// 8
// 10
// 10

Conclusion

The beauty of Streams in Dart is that the API is implemented the same across all platforms, whether web, server or mobile. This fact presents a “light bulb moment” where we can write reusable logic for use across the platforms as long as it utilizes the Stream API.

Further reading