Using Streams — Readable, Writable, Transform Streams and Flow Control


This is the fifth article from a new series about Node.js. In the past article we covered the Event Emitter, which is a convenient pattern for getting notifications of complex state changes in a decoupled manner. But some objects may be simpler than that: they’re a source of data or the target of data.

If you need to process a stream of data from a source, you need to create one such stream, or even repeatedly transform or filter data, Node has that abstraction for you: Streams.

- Pedro Teixeira

(Some of the code samples in this article you can find in this github repo).

Readable stream

In this article we’re going to continue building onto our home automation system. Our system has a thermometer that frequently emits temperature readings. This thermometer acts like a data tap; what Node.js normally calls a ‘Readable Stream’.

Here is how we could listen to the temperature readings from that thermometer:

var thermometer = home.rooms.living\_room.thermometer;
thermometer.on('data', function(temperature) {    
  console.log('Living room temperature is: %d C', temperature);  
});

Here you can see that a readable stream is a special case of an event emitter, emitting data events every time some new data is available.

A readable stream can emit any type of data: binary data in the form of buffers or strings, or even more complex data in the form of any JavaScript object.

In addition to emitting data, a readable stream can be paused and resumed:

thermometer.pause();
// in 5 seconds, resume monitoring temperature:  
setTimeout(function() {    
  thermometer.resume();  
}, 5000);

When paused, a readable stream will emit no more data until it has been resumed.

There are several examples of readable stream implementations in the Node.js and outside of it.

Here are some of them:

  • The contents of a file
  • An server HTTP request body
  • A server TCP connection
  • A client TCP connection
  • An client HTTP response body
  • The changes on a database
  • A video stream
  • An audio stream
  • The results of a database query
  • and many more…

Creating a readable stream

There are several ways of creating a readable stream. One way is to inherit from Node.js stream.Readable, implementing the _read method.

thermometer.js:

var Readable = require('stream').Readable;    
var util = require('util');
module.exports = Thermometer;
function Thermometer(options) {    
  if (! (this instanceof Thermometer)) return new Thermometer(options);  
  if (! options) options = {};  
  options.objectMode = true;  
  Readable.call(this, options);  
}
util.inherits(Thermometer, Readable);
Thermometer.prototype.\_read = function read() {    
  var self = this;
  getTemperatureReadingFromThermometer(function(err, temperature) {  
    if (err) self.emit('error', err);  
    else self.push(temperature);  
  });  
};

Our thermometer constructor initialises itself by calling the super-class constructor on itself, assuring that options.objectMode is true. This enables the stream to handle data other than strings or buffers, as is our case.

Besides inheriting from stream.Readable, our readable stream has to implement the _read method, which is called when the stream is ready to pull in some data. (More about this data pulling later.) This method then fetches the needed data (in our case, from the hardware temperature sensor) and, once available, gets pushed into the stream by using stream.push(data).

Pull stream vs. push stream

There are two main types of read stream: one that you must pull data from, and one that pushes data to you. Am illustration of a push stream is a water tap: once you open it, it keeps gushing water. An illustration of a pull stream can be a drinking straw: it pulls the water only when the straw user sucks on it.

Another, and perhaps a more apt, real-world analogy of a pull stream is a bucket brigade. If there is nobody there to take the bucket, the bucket holder is just left standing there waiting.

Node.js core streams have these two modes. If you simply listen for the data event, the push mode is activated and data flows as fast as the underlying resources can push them:

thermometer_client_push.js:

var Thermometer = require('./thermometer');
var thermomether = Thermometer();
thermomether.on('data', function(temp) {    
  console.log('temp:', temp);  
});

If, instead, you read from the stream, you’re using the default pull mode and reading at your own rate, as in this example:

var Thermometer = require('./thermometer');
var thermometer = Thermometer({highWaterMark: 1});
setInterval(function() {    
  var temp = thermometer.read();  
  console.log('temp:', temp);  
}, 1000);

This client uses the stream.read() method to get the latest reading from the stream. Notice that we initialise the stream with a highWaterMark option value of 1. When the stream is in object mode, this setting defines the maximum number of objects that our stream buffers. As we want to minimise the possibility of getting stale temperature data, we only keep a maximum of one reading buffered.

When we introduce stream.pipe for hooking streams together, you will see where these values, and the way data flows, come into play.

Writable streams

There’s another type of stream that, instead of emitting data, can have data sent to it: the writable stream. Some examples of writable streams in Node are:

  • a writable file in append mode
  • a TCP connection
  • the process standard output stream
  • a server HTTP response body
  • a database bucket (or table)
  • an HTML parser
  • a remote logger
  • … and many more

To write into a writable stream you simply call stream.write(o), passing in the data you’re writing. You can also pass in a function that gets called once that payload gets flushed: stream.write(payload, callback).

To implement a writable stream you can inherit from Node.js stream.Writable and implement the protected stream._write method.

As an example, let’s say that you want to write every single thermometer reading into this database server, but the database layer module you have to use does not provide a streaming API. Let’s then implement one:

db_write_stream.js:

var Writable = require('stream').Writable;    
var util = require('util');
module.exports = DatabaseWriteStream;
function DatabaseWriteStream(options) {    
  if (! (this instanceof DatabaseWriteStream))  
    return new DatabaseWriteStream(options);  
  if (! options) options = {};  
  options.objectMode = true;  
  Writable.call(this, options);  
}
util.inherits(DatabaseWriteStream, Writable);
DatabaseWriteStream.prototype.\_write = function write(doc, encoding, callback) {    
  insertIntoDatabase(JSON.stringify(doc), callback);  
};

The implementation of this write stream is really simple: besides the inheritance ceremony and enabling object mode, the only relevant thing here is to define the _write function that performs the actual write into the underlying resource (our document database). The _write function must accept three arguments:

  • chunk: the piece of data you’re writing. In our case, it’s the document we’re about to insert into the database.
  • encoding: if the data is a string (not our case), what is the encoding?
  • callback: a callback function to be called once the write finishes or errors.

We can now use this stream to write thermometer data into:

var DbWriteStream = require('./db\_write\_stream');    
var db = DbWriteStream();
var Thermometer = require('./thermometer');
var thermomether = Thermometer();
thermomether.on('data', function(temp) {    
  db.write({when: Date.now(), temperature: temp});  
});

OK, this is nice — but why did we bother creating a writable stream on top of our database? Why not just stick with the original database module? Because streams are composable by using stream.pipe().

Connecting streams using pipe

Instead of manually connecting the two streams, we can connect a readable stream into a writable stream using stream.pipe like this:

var DbWriteStream = require('./db\_write\_stream');    
var db = DbWriteStream();
var Thermometer = require('./thermometer');    
var thermomether = Thermometer();
thermomether.pipe(db);

This keeps the data flowing between our thermometer and our database. If, later in time, we want to stop the flow, we can disconnect both streams using unpipe:

// Unpipe after 10 seconds
setTimeout(function() {    
  thermometer.unpipe(db);  
}, 10e3);

The advantage of pipe is not only to avoid us from writing code to transport the stream chunks from one stream into another: it also gives us flow control for free.

Flow control in streams

When you hook a readable and writeable stream together with readable.pipe(writable), the flow of data is adjusted to the consumer speed. Under the hood, pipe uses the writable .write() method, and uses the method return value to decide whether or not to pause the source stream like this: if the writable.write returns true, it means that the written data was flushed into the underlying resource — our database in this specific case. If the call returns false, it means that the writable stream is buffering the write, which means that the source stream needs to be paused. Once the writable stream is drained, it appropriately emits the drain event, signalling the pipe to resume the source stream.

Also, and as we’ve touched on briefly, you can control the maximum amount of buffering that a readable stream does by defining the options.highWaterMark value. If the stream is a binary one (also sometimes called a “raw” stream), this value represents the maximum amount of bytes that the stream is willing to buffer. If the stream is in object mode, it represents the maximum number of objects that it’s willing to buffer.

As an implementor of any type stream, you don’t have to worry about the implementation of this. When implementing a writable stream, you just have to callback when the given chunk or object is sent. When using a readable stream, the stream client can define the options.highWaterMark value to control the maximum amount of buffering. All the rest is handled by the Node.js streams’ implementation.

Transform streams

We’ve looked at readable and writable streams, but there is a third type of stream that combines these two: it’s called a Through or Transform stream.

Streams are not necessarily meant to be used in pairs: one readable stream piped into a writable stream. They can be composed into this pipeline where the data flows from a readable stream into one or more transform streams and ends up in a writable stream.

In our database writable stream example, we accept JavaScript objects and transform them into a JSON string before inserting them in the database. Instead we can create a generic transform stream that does that:

json_encode_stream.js:

var Transform = require('stream').Transform;    
var inherits = require('util').inherits;
module.exports = JSONEncode;
function JSONEncode(options) {    
  if ( ! (this instanceof JSONEncode))  
    return new JSONEncode(options);
  if (! options) options = {};  
  options.objectMode = true;  
  Transform.call(this, options);  
}
inherits(JSONEncode, Transform);
JSONEncode.prototype.\_transform = function **\_transform**(obj, encoding, callback) {    
  try {  
    obj = JSON.stringify(obj);  
  } catch(err) {  
    return callback(err);  
  }
  this.push(obj);  
  callback();  
};

To implement a transform stream we inherit from Node’s stream.Transform pseudo-class and implement the protected _transform method, which accepts the raw JavaScript object and pushes in a JSON-encoded string representation of it.

Also, you may have noticed that, when we introduced pipes, we gave up on pushing documents with timestamps into the database; instead we pushed raw temperature readings. We can correct this now by introducing another transform stream that accepts a temperature reading and spits out a time-stamped document:

to_timestamped_document_stream.js:

var Transform = require('stream').Transform;    
var inherits = require('util').inherits;
module.exports = ToTimestampedDocTransform;
function ToTimestampedDocTransform(options) {    
  if ( ! (this instanceof JSONTransform))  
    return new JSONTransform(options);
  if (! options) options = {};  
  options.objectMode = true;  
  Transform.call(this, options);  
}
inherits(ToTimestampedDocTransform, Transform);
ToTimestampedDocTransform.prototype.\_transform = function **\_transform**(temperature, encoding, callback) {    
  this.push({when: Date.now(), temperature: temperature});  
  callback();  
};

Since now we no longer need to create a document, we can simplify the database writable stream:

db_write_stream.js:

var Writable = require('stream').Writable;    
var util = require('util');
module.exports = DatabaseWriteStream;
function DatabaseWriteStream(options) {    
  if (! (this instanceof DatabaseWriteStream))  
    return new DatabaseWriteStream(options);  
  if (! options) options = {};  
  options.objectMode = true;  
  Writable.call(this, options);  
}
util.inherits(DatabaseWriteStream, Writable);
DatabaseWriteStream.prototype.\_write = function write(doc, encoding, callback) {    
  insertIntoDatabase(doc, callback);  
};
function insertIntoDatabase(doc, cb) {    
  setTimeout(cb, 10);  
}

We can finally instantiate and pipe these streams together:

client.js:

var DbWriteStream = require('./db\_write\_stream');    
var db = DbWriteStream();
var JSONEncodeStream = require('./json\_encode\_stream');    
var json = JSONEncodeStream();
var ToTimestampedDocumentStream = require('./to\_timestamped\_document\_stream');    
var doc = ToTimestampedDocumentStream();
var Thermometer = require('../thermometer');
var thermometer = Thermometer();
thermometer.pipe(doc).pipe(json).pipe(db);

Here we’re taking advantage of the fact that stream.pipe() returns the target stream, which we can use to pipe into the next stream, and so on.

Third-party Streams

Besides all the internal streams node has, NPM has a lot of packages that implement transformation streams. You can use some of them to encode, decode, filter or somehow transform your data.

NPM also contains many packages that export a streaming API (like databases, websocket servers, websocket clients, etc.), which you can install, create and pipe together like if playing with Legos.

Have fun!

Next Article

This article completes the series about flow control in Node.js.

You can find all the previous posts on this topic here:

The next article will be the first of a series of articles that talk about handling configuration data in Node.js.

Written by Pedro Teixeira — published for YLD.

You may also like:


Written by YLDJanuary 3rd, 2016


Share this article