Node server with Rx and Cycle.js

Manage side effects when coding NodeJS servers with Rx and Cycle.js (NOT about server-side rendering).

Disclaimer this blog post is NOT about server-side rendering using Cycle.js, see this example instead.

This is an explanation how one can apply (and why this is beneficial to do so) reactive programming to a simple NodeJS server, that is responding to requests. Then I will show how to take the main idea behind Cycle.js and apply it to the server code.

You can find the companion source code in bahmutov/node-rx-cycle.

The NodeJS server

We will take simplest example: the Hello World NodeJS server, used as the first example on the official website. I have copied the code to node-rx-cycle/src/00-node-server.js

1
2
3
4
5
6
7
8
9
const http = require('http');
const hostname = '127.0.0.1';
const port = 1337;
http.createServer((req, res) => {
res.writeHead(200, { 'Content-Type': 'text/plain' });
res.end('Hello World\n');
}).listen(port, hostname, () => {
console.log(`Server running at http://${hostname}:${port}/`);
});

Running this code using Node v5 is simple, and we can try it from another terminal

$ node src/00-node-server.js 
Server running at http://127.0.0.1:1337/
$ curl 127.0.0.1:1337
Hello World

The server listens to HTTP events, and every time there is a request, our callback is executed. The callback already has two arguments: the input request req, an instance of http.ClientRequest, and the response object res, an instance of http.ServerResponse.

The complexity of the server code in the real application quickly grows. There are two problems as I see it

  1. Code that uses callbacks, even when enforcing good architecture pattern like Express does, quickly becomes hard to read and even harder to test. The asynchronous nature of arriving requests, database lookups, etc. requires better programming patterns (like Promises).
  2. The input and output operations (called read and write effects) are usually sprinkled all around our code. It is very common to see some callbacks modifying the input request object. The writing to the response object is often even more tangled. Again, a good design pattern, like middleware callback stacks helps, but does not solve the problem completely.

Let us apply the reactive programming approach to the first problem, and then the Cycle.js principle (if not the library itself) to attack the second problem.

Reactive server

First, let us get rid of callbacks and replace them with one (and later multiple) reactive stream. Each request will be an event, passed along the stream, until the last step will write the response to the client. I will use RxJS library that provides a lot of ways to deal with streams. There are others, but in our case, there is no difference.

npm install --save rx

The server is equivalent to the above example, but the server callback just emits an event on each HTTP request, instead of handling it right away. You can find the server code in 01-rx.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
const Rx = require('rx');
const requests_ = new Rx.Subject();

function sendHello(e) {
console.log('sending hello');
e.res.writeHead(200, { 'Content-Type': 'text/plain' });
e.res.end('Hello World\n');
}

requests_
.subscribe(sendHello)

const http = require('http');
const hostname = '127.0.0.1';
const port = 1337;
http.createServer((req, res) => {
requests_.onNext({ req: req, res: res });
}).listen(port, hostname, () => {
console.log(`Server running at http://${hostname}:${port}/`);
});

While other people prefer to give stream variables names ending with $ characters, I prefer the underscores. First, I am fed up with the dollar sign in the variables after Angular (just kidding). Second, I think the trailing line is closer to stream of events than a dollar sign.

The server callback just adds an event to the beginning of the stream requests_ which we constructed.

1
2
3
4
const requests_ = new Rx.Subject();
http.createServer((req, res) => {
requests_.onNext({ req: req, res: res });
})

Because the server is very simple, we just subscribe to the requests_ stream. Every event will trigger the callback function sendHello, where we write the response.

Reactive benefits

We have not increased the line count by much, yet the stream gives us a list of benefits, compared to callbacks (or even middleware stacks of callbacks, like Express does). For example, we can add logging using standard library .tap method.

1
2
3
requests_
.tap(e => console.log('request to', e.req.url))
.subscribe(sendHello)
$ node src/01-rx.js 
Server running at http://127.0.0.1:1337/
$ curl 127.0.0.1:1337
$ curl 127.0.0.1:1337/hi
request to /
sending hello
request to /hi
sending hello

We can also buffer, filter, merge and many more operations on events, thanks to the large number of well tested and performant operations in RxJS library.

More importantly, we can handle errors and memory deallocation easily. Each stream subscription can take 3 callbacks, for example: the success (with the new event), the error, and the stream complete callbacks. Using these callbacks you can make sure the errors are handled, and the stream is disposed of properly.

1
2
3
4
5
6
7
8
9
10
11
12
13
// handle any errors in the stream
const subscription = requests_
.tap(e => console.log('request to', e.req.url))
.subscribe(
sendHello,
console.error,
() => {
console.log('stream is done')
// nicely frees the stream
subscription.dispose()
}
)
process.on('exit', () => subscription.dispose())

Notice an interesting fact about the variables in the examples above: all variables (streams, etc) were declared using const. This is very common in reactive programming - we setup our streams only once - it is the events that move inside the streams, invisible to us; we don't have to declare any variables for the events themselves, except inside the little callback functions!

Rate limiting

Reactive streams help us manage time-related complexity very well. For example, here is an example where we want to print an array of numbers, but only print 1 number per second. We create two streams - one where each event is a number, and another one where each event is a timer interval, one interval per second. To output one number per second, we just zip the two streams, and then subscribe and print the event.

1
2
3
4
5
6
7
8
const Rx = require('rx')
const timeEvents = Rx.Observable
.interval(1000)
const numberEvents = Rx.Observable
.fromArray([3, 1, 7]);
Rx.Observable.zip(timeEvents, numberEvents, function pickValue(t, n) { return n; })
.subscribe(console.log);
// prints 3 1 7 with 1 second intervals

Let us see how to apply a similar time operation on our stream. I want to rate limit the server to receive all the requests, but respond only once per second, keeping other requests in the queue. If 10 requests arrive at the same time, the first response will be after 1 second, second response at second 2, etc. Here is how we could describe the rate limit stream operation using marble diagrams commonly used in reactive programming.

1
2
3
4
---r1-r2-r3-----r4--------r5---->    // requests_
rateLimit
---r1---r2---r3---r4------r5----> // rateLimited_ = rateLimit(requests_)
| | | | | // 1 second intervals

The above stream of requests has several requests arriving in close proximity. We create a new stream rateLimited_ using rateLimit(requests_) call. The new stream rateLimited_ will output every request event, but will make sure no events are closer than 1 second. I added 1 second interval marks to the bottom of the diagram for clarity.

It is probably possible to write rateLimit operation using the standard RxJS methods, but I did not know how, so I wrote one myself. You can find it in the file rate-limit.js. For example, let us rate limit a stream with interval events 200ms apart.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
const in_ = Rx.Observable
.interval(200)
.take(5)
const rateLimit = require('./rate-limit')
const limited_ = rateLimit(in_, 1000)
limited_
.timeInterval()
.subscribe(
console.log,
console.error,
console.log.bind(console, 'limited completed')
)
// output
// { value: 0, interval: 203 }
// { value: 1, interval: 1005 }
// { value: 2, interval: 1001 }
// { value: 3, interval: 996 }
// { value: 4, interval: 999 }
// limited completed

Let us apply rate limit to the requests received. For clarity, the console log includes the timestamp relative to the program start time.

1
2
3
4
5
6
const interval = 1000
const rateLimit = require('./rate-limit')
const limited_ = rateLimit(
requests_.tap(e => console.log(`request to ${e.req.url} at`, +(new Date) - started))
, interval)
limited_.subscribe(sendHello)

Notice that we have rate limited requests_.tap(...) expression. Every method call on the reactive stream creates a new stream. Thus we could write (and fork and subscribe, etc) every intermediate step in the fluent chain.

1
2
3
4
const requests_ = ...
const logged_ = requests_.tap(e => console.log(e.req.url))
const limited_ = rateLimit(logged_, 1000)
limited_.subscribe(sendHello)

Back to the program (you can find it in src/02-1fps.js). We can test it by sending multiple requests at once.

$ node src/02-1fps.js 
Server running at http://127.0.0.1:1337/
request to / at 10505
sending hello at 10506
request to /hi at 10527
request to / at 10528
sending hello at 11510
sending hello at 12506

Notice that the requests arrived at timestamps 10505, 10527 and 10528 (inside the 25 ms period), but the responses where sent at 10506, 11510 and 12506 (1 second apart).

Reactive programming makes reasoning about asynchronous events, controlling their timing, and combining them simpler. It is much much simpler than using callbacks, and even simpler than using Promises. Since the server logic naturally deals with multiple identical request events, a stream where the events can flow and be processed is a better mental model compared to single-execution Promise abstraction.

Now let us deal with side effects by applying Cycle.js principle.

What if the network was a function?

The gist of Cycle.js in my understanding is moving of everything that is outside of our control to inputs or outputs, leaving our main logic inside a pure function. User actions like mouse clicks and text inputs, Ajax requests and responses, updating DOM markup - these are abstracted away from our code; leaving only easy to reason and test pure logic.

A good idea before reading the rest of this blog is to watch these videos

Let us look at the reactive implementation again, and for simplicity I will use the non rate-limited implementation.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
const requests_ = new Rx.Subject();
function sendHello(e) {
e.res.writeHead(200, { 'Content-Type': 'text/plain' });
e.res.end('Hello World\n');
}
requests_
.tap(e => console.log('request to', e.req.url))
.subscribe(sendHello)

const http = require('http');
const hostname = '127.0.0.1';
const port = 1337;
http.createServer((req, res) => {
requests_.onNext({ req: req, res: res });
})

Notice how the middle of our program is pretty simple (from requests_... to .subscribe() lines). Once the request event enters the stream, the result is only dependent on the input - the very definition of "pure function".

Outside this little block however, the things are much less clear. For example, the sendHello() function interacts with the network by using res.end method - and that is prone to lots of problems and side effects.

Similarly, the http.createServer code is the very definition of code with side-effects - like binding to a socket and listening to the incoming requests.

Can we isolate the pure part of the code from the network code? Even better, can we treat the output operation (setting the response header and writing to res.end) as a stream operation? We already have converted the input operation to a stream in the code below, so we are half-way there!

1
2
3
(req, res) => {
requests_.onNext({ req: req, res: res });
}

Let us do this!

Separate "main"

Our first refactoring will be similar to lesson 2, and it will separate logic from the http effect. The logic returns a stream, and the http efect takes that stream as input. The relevant code is below.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
const requests_ = new Rx.Subject();

function main() {
return requests_
.tap(e => console.log('request to', e.req.url))
}

function httpEffect(model_) {
model_.subscribe(e => {
console.log('sending hello')
e.res.writeHead(200, { 'Content-Type': 'text/plain' })
e.res.end('Hello World\n')
})
}

httpEffect(main())

Using more of a Cycle.js notation, the output of main is a "sink", and this sink is the input to one or more "effects" functions.

1
2
const sink = main()
httpEffect(sink)

For now, we have only a single type of "sink" - the sink that should go to HTTP response. In practical application we might return multiple sinks from the main() call. For example, we could also sink a log operation, and a database operation - all are side effects, thus need a "sink" in the Cycle.js world.

1
2
3
4
5
6
7
8
9
10
11
function main() {
return {
HTTP: requests_,
LOG: requests_.map(e => e.req.url),
DB: requests_.map( .... /* something to go into DB */)
}
}
const sink = main()
httpEffect(sink.HTTP)
logEffect(sink.LOG)
dbEffect(sink.DB)

Introduce "run"

Manually mapping sinks to effects is tiresome. Let us connect things automatically. Just loop through the effects and find a sink with the same name. We just need a function to do this, let us call it "run"

1
2
3
4
5
6
7
8
9
10
11
12
13
14
function main() {
return {
HTTP: requests_.tap(e => console.log('request to', e.req.url))
}
}
function run(main, effects) {
const sinks = main()
Object.keys(effects).forEach(key => {
effects[key](sinks[key])
})
}
run(main, {
HTTP: httpEffect
})

Same result, but the code in src/04-run.js is a little more generic. The object with all effects is called a "driver" in Cycle - because it connects pure logic with the outside interfaces, just like drivers connect software and the operating system to the hardware.

1
2
3
4
const drivers = {
HTTP: httpEffect
}
run(main, drivers)

Note: printing to the console is also a write effect, so we should NOT include this in the main() function. But for the sake of simplicity in the demo, I am leaving it there.

Abstract the input request better

Finally, let us take another look at the "read" effects - the http requests that arrive over the network. We need to package them up - they should be the inputs to our main function. Right now, the requests_ object the main uses is outside its lexical scope, a big "no-no" in pure functions.

We need to connect the "read" effects to the main function, let us just give it an object, and just like drivers above, let us have an object, with HTTP being a source of effects.

1
2
3
4
5
function main(sources) {
return {
HTTP: sources.HTTP.tap(e => console.log('request to', e.req.url))
}
}

Where is the sources object coming from? It has to come from the run function of course - the run function grabs all the sources (the "read" effects) and calls "drivers" on the result of main. Here it is

1
2
3
4
5
6
7
8
9
10
11
12
13
const httpEffect = makeHttpEffect()
const drivers = {
HTTP: httpEffect
}
function run(main, drivers) {
const sources = {
HTTP: drivers.HTTP.readEffect
}
const sinks = main(sources)
Object.keys(drivers).forEach(key => {
drivers[key].writeEffect(sinks[key])
})
}

Remember an important point when looking at this code - the "read" effect, and the output from the main are reactive streams! The "write" effect functions expect streams as inputs too. The run function is only executed once - that is why the inputs and outputs are a constant references - this is the setup method, not the runtime data flow. The flow will happen inside the connected streams.

Finally, we need to see how to create the httpEffect object. It is the glue that connects the input events from the HTTP socket all the way through the main to the response written back into the socket. It will even have a callback for the http.createServer to use!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
function makeHttpEffect() {
const requests_ = new Rx.Subject();
return {
writeEffect: function (model_) {
model_.subscribe(e => {
console.log('sending hello')
e.res.writeHead(200, { 'Content-Type': 'text/plain' })
e.res.end('Hello World\n')
})
return requests_
},
serverCallback: (req, res) => {
requests_.onNext({ req: req, res: res })
},
readEffect: requests_
}
}

The 3 properties are:

  • readEffect - the input request stream, notice how it is private to the function makeHttpEffect.
  • writeEffect - that subscribes to the model stream and can write responses back
  • serverCallback - the input to the stream, used by the HTTP server function
1
2
3
const httpEffect = makeHttpEffect()
http.createServer(httpEffect.serverCallback)
.listen(...)

You can find the full code in 05-sources.js

All together

Let me mark pure functions vs non-pure in the code above.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
const Rx = require('rx');
// pure
function main(sources) {
return {
HTTP: sources.HTTP.tap(e => console.log('request to', e.req.url))
}
}
// pure
function makeHttpEffect() {
const requests_ = new Rx.Subject();
return {
// effects
writeEffect: function (model_) {
model_.subscribe(e => {
console.log('sending hello')
e.res.writeHead(200, { 'Content-Type': 'text/plain' })
e.res.end('Hello World\n')
})
return requests_
},
// effects
serverCallback: (req, res) => {
requests_.onNext({ req: req, res: res })
},
// effects
readEffect: requests_
}
}
// pure
const httpEffect = makeHttpEffect()
const drivers = {
HTTP: httpEffect
}
// pure
function run(main, drivers) {
const sources = {
HTTP: drivers.HTTP.readEffect
}
const sinks = main(sources)
Object.keys(drivers).forEach(key => {
drivers[key].writeEffect(sinks[key])
})
}
run(main, drivers)
// side effects
const http = require('http')
const hostname = '127.0.0.1'
const port = 1337
http.createServer(httpEffect.serverCallback)
.listen(port, hostname, () => {
console.log(`Server running at http://${hostname}:${port}/`)
});

We have been able to separate the logic (our main function, run, etc.) from functions that can have side effects (readEffect and writeEffect, createServer functions). This separation helps when designing software, writing code and testing it.

If you compare this code with the canonical Cycle.js code, like the example shown in lesson 5, you will notice a major difference. We do not have to wrap the global "document" object in read and write effects. Instead, the write effect has all the information available right away (the response object), and it does not feed back into the "read" effect object - only the HTTP server feeds new events into it.

This means the above example does not close the "network -> computer -> network" loop (or does not form the cycle, if you want a pun), like the DOM examples show. Different use cases lead to different code!