Creating streams in Dart
Written by Lasse Nielsen
April 2013 (updated October 2018)
The dart:async library contains two types that are important for many Dart APIs: Stream and Future. Where a Future represents the result of a single computation, a stream is a sequence of results. You listen on a stream to get notified of the results (both data and errors) and of the stream shutting down. You can also pause while listening or stop listening to the stream before it is complete.
But this article is not about using streams. It’s about creating your own streams. You can create streams in a few ways:
- Transforming existing streams.
- Creating a stream from scratch by using an
async*
function. - Creating a stream by using a
StreamController
.
This article shows the code for each approach and gives tips to help you implement your stream correctly.
For help on using streams, see Asynchronous Programming: Streams.
Transforming an existing stream
The common case for creating streams is that you already have a stream, and you want to create a new stream based on the original stream’s events. For example you might have a stream of bytes that you want to convert to a stream of strings by UTF-8 decoding the input. The most general approach is to create a new stream that waits for events on the original stream and then outputs new events. Example:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
<span class="com">/// Splits a stream of consecutive strings into lines.</span> <span class="com">///</span> <span class="com">/// The input string is provided in smaller chunks through</span> <span class="com">/// the `source` stream.</span> <span class="typ">Stream</span><span class="pun"><</span><span class="typ">String</span><span class="pun">></span><span class="pln"> lines</span><span class="pun">(</span><span class="typ">Stream</span><span class="pun"><</span><span class="typ">String</span><span class="pun">></span><span class="pln"> source</span><span class="pun">)</span> <span class="kwd">async</span><span class="pun">*</span> <span class="pun">{</span> <span class="com">// Stores any partial line from the previous chunk.</span> <span class="kwd">var</span><span class="pln"> partial </span><span class="pun">=</span> <span class="str">''</span><span class="pun">;</span> <span class="com">// Wait until a new chunk is available, then process it.</span> <span class="kwd">await</span> <span class="kwd">for</span> <span class="pun">(</span><span class="kwd">var</span><span class="pln"> chunk </span><span class="kwd">in</span><span class="pln"> source</span><span class="pun">)</span> <span class="pun">{</span> <span class="kwd">var</span><span class="pln"> lines </span><span class="pun">=</span><span class="pln"> chunk</span><span class="pun">.</span><span class="pln">split</span><span class="pun">(</span><span class="str">'\n'</span><span class="pun">);</span><span class="pln"> lines</span><span class="pun">[</span><span class="lit">0</span><span class="pun">]</span> <span class="pun">=</span><span class="pln"> partial </span><span class="pun">+</span><span class="pln"> lines</span><span class="pun">[</span><span class="lit">0</span><span class="pun">];</span> <span class="com">// Prepend partial line.</span><span class="pln"> partial </span><span class="pun">=</span><span class="pln"> lines</span><span class="pun">.</span><span class="pln">removeLast</span><span class="pun">();</span> <span class="com">// Remove new partial line.</span> <span class="kwd">for</span> <span class="pun">(</span><span class="kwd">var</span><span class="pln"> line </span><span class="kwd">in</span><span class="pln"> lines</span><span class="pun">)</span> <span class="pun">{</span><span class="pln"> yield line</span><span class="pun">;</span> <span class="com">// Add lines to output stream.</span> <span class="pun">}</span> <span class="pun">}</span> <span class="com">// Add final partial line to output stream, if any.</span> <span class="kwd">if</span> <span class="pun">(</span><span class="pln">partial</span><span class="pun">.</span><span class="pln">isNotEmpty</span><span class="pun">)</span><span class="pln"> yield partial</span><span class="pun">;</span> <span class="pun">}</span> |
For many common transformations, you can use Stream
-supplied transforming methods such as map()
, where()
, expand()
, and take()
.
For example, assume you have a stream, counterStream
, that emits an increasing counter every second. Here’s how it might be implemented:
1 2 |
<span class="kwd">var</span><span class="pln"> counterStream </span><span class="pun">=</span> <span class="typ">Stream</span><span class="pun"><</span><span class="typ">int</span><span class="pun">>.</span><span class="pln">periodic</span><span class="pun">(</span><span class="typ">Duration</span><span class="pun">(</span><span class="pln">seconds</span><span class="pun">:</span> <span class="lit">1</span><span class="pun">),</span> <span class="pun">(</span><span class="pln">x</span><span class="pun">)</span> <span class="pun">=></span><span class="pln"> x</span><span class="pun">).</span><span class="pln">take</span><span class="pun">(</span><span class="lit">15</span><span class="pun">);</span> |
To quickly see the events, you can use code like this:
1 |
<span class="pln">counterStream</span><span class="pun">.</span><span class="pln">forEach</span><span class="pun">(</span><span class="pln">print</span><span class="pun">);</span> <span class="com">// Print an integer every second, 15 times.</span> |
To transform the stream events, you can invoke a transforming method such as map()
on the stream before listening to it. The method returns a new stream.
1 2 3 |
<span class="com">// Double the integer in each event.</span> <span class="kwd">var</span><span class="pln"> doubleCounterStream </span><span class="pun">=</span><span class="pln"> counterStream</span><span class="pun">.</span><span class="pln">map</span><span class="pun">((</span><span class="typ">int</span><span class="pln"> x</span><span class="pun">)</span> <span class="pun">=></span><span class="pln"> x </span><span class="pun">*</span> <span class="lit">2</span><span class="pun">);</span><span class="pln"> doubleCounterStream</span><span class="pun">.</span><span class="pln">forEach</span><span class="pun">(</span><span class="pln">print</span><span class="pun">);</span> |
Instead of map()
, you could use any other transforming method, such as the following:
1 2 3 |
<span class="pun">.</span><span class="pln">where</span><span class="pun">((</span><span class="typ">int</span><span class="pln"> x</span><span class="pun">)</span> <span class="pun">=></span><span class="pln"> x</span><span class="pun">.</span><span class="pln">isEven</span><span class="pun">)</span> <span class="com">// Retain only even integer events.</span> <span class="pun">.</span><span class="pln">expand</span><span class="pun">((</span><span class="kwd">var</span><span class="pln"> x</span><span class="pun">)</span> <span class="pun">=></span> <span class="pun">[</span><span class="pln">x</span><span class="pun">,</span><span class="pln"> x</span><span class="pun">])</span> <span class="com">// Duplicate each event.</span> <span class="pun">.</span><span class="pln">take</span><span class="pun">(</span><span class="lit">5</span><span class="pun">)</span> <span class="com">// Stop after the first five events.</span> |
Often, a transforming method is all you need. However, if you need even more control over the transformation, you can specify a StreamTransformerwith Stream
’s transform()
method. The platform libraries provide stream transformers for many common tasks. For example, the following code uses the utf8.decoder
and LineSplitter
transformers provided by the dart:convert library.
1 2 3 |
<span class="typ">Stream</span><span class="pun"><</span><span class="typ">List</span><span class="pun"><</span><span class="typ">int</span><span class="pun">>></span><span class="pln"> content </span><span class="pun">=</span> <span class="typ">File</span><span class="pun">(</span><span class="str">'someFile.txt'</span><span class="pun">).</span><span class="pln">openRead</span><span class="pun">();</span> <span class="typ">List</span><span class="pun"><</span><span class="typ">String</span><span class="pun">></span><span class="pln"> lines </span><span class="pun">=</span> <span class="kwd">await</span><span class="pln"> content</span><span class="pun">.</span><span class="pln">transform</span><span class="pun">(</span><span class="pln">utf8</span><span class="pun">.</span><span class="pln">decoder</span><span class="pun">).</span><span class="pln">transform</span><span class="pun">(</span><span class="typ">LineSplitter</span><span class="pun">()).</span><span class="pln">toList</span><span class="pun">();</span> |
Creating a stream from scratch
One way to create a new stream is with an asynchronous generator (async*
) function. The stream is created when the function is called, and the function’s body starts running when the stream is listened to. When the function returns, the stream closes. Until the function returns, it can emit events on the stream by using yield
or yield*
statements.
Here’s a primitive example that emits numbers at regular intervals:
1 2 3 4 5 6 7 8 |
<span class="typ">Stream</span><span class="pun"><</span><span class="typ">int</span><span class="pun">></span><span class="pln"> timedCounter</span><span class="pun">(</span><span class="typ">Duration</span><span class="pln"> interval</span><span class="pun">,</span> <span class="pun">[</span><span class="typ">int</span><span class="pln"> maxCount</span><span class="pun">])</span> <span class="kwd">async</span><span class="pun">*</span> <span class="pun">{</span> <span class="typ">int</span><span class="pln"> i </span><span class="pun">=</span> <span class="lit">0</span><span class="pun">;</span> <span class="kwd">while</span> <span class="pun">(</span><span class="kwd">true</span><span class="pun">)</span> <span class="pun">{</span> <span class="kwd">await</span> <span class="typ">Future</span><span class="pun">.</span><span class="pln">delayed</span><span class="pun">(</span><span class="pln">interval</span><span class="pun">);</span><span class="pln"> yield i</span><span class="pun">++;</span> <span class="kwd">if</span> <span class="pun">(</span><span class="pln">i </span><span class="pun">==</span><span class="pln"> maxCount</span><span class="pun">)</span> <span class="kwd">break</span><span class="pun">;</span> <span class="pun">}</span> <span class="pun">}</span> |
This function returns a Stream
. When that stream is listened to, the body starts running. It repeatedly delays for the requested interval and then yields the next number. If the count
parameter is omitted, there is no stop condition on the loop, so the stream outputs increasingly larger numbers forever – or until the listener cancels its subscription.
When the listener cancels (by invoking cancel()
on the StreamSubscription
object returned by the listen()
method), then the next time the body reaches a yield
statement, the yield
instead acts as a return
statement. Any enclosing finally
block is executed, and the function exits. If the function attempts to yield a value before exiting, that fails and acts as a return.
When the function finally exits, the future returned by the cancel()
method completes. If the function exits with an error, the future completes with that error; otherwise, it completes with null
.
Another, more useful example is a function that converts a sequence of futures to a stream:
1 2 3 4 5 6 |
<span class="typ">Stream</span><span class="pun"><</span><span class="typ">T</span><span class="pun">></span><span class="pln"> streamFromFutures</span><span class="pun"><</span><span class="typ">T</span><span class="pun">>(</span><span class="typ">Iterable</span><span class="pun"><</span><span class="typ">Future</span><span class="pun"><</span><span class="typ">T</span><span class="pun">>></span><span class="pln"> futures</span><span class="pun">)</span> <span class="kwd">async</span><span class="pun">*</span> <span class="pun">{</span> <span class="kwd">for</span> <span class="pun">(</span><span class="kwd">var</span><span class="pln"> future </span><span class="kwd">in</span><span class="pln"> futures</span><span class="pun">)</span> <span class="pun">{</span> <span class="kwd">var</span><span class="pln"> result </span><span class="pun">=</span> <span class="kwd">await</span><span class="pln"> future</span><span class="pun">;</span><span class="pln"> yield result</span><span class="pun">;</span> <span class="pun">}</span> <span class="pun">}</span> |
This function asks the futures
iterable for a new future, waits for that future, emits the resulting value, and then loops. If a future completes with an error, then the stream completes with that error.
It’s rare to have an async*
function building a stream from nothing. It needs to get its data from somewhere, and most often that somewhere is another stream. In some cases, like the sequence of futures above, the data comes from other asynchronous event sources. In many cases, however, an async*
function is too simplistic to easily handle multiple data sources. That’s where the StreamController
class comes in.
Using a StreamController
If the events of your stream comes from different parts of your program, and not just from a stream or futures that can traversed by an async
function, then use a StreamController to create and populate the stream.
A StreamController
gives you a new stream and a way to add events to the stream at any point, and from anywhere. The stream has all the logic necessary to handle listeners and pausing. You return the stream and keep the controller to yourself.
The following example (from stream_controller_bad.dart) shows a basic, though flawed, usage of StreamController
to implement the timedCounter()
function from the previous examples. This code creates a stream to return, and then feeds data into it based on timer events, which are neither futures nor stream events.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
<span class="com">// NOTE: This implementation is FLAWED!</span> <span class="com">// It starts before it has subscribers, and it doesn't implement pause.</span> <span class="typ">Stream</span><span class="pun"><</span><span class="typ">int</span><span class="pun">></span><span class="pln"> timedCounter</span><span class="pun">(</span><span class="typ">Duration</span><span class="pln"> interval</span><span class="pun">,</span> <span class="pun">[</span><span class="typ">int</span><span class="pln"> maxCount</span><span class="pun">])</span> <span class="pun">{</span> <span class="kwd">var</span><span class="pln"> controller </span><span class="pun">=</span> <span class="typ">StreamController</span><span class="pun"><</span><span class="typ">int</span><span class="pun">>();</span> <span class="typ">int</span><span class="pln"> counter </span><span class="pun">=</span> <span class="lit">0</span><span class="pun">;</span> <span class="typ">void</span><span class="pln"> tick</span><span class="pun">(</span><span class="typ">Timer</span><span class="pln"> timer</span><span class="pun">)</span> <span class="pun">{</span><span class="pln"> counter</span><span class="pun">++;</span><span class="pln"> controller</span><span class="pun">.</span><span class="pln">add</span><span class="pun">(</span><span class="pln">counter</span><span class="pun">);</span> <span class="com">// Ask stream to send counter values as event.</span> <span class="kwd">if</span> <span class="pun">(</span><span class="pln">maxCount </span><span class="pun">!=</span> <span class="kwd">null</span> <span class="pun">&&</span><span class="pln"> counter </span><span class="pun">>=</span><span class="pln"> maxCount</span><span class="pun">)</span> <span class="pun">{</span><span class="pln"> timer</span><span class="pun">.</span><span class="pln">cancel</span><span class="pun">();</span><span class="pln"> controller</span><span class="pun">.</span><span class="pln">close</span><span class="pun">();</span> <span class="com">// Ask stream to shut down and tell listeners.</span> <span class="pun">}</span> <span class="pun">}</span> <span class="typ">Timer</span><span class="pun">.</span><span class="pln">periodic</span><span class="pun">(</span><span class="pln">interval</span><span class="pun">,</span><span class="pln"> tick</span><span class="pun">);</span> <span class="com">// BAD: Starts before it has subscribers.</span> <span class="kwd">return</span><span class="pln"> controller</span><span class="pun">.</span><span class="pln">stream</span><span class="pun">;</span> <span class="pun">}</span> |
As before, you can use the stream returned by timedCounter()
like this: [PENDING: Did we show this before?]
1 2 |
<span class="kwd">var</span><span class="pln"> counterStream </span><span class="pun">=</span><span class="pln"> timedCounter</span><span class="pun">(</span><span class="kwd">const</span> <span class="typ">Duration</span><span class="pun">(</span><span class="pln">seconds</span><span class="pun">:</span> <span class="lit">1</span><span class="pun">),</span> <span class="lit">15</span><span class="pun">);</span><span class="pln"> counterStream</span><span class="pun">.</span><span class="pln">listen</span><span class="pun">(</span><span class="pln">print</span><span class="pun">);</span> <span class="com">// Print an integer every second, 15 times.</span> |
This implementation of timedCounter()
has a couple of problems:
- It starts producing events before it has subscribers.
- It keeps producing events even if the subscriber requests a pause.
As the next sections show, you can fix both of these problems by specifying callbacks such as onListen
and onPause
when creating the StreamController
.
Waiting for a subscription
As a rule, streams should wait for subscribers before starting their work. An async*
function does this automatically, but when using a StreamController
, you are in full control and can add events even when you shouldn’t. When a stream has no subscriber, its StreamController
buffers events, which can lead to a memory leak if the stream never gets a subscriber.
Try changing the code that uses the stream to the following:
1 2 3 4 5 6 7 8 9 |
<span class="typ">void</span><span class="pln"> listenAfterDelay</span><span class="pun">()</span> <span class="kwd">async</span> <span class="pun">{</span> <span class="kwd">var</span><span class="pln"> counterStream </span><span class="pun">=</span><span class="pln"> timedCounter</span><span class="pun">(</span><span class="kwd">const</span> <span class="typ">Duration</span><span class="pun">(</span><span class="pln">seconds</span><span class="pun">:</span> <span class="lit">1</span><span class="pun">),</span> <span class="lit">15</span><span class="pun">);</span> <span class="kwd">await</span> <span class="typ">Future</span><span class="pun">.</span><span class="pln">delayed</span><span class="pun">(</span><span class="kwd">const</span> <span class="typ">Duration</span><span class="pun">(</span><span class="pln">seconds</span><span class="pun">:</span> <span class="lit">5</span><span class="pun">));</span> <span class="com">// After 5 seconds, add a listener.</span> <span class="kwd">await</span> <span class="kwd">for</span> <span class="pun">(</span><span class="typ">int</span><span class="pln"> n </span><span class="kwd">in</span><span class="pln"> counterStream</span><span class="pun">)</span> <span class="pun">{</span><span class="pln"> print</span><span class="pun">(</span><span class="pln">n</span><span class="pun">);</span> <span class="com">// Print an integer every second, 15 times.</span> <span class="pun">}</span> <span class="pun">}</span> |
When this code runs, nothing is printed for the first 5 seconds, although the stream is doing work. Then the listener is added, and the first 5 or so events are printed all at once, since they were buffered by the StreamController
.
To be notified of subscriptions, specify an onListen
argument when you create the StreamController
. The onListen
callback is called when the stream gets its first subscriber. If you specify an onCancel
callback, it’s called when the controller loses its last subscriber. In the preceding example,Timer.periodic()
should move to an onListen
handler, as shown in the next section.
Honoring the pause state
Avoid producing events when the listener has requested a pause. An async*
function automatically pauses at a yield
statement while the stream subscription is paused. A StreamController
, on the other hand, buffers events during the pause. If the code providing the events doesn’t respect the pause, the size of the buffer can grow indefinitely. Also, if the listener stops listening soon after pausing, then the work spent creating the buffer is wasted.
To see what happens without pause support, try changing the code that uses the stream to the following:
1 2 3 4 5 6 7 8 9 10 11 12 |
<span class="typ">void</span><span class="pln"> listenWithPause</span><span class="pun">()</span> <span class="pun">{</span> <span class="kwd">var</span><span class="pln"> counterStream </span><span class="pun">=</span><span class="pln"> timedCounter</span><span class="pun">(</span><span class="kwd">const</span> <span class="typ">Duration</span><span class="pun">(</span><span class="pln">seconds</span><span class="pun">:</span> <span class="lit">1</span><span class="pun">),</span> <span class="lit">15</span><span class="pun">);</span> <span class="typ">StreamSubscription</span><span class="pun"><</span><span class="typ">int</span><span class="pun">></span><span class="pln"> subscription</span><span class="pun">;</span><span class="pln"> subscription </span><span class="pun">=</span><span class="pln"> counterStream</span><span class="pun">.</span><span class="pln">listen</span><span class="pun">((</span><span class="typ">int</span><span class="pln"> counter</span><span class="pun">)</span> <span class="pun">{</span><span class="pln"> print</span><span class="pun">(</span><span class="pln">counter</span><span class="pun">);</span> <span class="com">// Print an integer every second.</span> <span class="kwd">if</span> <span class="pun">(</span><span class="pln">counter </span><span class="pun">==</span> <span class="lit">5</span><span class="pun">)</span> <span class="pun">{</span> <span class="com">// After 5 ticks, pause for five seconds, then resume.</span><span class="pln"> subscription</span><span class="pun">.</span><span class="pln">pause</span><span class="pun">(</span><span class="typ">Future</span><span class="pun">.</span><span class="pln">delayed</span><span class="pun">(</span><span class="kwd">const</span> <span class="typ">Duration</span><span class="pun">(</span><span class="pln">seconds</span><span class="pun">:</span> <span class="lit">5</span><span class="pun">)));</span> <span class="pun">}</span> <span class="pun">});</span> <span class="pun">}</span> |
When the five seconds of pause are up, the events fired during that time are all received at once. That happens because the stream’s source doesn’t honor pauses and keeps adding events to the stream. So the stream buffers the events, and it then empties its buffer when the stream becomes unpaused.
The following version of timedCounter()
(from stream_controller.dart) implements pause by using the onListen
, onPause
, onResume
, and onCancel
callbacks on the StreamController
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
<span class="typ">Stream</span><span class="pun"><</span><span class="typ">int</span><span class="pun">></span><span class="pln"> timedCounter</span><span class="pun">(</span><span class="typ">Duration</span><span class="pln"> interval</span><span class="pun">,</span> <span class="pun">[</span><span class="typ">int</span><span class="pln"> maxCount</span><span class="pun">])</span> <span class="pun">{</span> <span class="typ">StreamController</span><span class="pun"><</span><span class="typ">int</span><span class="pun">></span><span class="pln"> controller</span><span class="pun">;</span> <span class="typ">Timer</span><span class="pln"> timer</span><span class="pun">;</span> <span class="typ">int</span><span class="pln"> counter </span><span class="pun">=</span> <span class="lit">0</span><span class="pun">;</span> <span class="typ">void</span><span class="pln"> tick</span><span class="pun">(</span><span class="pln">_</span><span class="pun">)</span> <span class="pun">{</span><span class="pln"> counter</span><span class="pun">++;</span><span class="pln"> controller</span><span class="pun">.</span><span class="pln">add</span><span class="pun">(</span><span class="pln">counter</span><span class="pun">);</span> <span class="com">// Ask stream to send counter values as event.</span> <span class="kwd">if</span> <span class="pun">(</span><span class="pln">counter </span><span class="pun">==</span><span class="pln"> maxCount</span><span class="pun">)</span> <span class="pun">{</span><span class="pln"> timer</span><span class="pun">.</span><span class="pln">cancel</span><span class="pun">();</span><span class="pln"> controller</span><span class="pun">.</span><span class="pln">close</span><span class="pun">();</span> <span class="com">// Ask stream to shut down and tell listeners.</span> <span class="pun">}</span> <span class="pun">}</span> <span class="typ">void</span><span class="pln"> startTimer</span><span class="pun">()</span> <span class="pun">{</span><span class="pln"> timer </span><span class="pun">=</span> <span class="typ">Timer</span><span class="pun">.</span><span class="pln">periodic</span><span class="pun">(</span><span class="pln">interval</span><span class="pun">,</span><span class="pln"> tick</span><span class="pun">);</span> <span class="pun">}</span> <span class="typ">void</span><span class="pln"> stopTimer</span><span class="pun">()</span> <span class="pun">{</span> <span class="kwd">if</span> <span class="pun">(</span><span class="pln">timer </span><span class="pun">!=</span> <span class="kwd">null</span><span class="pun">)</span> <span class="pun">{</span><span class="pln"> timer</span><span class="pun">.</span><span class="pln">cancel</span><span class="pun">();</span><span class="pln"> timer </span><span class="pun">=</span> <span class="kwd">null</span><span class="pun">;</span> <span class="pun">}</span> <span class="pun">}</span><span class="pln"> controller </span><span class="pun">=</span> <span class="typ">StreamController</span><span class="pun"><</span><span class="typ">int</span><span class="pun">>(</span><span class="pln"> onListen</span><span class="pun">:</span><span class="pln"> startTimer</span><span class="pun">,</span><span class="pln"> onPause</span><span class="pun">:</span><span class="pln"> stopTimer</span><span class="pun">,</span><span class="pln"> onResume</span><span class="pun">:</span><span class="pln"> startTimer</span><span class="pun">,</span><span class="pln"> onCancel</span><span class="pun">:</span><span class="pln"> stopTimer</span><span class="pun">);</span> <span class="kwd">return</span><span class="pln"> controller</span><span class="pun">.</span><span class="pln">stream</span><span class="pun">;</span> <span class="pun">}</span> |
Run this code with the listenWithPause()
function above. You’ll see that it stops counting while paused, and it resumes nicely afterwards.
You must use all of the listeners—onListen
, onCancel
, onPause
, and onResume
—to be notified of changes in pause state. The reason is that if the subscription and pause states both change at the same time, only the onListen
or onCancel
callback is called.
Final hints
When creating a stream without using an async* function, keep these tips in mind:
- Be careful when using a synchronous controller—for example, one created using
StreamController(sync: true)
. When you send an event on an unpaused synchronous controller (for example, using theadd()
,addError()
, orclose()
methods defined by EventSink), the event is sent immediately to all listeners on the stream.Stream
listeners must never be called until the code that added the listener has fully returned, and using a synchronous controller at the wrong time can break this promise and cause good code to fail. Avoid using synchronous controllers. - If you use
StreamController
, theonListen
callback is called before thelisten
call returns theStreamSubscription
. Don’t let theonListen
callback depend on the subscription already existing. For example, in the following code, anonListen
event fires (andhandler
is called) before thesubscription
variable has a valid value.
1<span class="pln">subscription </span><span class="pun">=</span><span class="pln"> stream</span><span class="pun">.</span><span class="pln">listen</span><span class="pun">(</span><span class="pln">handler</span><span class="pun">);</span> - The
onListen
,onPause
,onResume
, andonCancel
callbacks defined byStreamController
are called by the stream when the stream’s listener state changes, but never during the firing of an event or during the call of another state change handler. In those cases, the state change callback is delayed until the previous callback is complete. - Don’t try to implement the
Stream
interface yourself. It’s easy to get the interaction between events, callbacks, and adding and removing listeners subtly wrong. Always use an existing stream, possibly from aStreamController
, to implement thelisten
call of a new stream. - Although it’s possible to create classes that extend
Stream
with more functionality by extending theStream
class and implementing thelisten
method and the extra functionality on top, that is generally not recommended because it introduces a new type that users have to consider. Instead you can often make a class that has aStream
(and more) — instead of one that is a Stream (and more).