Multiplexing Streams


In the previous article of this series “Using a Remote Emitter”, we showed you how we can easily propagate events between Node.js processes.

This next article builds on this, showing you how you to apply a duplex stream to it, inside it, transport several streams, and create independent channels that use only one connection.

You can find the code for this article in this Github repo.

Enjoy!

Let’s say now that your refrigerator door sensor server has been proved a success, and that you want to extend it. For instance, you would like to sense the refrigerator temperature and make that data available to the clients connecting to the server.

But there’s a problem: you will need to add more event types and also change the ones that exist: openand close would have to be changed to door open and door close and you will also have to add temperature reading or something similar.

These events should be separated, i.e. transported in different streams, to make the server and client implementations easier and more adaptable to change. We could create different sub-streams on the server -> client stream, one for each device: one for the door sensor, another for the thermometer, and yet another should you choose to add yet another sensor to the refrigerator.

Fortunately this can be easily accomplished by using the mux-demux npm package: it lets us:

  • on the server side, multiplex several streams into one stream;
  • on the client side, do the inverse: de-multiplex that stream into separate streams to treat them individually

Example: The fridge server

Let’s then introduce this change onto our server, which now gets properly named fridge_server:

fridge_server.js:

var net = require('net');    
var DuplexEmitter = require('duplex-emitter');    
var Mux = require('mux-demux');
var server = net.createServer();
server.on('connection', handleConnection);
server.listen(8000, function() {    
  console.log('door server listening on %j', server.address());  
});  

// sensors
var sensors = \[    
  {  
    name: 'door',  
    events: \['open', 'close'\],  
    emitter: require('./door'),  
    remotes: {},  
    nextId: 0,  
    lastEvent: undefined  
  },  
  {  
    name: 'temperature',  
    events: \['reading'\],  
    emitter: require('./thermometer'),  
    remotes: {},  
    nextId: 0,  
    lastEvent: undefined  
  },  
\];  

// handle connections
function handleConnection(conn) {    
  var mx = Mux();
  conn.on('error', onError);  
  mx.on('error', onError);
  conn.pipe(mx).pipe(conn);
  sensors.forEach(attachSensor);
  function attachSensor(sensor) {  
    var stream = mx.createWriteStream(sensor.name);  
    var remoteEmitter = DuplexEmitter(stream);
    stream.once('close', onClose);  
    stream.on('error', onError);  
    mx.on('error', onError);
    // add remote to sensor remotes  
    var id = ++ sensor.nextId;  
    sensor.remotes\[id\] = remoteEmitter;
    if (sensor.lastEvent) {  
      remoteEmitter.emit.apply(remoteEmitter, sensor.lastEvent);  
    }
    function onClose() {  
      delete sensor.remotes\[id\];  
    }
  }
  function onError(err) {  
    conn.destroy();  
    console.error('Error on connection: ' + err.message);  
  }
}  

/// broadcast all sensor events to connections
sensors.forEach(function(sensor) {    
  sensor.events.forEach(function(event) {
    // broadcast all events of type \`event\`  
    sensor.emitter.on(event, broadcast(event, sensor.remotes));
    // store last event on \`sensor.lastEvent\`  
    sensor.emitter.on(event, function() {  
      var args = Array.prototype.slice.call(arguments);  
      args.unshift(event);  
      sensor.lastEvent = args;  
    });  
  });  
});
function broadcast(event, remotes) {    
  return function() {  
    var args = Array.prototype.slice.call(arguments);  
    args.unshift(event);
    Object.keys(remotes).forEach(function(emitterId) {  
      var remote = remotes\[emitterId\];  
      remote.emit.apply(remote, args);  
    });
  };  
}

The biggest change from the previous version is that we now have a sensor definition where we store:

  • name: the name of the sensor
  • events: which events the sensor will emit that we should propagate to the clients
  • emitter: which event emitter object emits the events for this sensor
  • remotes: the remote event emitters we should broadcast events to
  • nextId: the id we will assign to the next remote emitter
  • lastEvent: the last emitted event for this sensor, to be emitted once a client connects

Once the server gets a connection, we attach all of the defined sensors, storing and propagating all the interesting sensor events to the remote emitters. Each remote emitter has its own stream, which we create using mx.createWriteStream(name). This stream bears the same name as the sensor, which allows us to differentiate the streams on the client side.

The fridge client

The client now has to handle multiple streams inside the main one — let’s see what needs to change:

fridge_client.js:

var Mux = require('mux-demux');    
var Reconnect = require('reconnect-net');    
var DuplexEmitter = require('duplex-emitter');
var hostname = process.argv\[2\];    
var port = Number(process.argv\[3\]);    
var doorTimeoutSecs = Number(process.argv\[4\]);    
var maxTemperature = Number(process.argv\[5\]);
var reconnect = Reconnect(onConnect).connect(port, hostname);
var sensors = {    
  'door': handleDoor,  
  'temperature': handleTemperature  
};
function onConnect(conn) {    
  var mx = Mux(onStream);  
  conn.pipe(mx).pipe(conn);
  function onStream(stream) {  
    var handle = sensors\[stream.meta\];  
    if (! handle) {  
      throw new Error('Unknown stream: %j', stream.meta);  
    }  
    handle(DuplexEmitter(stream));  
  }  
}
/// Door
function handleDoor(door) {    
  var timeout;  
  var warned = false;
  door.on('open', onDoorOpen);  
  door.on('close', onDoorClose);
  function onDoorOpen() {  
    timeout = setTimeout(onDoorTimeout, doorTimeoutSecs \* 1e3);  
  }
  function onDoorClose() {  
    if (warned) {  
      warned = false;  
      console.log('closed now');  
    }  
    if (timeout) {  
      clearTimeout(timeout);  
    }  
  }
  function onDoorTimeout() {  
    warned = true;  
    console.error(  
      'DOOR OPEN FOR MORE THAN %d SECONDS, GO CLOSE IT!!!',  
      doorTimeoutSecs);  
  }  
}  

/// Temperature
function handleTemperature(temperature) {    
  temperature.on('reading', onTemperatureReading);
  function onTemperatureReading(temp, units) {  
    if (temp > maxTemperature) {  
      console.error('FRIDGE IS TOO HOT: %d %s', temp, units);  
    }  
  }  
}

This client now accepts an additional argument — the maximum temperature it allows before emitting a warning:

var maxTemperature = Number(process.argv\[5\]);

Also, it has a definition of all the expected sensors and their function handlers:

var sensors = {    
  'door': handleDoor,  
  'temperature': handleTemperature  
};

When we connect to the server we need to instantiate a Mux-Demux stream, setting a stream handler: Mux-Demux will emit a connection event on each sub-stream. Each one of the sub-streams has a new attribute named meta, bearing the same name we gave it on the server-side:

function onConnect(conn) {    
  var mx = Mux(onStream);  
  conn.pipe(mx).pipe(conn);
  function onStream(stream) {  
    var handle = sensors\[stream.meta\];  
    if (! handle) {  
      throw new Error('Unknown stream: %j', stream.meta);  
    }  
    handle(stream);  
  }  
}

Once we have the handler we can call create the duplex stream from the sub-stream and hand it off. The handleDoor function implements the same warning logic as before, and the new handleTemperatureemits a warning if the emitted temperature is higher than the given maximum temperature.

Let’s test this server and client now. First off, start the server:

$ node fridge\_server

In another shell window, start the client:

$ node fridge\_client localhost 8000 1 10

Given the arguments and the randomness of the server-side sensor emitters, you should soon start to see some warnings like this:

FRIDGE IS TOO HOT: 17.606447436846793 C    
DOOR OPEN FOR MORE THAN 1 SECONDS, GO CLOSE IT!!!    
FRIDGE IS TOO HOT: 13.070351709611714 C    
closed now    
DOOR OPEN FOR MORE THAN 1 SECOND, GO CLOSE IT!!!    
FRIDGE IS TOO HOT: 14.822801598347723 C    
closed now    
FRIDGE IS TOO HOT: 15.487561323679984 C    
DOOR OPEN FOR MORE THAN 1 SECOND, GO CLOSE IT!!!    
closed now    
FRIDGE IS TOO HOT: 13.501591393724084 C    
DOOR OPEN FOR MORE THAN 1 SECOND, GO CLOSE IT!!!    
closed now    
DOOR OPEN FOR MORE THAN 1 SECOND, GO CLOSE IT!!!    
closed now    
DOOR OPEN FOR MORE THAN 1 SECOND, GO CLOSE IT!!!    
closed now

Next article: Remote Procedure calls

In the next article for this series we’ll be taking a look at how you can perform remote procedure calls on top of a channel so that we can control our office fridge!

This article was extracted from the Networking Patterns, a book from the Node Patterns series.

Written for YLD.

You may also like:


Written by YLDMarch 3rd, 2016


Share this article