Testing reactive code

How to unit test Rx code on Node with Mocha (and Ava).

There is very little material on unit testing reactive JavaScript code. This blog post tries to describe many little problems I have encountered with a few solutions.

The problem

Give the following simple Rx example below, how would you unit test it?

1
2
3
4
5
6
7
8
9
10
// even-numbers.js
const Rx = require('rx')
const array = [1, 2, 3, 4, 5, 6]
const evenNumbers_ = Rx.Observable.fromArray(array)
.filter(x => x % 2 === 0)
evenNumbers_.subscribe(
(x) => console.log(`even number ${x}`),
(err) => console.error(err),
() => console.log('all done')
)

We can run the code and see the result to confirm it works

$ node even-numbers.js 
even number 2
even number 4
even number 6
all done

How do we unit test the stream of even numbers?

Refactor for testability

In the above example, the code that sets up the stream of even numbers is mixed with the code that actually subscribes to the stream and prints the information to the console. While we could unit test the command line output, this would be far from ideal for unit tests.

Let us split the stream creation from the printing code. We can create the evenNumbers_ stream in the even-numbers.js and subscribe to it in the file index.js

even-numbers.js
1
2
3
4
5
const Rx = require('rx')
const array = [1, 2, 3, 4, 5, 6]
const evenNumbers_ = Rx.Observable.fromArray(array)
.filter(x => x % 2 === 0)
module.exports = evenNumbers_
index.js
1
2
3
4
5
6
const evenNumbers_ = require('./even-numbers')
evenNumbers_.subscribe(
(x) => console.log(`even number ${x}`),
(err) => console.error(err),
() => console.log('all done')
)

This runs just fine

1
2
3
4
5
$ node index.js 
even number 2
even number 4
even number 6
all done

Our first unit test

We can write a few simple unit tests, using Mocha - my favorite unit testing framework. Let us confirm that we get an Observable instance

spec.js
1
2
3
4
5
6
7
8
const la = require('lazy-ass')
const is = require('check-more-types')
describe('even numbers', () => {
const evenNumbers_ = require('./even-numbers')
it('is observable', () => {
la(is.fn(evenNumbers_.subscribe), 'has subscribe method')
})
})

Let us make sure the sequence finishes after a few numbers. Since the test is asynchronous, we will use a callback to let Mocha runtime know when the stream has finished.

1
2
3
4
5
6
7
8
9
const la = require('lazy-ass')
const is = require('check-more-types')
describe('even numbers', () => {
const evenNumbers_ = require('./even-numbers')
const noop = () => {}
it('finishes', (done) => {
evenNumbers_.subscribe(noop, noop, done)
})
})

We used a dummy function noop when subscribing and ignored everything but the "stream completed" event.

Let us confirm that we get 3 even numbers from this stream. We can increment the count on each number received and can check the total when the stream completes

1
2
3
4
5
6
7
8
it('has 3 numbers', (done) => {
var count = 0
const onNumber = () => { count += 1 }
evenNumbers_.subscribe(onNumber, noop, () => {
la(count === 3, 'got 3 numbers', count)
done()
})
})

Catching errors in the unit test

Let us also confirm that there were no error events. For example, an error could happen somewhere inside the stream

1
2
3
4
// even-numbers.js
const evenNumbers_ = Rx.Observable.fromArray(array)
.filter(x => x % 2 === 0)
.map(x => { throw new Error('oh no') })

Our test should fail with useful information if there is an error event. Let us rethrow an Error instance if the stream has an error.

1
2
3
4
5
// spec.js
it('has no errors', (done) => {
const crash = (err) => { throw err } // rethrow
evenNumbers_.subscribe(noop, crash, done)
})

If we run this unit test, we get a long stack trace

1
2
3
4
5
6
7
8
Error oh no
at even-numbers.js:7:21
at tryCatcher (node_modules/rx/dist/rx.js:63:31)
at InnerObserver.next (node_modules/rx/dist/rx.js:5389:43)
at InnerObserver.Rx.internals.AbstractObserver.AbstractObserver.onNext (node_modules/rx/dist/rx.js:1752:31)
at InnerObserver.tryCatcher (node_modules/rx/dist/rx.js:63:31)
at AutoDetachObserverPrototype.next (node_modules/rx/dist/rx.js:5864:51)
...

The long stack is a little bit too much. Luckily we can enable the long stack support. This will make the reactive code slower, thus we do this in the unit test.

spec.js
1
2
3
4
5
6
7
8
9
10
const Rx = require('rx')
Rx.config.longStackSupport = true
const evenNumbers_ = require('./even-numbers')
describe('even numbers', () => {
const noop = () => {}
it('has no errors', (done) => {
const crash = (err) => { throw err }
evenNumbers_.subscribe(noop, crash, done)
})
})
1
2
3
4
5
6
7
8
9
10
11
12
13
Error: oh no
at even-numbers.js:5:21
at tryCatcher (node_modules/rx/dist/rx.js:63:31)
at InnerObserver.tryCatcher (node_modules/rx/dist/rx.js:63:31)
at InnerObserver.tryCatcher (node_modules/rx/dist/rx.js:63:31)
at tryCatcher (node_modules/rx/dist/rx.js:63:31)
at Context.<anonymous> (spec.js:26:18)
From previous event:
at Object.<anonymous> (even-numbers.js:3:36)
at Object.<anonymous> (spec.js:5:22)
at Array.forEach (native)
at node.js:963:3
...

The stack is much more helpful. Notice we had to load Rx in the unit test before the production code has loaded it. This is because the Node runtime caches loaded modules. We must enable the long stack support before even-numbers.js uses Rx to configure the stream.

Testing with Ava

In addition to Mocha, I have tried testing reactive streams using Ava. Mocha is nice because it understands promises natively, but Ava can understand async unit tests that return promises, generators, and even observables! For example, to make sure our stream only returns even numbers, we could just return the observable

1
2
3
4
5
6
7
8
9
import test from 'ava'
import evenNumbers_ from './even-numbers'
test('checks 3 even numbers', t => {
t.plan(3)
return evenNumbers_
.map(n => {
t.true(n % 2 === 0)
})
})

We even made sure in our tests to check how many even numbers were processed using t.plan(n) function that sets the expected number of assertions in the unit test. Every time t.true() runs, the assertion counter is incremented. At the end of the test the two numbers are compared, making sure our test took the execution path we expected.

Unfortunately, Ava understands ES6 Observables and not as they are implemented in RxJS v4. Thus I had to modify the code a little to use RxJs v5. The change was simple in our example

npm install --save rxjs

The only change in the even-numbers.js was the require name

1
2
3
4
5
const Rx = require('rxjs/Rx')
const array = [1, 2, 3, 4, 5, 6]
const evenNumbers_ = Rx.Observable.fromArray(array)
.filter(x => x % 2 === 0)
module.exports = evenNumbers_

If we had more complicated code, the migration from Rx v4 to v5 would be more complicated

Verifying data vs verifying timestamps

A reactive stream has really two types of information

  1. The data itself (including the order). For example, even numbers stream has the following data 2, 4, 6.
  2. The timing information when the events arrive. For example, the following stream has delays between the numbers.
even-delays.js
1
2
3
4
5
const array = [1, 2, 3, 4, 5, 6]
const evenNumbers_ = Rx.Observable.fromArray(array)
.filter(x => x % 2 === 0)
.delay(x => Rx.Observable.timer(x * 500))
.timeInterval()

If we use this stream and print the time intervals (that measure time since the last event) we get the following

1
2
3
4
5
$ node index.js 
even number { value: 2, interval: 1009 }
even number { value: 4, interval: 995 }
even number { value: 6, interval: 1004 }
all done

In marble terms, our stream looks like this.

1
2
3
4
5
6
|---1-2-3-4-5-6-X->
| filter( x is even)
|-----2---4---6-X->
| delay( x * 500ms )
|-----------2-----------4-----------6-X->
time 0 1009ms 2004ms 3008ms

Notice the delay operator delays an event from the start of the observable stream, which for our purposes is timestamp 0 of the program.

How can we check the values (2, 4, 6) and when the events arrive (1 second apart in this case)? Again, we must start with refactoring to separate the two aspects. This will make testing easier. For example, we can create the delayed number stream from the even number stream.

1
2
3
4
5
6
7
8
9
10
11
12
13
// even-numbers.js
const Rx = require('rx')
const array = [1, 2, 3, 4, 5, 6]
const evenNumbers_ = Rx.Observable.fromArray(array)
.filter(x => x % 2 === 0)
module.exports = evenNumbers_
// even-delays.js
const Rx = require('rx')
const evenNumbers_ = require('./even-numbers')
const delayedNumbers_ = evenNumbers_
.delay(x => Rx.Observable.timer(x * 500))
.timeInterval()
module.exports = delayedNumbers_

We will test the simple numbers list in the first test

1
2
3
4
5
6
7
8
9
10
const evenNumbers_ = require('./even-numbers')
const noop = () => {}
it('has 3 numbers', (done) => {
const numbers = []
const onNumber = x => numbers.push(x)
evenNumbers_.subscribe(onNumber, noop, () => {
la(_.isEqual(numbers, [2, 4, 6]))
done()
})
})

We can also verify the delayed numbers are generated with expected time intervals. First, we need to tell Mocha to wait longer for the unit test to finish, and second, we need to grab the timestamps. I will use a couple of extra helper functions to round the timestamps to seconds

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
const delayedNumbers_ = require('./even-delays')
describe('delayed numbers', function () {
this.timeout(7000) // allow each unit test to run for 7 seconds

const roundHundred = x => Math.round(x/100)*100
const msToSeconds = x => x / 1000

it('finishes within 7 seconds', (done) => {
delayedNumbers_.subscribe(noop, noop, done)
})

it('has the right timestamps', (done) => {
const timestamps = []
const keepTimestamp = x => timestamps.push(
msToSeconds(roundHundred(x.interval))
)
delayedNumbers_.subscribe(keepTimestamp, noop, () => {
la(_.isEqual(timestamps, [2, 2, 2]))
done()
})
})
})

We expect the timestamps array in the second test to have values [2, 2, 2] at the end of the test.

Caution: functions vs arrow expressions

Note we have used describe('...', function () {}) callback in the above example rather than the arrow function describe('...', () => {}) because the Mocha runtime uses suite and test context objects under the hood.

1
2
3
4
5
6
describe('delayed numbers', function () {
this.timeout(7000) // Works
});
describe('delayed numbers', () => {
this.timeout(7000) // does NOT work, "this" is global
});

Be careful when binding the context to the outer scope in Mocha, and in RxJS. For example, the following are different ways to create an observer are different

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// works
const observer = Rx.Observer.create(
function onNext() {},
function onError() {},
function onComplete() {}
)
stream_.subscribe(observer)
// does NOT work
const observer = Rx.Observer.create(
() => {},
() => {},
() => {}
)
stream_.subscribe(observer)

Always use strict in your code to avoid accidentally using the global object as the scope for the fat arrows, and try to stick to explicit functions. I have been burnt by this error in both MochaJS and RxJS, so hope this saves you some time.

Build test streams

In the above example test, we have switched from stream composition (evenNumbers_ to delayedNumbers_) to functional composition (msToSeconds(roundHundred(x.interval)) expression)

1
2
3
4
5
6
7
8
9
10
11
12
const roundHundred = x => Math.round(x/100)*100
const msToSeconds = x => x / 1000
it('has the right timestamps', (done) => {
const timestamps = []
const keepTimestamp = x => timestamps.push(
msToSeconds(roundHundred(x.interval))
)
delayedNumbers_.subscribe(keepTimestamp, err => { throw err }, () => {
la(_.isEqual(timestamps, [2, 2, 2]))
done()
})
})

Since we already have reactive streams, we can easily avoid additional functional pipelining, building up test streams instead. In this case, we want to extend the delayedNumers stream and grab just the timestamps, creating a stream specifically for testing.

1
2
3
4
5
6
7
8
9
it('has the right timestamps', (done) => {
const verifyTimestamps = ts => la(_.isEqual(ts, [2, 2, 2]), ts)
delayedNumbers_
.pluck('interval')
.map(roundHundred)
.map(msToSeconds)
.bufferWithCount(3)
.subscribe(verifyTimestamps, err => { throw err }, done)
})

Basically, we transform each event, massaging the data, and buffering until we get 3 timestamps that we send to the verifyTimestamps. I prefer this form because it cleanly separates the data check (first .subscribe callback) from the stream complete event (plain done function). On the other hand, we no longer make sure that we actually verified the timestamps. For example, if the underlying stream never invokes onNext (that is, no events every happen), then the verifyTimestamps callback never executes, and the test just finishes.

Mocha (unlike Ava) does not support assertion counting (because the test runtime is really decoupled from any particular assertion library). Thus we cannot easily set the expected number of assertions and catch the lack of events.

This is a great opportunity to build a tiny helper for testing streams.

Helper test observer

We have subscribed to test streams using 3 separate callback functions. RxJS has a second form of subscribing, passing a single observer instance instead.

1
2
3
4
5
6
7
8
9
10
11
12
const observer = Rx.Observer.create(
console.log,
console.error,
function () { console.log('done') }
)
evenNumbers_.subscribe(observer)
/*
2
4
6
done
*/

This subscribe format is convenient for creating an observer that makes sure its onNext has been executed, for example. Here is one implementation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
function makeTestObserver(onNext, onError, onComplete) {
if (arguments.length === 2) {
onComplete = onError
onError = null
}
if (!onError) {
onError = err => { throw err }
}
if (!onComplete) {
throw new Error('Missing "onComplete" callback')
}
var onNextCalled

const obs = Rx.Observer.create(
function () {
onNextCalled = true
return onNext.apply(null, arguments)
},
onError,
function () {
if (!onNextCalled) {
throw new Error('onNext has never been called')
}
return onComplete.apply(null, arguments)
}
)
return obs
}

Now our unit tests can be dependable without much boilerplate

1
2
3
4
5
6
7
8
9
10
it('does everything using test Observer', (done) => {
const verifyTimestamps = ts => la(_.isEqual(ts, [2, 2, 2]), ts)
const testObserver = makeTestObserver(verifyTimestamps, done)
delayedNumbers_
.map(x => x.interval)
.map(roundHundred)
.map(msToSeconds)
.bufferWithCount(3)
.subscribe(testObserver)
})

Less boilerplate - simpler testing.

Testing hot observables

So far we had simple "cold observables". Every subscriber got the same sequence of numbers, even if there were multiple subscribers.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
evenNumbers_.subscribe(
console.log,
console.error,
function () { console.log('done') }
)
evenNumbers_.subscribe(
console.log,
console.error,
function () { console.log('done') }
)
evenNumbers_.subscribe(
console.log,
console.error,
function () { console.log('done') }
)
/*
prints 3 times
2
4
6
done
*/

But some observables, like current time stamps or mouse clicks stream, are "hot" - they send "live" events, and if a subscriber missed them, those events are gone and will not be repeated. Most common way to miss events is to subscribe to the events after it has already started broadcasting.

In RxJS we can create "hot" even numbers stream from a cold one using .publish() method call. The new hotNumbers_ stream will start broad casting after we execute .connect() call.

1
2
3
4
5
// hot-numbers.js
const evenNumbers_ = require('./even-numbers')
const hotNumbers_ = evenNumbers_.publish()
hotNumbers_.connect() // starts the stream
module.exports = hotNumbers_

The stream hotNumbers_ starts right away, and if we subscribe to it from another file - well, it will be too late

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// use-hot-numbers.js
const hotNumbers_ = require('./hot-numbers')
hotNumbers_.subscribe(
console.log,
console.error,
function () { console.log('done') }
)
hotNumbers_.subscribe(
console.log,
console.error,
function () { console.log('done') }
)
hotNumbers_.subscribe(
console.log,
console.error,
function () { console.log('done') }
)
/*
prints
done
done
done
*/

Once the hot stream has completed, it stays completed and any subscriber gets just the "done" callback executed. This is similar to adding another .then(cb) step to a Promise that has been fulfilled - the callback will be executed with the value, but the initial promise function will not be run again.

Hot observables make testing more complex. We must ensure 2 things

  1. We connect to the hot observable before it starts broadcasting.
  2. We recreate a hot observable from scratch in each unit test, because the events are not going to be repeated.

I would refactor the production code so that each module returns either a cold observable or a function returning a new hot observable. For example, the even-numbers.js returns a cold observable, and hot-numbers.js returns a hot one.

1
2
3
4
5
6
7
8
9
10
11
12
13
// even-numbers.js - returns COLD observable
const Rx = require('rx')
const array = [1, 2, 3, 4, 5, 6]
const evenNumbers_ = Rx.Observable.fromArray(array)
.filter(x => x % 2 === 0)
module.exports = evenNumbers_
// hot-numbers - returns factory function for a HOT observable
const evenNumbers_ = require('./even-numbers')
function makeHotNumbers() {
const hotNumbers_ = evenNumbers_.publish()
return hotNumbers_
}
module.exports = makeHotNumbers

The production code that wants to use hotNumbers_ instance would call the function and the call the .connect() method to start pushing the events

1
2
3
const hot_ = makeHotNumbers()
// connect whatever is necessary to hot_
hot_.connect() // starts the events

The unit tests can make a fresh hot observable and start events, when needed.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
const makeEvenNumbers = require('./hot-numbers')
describe('even numbers only', () => {
var hot_
beforeEach(() => {
hot_ = makeEvenNumbers()
})
it('has 3 numbers', (done) => {
const numbers = []
const onNumber = x => numbers.push(x)
hot_.subscribe(onNumber, console.error, () => {
la(_.isEqual(numbers, [2, 4, 6]), 'numbers', numbers)
done()
})
hot_.connect()
})
})

The separation between the construction code and the actual execution is similar to IO Monad concept and moving side effects into specific parts of the code (like the application's periphery or unit tests). In RxJS case, the construction of hot observable is pure - we can call the makeEvenNumbers() as many times as we want with the same result - a fresh observable. The side effects are only possible when we execute hot_.connect() - the changing internal state of the observable being the side effect.

I have described a very similar concept for a single async action wrapped inside a Task object. Read the blog post Difference between a Promise and a Task. to see the same tight boundary between the pure code construction and the side-effects during the execution.

Virtual timing for faster tests

We have a problem with delayed numbers stream - it uses real world timer, taking too long! Waiting 6 seconds in our test just to confirm that 3 even numbers appear 2 seconds apart is taking too long. Can we use a virtual timer in our unit tests instead?

Schedulers

Take a simple source of events, for example a timer generating timestamps every second. To make sure it terminates, we will use .take(3) method call to only grab first 3 events

1
2
3
4
5
6
7
8
9
10
11
12
13
14
const Rx = require('rx')
var source = Rx.Observable.timer(100, 1000).take(3)
source.subscribe(
(x) => console.log('at', +(new Date()), 'value', x),
console.error,
() => console.log('completed')
)
/*
$ node timer-test.js
at 1455802419515 value 0
at 1455802420514 value 1
at 1455802421510 value 2
completed
*/

The timer observable emits an event, then waits 1 second, emits the second event, waits 1 second, etc. The waiting period is controled by a scheduler. Most Rx methods take a scheduler instance as the last argument, including Rx.Observable.timer. There are [several schedulers] included in Rx, but for testing, we can use Rx.TestScheduler. Here is how to use it.

First, if we just pass an instance to the timer method nothing happens - the program finishes without any events!

1
2
3
4
5
6
7
8
9
const Rx = require('rx')
const timerScheduler = new Rx.TestScheduler()
var source = Rx.Observable.timer(100, 1000, timerScheduler).take(3)
source.subscribe(
(x) => console.log('at', +(new Date()), 'value', x),
console.error,
() => console.log('completed')
)
// prints nothing!

This is because the test scheduler needs explicit events from us, including when to execute them. We will schedule 3 "onNext" events and then 1 "onComplete".

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
const timerScheduler = new Rx.TestScheduler()
var source = Rx.Observable.timer(100, 1000, timerScheduler).take(3)
source.subscribe(
(x) => console.log('at', +(new Date()), 'value', x),
console.error,
() => console.log('completed')
)
timerScheduler.startScheduler(function () {
return timerScheduler.createColdObservable(
Rx.ReactiveTest.onNext(200, 0),
Rx.ReactiveTest.onNext(300, 1),
Rx.ReactiveTest.onNext(400, 2),
Rx.ReactiveTest.onCompleted(500)
)
})

We used startScheduler() method that returns an observable of Rx.ReactiveTest events. Now our timer generates events very quickly (in real time)

1
2
3
4
5
$ node timer-test.js 
at 1455802551646 value 0
at 1455802551651 value 1
at 1455802551652 value 2
completed

Rx.TestScheduler example

Since we need to pass a scheduler when creating an Observable if we want to speed up the time, we have to refactor even cold observables into factory methods. For example, let us make a factory method to delay even numbers

even-delays-scheduled.js
1
2
3
4
5
6
7
8
9
const Rx = require('rx')
const evenNumbers_ = require('./even-numbers')
function makeDelayedNumbers(scheduler) {
const delayedNumbers_ = evenNumbers_
.delay(x => Rx.Observable.timer(x * 1000, scheduler))
.timeInterval()
return delayedNumbers_
}
module.exports = makeDelayedNumbers

The production code does not need to pass anything; the default scheduler will be used if the argument sheduler is undefined.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
delayedNumbers_ = makeDelayedNumbers()
delayedNumbers_
.pluck('value')
.subscribe(
(x) => console.log('at', +(new Date()), 'value', x),
console.error,
() => {
console.log('completed at', +(new Date()))
}
)
/*
prints
at 1455803481722 value 2
at 1455803483721 value 4
at 1455803485724 value 6
completed at 1455803485725
*/

Inside our unit test, we will construct a test scheduler and we will make the test run fast

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
const Rx = require('rx')
Rx.config.longStackSupport = true
const makeDelayedNumbers = require('./even-delays-scheduled')
describe('virtual timing', function () {
var delayedNumbers_, testScheduler
beforeEach(() => {
testScheduler = new Rx.TestScheduler()
delayedNumbers_ = makeDelayedNumbers(testScheduler)
})
function startTimer() {
testScheduler.startScheduler(function () {
return testScheduler.createColdObservable(
Rx.ReactiveTest.onNext(200, 0),
Rx.ReactiveTest.onNext(300, 1),
Rx.ReactiveTest.onNext(400, 2),
Rx.ReactiveTest.onCompleted(500)
)
})
}
it('gets the even numbers', (done) => {
delayedNumbers_
.pluck('value')
.subscribe(
(x) => console.log('at', +(new Date()), 'value', x),
console.error,
() => {
console.log('completed at', +(new Date()))
done()
}
)
startTimer()
})
})

Running this unit test is much faster

1
2
3
4
5
6
7
8
> mocha virtual-timing-spec.js
virtual timing
at 1455803627601 value 2
at 1455803627603 value 4
at 1455803627603 value 6
completed at 1455803627603
✓ gets the even numbers
1 passing (28ms)

Of course, the extra test code complexity is a huge trade off in this case.

Summary

When unit testing reactive streams

  • Use done Mocha callback to let the testing runtime continue
  • Turn on the long stack support before the production code loads
  • Do not forget to verify the test path by confirming the number of assertions, or by throwing an exception if a stream goes through a different path
  • Build additional streams with extracted test data for easy validation
  • If you already use Rx v5 try Ava test runner
  • There is plenty of room for writing your own little helper functions to remove the stream testing boilerplate
  • If the tests are really really slow, consider using Rx.TestScheduler to drive the events on virtual time

Additional information