What is the difference between streams and functional reactive programming?

Count a specific word in a text file using streams and reactive programming.

Every time I talk about functional reactive programming and a stream of asynchronous events developers ask me if this is the same as Node streams. The two programming approaches are similar but there is an important difference. Node streams have state that changes (think file reading stream that goes from 'closed' to 'open', to 'data', to 'end'), while functional reactive streams have no internal state and instead deal with a single isolated event at a time.

For a good intro to the Node streams, see stream-handbook and Node.js stream playground. For a good intro to FRP see RxJs intro and intro to reactive you've been missing.

Let us see a small example implemented using both approaches to see the difference. Imagine a text file that contains the word "apple" several times

apples.txt
1
2
3
this is an apple.
an apple a day keeps doctor away.
but this is an orange.

We want to find how many times the word "apple" is in this file. We could implement this manually, of course

count apples
1
2
3
4
5
var text = require('fs').readFileSync('./apples.txt', 'utf-8');
var pattern = /apple/g;
var count = text.match(pattern).length;
console.log('found', count, 'apples');
// found 2 apples

Streaming apples

We can avoid reading the entire (potentially huge) file into memory, and process one chunk of the text at a time using a read stream.

reading a stream
1
2
3
4
5
6
7
8
9
10
var stream = require('fs').createReadStream('./apples.txt', { encoding: 'utf8' });
var count = 0;
stream.on('data', function (text) {
var pattern = /apple/g;
count += text.match(pattern).length;
});
stream.on('end', function () {
console.log('found', count, 'apples');
});
// found 2 apples

Setting the encoding to 'utf8' is important, otherwise the text received in the on('data') callback will be a Buffer and not a String.

We had to use an external variable count in order to actually count the apples. We also hard-coded the console.log logic in the on('end') callback. How can we compose reading the file, finding the word "apple" and printing the final count when the entire file has been processed using streams?

The composition method for connecting multiple streams is through the .pipe() function. For example to connect the file reading stream to the process standard output stream we can simply write the following

1
2
3
4
5
6
7
8
require('fs')
.createReadStream('./apples.txt', { encoding: 'utf8' })
.pipe(process.stdout);
/*
this is an apple.
an apple a day keeps doctor away.
but this is an orange.
*/

Let us write a subclass of Transform class that will find the word "apple" in the input and pass the "found" counter to the next stream.

find apples Transform stream
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
var Transform = require('stream').Transform;
var util = require('util');
util.inherits(AppleFinder, Transform);
function AppleFinder(opt) {
Transform.call(this, opt);
}
var StringDecoder = require('string_decoder').StringDecoder;
var decoder = new StringDecoder('utf8');
AppleFinder.prototype._transform = function (data, encoding, callback) {
var text = decoder.write(data);
var pattern = /apple/g;
var count = text.match(pattern).length;
callback(null, String(count) + '\n'); // 1
}
require('fs')
.createReadStream('./apples.txt', { encoding: 'utf8' })
.pipe(new AppleFinder())
.pipe(process.stdout);
// 2

We are reusing an existing stream Transform class via .pipe, and do not have to worry about other individual events (like data or end). Instead we will be printing the number of times the word "apple" is found in each chunk of the data read from the file stream.

Notice that we must output either a String or a Buffer when calling the next stream from the _transform callback in line // 1. This makes passing arbitrary data between streams difficult, even if creating buffers is simple using the built-in Buffer class. I believe that managing encodings and processing data in chunk is making the streams computationally efficient, but requires an extra boilerplate code.

Reactive apples

Reactive streams take a more general approach to events and data. Instead of strings and buffers, these streams pass arbitrary individual data items. For example we can create an observable sequence from a readable stream and then map text to numbers

count apples
1
2
3
4
5
6
7
8
9
10
11
12
13
14
function find(text) {
var pattern = /apple/g;
var count = text.match(pattern).length;
return count;
}
var stream = require('fs')
.createReadStream('./apples.txt', { encoding: 'utf8' });
var Rx = require('rx');
Rx.Node.fromReadableStream(stream)
.map(find)
.subscribe(function (counter) {
console.log('found', counter, 'apples');
});
// found 2 apples

In this particular example, the entire file is processed as a single chunk. If a file is longer, we really want to search for apples inside the individual file chunks and then sum the numbers up at the end. To better show this behavior, I repeated the same text inside apples.txt multiple times.

1
2
3
4
5
6
7
8
9
10
this is an apple.
an apple a day keeps doctor away.
but this is an orange.

this is an apple.
an apple a day keeps doctor away.
but this is an orange.

this is an apple.
... continues in the same manner

The new larger file has the word "apple" 5280 times. The above reactive program now shows how the input file stream only reads a chunk up to a certain size before passing data to the next step in the stream

$ node rx-apples.js 
found 1725 apples
found 1724 apples
found 1724 apples
found 106 apples

To compute the sum of all counters we can use the built-in "sum" method that adds up all numbers generated by the previous stream. All we need to do is

1
2
3
4
5
6
7
8
9
...
var Rx = require('rx');
Rx.Node.fromReadableStream(stream)
.map(find)
.sum()
.subscribe(function (counter) {
console.log('found', counter, 'apples');
});
// found 5279 apples

Because reactive streams libraries offer rich APIs (see RxJs or Bacon.js libraries), there is high chance that a simple transform like .map, .filter or even .sum is already implemented.

But the answer is wrong!

You are wondering why I claimed that the large apples.txt file contains the word "apple" 5280 times, while the Rx program finds only 5279 apples. This is due to the file read stream cutting off words across "apple" when reading the file. We can see this if we print the start of the input text passed from the stream.

print the start of each text chunk
1
2
3
4
5
6
7
8
9
10
11
12
...
function printStart(text) {
console.log('start:', JSON.stringify(text.substr(0, 20)));
}
var Rx = require('rx');
Rx.Node.fromReadableStream(stream)
.tap(printStart)
.map(find)
.sum()
.subscribe(function (counter) {
console.log('found', counter, 'apples');
});

The .tap method allows inspecting the event, and passing it to the next step automatically. The program prints

1
2
3
4
5
start: "this is an apple.\nan"
start: "le a day keeps docto"
start: "ay.\nbut this is an o"
start: "e.\n\nthis is an apple"
found 5279 apples

The second start shows the word "apple" cut in half. This is the missing fruit. Unfortunately, correctly buffering the stream across the chunks, splitting at the line breaks is extra work ;)