Understanding Streams in Dart (Part 1)

Understanding Streams in Dart (Part 1)

The concept of Streams have proved to be a challenging topic to understand for most programmers diving into Dart (or any other language for that matter), partly because it takes a couple of tries along with examples to grasp. In this article series I will attempt to demystify the use of streams in Dart while building a tangible example with what we learn.

What are Streams?

Looking at the Dart documentation, its defined as:

A source of asynchronous data events. A Stream provides a way to receive a sequence of events. Each event is either a data event, also called an element of the stream, or an error event, which is a notification that something has failed. When a stream has emitted all its event, a single “done” event will notify the listener that the end has been reached.

api.dartlang.org

Streams as a concept, refer to the channel by which data flows from point A to point B. In this channel we are able to perform various transformations to the data that is “read in” before it reaches point B. This channel is useful when transferring data in chunks rather than the whole at once.

The way we work with Streams in Dart is through a set of helper classes offered by the SDK. These helper classes include utility methods to push data to a stream and notify listeners of that stream to capture whatever data gets added.

The most generic class that represents a stream is called Stream<T>. Generally we do not use this class directly because this is rather exposed by other classes in the Dart arsenal. See this as an interface for interacting with this channel where data is flowing through.

Basic example with StreamController<T>

A StreamController<T> contains a stream which allows a consumer to send data, done and error events to it. We would access this stream by doing streamController.stream, allowing us to invoke any of the methods defined in its documentation.

Here’s an example with the StreamController<T> class:

var streamController = StreamController();

// Accessing the stream and listening for data event
streamController.stream.listen((data) {
  print('Got eem! $data');
});

The snippet above allows us to watch the stream channel for incoming data chunks. We then respond to this data by printing it out to the console.

So I guess the question that follows is: How do we trigger the data listener event? Answer: By feeding data to the stream! This is made possible through another class called EventSink<T>. This object contains an add() method for feeding data to the stream:

streamController.sink.add('Added this string');

// Result
// Got eem! Added this string

The listen() method on the stream can also catch error messages. This is because a StreamSubscription<T> object is generated whenever you listen to a stream. This object is the reason we can handle various events such as data, error and done (when the close() method is called on the stream).

Here’s the full definition for the listen() method:

StreamSubscription<T> listen (
  void onData(T event), 
  {
    Function onError,
    void onDone(), // Invoked when the stream is closed
    bool cancelOnError // Kills the stream when an error occurs
  },
);

We can therefore rewrite our listen() method as such:

streamController.stream.listen(
  (data) => print('Got eem! $data'),
  onError: (err) => print('Got an error! $err'),
  onDone: () => print('Mission complete!'),
  cancelOnError: false,
);

Here’s how we would call the "error" and "done" events:

streamController.sink.addError('Houston, we have a problem!');
// Got an error! Houston, we have a problem!

streamController.sink.close();
// Mission complete!

View source code on DartPad

Streams exposed through libraries

Although StreamController<T> allows us fine-grained control with streams we instantiate ourselves, there are inbuilt Dart libraries that use Streams under the hood. For example, take a look at this snippet for setting up a server:

import 'dart:io';

void main() async {
  var server = await HttpServer.bind('localhost', 8080);

  // HttpServer exposes a Stream<T> interface
  server.listen((HttpRequest request) {
    request.response.write('Hello, World!');
    request.response.close();
  });
}

The snippet above instantiates an HttpServer for creating web servers. This class exposes a Stream<T> interface, which means that we can now listen on this stream that will contain request objects produced when a user hits our server.

Here’s another example of a stream exposed in the web browser:

import 'dart:html';

void main() {
  var button = querySelector('button');

  // `onClick` is a Stream<T> instance that receives user click data events
  button.onClick.listen((_) => print('Button clicked!'));
}

User interactions that take place in the browser such as clicking, scrolling, typing etc… are emitted as “data” events captured in a stream. In other words HTML elements also expose a Stream<T> interface for dealing with user interactions on the page.

There are a lot more classes that use Streams under the hood, the point here being that in general you won’t be instantiating Stream<T> objects directly, but rather these will be instantiated for you through various library classes in the SDK.

Conclusion

Streams provide a powerful way of working with chunks of data. Since this operates in an asynchronous manner, we get the benefit of running code in a non-blocking fashion. I would recommend reading through the documentation, particularly the dart:async library which contains classes for asynchronous programming, such as Streams and Futures.

In the next part of the series, we will look at how to perform transformations on the stream, as well as demonstrating a common design pattern centered around the use of Streams.

Further reading