Engineering

Turning a stream into an RPC channel

Further developing our refrigerator project, you may want to add not only new sensors, but also some new functions like changing the fridge target temperature, starting a defrost cycle, or even turning it on or off. We could expose some of these functions as events to be sent by the client into the server, but events are not really a good suit for this type of remote access function. Instead, we should use another abstraction: a remote procedure call.

The server

Let’s then expose an RPC channel on our refrigerator server, using a Mux-Demux sub-stream and the dnode module. Let's first instal dnode:

$ npm install dnode

We can then add some commands to our fake refrigerator server, encapsulated in the commands.js local module:

commands.js:

var commands = exports;
commands.setTargetTemperature = randomTimeout(null, 'command accepted, target temperature set');  
commands.defrost = randomTimeout(null, 'command accepted, defrosting');  
commands.powerOn = randomTimeout(null, 'command accepted, fridge is on now');  
commands.powerOff = randomTimeout(null, 'command accepted, fridge is off now');
function randomTimeout() {  
  var replyArgs = Array.prototype.slice.call(arguments);
  return function() {
    var callback = arguments[arguments.length - 1];
    if (typeof callback == 'function') {
      var timeout = Math.floor(Math.random() * 1000);
      var args = [callback, timeout].concat(replyArgs);
      setTimeout.apply(null, args);
    }
  }
}

This module exposes a set of commands (setTargetTemperature, defrost, powerOn and powerOff) as functions. Here we're taking a shortcut to easily define a function that calls back with the given arguments somewhere before one second has elapsed.

Now we need to change our refrigerator server to expose these methods remotely:

fridge_server.js:

var net = require('net');  
var DuplexEmitter = require('duplex-emitter');  
var Mux = require('mux-demux');  
var dnode = require('dnode');  
var commands = require('./commands');
var server = net.createServer();
server.on('connection', handleConnection);
server.listen(8000, function() {  
  console.log('door server listening on %j', server.address());
});
// sensors
var sensors = [  
  {
    name: 'door',
    events: ['open', 'close'],
    emitter: require('./door'),
    remotes: {},
    nextId: 0,
    lastEvent: undefined
  },
  {
    name: 'temperature',
    events: ['reading'],
    emitter: require('./thermometer'),
    remotes: {},
    nextId: 0,
    lastEvent: undefined
  },
];
// handle connections
function handleConnection(conn) {  
  var mx = Mux(handleConnection);
  conn.on('error', onError);
  mx.on('error', onError);
  conn.pipe(mx).pipe(conn);
  sensors.forEach(attachSensor);
  function attachSensor(sensor) {
    var stream = mx.createWriteStream(sensor.name);
    var remoteEmitter = DuplexEmitter(stream);
    stream.once('close', onClose);
    stream.on('error', onError);
    mx.on('error', onError);
    // add remote to sensor remotes
    var id = ++ sensor.nextId;
    sensor.remotes[id] = remoteEmitter;
    if (sensor.lastEvent) {
      remoteEmitter.emit.apply(remoteEmitter, sensor.lastEvent);
    }
    function onClose() {
      delete sensor.remotes[id];
    }
  }
  /// RPC
  function handleConnection(conn) {
    if (conn.meta != 'rpc') {
      onError(new Error('Invalid stream name: ' + conn.meta));
    }
    else {
      var d = dnode(commands);
      conn.pipe(d).pipe(conn);
    }
  }
  function onError(err) {
    conn.destroy();
    console.error('Error on connection: ' + err.message);
  }
}
/// broadcast all sensor events to connections
sensors.forEach(function(sensor) {  
  sensor.events.forEach(function(event) {
    // broadcast all events of type `event`
    sensor.emitter.on(event, broadcast(event, sensor.remotes));
    // store last event on `sensor.lastEvent`
    sensor.emitter.on(event, function() {
      var args = Array.prototype.slice.call(arguments);
      args.unshift(event);
      sensor.lastEvent = args;
    });
  });
});
function broadcast(event, remotes) {  
  return function() {
    var args = Array.prototype.slice.call(arguments);
    args.unshift(event);
    Object.keys(remotes).forEach(function(emitterId) {
      var remote = remotes[emitterId];
      remote.emit.apply(remote, args);
    });
  };
}

Besides all the sensor set-up we already had in the previous version of our server, now we’re listening for new Mux-Demux streams by passing a function to the Mux-Demux constructor:

var mx = Mux(handleConnection);
/// ...
function handleConnection(conn) {  
    if (conn.meta != 'rpc') {
      onError(new Error('Invalid stream name: ' + conn.meta));
    }
    else {
      var d = dnode(commands);
      conn.pipe(d).pipe(conn);
    }
  }

This connection handler handles the RPC stream by creating a dnode endpoint with the given commands. This dnode endpoint is a stream, which we attach to and from our Mux-Demux stream:

conn.pipe(d).pipe(conn);

The client

Those should be all the changes we need to perform on the server. Now we need to create a new client that accepts command-line arguments and dynamically invokes our RPC service.

send_command.js:

#!/usr/bin/env node
var net = require('net');  
var Mux = require('mux-demux');  
var dnode = require('dnode');
var hostname = process.argv[2];  
var port = Number(process.argv[3]);  
var args = process.argv.slice(4);  
var method = args.shift();  
if (! method) throw new Error('please provide a method to call');
console.log('command: %s (%j)', method, args.join(', '));
var conn = net.connect(port, hostname);
var mx = Mux(onConnection);  
conn.pipe(mx).pipe(conn);
var stream = mx.createStream('rpc');
var d = dnode();  
stream.pipe(d).pipe(stream);
d.on('remote', onRemote);
function onRemote(remote) {  
  // call the method
  args.push(callback);
  var fn = remote[method];
  if (! fn) throw new Error('No such method: ' + method);
  fn.apply(remote, args);
}
function callback(err, result) {  
  if (err) throw err;
  console.log('result: %j', result);
  conn.end();
}
function onConnection(conn) {  
}

Besides command-line argument parsing and the usual connection set-up, we now create a Mux-Demux stream named rpc:

var stream = mx.createStream('rpc');

This stream is a duplex one: both readable and writable — the client writes the RPC calls and reads the results. Now we can create a dnode endpoint and pipe it from and to this stream:

var d = dnode();  
stream.pipe(d).pipe(stream);

Having the new version of the server up and running, we can now use this send_command client to send commands:

$ node send_command localhost 8000 setTargetTemperature 10
command: setTargetTemperature ("10")  
result: "command accepted, target temperature set"

You can now try sending it any of the other valid commands, defrost, powerOn or powerOff.

Next article

So far our TCP server has communicated with clients without any kind of security: it lets any client connect to it and it communicates with them through a clear-text channel, making it prone to someone eavesdropping on the network. These may not be big problems for your office refrigerator application, but it can surely be if your data is sensitive, or you can’t let just every and any one use the service. In our next article we are going to address these issues.

You can find all the previous posts on Networking Patterns here:

Turning a stream into an RPC channel
was originally published in YLD Blog on Medium.
Share this article: