Every time I talk about functional reactive programming and a stream of asynchronous events developers ask me if this is the same as Node streams. The two programming approaches are similar but there is an important difference. Node streams have state that changes (think file reading stream that goes from 'closed' to 'open', to 'data', to 'end'), while functional reactive streams have no internal state and instead deal with a single isolated event at a time.
For a good intro to the Node streams, see stream-handbook and Node.js stream playground. For a good intro to FRP see RxJs intro and intro to reactive you've been missing.
Let us see a small example implemented using both approaches to see the difference. Imagine a text file that contains the word "apple" several times
1 | this is an apple. |
We want to find how many times the word "apple" is in this file. We could implement this manually, of course
1 | var text = require('fs').readFileSync('./apples.txt', 'utf-8'); |
Streaming apples
We can avoid reading the entire (potentially huge) file into memory, and process one chunk of the text at a time using a read stream.
1 | var stream = require('fs').createReadStream('./apples.txt', { encoding: 'utf8' }); |
Setting the encoding to 'utf8' is important, otherwise the text
received in the on('data')
callback
will be a Buffer and not a String.
We had to use an external variable count
in order to actually count the apples. We also hard-coded the
console.log
logic in the on('end')
callback. How can we compose reading the file, finding the word "apple"
and printing the final count when the entire file has been processed using streams?
The composition method for connecting multiple streams is through the .pipe()
function. For example
to connect the file reading stream to the process standard output stream we can simply write the following
1 | require('fs') |
Let us write a subclass of Transform class that will find the word "apple" in the input and pass the "found" counter to the next stream.
1 | var Transform = require('stream').Transform; |
We are reusing an existing stream Transform class via .pipe
, and do not have to worry
about other individual events (like data
or end
).
Instead we will be printing the number of times the word "apple" is found in each chunk of the data
read from the file stream.
Notice that we must output either a String or a Buffer when calling the next stream from the _transform
callback in line // 1
. This makes passing arbitrary data between streams difficult, even if creating buffers
is simple using the built-in Buffer class. I believe that managing encodings and
processing data in chunk is making the streams computationally efficient,
but requires an extra boilerplate code.
Reactive apples
Reactive streams take a more general approach to events and data. Instead of strings and buffers, these streams pass arbitrary individual data items. For example we can create an observable sequence from a readable stream and then map text to numbers
1 | function find(text) { |
In this particular example, the entire file is processed as a single chunk. If a file is longer, we really
want to search for apples inside the individual file chunks and then sum the numbers up at the end. To better show
this behavior, I repeated the same text inside apples.txt
multiple times.
1 | this is an apple. |
The new larger file has the word "apple" 5280 times. The above reactive program now shows how the input file stream only reads a chunk up to a certain size before passing data to the next step in the stream
$ node rx-apples.js
found 1725 apples
found 1724 apples
found 1724 apples
found 106 apples
To compute the sum of all counters we can use the built-in "sum" method that adds up all numbers generated by the previous stream. All we need to do is
1 | ... |
Because reactive streams libraries offer rich APIs (see RxJs or Bacon.js libraries),
there is high chance that a simple transform like .map
, .filter
or even .sum
is already implemented.
But the answer is wrong!
You are wondering why I claimed that the large apples.txt
file contains the word "apple" 5280 times, while
the Rx program finds only 5279 apples. This is due to the file read stream cutting off words across "apple"
when reading the file. We can see this if we print the start of the input text passed from the stream.
1 | ... |
The .tap
method allows inspecting the event, and passing it to the next step automatically. The program prints
1 | start: "this is an apple.\nan" |
The second start shows the word "apple" cut in half. This is the missing fruit. Unfortunately, correctly buffering the stream across the chunks, splitting at the line breaks is extra work ;)