Disclaimer this blog post is NOT about server-side rendering using Cycle.js, see this example instead.
This is an explanation how one can apply (and why this is beneficial to do so) reactive programming to a simple NodeJS server, that is responding to requests. Then I will show how to take the main idea behind Cycle.js and apply it to the server code.
You can find the companion source code in bahmutov/node-rx-cycle.
The NodeJS server
We will take simplest example: the Hello World NodeJS server, used as the first example on the official website. I have copied the code to node-rx-cycle/src/00-node-server.js
1 | const http = require('http'); |
Running this code using Node v5 is simple, and we can try it from another terminal
$ node src/00-node-server.js
Server running at http://127.0.0.1:1337/
$ curl 127.0.0.1:1337
Hello World
The server listens to HTTP events, and every time there is a request, our callback is executed.
The callback already has two arguments: the input request req
, an instance of
http.ClientRequest, and the response object res
, an instance of
http.ServerResponse.
The complexity of the server code in the real application quickly grows. There are two problems as I see it
- Code that uses callbacks, even when enforcing good architecture pattern like Express does, quickly becomes hard to read and even harder to test. The asynchronous nature of arriving requests, database lookups, etc. requires better programming patterns (like Promises).
- The input and output operations (called read and write effects) are usually sprinkled all around our code. It is very common to see some callbacks modifying the input request object. The writing to the response object is often even more tangled. Again, a good design pattern, like middleware callback stacks helps, but does not solve the problem completely.
Let us apply the reactive programming approach to the first problem, and then the Cycle.js principle (if not the library itself) to attack the second problem.
Reactive server
First, let us get rid of callbacks and replace them with one (and later multiple) reactive stream. Each request will be an event, passed along the stream, until the last step will write the response to the client. I will use RxJS library that provides a lot of ways to deal with streams. There are others, but in our case, there is no difference.
npm install --save rx
The server is equivalent to the above example, but the server callback just emits an event on each HTTP request, instead of handling it right away. You can find the server code in 01-rx.js
1 | const Rx = require('rx'); |
While other people prefer to give stream variables names ending with $
characters,
I prefer the underscores. First, I am fed up with the dollar sign in the variables after
Angular (just kidding). Second, I think the trailing line is closer to stream of events
than a dollar sign.
The server callback just adds an event to the beginning of the stream requests_
which we constructed.
1 | const requests_ = new Rx.Subject(); |
Because the server is very simple, we just subscribe to the requests_
stream. Every
event will trigger the callback function sendHello
, where we write the response.
Reactive benefits
We have not increased the line count by much, yet the stream gives us a list of benefits,
compared to callbacks (or even middleware stacks of callbacks, like Express does).
For example, we can add logging using standard library .tap
method.
1 | requests_ |
$ node src/01-rx.js
Server running at http://127.0.0.1:1337/
$ curl 127.0.0.1:1337
$ curl 127.0.0.1:1337/hi
request to /
sending hello
request to /hi
sending hello
We can also buffer, filter, merge and many more operations on events, thanks to the large number of well tested and performant operations in RxJS library.
More importantly, we can handle errors and memory deallocation easily. Each stream subscription can take 3 callbacks, for example: the success (with the new event), the error, and the stream complete callbacks. Using these callbacks you can make sure the errors are handled, and the stream is disposed of properly.
1 | // handle any errors in the stream |
Notice an interesting fact about the variables in the examples above: all variables (streams, etc)
were declared using const
. This is very common in reactive programming - we setup our
streams only once - it is the events that move inside the streams, invisible to us;
we don't have to declare any variables for the events themselves, except inside
the little callback functions!
Rate limiting
Reactive streams help us manage time-related complexity very well. For example, here is an example where we want to print an array of numbers, but only print 1 number per second. We create two streams - one where each event is a number, and another one where each event is a timer interval, one interval per second. To output one number per second, we just zip the two streams, and then subscribe and print the event.
1 | const Rx = require('rx') |
Let us see how to apply a similar time operation on our stream. I want to rate limit
the server to receive all the requests, but respond only once per second, keeping
other requests in the queue. If 10 requests arrive at the same time, the first response
will be after 1 second, second response at second 2, etc. Here is how we could describe
the rate limit
stream operation using marble diagrams commonly used in reactive
programming.
1 | ---r1-r2-r3-----r4--------r5----> // requests_ |
The above stream of requests has several requests arriving in close proximity.
We create a new stream rateLimited_
using rateLimit(requests_)
call.
The new stream rateLimited_
will output every request event, but will make sure
no events are closer than 1 second. I added 1 second interval marks to the bottom of
the diagram for clarity.
It is probably possible to write rateLimit
operation using the standard RxJS methods,
but I did not know how, so I wrote one myself. You can find it in the file
rate-limit.js. For example, let us rate limit a stream with interval
events 200ms apart.
1 | const in_ = Rx.Observable |
Let us apply rate limit to the requests received. For clarity, the console log includes the timestamp relative to the program start time.
1 | const interval = 1000 |
Notice that we have rate limited requests_.tap(...)
expression. Every method
call on the reactive stream creates a new stream. Thus we could write
(and fork and subscribe, etc) every intermediate step in the fluent chain.
1 | const requests_ = ... |
Back to the program (you can find it in src/02-1fps.js). We can test it by sending multiple requests at once.
$ node src/02-1fps.js
Server running at http://127.0.0.1:1337/
request to / at 10505
sending hello at 10506
request to /hi at 10527
request to / at 10528
sending hello at 11510
sending hello at 12506
Notice that the requests arrived at timestamps 10505, 10527 and 10528 (inside the 25 ms period), but the responses where sent at 10506, 11510 and 12506 (1 second apart).
Reactive programming makes reasoning about asynchronous events, controlling their timing, and combining them simpler. It is much much simpler than using callbacks, and even simpler than using Promises. Since the server logic naturally deals with multiple identical request events, a stream where the events can flow and be processed is a better mental model compared to single-execution Promise abstraction.
Now let us deal with side effects by applying Cycle.js principle.
What if the network was a function?
The gist of Cycle.js in my understanding is moving of everything that is outside of our control to inputs or outputs, leaving our main logic inside a pure function. User actions like mouse clicks and text inputs, Ajax requests and responses, updating DOM markup - these are abstracted away from our code; leaving only easy to reason and test pure logic.
A good idea before reading the rest of this blog is to watch these videos
Let us look at the reactive implementation again, and for simplicity I will use the non rate-limited implementation.
1 | const requests_ = new Rx.Subject(); |
Notice how the middle of our program is pretty simple (from requests_...
to
.subscribe()
lines). Once the request event enters the stream, the result is
only dependent on the input - the very definition of "pure function".
Outside this little block however, the things are much less clear. For example,
the sendHello()
function interacts with the network by using
res.end method - and that is prone to lots of problems and side effects.
Similarly, the http.createServer
code is the very definition of code with
side-effects - like binding to a socket and listening to the incoming requests.
Can we isolate the pure part of the code from the network code? Even better, can we
treat the output operation (setting the response header and writing to res.end
)
as a stream operation? We already have converted the input operation to a stream in
the code below, so we are half-way there!
1 | (req, res) => { |
Let us do this!
Separate "main"
Our first refactoring will be similar to lesson 2, and it will separate logic from
the http effect
. The logic returns a stream, and the http efect
takes that stream as
input. The relevant code is below.
1 | const requests_ = new Rx.Subject(); |
Using more of a Cycle.js notation, the output of main
is a "sink", and this sink is the
input to one or more "effects" functions.
1 | const sink = main() |
For now, we have only a single type of "sink" - the sink that should go to HTTP response.
In practical application we might return multiple sinks from the main()
call. For example,
we could also sink a log operation, and a database operation - all are side effects, thus need
a "sink" in the Cycle.js world.
1 | function main() { |
Introduce "run"
Manually mapping sinks to effects is tiresome. Let us connect things automatically. Just loop through the effects and find a sink with the same name. We just need a function to do this, let us call it "run"
1 | function main() { |
Same result, but the code in src/04-run.js is a little more generic. The object with all effects is called a "driver" in Cycle - because it connects pure logic with the outside interfaces, just like drivers connect software and the operating system to the hardware.
1 | const drivers = { |
Note: printing to the console is also a write effect, so we should NOT include this
in the main()
function. But for the sake of simplicity in the demo, I am leaving it there.
Abstract the input request better
Finally, let us take another look at the "read" effects - the http requests that
arrive over the network. We need to package them up - they should be the inputs to our main
function. Right now, the requests_
object the main
uses is outside its lexical scope,
a big "no-no" in pure functions.
We need to connect the "read" effects to the main
function, let us just give it an object,
and just like drivers above, let us have an object, with HTTP being a source of effects.
1 | function main(sources) { |
Where is the sources
object coming from? It has to come from the run
function
of course - the run
function grabs all the sources (the "read" effects) and calls
"drivers" on the result of main
. Here it is
1 | const httpEffect = makeHttpEffect() |
Remember an important point when looking at this code - the "read" effect, and the
output from the main
are reactive streams! The "write" effect functions expect
streams as inputs too. The run
function is only executed once - that is why the
inputs and outputs are a constant references - this is the setup method, not the runtime
data flow. The flow will happen inside the connected streams.
Finally, we need to see how to create the httpEffect
object. It is the glue that
connects the input events from the HTTP socket all the way through the main
to the response written back into the socket. It will even have a callback
for the http.createServer
to use!
1 | function makeHttpEffect() { |
The 3 properties are:
readEffect
- the input request stream, notice how it is private to the functionmakeHttpEffect
.writeEffect
- that subscribes to the model stream and can write responses backserverCallback
- the input to the stream, used by the HTTP server function
1 | const httpEffect = makeHttpEffect() |
You can find the full code in 05-sources.js
All together
Let me mark pure functions vs non-pure in the code above.
1 | const Rx = require('rx'); |
We have been able to separate the logic (our main
function, run
, etc.) from functions
that can have side effects (readEffect
and writeEffect
, createServer
functions).
This separation helps when designing software, writing code and testing it.
If you compare this code with the canonical Cycle.js code, like the example shown in lesson 5, you will notice a major difference. We do not have to wrap the global "document" object in read and write effects. Instead, the write effect has all the information available right away (the response object), and it does not feed back into the "read" effect object - only the HTTP server feeds new events into it.
This means the above example does not close the "network -> computer -> network" loop (or does not form the cycle, if you want a pun), like the DOM examples show. Different use cases lead to different code!