Creative Bracket

How to use Streams in Dart (Part 2)

In Part 1 of the series, we learnt what Streams were and looked at two ways we would be interacting with them. We also looked at the flexibility of handling Streams with the StreamController<T> type and libraries that expose Stream<T> types for us to listen to and process our chunks of data.

In this post we will be looking at how transformations are made in the Stream using what are known as Stream Transformers. From that we will proceed to looking at a common design pattern centered around the use of Streams.


What is a Stream Transformer?

A Stream Transformer allows us to perform data transformations on a Stream. These transformations are then pushed back into the Stream to be received by all the listeners defined for that particular stream.

Dart does stream transformation through a class called StreamTransformer<S,T> which comes in three different forms:

// 1. Creates a transformer based on the provided `onListen` callback
StreamTransformer(
  StreamSubscription<T> onListen(
    Stream<S> stream,
    bool cancelOnError
  )
);

// 2. Creates a transformer based on the provided `bind` callback fn
StreamTransformer.fromBind(
  Stream<T> bind(Stream<S>)
);

// 3. Creates a transformer that will delegate events to the
// provided `handleData`, `handleError` and `handleDone` callback functions
StreamTransformer.fromHandlers({
  void handleData(
    S data,
    EventSink<T> sink
  ),
  void handleError(
    Object error,
    StackTrace stackTrace,
    EventSink<T> sink
  ),
  void handleDone(
    EventSink<T> sink
  ), 
});  

For this post I will focus on StreamTransformer.fromHandlers(...) because I found it simpler to work with. Here’s a code sample of what a valid stream transformer looks like:

var streamTransformer = StreamTransformer<num, num>.fromHandlers(
    handleData: (num data, EventSink sink) {
      // The actual transformation we're making
      // here is multiplying $data by 2
      sink.add(data * 2);
    }, 
    handleError: (Object error, StackTrace stacktrace, EventSink sink) {
      sink.addError('Something went wrong: $error');
    }, 
    handleDone: (EventSink sink) => sink.close(),
  );

Our StreamTransformer.fromHandlers(...) named constructor receives callback functions under three named parameters:

  1. handleData: This will respond to any data event emitted from the stream. The arguments supplied are the data from the emitted event and the EventSink<T> instance belonging to the current stream undergoing this transformation. This instance also contains the add() method for relaying manipulated data to stream listeners.
  2. handleError: This will respond to any error events emitted from the stream. You are given arguments containing the error, a stack trace and the EventSink<T> instance that contains the addError() method for sending a customised error message to listeners of the stream.
  3. handleDone: Runs when there is no more data to be processed by the stream. This usually happens when the close() method of the stream’s EventSink<T> instance is invoked.

To use our streamTransformer, we need to pass it as an argument via the stream’s transform() method:

var controller = StreamController<num>();

// Call the `transform` method on the controller's stream
// while passing in the stream transformer
var controllerStream = controller.stream.transform(streamTransformer);

// Just print out transformations to the console
controllerStream.listen(print);

// Add data to stream to see transformations in effect
controller.sink.add(1); // 2
controller.sink.add(2); // 4
controller.sink.add(3); // 6
controller.sink.add(4); // 8
controller.sink.add(5); // 10

Here’s the working example on DartPad to play with:

Single subscription vs Broadcast streams

Streams are instantiated in two forms: single subscription and broadcast. The differences in principle is that single subscription can only have one data listener event. Adding another will throw the exception below:

controllerStream.listen(print);
controllerStream.listen(print); // Throws the error below:
// ---
// Uncaught exception: Bad state: Stream has already been listened to. 

However, broadcast streams allow you to define multiple data listeners. So let’s amend the controller value to use a broadcast stream instance:

var controller = StreamController<num>.broadcast();

Now let’s test this again:

controllerStream.listen(print);
controllerStream.listen(print)
// --
// 2
// 2
// 4
// 4
// 6
// 6
// 8
// 8
// 10
// 10

Conclusion

The beauty of Streams in Dart is that the API is implemented the same across all platforms, whether web, server or mobile. This fact presents a “light bulb moment” where we can write reusable logic for use across the platforms as long as it utilizes the Stream API.

This brings us to the design pattern known as Business Logic Components or BLoC for short, which will be covered in Part 3! That will allow us to draw together everything we’ve learnt so far in the series.

Further reading


Sharing is caring 🤗

If you enjoyed reading this post, please share this through the various social buttons hovering on the left/top side of the screen ↖️⬆️. Also, check out and subscribe to my YouTube channel (hit the bell icon too) for videos on Dart.

Subscribe to the newsletter for my free 35-page Get started with Dart eBook and to be notified when new content is released.

Like, share and follow me 😍 for more content on Dart.

Article Photo by Jérôme Prax on Unsplash

Jermaine Oppong

Hello 👋, I show programmers how to build full-stack web applications with the Dart SDK. I am passionate about teaching others, having received tremendous support on sites like dev.to and medium.com for my articles covering various aspects of the Dart language and ecosystem.

Useful learning materials