Monday, March 18, 2013

Getting your feet wet with Streams

Summary of "Dartwatch: Getting your feet wet with Streams" (via tldr.io)
  • Streams are the new, unified way to interact with async APIs in Dart.
  • They emit a repeating series of events (such as messages, file data, user clicks)
  • A stream is either single-subscriber or multi-subscriber
  • Example stream usage is button.onClick.listen((e) => print("button clicked"); or reading a file from the file system
  • Streams have a number of methods to take partial data (first, take...), check where data matches (contains, any, every), and transform (List to String).


Over the last few months you can't help but notice that Streams have appeared in Dart in a big way.
Whether running in the browser as part of the various html events, such as button.onClick, or on the server as part of the dart:io changes, Streams form a unified interface to anything that might send out a repeating series of data.

In this, (the first of two blog posts), you will learn about:
  • how to consume streams (this post)
  • how to use the Stream classes to send your own data (coming soon)
Before we get started, though, it is important to note that Streams are part of the dart:async library, and share a close relationship with Dart's async staple: Future due to their asynchronous nature (you can't block code until a user clicks a button!)

Background reading: Futures

The Future class is used for async communication within various Dart. A common example is a browser HttpRequest (or AJAX request).  You have some code running in the browser that wants to get a value from the server, let's say, the current number of logged on users.  If your client-side code called the server and then waited (blocking) for the server to respond, the UI would freeze up until the server responded (due to code execution being halted).
Fortunately (and provided by the A in AJAX), this call to the server is asynchronous, and modelled by Dart's Future API, which returns a future value wrapped up in a callback exposed by the then() function.

The following snippet shows how this might work, with the callback function in bold italics:

var url = "http://example.com/userCount";
HttpRequest.getString(url).then( (String result) {
 print("User count: $result");

});

This pattern is used by streams to retrieve or manipulate data that the stream is sending out to its consumers.

What are streams for?

Imagine you are writing a chat application.  On a single client, you will be receiving messages and displaying them to the user.  You can't simply write a while loop, because that will block execution, so you need to use async callbacks.  This is an ideal use case for streams - you have one part of your code pushing data into the stream, and another part of your code listening to the stream.  

Key concepts:

Populating a stream: Data gets into a stream from a StreamController
Consuming a stream: Data is sent out of a stream to a StreamSubscriber (or possibly multiple subscribers).

We'll look at the consuming a stream first as you're more likely to come across streams as a consumer from existing APIs within Dart.

Consuming a stream

Lets take a look at some simple stream code.  For simplicity we're going to create a stream from a fixed, literal list at the moment by using the stream's fromIterable() constructor, rather than by dynamically populating it with a StreamController.  

1
2
var data = [1,2,3,4,5]; // some sample data
var stream = new Stream.fromIterable(data);  // create the stream

Now that we have a stream that is ready to send out some data, we can use that stream to listen to some data.

The typical way is to use the stream's listen() method to subscribe to the stream.  This has a number of optional parameters, and one mandatory parameter, which is the onData handler callback function:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
import 'dart:async';

main() {
  var data = [1,2,3,4,5]; // some sample data
  var stream = new Stream.fromIterable(data);  // create the stream

  // subscribe to the streams events
  stream.listen( (value) {     // 
    print("Received: $value"); // onData handler
  });                          // 
}

The listen() method is fired every time some data is received. In in our stream, the listen() callback is called for each of the data elements, so the output of running this code is as expected:

Received: 1
Received: 2
Received: 3
Received: 4
Received: 5

There are other ways to consume data from the stream, using properties such as first, last, length, isEmpty.  Each of these properties return a future: for example:

1
2
3
4
stream.first.then((value) => print("stream.first: $value"));  // 1
// stream.last.then((value) => print("stream.last: $value")); // 5
// stream.isEmpty.then((value) => print("stream.isEmpty: $value")); // false
// stream.length.then((value) => print("stream.length: $value")); // 5
You'll notice that some of the lines are commented out.  This is for an important reason.  Streams comes in two flavours: single or multiple subscriber.   By default, our stream is a single subscriber.  This means that if you try to listen to the stream more than once, you will get an exception, and using any of the callback functions or future properties counts as listening.

You can convert the single subscriber stream into a multi subscriber, or broadcast stream by using the asBroadcastStream() method, as shown below:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
import 'dart:async';

void main() {
  var data = [1,2,3,4,5];
  var stream = new Stream.fromIterable(data);
  var broadcastStream = stream.asBroadcastStream();
  
  broadcastStream.listen((value) => print("stream.listen: $value"));
  broadcastStream.first.then((value) => print("stream.first: $value"));
  broadcastStream.last.then((value) => print("stream.last: $value"));
  broadcastStream.isEmpty.then((value) => print("stream.isEmpty: $value"));
  broadcastStream.length.then((value) => print("stream.length: $value"));
}
Now that the stream allows multiple subscribers, you can add multiple listeners. You can check whether a stream is a broadcast stream by checking the stream.isBroadcast property.

Common Stream methods

There are lots of methods available on the stream class, be sure to check out the API docs for the complete list.  In the following section, I'll describe some of the more common methods.

Subsets of streaming data

Streams also have some useful methods for extracting parts of the data being sent out from the stream.  The take(), skip(), takeWhile() and skipWhile() and where() allow you to take a subset of data, as shown by the following example.  Each outputs its own stream that you can listen to.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
  broadcastStream
      .where((value) => value % 2 == 0) // divisble by 2
      .listen((value) => print("where: $value")); // where: 2
                                                  // where: 4
  
  broadcastStream
      .take(3) // takes only the first three elements
      .listen((value) => print("take: $value")); // take: 1
                                                 // take: 2
                                                 // take: 3
  
  broadcastStream
      .skip(3)  // skips the first three elements
      .listen((value) => print("skip: $value")); // skip: 4
                                                 // skip: 5
  
  broadcastStream
      .takeWhile((value) => value < 3) // take while true
      .listen((value) => print("takeWhile: $value")); // takeWhile: 1
                                                      // takeWhile: 2

  broadcastStream
      .skipWhile((value) => value < 3) // skip while true
      .listen((value) => print("skipWhile: $value")); // skipWhile: 4
                                                      // skipWhile: 5

Transforming

Another useful method is the transform() method, which takes a StreamTransformer instance.  This allows you to modify the contents of the stream.  the StreamTransformer constructor takes a handleData function, that is called for each value passed  from the stream.  You can modify the value as you wish, and add it back to the StreamSink (more on that later), which results in the modified values being output on the transform() method's stream.  The example below takes our data [1,2,3,4,5] and converts each item into two new String values "Message nand "Body n" values.  Each of these are placed onto the new stream.

1
2
3
4
5
6
7
8
9
// define a stream transformer
var transformer = new StreamTransformer(handleData: (value, sink) {
  // create two new values from the original value
  sink.add("Message: $value");
  sink.add("Body: $value");
});
  
// transform the stream and listen to its output
stream.transform(transformer).listen((value) => print("listen: $value"));
Running this produces the following output:

listen: Message: 1
listen: Body: 1
listen: Message: 2
listen: Body: 2
listen: Message: 3
listen: Body: 3
listen: Message: 4
listen: Body: 4
listen: Message: 5
listen: Body: 5

Validating

Sometimes, you want to validate that the data returned from a stream meets certain conditions.  A number of functions are provided which return Future<bool> values: any(), every(), contains():

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
broadcastStream
    .any((value) => value < 5)
    .then((result) => print("Any less than 5?: $result")); // true
  
broadcastStream
    .every((value) => value < 5)
    .then((result) => print("All less than 5?: $result")); // false
  
broadcastStream
    .contains(4)
    .then((result) => print("Contains 4?: $result")); // true

Single value streams

Some streams are designed to return only a single value, and you want to ensure that you only retrieve a  single value from them.  The  single getter and singleMatching() method both return a future containing the single value, or raise an error if they don't.  For example, with our data set containing 5 values, the following will return the value 1:

1
2
3
broadcastStream
    .singleMatching((value) => value < 2) 
    .then((value) => print("single value: $value"));
whereas, the following will raise an error and halt the application (because the error is unhandled):
1
2
3
broadcastStream
    .single
    .then((value) => print("single value: $value"));

Note: Thanks to Florian who's just pointed out that the xMatching (such as singleMatching) has been renamed xWhere (such as singleWhere) in the current bleeding edge build.  This blog was written with r19425, so likely you'll be using the Where version rather than the Matching version.

This brings us neatly on to...

Error handling in streams and futures

There is already an excellent article about handling errors with future based APIs on the dartlang website, so I'll not repeat that here.  It's useful to note, though, that we could rewrite our previous snippet to include some error handling so that we can detect that the single() call has failed.  A Future's then() function returns a future, and you can use its catchError handler.  This catchError handler will catch any errors thrown within the .then() callback:

1
2
3
4
5
broadcastStream
    .single
    .then((value) => print("single value: $value")) 
    .catchError((err) => print(err));
// output: Bad State: More than one element

Error handling with a stream subscription

When you use the listen() function to listen to values coming from a stream, you have the option of adding error handling.  The listen function creates a StreamSubscription instance, which is the return value of the listen() function.
A StreamSubscription has a number of handlers, namely: onData, onError and onDone.  Each of these can be assigned via the listen() function, or later, via the returned StreamSubscription object.  As far as I can tell, the following two are equivalent.  Note the onError handler, which you can use to catch errors output from the stream.

1
2
3
4
5
// setup the handlers through the subscription's handler methods
var subscription = stream.listen(null);
subscription.onData((value) => print("listen: $value"));
subscription.onError((err) => print("error: $err"));
subscription.onDone(() => print("done"));
and
1
2
3
4
5
// setup the handlers as arguments to the listen() function
var subscription = stream.listen(
    (value) => print("listen: $value"),
    onError: (err) => print("error: $err"),
    onDone: () => print("done"));
These two both print the same output:

listen: 1
listen: 2
listen: 3
listen: 4
listen: 5
done


One of  the benefits of using the form var subscription = stream.listen(null) and then setting up the onData handler separately means that you can use the subscription object in the data handler itself.
The onDone handler is called when there is no more data, and the underlying stream is closed.

Unsubscribing from the stream

You can also use the StreamSubscription object to unsubscribe from the stream, by using its cancel() method.  For example, this unsubscribes from the stream after the value 2 is received, so never gets receives the onDone message.

1
2
3
4
5
6
7
var subscription = stream.listen(null);
subscription.onData((value) {
  print("listen: $value");
  if (value == 2) subscription.cancel(); // cancel the subscription
});
subscription.onError((err) => print("error: $err"));
subscription.onDone(() => print("done"));

Streams are generic

All the stream classes are also generic, which means that you get strongly typed data in the handlers.  For example, if you create a Stream<String>, then all the handler functions will also be expecting a string, as shown by the following code:

1
2
3
4
5
6
var data = [1,2,3,4,5]; // int's, valid
// var data = ["1","2","3","4","5"]; // strings, not valid
var stream = new Stream<int>.fromIterable(data); // Stream<int>
stream.listen((value) { // value must be an int
  print("listen: $value");
});


Some real world examples of consuming a stream

Now that you've seen how to consume data in a stream, let's take a look at a couple of real-world examples: handling button clicks, and reading data from a file.

Button clicks - dart:html

Buttons have a number of onXYZ streams defined, and the onClick stream is defined as Stream<MouseEvent> - this means that the data that you receive when you listen to the onClick stream is all going to be a MouseEvent datatype.

We'll set up a button, and a couple of event handlers.  One that will remain registered, and one that will unregister itself after the third button click.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
import 'dart:html';

void main() {
  var button = new ButtonElement();
  document.body.children.add(button);
  
  button.text = "Foo"; 
  var clickCount = 0;
  button.onClick.listen((mouseEvent) {
    print("clicked"); // remain subscribed for all clicks
  });
  
  var subscription = button.onClick.listen(null);
  subscription.onData((mouseEvent) {
    print("copy that");
    clickCount++;
    window.alert("Clicked");
    if (clickCount == 3) {
      subscription.cancel(); // unsubscribe after the third click
    }
  });  
}
When the button is clicked, the click counter is incremented, and on the third click, the second subscription is unsubscribed.

Reading a file - dart:io

The second real-world example shows how to read some data from a file on the filesystem.  The file.openRead() returns a stream containing the file's contents.  The stream (which contains a List<int>) is decoded using a StringDecoder class to allow for UTF-8 conversion.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
import 'dart:io';

void main() {
  File file = new File(r"c:\work\test.txt");
  file.openRead().transform(new StringDecoder()).listen(
      (String data) {
        print(data); // output the data
      }, 
      onError: (error) => print("Error, could not open file"),
      onDone: () => print("Finished reading data"));
} 

Next time: populating your own streams

Feel free to leave comments, or contact me via my G+ profile (especially if you spot any inaccuracies.  The snippets were all tested with r19425).

Note: This blog post is now syndicated as an article on dartlang.org.  That version is more likely to stay correct (with respect to any breaking API changes).

5 comments:

  1. Should this example:

    broadcastStream
    .single
    .then((value) => print("single value: $value"),
    onError: (err) => print(err));

    Be written as:

    broadcastStream
    .single
    .then((value) => print("single value: $value"))
    .catchError((err) => print(err));

    I'm not quite sure of the exact difference, but most code seems to be written, the second way.

    ReplyDelete
    Replies
    1. As far as I can tell, they're both equivalent, but I've updated the article anyway to show both.

      Delete
    2. The second way, using catchError, will also catch any error that is thrown by the print function in the then part. In general, Future.then( f, onError: g) will not pass an exception thrown by f to the error handler g, so it will be unhandled and be caught at a higher level or terminate the program.

      Future.then(f).catchError(g) will give both errors from the future and errors from f to the error handler g.

      Delete
    3. Great! thanks for the explanation. I'll update the article.

      Delete
  2. Thanks for the explanation Bill. It sounds like Future.then(f).catchError(g), is almost always the way to go.

    ReplyDelete