Summary of "Dartwatch: Getting your feet wet with Streams" (via tldr.io)
- Streams are the new, unified way to interact with async APIs in Dart.
- They emit a repeating series of events (such as messages, file data, user clicks)
- A stream is either single-subscriber or multi-subscriber
- Example stream usage is button.onClick.listen((e) => print("button clicked"); or reading a file from the file system
- Streams have a number of methods to take partial data (first, take...), check where data matches (contains, any, every), and transform (List
to String).
Over the last few months you can't help but notice that Streams have appeared in Dart in a big way.
Whether running in the browser as part of the various html events, such as button.onClick, or on the server as part of the dart:io changes, Streams form a unified interface to anything that might send out a repeating series of data.
In this, (the first of two blog posts), you will learn about:
- how to consume streams (this post)
- how to use the Stream classes to send your own data (coming soon)
Before we get started, though, it is important to note that Streams are part of the dart:async library, and share a close relationship with Dart's async staple: Future due to their asynchronous nature (you can't block code until a user clicks a button!)
Background reading: Futures
The Future class is used for async communication within various Dart. A common example is a browser HttpRequest (or AJAX request). You have some code running in the browser that wants to get a value from the server, let's say, the current number of logged on users. If your client-side code called the server and then waited (blocking) for the server to respond, the UI would freeze up until the server responded (due to code execution being halted).
Fortunately (and provided by the A in AJAX), this call to the server is asynchronous, and modelled by Dart's Future API, which returns a future value wrapped up in a callback exposed by the then() function.
The following snippet shows how this might work, with the callback function in bold italics:
var url = "http://example.com/userCount";
HttpRequest.getString(url).then( (String result) {
print("User count: $result");
});
This pattern is used by streams to retrieve or manipulate data that the stream is sending out to its consumers.
Fortunately (and provided by the A in AJAX), this call to the server is asynchronous, and modelled by Dart's Future API, which returns a future value wrapped up in a callback exposed by the then() function.
The following snippet shows how this might work, with the callback function in bold italics:
var url = "http://example.com/userCount";
HttpRequest.getString(url).then( (String result) {
print("User count: $result");
});
This pattern is used by streams to retrieve or manipulate data that the stream is sending out to its consumers.
What are streams for?
Imagine you are writing a chat application. On a single client, you will be receiving messages and displaying them to the user. You can't simply write a while loop, because that will block execution, so you need to use async callbacks. This is an ideal use case for streams - you have one part of your code pushing data into the stream, and another part of your code listening to the stream.
Key concepts:
Populating a stream: Data gets into a stream from a StreamController
Consuming a stream: Data is sent out of a stream to a StreamSubscriber (or possibly multiple subscribers).
We'll look at the consuming a stream first as you're more likely to come across streams as a consumer from existing APIs within Dart.
Consuming a stream
Lets take a look at some simple stream code. For simplicity we're going to create a stream from a fixed, literal list at the moment by using the stream's fromIterable() constructor, rather than by dynamically populating it with a StreamController.
1 2 | var data = [1,2,3,4,5]; // some sample data var stream = new Stream.fromIterable(data); // create the stream |
Now that we have a stream that is ready to send out some data, we can use that stream to listen to some data.
The typical way is to use the stream's listen() method to subscribe to the stream. This has a number of optional parameters, and one mandatory parameter, which is the onData handler callback function:
1 2 3 4 5 6 7 8 9 10 11 | import 'dart:async'; main() { var data = [1,2,3,4,5]; // some sample data var stream = new Stream.fromIterable(data); // create the stream // subscribe to the streams events stream.listen( (value) { // print("Received: $value"); // onData handler }); // } |
The listen() method is fired every time some data is received. In in our stream, the listen() callback is called for each of the data elements, so the output of running this code is as expected:
Received: 1 Received: 2 Received: 3 Received: 4 Received: 5
There are other ways to consume data from the stream, using properties such as first, last, length, isEmpty. Each of these properties return a future: for example:
1 2 3 4 | stream.first.then((value) => print("stream.first: $value")); // 1 // stream.last.then((value) => print("stream.last: $value")); // 5 // stream.isEmpty.then((value) => print("stream.isEmpty: $value")); // false // stream.length.then((value) => print("stream.length: $value")); // 5 |
You can convert the single subscriber stream into a multi subscriber, or broadcast stream by using the asBroadcastStream() method, as shown below:
1 2 3 4 5 6 7 8 9 10 11 12 13 | import 'dart:async'; void main() { var data = [1,2,3,4,5]; var stream = new Stream.fromIterable(data); var broadcastStream = stream.asBroadcastStream(); broadcastStream.listen((value) => print("stream.listen: $value")); broadcastStream.first.then((value) => print("stream.first: $value")); broadcastStream.last.then((value) => print("stream.last: $value")); broadcastStream.isEmpty.then((value) => print("stream.isEmpty: $value")); broadcastStream.length.then((value) => print("stream.length: $value")); } |
Now that the stream allows multiple subscribers, you can add multiple listeners. You can check whether a stream is a broadcast stream by checking the stream.isBroadcast property.
Running this produces the following output:
Common Stream methods
There are lots of methods available on the stream class, be sure to check out the API docs for the complete list. In the following section, I'll describe some of the more common methods.Subsets of streaming data
Streams also have some useful methods for extracting parts of the data being sent out from the stream. The take(), skip(), takeWhile() and skipWhile() and where() allow you to take a subset of data, as shown by the following example. Each outputs its own stream that you can listen to.1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | broadcastStream
.where((value) => value % 2 == 0) // divisble by 2
.listen((value) => print("where: $value")); // where: 2
// where: 4
broadcastStream
.take(3) // takes only the first three elements
.listen((value) => print("take: $value")); // take: 1
// take: 2
// take: 3
broadcastStream
.skip(3) // skips the first three elements
.listen((value) => print("skip: $value")); // skip: 4
// skip: 5
broadcastStream
.takeWhile((value) => value < 3) // take while true
.listen((value) => print("takeWhile: $value")); // takeWhile: 1
// takeWhile: 2
broadcastStream
.skipWhile((value) => value < 3) // skip while true
.listen((value) => print("skipWhile: $value")); // skipWhile: 4
// skipWhile: 5
|
Transforming
Another useful method is the transform() method, which takes a StreamTransformer instance. This allows you to modify the contents of the stream. the StreamTransformer constructor takes a handleData function, that is called for each value passed from the stream. You can modify the value as you wish, and add it back to the StreamSink (more on that later), which results in the modified values being output on the transform() method's stream. The example below takes our data [1,2,3,4,5] and converts each item into two new String values "Message n" and "Body n" values. Each of these are placed onto the new stream.1 2 3 4 5 6 7 8 9 | // define a stream transformer var transformer = new StreamTransformer(handleData: (value, sink) { // create two new values from the original value sink.add("Message: $value"); sink.add("Body: $value"); }); // transform the stream and listen to its output stream.transform(transformer).listen((value) => print("listen: $value")); |
listen: Message: 1 listen: Body: 1 listen: Message: 2 listen: Body: 2 listen: Message: 3 listen: Body: 3 listen: Message: 4 listen: Body: 4 listen: Message: 5 listen: Body: 5
Validating
Sometimes, you want to validate that the data returned from a stream meets certain conditions. A number of functions are provided which return Future<bool> values: any(), every(), contains():1 2 3 4 5 6 7 8 9 10 11 | broadcastStream
.any((value) => value < 5)
.then((result) => print("Any less than 5?: $result")); // true
broadcastStream
.every((value) => value < 5)
.then((result) => print("All less than 5?: $result")); // false
broadcastStream
.contains(4)
.then((result) => print("Contains 4?: $result")); // true
|
Single value streams
Some streams are designed to return only a single value, and you want to ensure that you only retrieve a single value from them. The single getter and singleMatching() method both return a future containing the single value, or raise an error if they don't. For example, with our data set containing 5 values, the following will return the value 1:
1 2 3 | broadcastStream
.singleMatching((value) => value < 2)
.then((value) => print("single value: $value"));
|
1 2 3 | broadcastStream
.single
.then((value) => print("single value: $value"));
|
Note: Thanks to Florian who's just pointed out that the xMatching (such as singleMatching) has been renamed xWhere (such as singleWhere) in the current bleeding edge build. This blog was written with r19425, so likely you'll be using the Where version rather than the Matching version.
This brings us neatly on to...
Error handling in streams and futures
There is already an excellent article about handling errors with future based APIs on the dartlang website, so I'll not repeat that here. It's useful to note, though, that we could rewrite our previous snippet to include some error handling so that we can detect that the single() call has failed. A Future's then() function returns a future, and you can use its catchError handler. This catchError handler will catch any errors thrown within the .then() callback:
A StreamSubscription has a number of handlers, namely: onData, onError and onDone. Each of these can be assigned via the listen() function, or later, via the returned StreamSubscription object. As far as I can tell, the following two are equivalent. Note the onError handler, which you can use to catch errors output from the stream.
and
These two both print the same output:
One of the benefits of using the form var subscription = stream.listen(null) and then setting up the onData handler separately means that you can use the subscription object in the data handler itself.
The onDone handler is called when there is no more data, and the underlying stream is closed.
1 2 3 4 5 | broadcastStream
.single
.then((value) => print("single value: $value"))
.catchError((err) => print(err));
// output: Bad State: More than one element
|
Error handling with a stream subscription
When you use the listen() function to listen to values coming from a stream, you have the option of adding error handling. The listen function creates a StreamSubscription instance, which is the return value of the listen() function.A StreamSubscription has a number of handlers, namely: onData, onError and onDone. Each of these can be assigned via the listen() function, or later, via the returned StreamSubscription object. As far as I can tell, the following two are equivalent. Note the onError handler, which you can use to catch errors output from the stream.
1 2 3 4 5 | // setup the handlers through the subscription's handler methods var subscription = stream.listen(null); subscription.onData((value) => print("listen: $value")); subscription.onError((err) => print("error: $err")); subscription.onDone(() => print("done")); |
1 2 3 4 5 | // setup the handlers as arguments to the listen() function var subscription = stream.listen( (value) => print("listen: $value"), onError: (err) => print("error: $err"), onDone: () => print("done")); |
listen: 1 listen: 2 listen: 3 listen: 4 listen: 5 done
One of the benefits of using the form var subscription = stream.listen(null) and then setting up the onData handler separately means that you can use the subscription object in the data handler itself.
The onDone handler is called when there is no more data, and the underlying stream is closed.
Unsubscribing from the stream
You can also use the StreamSubscription object to unsubscribe from the stream, by using its cancel() method. For example, this unsubscribes from the stream after the value 2 is received, so never gets receives the onDone message.
1 2 3 4 5 6 7 | var subscription = stream.listen(null); subscription.onData((value) { print("listen: $value"); if (value == 2) subscription.cancel(); // cancel the subscription }); subscription.onError((err) => print("error: $err")); subscription.onDone(() => print("done")); |
Streams are generic
All the stream classes are also generic, which means that you get strongly typed data in the handlers. For example, if you create a Stream<String>, then all the handler functions will also be expecting a string, as shown by the following code:
1 2 3 4 5 6 | var data = [1,2,3,4,5]; // int's, valid // var data = ["1","2","3","4","5"]; // strings, not valid var stream = new Stream<int>.fromIterable(data); // Stream<int> stream.listen((value) { // value must be an int print("listen: $value"); }); |
Some real world examples of consuming a stream
Now that you've seen how to consume data in a stream, let's take a look at a couple of real-world examples: handling button clicks, and reading data from a file.
Button clicks - dart:html
Buttons have a number of onXYZ streams defined, and the onClick stream is defined as Stream<MouseEvent> - this means that the data that you receive when you listen to the onClick stream is all going to be a MouseEvent datatype.
We'll set up a button, and a couple of event handlers. One that will remain registered, and one that will unregister itself after the third button click.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | import 'dart:html'; void main() { var button = new ButtonElement(); document.body.children.add(button); button.text = "Foo"; var clickCount = 0; button.onClick.listen((mouseEvent) {
print("clicked"); // remain subscribed for all clicks
});
var subscription = button.onClick.listen(null);
subscription.onData((mouseEvent) {
print("copy that");
clickCount++;
window.alert("Clicked");
if (clickCount == 3) {
subscription.cancel(); // unsubscribe after the third click
} }); } |
When the button is clicked, the click counter is incremented, and on the third click, the second subscription is unsubscribed.
Reading a file - dart:io
The second real-world example shows how to read some data from a file on the filesystem. The file.openRead() returns a stream containing the file's contents. The stream (which contains a List<int>) is decoded using a StringDecoder class to allow for UTF-8 conversion.
1 2 3 4 5 6 7 8 9 10 11 | import 'dart:io'; void main() { File file = new File(r"c:\work\test.txt"); file.openRead().transform(new StringDecoder()).listen( (String data) { print(data); // output the data }, onError: (error) => print("Error, could not open file"), onDone: () => print("Finished reading data")); } |
Next time: populating your own streams
Feel free to leave comments, or contact me via my G+ profile (especially if you spot any inaccuracies. The snippets were all tested with r19425).
Note: This blog post is now syndicated as an article on dartlang.org. That version is more likely to stay correct (with respect to any breaking API changes).
Note: This blog post is now syndicated as an article on dartlang.org. That version is more likely to stay correct (with respect to any breaking API changes).
Should this example:
ReplyDeletebroadcastStream
.single
.then((value) => print("single value: $value"),
onError: (err) => print(err));
Be written as:
broadcastStream
.single
.then((value) => print("single value: $value"))
.catchError((err) => print(err));
I'm not quite sure of the exact difference, but most code seems to be written, the second way.
As far as I can tell, they're both equivalent, but I've updated the article anyway to show both.
DeleteThe second way, using catchError, will also catch any error that is thrown by the print function in the then part. In general, Future.then( f, onError: g) will not pass an exception thrown by f to the error handler g, so it will be unhandled and be caught at a higher level or terminate the program.
DeleteFuture.then(f).catchError(g) will give both errors from the future and errors from f to the error handler g.
Great! thanks for the explanation. I'll update the article.
DeleteThanks for the explanation Bill. It sounds like Future.then(f).catchError(g), is almost always the way to go.
ReplyDelete