Ramda for reactive streams

Array or Observable - Ramda can do both!

Take a small Observable example from xstream - it outputs two numbers and completes when a second stream finishes after 5 seconds. It uses Observables and shows filtering and mapping values through the stream:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import xs from 'xstream'

// Tick every second incremental numbers,
// only pass even numbers, then map them to their square,
// and stop after 5 seconds has passed

var stream = xs.periodic(1000)
.filter(i => i % 2 === 0)
.map(i => i * i)
.endWhen(xs.periodic(5000).take(1))

// So far, the stream is idle.
// As soon as it gets its first listener, it starts executing.

stream.addListener({
next: i => console.log(i),
error: err => console.error(err),
complete: () => console.log('completed'),
})

Take a look at .filter and .map calls - doesn't it look like we are operating on a "plain" Array? It certainly looks this way to me! And if Hey Underscore, You're Doing It Wrong! taught me anything, it is that switching from a method to a function with callback first is the way to go.

Babel-node

Before we proceed, let me show how I work with modern JavaScript in the terminal. I prefer to run every program locally, and avoid using online sandboxes like jsfiddle or plnkr - it is just faster to work locally for me!

In the example above we have everything modern Node 6/7 supports except for import statement. Thus we need to transpile this code before we can run it. I use Babel-node + preset-env for this to avoid messing with plugins / presets / features. So grab the above code, place it into index.js and create a Node package locally

1
2
3
npm init --yes
npm i -S xstream
npm i -D babel-cli babel-preset-env

Then I define start command to transpile code using current modern environment on run time.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
{
"name": "ramda-with-streams",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"start": "babel-node --presets env index.js"
},
"dependencies": {
"xstream": "10.3.0"
},
"devDependencies": {
"babel-cli": "6.22.2",
"babel-preset-env": "1.3.2"
}
}

The code works and prints 0 then 4 then completed

1
2
3
4
5
npm start
> babel-node --presets env index.js
0
4
completed

There are two second pauses between the numbers, so I am going to show the output like this

1
0 --- 4 --- completed -|->

The | character means the end of the stream, and the -> is the stream itself.

Perfect, now we can experiment

Named callbacks

The first thing I want to do is name and separate callback functions for clarity. We have several

1
2
3
4
5
6
7
8
const isEven = i => i % 2 === 0
const square = i => i * i
const after5 = xs.periodic(5000).take(1)
const stream = xs.periodic(1000)
.filter(isEven)
.map(square)
.endWhen(after5)
// 0 --- 4 --- completed -|->

If we are doing this, let us quickly write a few unit tests - because we literally needs 10 seconds to do this. In our case I will use Ava - one of my favorite test runners lately.

Move callback functions into their own file

1
2
3
// utils.js
export const isEven = i => i % 2 === 0
export const square = i => i * i

Install Ava (super simple)

1
npm i -D [email protected]

Since our code uses export keyword, Ava needs to transpile code, and there are lot of ways to configure it. The simplest is to define the preset in the package.json file

1
2
3
4
5
6
7
8
9
10
11
{
"scripts": {
"test": "ava"
},
"ava": {
"require": ["babel-register"]
},
"babel": {
"presets": ["@ava/stage-4"]
}
}

Let us write a test for isEven function

1
2
3
4
5
6
7
8
9
// test.js
import test from 'ava'
import {isEven, square} from './utils'
test('isEven', t => {
t.true(isEven(2))
t.true(isEven(4))
t.false(isEven(1))
t.false(isEven(11))
})

Hmm, kind of verbose, isn't it? Let us use Ava's snapshot feature to grab multiple values at once

1
2
3
4
// test.js
test('isEven snapshot', t => {
t.snapshot([2, 4, 1, 11].map(isEven))
})

Look at the saved snapshot file

1
2
3
4
5
6
7
8
9
10
11
$ npm test
$ cat __snapshots__/test.js.snap
// Jest Snapshot v1, https://goo.gl/fbAQLP
exports[`isEven snapshot 1`] = `
Array [
true,
true,
false,
false,
]
`;

Nice, but less than perfect. Let us add an external snapshot testing library snap-shot (full disclosure: it is my baby!) that has data-driven testing inspired by sazerac.

1
npm i -D snap-shot

We need a named function isEven, so I changed the arrow expression into a function

1
2
3
4
5
6
7
// utils.js
export function isEven (i) {
return i % 2 === 0
}
export function square (i) {
return i * i
}

Because we are using only our assertion, and not Ava's built-in one, we need to allow Ava to pass such test (see the Ava configuration list)

1
2
3
4
5
6
7
8
{
"ava": {
"failWithoutAssertions": false,
"require": [
"babel-register"
]
}
}

The Ava test is now super compact

1
2
3
4
5
6
7
// test.js
import test from 'ava'
import {isEven, square} from './utils'
import snapshot from 'snap-shot'
test('Data-driven isEven', t => {
snapshot(isEven, 2, 4, 1, 11)
})

The snapshot file makes it clear what is going on

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
$ cat __snapshots__/test.js.snap-shot
exports['Data-drive isEven 1'] = {
"name": "isEven",
"behavior": [
{
"given": 2,
"expect": true
},
{
"given": 4,
"expect": true
},
{
"given": 1,
"expect": false
},
{
"given": 11,
"expect": false
}
]
}

Ok, lovely! Similarly, we can test the square function, if needed.

Bonus snap-shot family includes schema snapshot testing and even subset shot testing.

Ramda instead of Observable

Look at our iteration over the events in the Observable

1
2
3
4
const stream = xs.periodic(1000)
.filter(isEven)
.map(square)
.endWhen(after5)

It looks so much like Array.filter(...).map(...) ... doesn't it? Ramda library is perfect for replacing and extending built-in JavaScript list iterators - and guess what - it can work with Observable! At least it can understand methods like .map and .filter. Let us replace the fluent Observable interface with a composition of R.map and R.filter calls.

1
2
3
4
5
6
7
// index.js
import xs from 'xstream'
import {filter, map} from 'ramda'
import {isEven, square} from './utils'
const after5 = xs.periodic(5000).take(1)
const stream = map(square, filter(isEven, xs.periodic(1000)))
.endWhen(after5)

We replaced .map and .filter method calls with Ramda functions, and left .endWith because there is no equivalent function.

The above code still runs the same and produces 0 --- 4 --- completed -|->, yet there is one interesting property. Because Ramda functions are curried, and we placed callback functions at the first position, we moved the only "data" variable in the last position, just like f(g(xs.periodic(1000))). We can compose map and filter calls into a new function (I prefer using pipe to compose for clarity)

1
2
3
4
5
6
7
import {filter, map, pipe} from 'ramda'
const evenSquares = pipe(
filter(isEven),
map(square)
)
const stream = evenSquares(xs.periodic(1000))
.endWhen(after5)

Still works the same way, but why would I want to do this? Because aside from timing, I can test the composed function evenSquare easily. For example, if I move it to utils.js I can write a snapshot test

1
2
3
4
import {isEven, square, evenSquares} from './utils'
test('even squares', t => {
snapshot(evenSquares([0, 1, 2, 3]))
})

What does it output? We can make it very visible using an environment variable while running tests

1
2
3
4
$ SHOW=1 npm t
> ava
saving snapshot "even squares 1" for file test.js
[ 0, 4 ]

Perfect. We just have rewritten our reactive pipeline to use battle-hardened, easy to unit test Ramda pipeline.