Orchestrating Callbacks


This is the second article from a new series about Node.js. We began by covering by covering the fundamental pattern for controlling flow in Node.js: the callback. In this new article we’ll discuss how we can further use this basic pattern to coordinate asynchronous operations. I hope you enjoy it!

Some of the code samples in this article you can find in this github repo.

- Pedro Teixeira, CTO, YLD!

Orchestrating callbacks

It’s not only Node core API that uses the callback pattern: a multitude of third-party libraries that live on NPM also use this pattern. This allows you to use a flow-control library like async to compose them in almost any way you would want.

To use async you must install it in the root directory of your code using:

$ npm install async

Series

The simplest case of I/O orchestration is the case where you want to chain a series of I/O calls together, one after another, and interrupt them if an error occurs.

This example runs two operations, each of them removing one file:

async_series_1.js:

const async = require('async');    
const fs = require('fs');
var work = \[removeFile1, removeFile2\];
function removeFile1(cb) {    
  fs.unlink('./file1.txt', cb);  
}
function removeFile2(cb) {    
  fs.unlink('./file2.txt', cb);  
}
async.series(work, done);
function done(err) {    
  if (err) throw err;  
  console.log('done');  
}

The async.series function gets two arguments: a first argument with an array of functions that are to be invoked in sequence. These functions (our removeFile1 and removeFile2) have to have the same signature: a callback function as the first and sole argument. Then the second argument of async.series is a final callback that gets called when all of the functions terminate, or when one function gets an error.

If you place two files named file1.txt and file2.txt in the current directory, you will be able to run this without an error:

$ node async\_series\_1.js

If you run this again you should get an error on the done callback because the file1.txt was removed:

if (err) throw err;  
                 ^  
Error: ENOENT, unlink './file1.txt'

Here you can see that the series flow breaks the first time an error occurs, and in this case the final callback function (done) gets called with an error in the first arguments.

Parallel

If the I/O you’re doing is independent, you can reduce the total time a set of operations take by doing them in parallel.

const async = require('async');    
const fs = require('fs');
var work = \[removeFile1, removeFile2\];
function removeFile1(cb) {    
  fs.unlink('./file1.txt', cb);  
}
function removeFile2(cb) {    
  fs.unlink('./file2.txt', cb);  
}  

async.parallel(work, done);
function done(err) {    
  if (err) throw err;  
  console.log('done');  
}

The only thing we changed here was to replace async.series with async.parallel to cause these two I/O operations to be performed in parallel.

If one of these operations fails, the first one to fail triggers the done callback with an error. If any of the pending calls finish, the done callback doesn’t get called. (Remember: a callback function is never called twice; this is part of the contract.)

This is one of the drawbacks of using I/O in parallel instead of in series: you’re only able to handle the first error that happens. If you need to handle each one of them, either avoid parallel calls using async.series or have one custom callback to handle the error on each individual operation.

Waterfall

The reason for using async.series would be that the I/O calls were dependent. Imagine that first you need to fetch a value from a remote system like a database server in order to be able to create the next request. Here is such an example:

function befriend(user1, user2, cb) {
  var alreadyFrends = false;
  async.series(\[findIfAlreadyFriended, friend\], cb);
  function findIfAlreadyFriended(cb) {  
    Friends.exists({from: user1, to: user2}, function(err, areFriends) {  
      if (err) {  
        cb(err);  
      }  
      else {  
        alreadyFriends = areFriends;  
      }  
    });  
  }
  function friend(cb) {  
    if (alreadyExists) {  
      return cb();  
    }
    Friends.insert({from: user1, to: user2}, cb);  
  }  
}

Notice that we’re using the befriend third argument, a callback function, as a direct last argument of the async.series call. Here you can already see a big advantage of the callback pattern: you don’t have to adapt between different function signature patterns – you can pass the callback function directly as an argument.

This befriend function creates a “friend” link in a remote database. It’s using async.series to compose these two functions: the first finds out whether the link already exists, and the second creates the friendship link, but only if that link doesn’t already exist.

The findIfAlreadyFriended function then has to check the remote system to find out if such a link exists, and update the global alreadyFriends Boolean variable. This Boolean variable is then used in the next function in list, the friend function, to decide whether to do the link insertion or to callback immediately.

We designed this befriend function to not error out if such a friend link already exists. You could also design it to callback with an error, which would break the async.series flow as we wanted.

You can entirely avoid keeping a shared variable between functions by using the async.waterfallmethod:

function befriend(user1, user2, cb) {
  async.waterfall(\[findIfAlreadyFriended, friend\], cb);
  function findIfAlreadyFriended(cb) {  
    Friends.exists({from: user1, to: user2}, cb);  
  }
  function friend(alreadyExists, cb) {  
    if (alreadyExists) return cb();
    Friends.insert({from: user1, to: user2}, cb);  
  }  
}

Knowing that the Friends.exists function already calls back with a signature of (err, exists), we use that to our advantage: we let async.waterfall pass whatever the results of the callback are into the next function, keeping the state you need in the call stack instead of the outer scope.

As in all the async flow control directives, async.waterfall also breaks on the first error, handing it off to the final callback.

Watch here that the friend function always has to invoke the cb callback no matter what. If we don’t, the befriend callback will not get invoked in some cases. This is one of the hardest parts of using callbacks: making sure they always get invoked exactly once, no matter what.

Collections

If, instead of having a set of different functions, you want to apply the same function to a set of objects, you can use async’s collection iteration function utilities.

Parallel iteration

As an example let’s say that, given a list of documents, you want to insert all of the documents into this remote database:

var docs = // an array of documents
async.each(docs, insertDocument, done);
function insertDocument(doc, cb) {    
  db.insert(doc, cb);  
}
function done(err) {    
  if (err) {  
    handleError(err);  
  }  
  else {  
    console.log('inserted all documents');  
  }  
}

Here we see that async.each accepts three arguments:

  1. The collection (a JavaScript array)
  2. The iterator function
  3. A final callback function, called when all iterations are complete, or on the first occasion that an error occurs.

Our iterator function insertDocument will then be called by async for each document, in parallel. This function is responsible for accepting one object as the first argument, and a callback function as the last argument; this last to be called when there is an error, or this particular operation terminates.

Given that our insertDocument is only calling one function, we could use JavaScript Object#bind and reduce our example to:

var docs = // an array of documents
async.each(docs, db.insert.bind(db), done);
function done(err) {    
  if (err) {  
    handleError(err);  
  }  
  else {  
    console.log('inserted all documents');  
  }  
}

Parallel, but with limit

The async.each function is quite handy, but is only useful if we know that the maximum length of the collection is relatively small. If the collection is too large, we risk:

  • overloading the receiving system. For instance if, as in this example, we’re invoking this procedure on a remote system, too many of those in parallel will risk overburdening that system. Worse still, if many of these operations are done, each one when answering a client request, we multiply this effect by the number of parallel client requests.
  • overloading Node.js memory. Each of these calls inevitably allocates some resources on our Node.js process: some structures, closures and file descriptors. These resources will eventually be closed once the operation finishes, or will be garbage-collected in the near future — but too many of these in parallel and we risk putting too memory much pressure, eventually leading to resource exhaustion.
  • blocking the event loop. Each one of the calls to start one of these operations blocks the event for a short period of time. Multiply that period of time by the number of objects you’re iterating on, and you may be blocking the event loop for a long time, possibly degrading the experience for other users currently being served by your Node.js process.

To prevent this you can either cap the collection, thus losing documents, or you can simply use async.eachLimit, which allows you to impose a limit on the number of outstanding operations:

var docs = // an array of documents
var parallelLimit = 5;
async.eachLimit(docs, parallelLimit, db.insert.bind(db), done);
function done(err) {    
  if (err) {  
    handleError;  
  }  
  else {  
    console.log('inserted all documents');  
  }  
}

By using async.eachLimit here, we’re defining that we’re allowing a maximum of five outstanding operations: that is, at any time, there is a maximum of five ongoing document insert calls. In this way you can help reduce the pressure on an external service, as well helping to reduce the pressure on memory and other resources.

Bear in mind, though, that this technique doesn’t reduce the overall pressure that your Node.js process puts on an external service. If, for instance, you’re going to perform this operation for every user request, even a local parallel limit of five like this one may not be enough, since the effect must be calculated by multiplying the parallel limit by the number of requests being served at any given time. (To reduce that, you can use other patterns such as a global queue or a connection pool).

Note that the time that it takes for a client request to complete may increase as you increasely limit the maximum outstanding parallel operations, which in turn may increase the number of pending parallel requests. This number can be fine-tuned according to your application load patterns.

Next Article

In the next article of this series we’ll analyze how, still leveraging the callback pattern, how you could use a work queue to further tight down and control the flow of operations.

You can find the previous post on Flow Control here:

Written by Pedro Teixeira

Originally published at blog.yld.io on October 30, 2015.


Written by YLDOctober 5th, 2015


Share this article