Kliesli composition

Connecting Kliesli compositions or the advantages of good pipe insulation.

Cable inside a cable

Please read Kliesli Compositions in JavaScript by Luis Atencio first. This blog post is just a refactoring of the code for clarity. While Luis does an admirable job showing the composition (a compliment), the final code is not as clear as it could be. We can even say that the code by putting type unwrapping and wrapping inside each composed function strays away from the functional principles of simplicity and composability. See last part for comparison, but hope you read my derivation before jumping to the end.

The example

Imagine you have a text file

1
2
$ cat text.txt
word word another word

Imagine you want to count words in this single line text file. Do not handle any errors while coding it. Instead concentrate on making the program from small easy to understand functions. You might write a function to read the file, then another function to decode it, then another function to split text into words. Final function could count the words and return a number, which the program can print. In code this is simple:

happy-path.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
const fs = require('fs')

const read = path =>
fs.readFileSync(path)

const check = buffer =>
buffer.length > 0 ? buffer : null

const decode = encoding => buffer =>
buffer.toString(encoding)

const count = text =>
text.split(' ').length

const processFile = path =>
count(decode('utf8')(check(read(path))))

console.log(processFile('./text.txt'))

When we run this program with an existing, non-empty file, things are great

1
$ cat text.txt
word word another word
$ node happy-path.js
4

Yet, this program is extremely brittle. It does not handle any errors that might happen in the real world. The file might not exist or it might be empty. This program is also less than performant because it reads the file in blocking manner using fs.readFileSync instead of using a callback. But if we used a callback, we could not easily perform composition check(read(path)).

Composition of functions

Let us start with an observation that the function processFile is really a seriest of nested function calls. The only variable in the function is path and that goes inside the "deepest" nested call read(path). We could draw what happens to the data in processFile as a data processing pipeline

1
2
3
4
5
6
const processFile = path =>
count(decode('utf8')(check(read(path))))
//
// string => buffer => buffer|null => string => number
//
// read check decode count

Note the flow of data goes from the most nested function read that executes first all the way to count that executes last. Each function expected a single argument and returns a single result. Well, except for decode, but that one due to being split into a function returning a function is really making a function we really want on the fly:

1
2
3
const decodeUtf8 = decode('utf8')
const processFile = path =>
count(decodeUtf8(check(read(path))))

Function read takes a string argument, and function count returns a number. Thus the composed function processFile expects a string and returns a number.

We wrote processFile but really is is just a series of function calls, each grabbing the result of the previous step and calling the next function in the list. People have been writing functions to do this for us for a long time. Ramda for example has R.compose

1
2
3
4
5
6
7
8
const R = require('ramda')
const processFile = R.compose(
count,
decode('utf8'),
check,
read
)
console.log(processFile('./text.txt'))

Note that we have constructed processFile without even defining the variable path (this is called pointfree style). I personally prefer the R.pipe function that reverses the order of composed functions. I think it reads more naturally top to bottom. In addition, we can write type of the result at each step; types usually start with an upper case letter.

1
2
3
4
5
6
const processFile = R.pipe(
read, // String -> Buffer
check, // Buffer -> Buffer | Null
decode('utf8'), // Buffer -> String
count // String -> Number
)

By taking the input to first step read (that is string) and output of the last step count (that is of type number) we can write the input and output types for the composed function processFile. We could write it like this:

1
2
// processFile :: String -> Number
const processFile = ...

Ok we can create the program function out of small building blocks on the fly, but the program really cannot handle any errors! It will crash and burn badly in a very common case - if the input file does not exist.

Task

Let us kill two birds with one stone. Let us make the program asynchronous by reading a file using a callback, and let us handle any errors reading the file. Instead of passing "plain" values from one function to another, the read will return the contents of the file, but stored inside a "Task" object. I will use data.task for this. Almost verbatim from the data.task example

1
2
3
4
5
6
7
8
9
// read :: String -> Task(Error, Buffer)
function read(path) {
return new Task(function(reject, resolve) {
fs.readFile(path, function(error, data) {
if (error) reject(error)
else resolve(data)
})
})
}

Calling read with a string returns you a "Task". And that "Task" later will have either an Error or a Buffer. Due to its asynchronous nature, we can no longer run check or decode or even print the file contents right away. Instead we need to attach actions we want to executes as callbacks to the Task object.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// read :: String -> Task(Error, Buffer)
function read(path) {
return new Task(function(reject, resolve) {
fs.readFile(path, function(error, data) {
if (error) reject(error)
else resolve(data)
})
})
}
// note double error; decode is a function returning a function
// decode :: String -> Buffer -> String
const decode = encoding => buffer =>
buffer.toString(encoding)

read('./text.txt')
.map(decode('utf8'))
.fork(console.log, console.error)
// word word another word

This is kind of interesting: Task.map expects a function that knows nothing about working with a Task. Instead the callback function to Task.map receives the value from inside Task (whenever it becomes available). The "plain" value returned by the callback function decode('utf8') is automatically placed back into a Task object.

The result of Task.map(...) is another Task. We can keep "mapping" over previous result.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// read :: String -> Task(Error, Buffer)
function read(path) {
return new Task(function(reject, resolve) {
fs.readFile(path, function(error, data) {
if (error) reject(error)
else resolve(data)
})
})
}

// decode :: String -> Buffer -> String
const decode = encoding => buffer =>
buffer.toString(encoding)

// check :: Buffer -> Buffer | Null
const check = buffer =>
buffer.length > 0 ? buffer : null

// count :: String -> Number
const count = text =>
text.split(' ').length

read('./text.txt')
.map(decode('utf8'))
.map(check)
.map(count)
.fork(console.log, console.error)

Here is a cool thing about a Task. As you might have guessed each Task.map method call returns another Task instance, but nothing is executed until Task.fork is called.

We can demonstrate this by interspersing log statements with mapping calls

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
const t1 = read('./text.txt')
console.log('t1')
const t2 = t1.map(decode('utf8'))
console.log('t2')
const t3 = t2.map(check)
console.log('t3')
const t4 = t3.map(count)
console.log('t4')
t4.fork(console.log, console.error)
/*
t1
t2
t3
t4
4
*/

And we can demonstrate that no Tasks are executed by commenting out .fork() call and adding a log statement to read function.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// read :: String -> Task(Error, Buffer)
function read(path) {
return new Task(function(reject, resolve) {
console.log('read task')
fs.readFile(path, function(error, data) {
if (error) reject(error)
else resolve(data)
})
})
}
...
console.log('t4')
// t4.fork(console.log, console.error)
/*
t1
t2
t3
t4
*/

Delayed execution is the main benefit of Tasks - you can keep adding more computations until you are happy with the chain, and only then call Task.fork to actually run it. Promises on the other hand are eager - as soon as you have created a Promise it starts running.

Composing Task with plain functions

Now that we got our very first function read returning a Task, but we lost the ability to compose functions using R.compose or R.pipe. How can we compose a Task - it is no longer a "plain" value we can pass to the next function in the chain. Luckily, Ramda has a composition function just for this case. If every function in the chain expects a "plain" input but returns a Task, all functions can be composed again using a library utility R.composeK or R.pipeK (the K stands for Kliesli composition, but I am not linking a reference url because the Wikipedia article will scare you away for good). Think of this as a composition for functions that all return same wrapped type like Task.

To compose, each individual function in the chain must return result of type Task. The simplest case to wrap a "plain" value in a Task is to call Task.of(x) factory function.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// decode :: String -> Buffer -> Task(String)
const decode = encoding => buffer =>
Task.of(buffer.toString(encoding))

// check :: Buffer -> Task(Buffer | Null)
const check = buffer =>
Task.of(buffer.length > 0 ? buffer : null)

// count :: String -> Task(Number)
const count = text =>
Task.of(text.split(' ').length)

const processFile = R.pipeK(
read, // String -> Task(Error, Buffer)
check, // Buffer -> Task(Buffer | Null)
decode('utf8'), // Buffer -> Task(String)
count // String -> Task(Number)
)

processFile('./text.txt')
.fork(console.log, console.error)

If we decide to draw pipeline processFile, I would visualize it as a series of pipe segments. Each segment that we write expected "plain" value but outputs Task object. Yet except for read no other function we wrote deals with or needs an actual Task object! No other function is asynchronous, so hard-coding Task.of(result) inside each function is short sighed. It makes a function harder to read and harder to test. We only returned a Task from each so we could use these functions with R.pipeK.

I prefer adapting a function to each particular case, rather than changing it (see my favorite adaptors). Thus I will change the check, decode('utf8') and count functions back to their original "simple" form, and will convert the result into a Task on the fly.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// decode :: String -> Buffer -> String
const decode = encoding => buffer =>
buffer.toString(encoding)

// check :: Buffer -> Buffer | Null
const check = buffer =>
buffer.length > 0 ? buffer : null

// count :: String -> Number
const count = text =>
text.split(' ').length

const processFile = R.pipeK(
read, // String -> Task(Error, Buffer)
x => Task.of(check(x)), // Buffer -> Task(Buffer | Null)
x => Task.of(decode('utf8')(x)), // Buffer -> Task(String)
x => Task.of(count(x)) // String -> Task(Number)
)

Individual functions decode, check and count are simple again, but our pipeline is a little heavy. We notice that x => Task.of(check(x)) for example is functional composition itself!

1
2
3
4
5
6
const processFile = R.pipeK(
read, // String -> Task(Error, Buffer)
R.compose(Task.of, check), // Buffer -> Task(Buffer | Null)
R.compose(Task.of, decode('utf8')), // Buffer -> Task(String)
R.compose(Task.of, count) // String -> Task(Number)
)

Notice the repeated R.compose(Task.of, _) syntax. We could partially apply the first argument here to shorten it.

1
2
3
4
5
6
7
8
const asTask = R.partial(R.compose, [Task.of])

const processFile = R.pipeK(
read, // String -> Task(Error, Buffer)
asTask(check), // Buffer -> Task(Buffer | Null)
asTask(decode('utf8')), // Buffer -> Task(String)
asTask(count) // String -> Task(Number)
)

We could avoid even using R.partial if R.compose were curried, but it is hard to curry a function with unknown number of arguments. Luckily, Ramda includes R.o which is a curried compose! That is the function that makes our code tiny in this case.

1
2
3
4
5
6
7
8
const asTask = R.o(Task.of)

const processFile = R.pipeK(
read, // String -> Task(Error, Buffer)
asTask(check), // Buffer -> Task(Buffer | Null)
asTask(decode('utf8')), // Buffer -> Task(String)
asTask(count) // String -> Task(Number)
)

Perfect. We are composing functions, some of which return an actual Task, but most do not, and we are adapting the return value on the fly so the pipe line holds. The actual "flow" is still, aside from the read function, just "plain" values along the "happy" path.

1
---------------------------------------------------------
                  Task "pipe"

read -> check -> decode -> count       happy path "pipe"  ->


---------------------------------------------------------

What happens if there is a file read error? In that case, the control flow will skip the "happy path pipe", and will go directly to the error callback function in the .fork(onError, onSuccess) execution.

1
---------------------------------------------------------
                  Task "pipe"

read -> check -> decode -> count       happy path "pipe"   -> onSuccess
     \
      ->   ~~~~~~ to the error callback ~~~~~~~~~~~~~~~~~~ -> onError
---------------------------------------------------------

I like thinking of Task / Promises / Either making little railway tracks, and the data moving along the tracks like box cars. Sometimes due to an error, the box car jumps to an error track where it will keep rolling until someone handles the lost car. Watch Functional programming design patterns by Scott Wlaschin for a good talk using this analogy.

Notice that check, decode and count do not take advantage of the full Task pipeline, unlike read. Also, they probably should not - they are synchronous functions and their problem is a different one. Take check for example: it returns null value to indicate an empty file. But what if the null value was a legitimate one? Would it return -1 or some magic constant to indicate a problem? Or would it throw an Error? And how could we compose these functions safely in that case?

Either pipeline

Let us take a look at check function. Again, just like read, it should not return a "plain" special value to indicate a problem. Instead it should return an object that, just like Task, allows mapping over the inner value. This wrapper is called Either, it is commonly used to replace multiple if-else branches and there many libraries that implement it. I will use data.either.

1
2
3
4
5
6
const Either = require('data.either')
// check : Buffer -> Either(Buffer)
const check = buffer =>
buffer.length > 0
? Either.Right(buffer) // success ✅
: Either.Left('File is empty') // failure ☢️

Great, what about decode and count? They too could just return Either, and we could compose all 3 functions into single pipeline using R.pipeK, only this time the result would be an Either(...) object.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// decode : String -> Buffer -> Either(String)
const decode = encoding => buffer => Either.of(
buffer.toString(encoding)
)

// count :: String -> Either(Number)
const count = text => Either.of(
text.split(' ').length
)

// processBuffer :: Buffer -> Either(Number)
const processBuffer = R.pipeK(
check, // Either(Buffer)
decode('utf8'), // Either(String)
count // Either(Number)
)

The way to get the value from Either is to NOT ignore possible errors, and for example provide default value.

1
2
3
4
5
console.log(
processBuffer(Buffer.from('foo bar', 'utf8'))
.getOrElse('Hmm, an error')
)
// 2

An Either is especially useful here, because a function like decode might receive an invalid encoding; Either allows us to avoid crashing.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// decode : String -> Buffer -> Either(String)
const decode = encoding => buffer => {
try {
return Either.Right(buffer.toString(encoding))
} catch (err) {
return Either.Left('Could not decode using ' + encoding)
}
}
// processBuffer :: Buffer -> Either(Number)
const processBuffer = R.pipeK(
check, // Either(Buffer)
decode('ffff'), // Either(String)
count // Either(Number)
)

console.log(
processBuffer(Buffer.from('foo bar', 'utf8'))
.getOrElse('Hmm, an error')
)
// Hmm, an error

We could do the same trick and NOT hard code returned Either type in count

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// count :: String -> Number
const count = text =>
text.split(' ').length

const asEither = R.o(Either.of)

// processBuffer :: Buffer -> Either(Number)
const processBuffer = R.pipeK(
check, // Either(Buffer)
decode('utf8'), // Either(String)
asEither(count) // Either(Number)
)

console.log(
processBuffer(Buffer.from('foo bar', 'utf8'))
.getOrElse('Hmm, an error')
)

In a sense we have constructed a pipeline where each function (well, except count but that only gets called with a valid string input) is safe.

1
-----------------------------------------
            Either "pipe"
check -> decode -> count

-----------------------------------------

From check and from decode, if there is an error, the control will "jump" to the error track.

1
-----------------------------------------
            Either "pipe"
check -> decode -> count  -> Right(n)
    \      \
     \-> ~~~> ~~~~~~~~~~~ -> Left(error)
-----------------------------------------

Do you like my ASCII drawing skills?!

Combining pipes

Finally, let us connect the two pipelines we have composed. We cannot combine Task and Either segments of the pipeline sequentially unfortunately.

1
------------=====-----------------
            ---->            ---->
   Task        |     Either
            ~~~~>            ~~~~>
------------=====-----------------

To see why, think about when you are going to call Task.fork - is before creating the first Either? No, that cannot be right, we want the final object to be a Task, so we can call Task.fork on it when we are ready to use it. The opposite is not true - we could convert an Either into a Task, see Natural Transformation video for example.

Ok, back to our code.

We have to stick the Either pipeline inside the Task pipeline. The read function will start the outer Task pipe. The result of read will be passed to the inner Either pipe check -> decode -> count. The output of the outer Task pipe that Task.fork will pass to the callback function will be an Either object returned by the Either pipe. As a diagram it would look like this

1
---------------------------------------------------------
                   Task "pipe"
         ------------------------------------------------
read ->   check -> decode -> count ---- success Number ->
  \           \      \                Either "pipe"
   \           \-> ~~~> ~~~~~~~ buffer error ~~~~~~~~~ ->
    \    ------------------------------------------------
     -> ~~~~~~~~~~~~~~ file read error ~~~~~~~~~~~~~~~ ->
---------------------------------------------------------

The Either pipeline is the same as before (decode does not check the encoding here).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// check : Buffer -> Either(Buffer)
const check = buffer =>
buffer.length > 0
? Either.Right(buffer) // success!
: Either.Left('File is empty') // failure

// decode : String -> Buffer -> String
const decode = encoding => buffer =>
buffer.toString(encoding)

// count :: String -> Number
const count = text =>
text.split(' ').length

const asEither = R.o(Either.of)

// processBuffer :: Buffer -> Either(Number)
const processBuffer = R.pipeK(
check, // Either(Buffer)
asEither(decode('utf8')), // Either(String)
asEither(count) // Either(Number)
)

Now we need to combine read and processBuffer as a Task-returning functions, and we can use the same asTask approach, because processBuffer is a "regular" function we can adapt.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// read : String -> Task(Error, Buffer)
function read(path) {
return new Task(function(reject, resolve) {
fs.readFile(path, function(error, data) {
if (error) reject(error)
else resolve(data)
})
})
}

const asTask = R.o(Task.of)

// processFile :: String -> Task(Either(Number))
const processFile = R.pipeK(
read, // Task(Error, Buffer)
asTask(processBuffer) // Task(Either(Number))
)

How do we get result from the output of processFile which is Task(Either(Number))? In two steps: first calling .fork then .getOrElse

1
2
3
4
processFile('./text.txt')
.fork(err => console.error(err),
e => console.log(e.getOrElse(0))
)

The double-pipe insulation handles missing file, access errors, empty file and could be easily extended to cover invalid encodings and other errors. All we need is to return an Either - we are already making this the function's return type anyway in preparation for R.pipeK anyway.

Comparison to the original blog post example

To finish this blog post I want to go back to the Luis Atencio and his example. Because Luis only uses single composition to make the Task pipe, it looks deceptively simple (I will switch compose to pipe for easier comparison)

1
2
3
4
5
6
7
// processFile :: String -> Task(Error, Either)
const processFile = R.pipeK(
read,
check,
decode('utf8'),
count
)

Yet, and this is a big one - this forces each function check, decode and count to receive what an Either as argument, and return Task result. This makes the code inside the count function for example unwrap the argument to get to the plain string value.

1
2
// count :: String -> Task(_, String)
const count = text => Task.of(text.map(t => t.split(' ').length))

Note the type signature - it is incorrect, the function count receives a Either(String), not String. That is why it has to do text.map to get the actual string t. Similarly there is no reason (other than forcing the function to be compatible with a Task pipeline) for count to return a Task!

Let us compare the count that fits into a Task pipe with our version of the same function that goes into Either pipe, but adapts the return value type externally using asEither composition. I will keep the same variable names for honest comparison.

1
2
3
4
// count :: Either(String) -> Task(_, String)
const count = text => Task.of(text.map(t => t.split(' ').length))
// count :: String -> Number
const count = t => t.split(' ').length

First version has 65 characters, second has 38. We saved almost 50% of code (and a lot of complexity) by keeping count focused on what it should actually do - split a string into an array and return its length. Leave the marshaling of arguments and returns to others.

Final thoughts

Thanks to Luis Atencio for great examples. Go buy his book Functional Programming in JavaScript - it is excellent and practical.