Using a Remote Emitter


In a previous article we covered the Event Emitter pattern, which allows you to somewhat detach the producer of events from the consumer. This pattern may also be useful for providing a means of communication between two processes: process A connects to process B, and they can both send events to each other. When one of the sides receives an event, it may choose to simply ignore it (by not listening to that event), or to handle it by attaching an event handler — much like the Event Emitter pattern.

Let’s see this in action. Here is a use case:

Your office has a refrigerator that everyone uses. But now and then someone forgets to close the refrigerator door, leaving it to warm up, thus endangering the safety of all the contained food. This just can’t be tolerated. As the smart person that you are, you attach a sensor to the door and a small computer (let’s say a Raspberry Pi) connected to the office network. This computer can run Node.js and you take that chance to create a Node.js-powered TCP server that notifies of changes in the door state.

The Server

Let’s then create the door server:

door_server.js:

var net = require('net');    
var DuplexEmitter = require('duplex-emitter');    
var door = require('./door');
var server = net.createServer();
server.on('connection', handleConnection);
server.listen(8000, function() {    
  console.log('door server listening on %j', server.address());  
});  

/// store door state
var open = false;    
var lastEventTime;
door.on('open', onOpen);    
door.on('close', onClose);
function onOpen(time) {    
  open = true;  
  lastEventTime = time;  
}
function onClose(time) {    
  open = false;  
  lastEventTime = time;  
}  

// handle connections
var nextId = 0;    
var emitters = {};
function handleConnection(conn) {    
  var remoteEmitter = DuplexEmitter(conn);  
  var id = ++ nextId;  
  emitters\[id\] = remoteEmitter;
  conn.once('close', onClose);  
  conn.on('error', onError);
  if (lastEventTime) {  
    remoteEmitter.emit(open ? 'open' : 'close', lastEventTime);  
  }
  function onClose() {  
    delete emitters\[id\];  
  }
  function onError(err) {  
    console.error('Error on connection: ' + err.message);  
  }  
}  

/// broadcast door events
door.on('open', broadcast('open'));    
door.on('close', broadcast('close'));
function broadcast(event) {    
  return function() {  
    var args = Array.prototype.slice.call(arguments);  
    args.unshift(event);
    Object.keys(emitters).forEach(function(emitterId) {  
      var emitter = emitters\[emitterId\];  
      emitter.emit.apply(emitter, args);  
    });
  };  
}

Here, as in previous examples, we instantiate a server and make it listen on TCP port 8000. The handleConnection function is now very different from what we’ve seen so far.

We start by instantiating a remote emitter by passing the connection to the duplex-emitter constructor. This gives us an object which, when you emit events on it, it transmits that event to the connection instead of emitting it locally. It’s also duplex, which means that it will emit events that are coming from the other end of the connection.

We’re using a local module (which we haven’t introduced yet) named door. This module exposes a Singleton object that represents the door sensor and is itself an event emitter. This door event emitter then emits events of the types open and close. Every time the door emits one of these events, we need to transmit it to each and every one of the connected clients. This means that we then need to track each one of the remote emitters in a global object so that we can later send it messages.

The broadcast function is then used to broadcast each one of the event types to all registered emitters.

When a connection closes we remove that emitter from the list of emitters:

function onClose() {    
  delete emitters\[id\];  
}

We still need one missing piece, the door:

door.js:

var EventEmitter = require('events').EventEmitter;
var door = new EventEmitter();
module.exports = door;
var open = false;
function emitLater() {    
  setTimeout(function() {
    open = ! open; // flip state  
    var event = open ? 'open' : 'close';  
    door.emit(event, Date.now());
    emitLater();
  }, Math.floor(Math.random() \* 5000));  
}
emitLater();

As I mentioned, this local door module exports an event emitter that emits close and open events. In this implementation these events are fake, randomly emitted every five seconds or less.

Let’s then test this server by starting it up:

$ node door\_server  
door server listening on {"address":"0.0.0.0","family":"IPv4","port":8000}

Let’s then connect to it using a command-line tool like netcat or telnet:

$ nc localhost 8000

You should start seeing these JSON strings being printed:

\["open",1407334342639\]  
\["close",1407334347577\]  
\["open",1407334347694\]  
\["close",1407334350693\]  
\["open",1407334352930\]  
\["close",1407334353093\]  
\["open",1407334356678\]  
\["close",1407334357292\]  
\["open",1407334360026\]  
\["close",1407334362703\]  
\["open",1407334362930\]  
\["close",1407334364680\]  
\["open",1407334369361\]

These are newline-separated JSON objects, each representing an event. In our case, we can see that the server is emitting the open and close events, passing along a timestamp representing when the respective event happened as the first and sole event argument.

The client

We can then create a Node.js client for the server. This client should be invoked by command line, accepting the server hostname and port. It should also accept a third option stating the maximum number of seconds the refrigerator door can stay open before it emits a warning.

door_client.js:

var net = require('net');    
var DuplexEmitter = require('duplex-emitter');
var hostname = process.argv\[2\];    
var port = Number(process.argv\[3\]);    
var timeoutSecs = Number(process.argv\[4\]);
var timeout;    
var warned = false;
var conn = net.connect(port, hostname);    
var remoteEmitter = DuplexEmitter(conn);
remoteEmitter.on('open', onOpen);    
remoteEmitter.on('close', onClose);
function onOpen() {    
  timeout = setTimeout(onTimeout, timeoutSecs \* 1e3);  
}
function onClose() {    
  if (warned) {  
    warned = false;  
    console.log('closed now');  
  }  
  if (timeout) {  
    clearTimeout(timeout);  
  }  
}
function onTimeout() {    
  warned = true;  
  console.error(  
    'DOOR OPEN FOR MORE THAN %d SECONDS, GO CLOSE IT!!!',  
    timeoutSecs);  
}

The door client connects to the server with the given hostname and port, and instantiates a duplex emitter from that connection. It listens for the open and close events. When the door opens, it sets a timeout for when to emit a warning. When the door closes, it clears the timeout. If the door doesn’t close soon enough, the timeout triggers and the client prints a DOOR OPEN FOR MORE THAN x SECONDS, GO CLOSE IT!!! to the console.

Now you can, with the server running on another window, start the door client:

$ node door\_client localhost 8000 1

Here we’re starting the door client and telling it that it should connect to the door server on localhost port 8000. We’re also telling it that the door should remain open for a maximum of one second — just for testing purposes, since our fake door on the server is emitting open and close events less than five seconds apart.

You should then start seeing, printed in the client console:

DOOR OPEN FOR MORE THAN 1 SECONDS, GO CLOSE IT!!!    
closed now    
DOOR OPEN FOR MORE THAN 1 SECONDS, GO CLOSE IT!!!    
closed now    
DOOR OPEN FOR MORE THAN 1 SECONDS, GO CLOSE IT!!!    
closed now    
DOOR OPEN FOR MORE THAN 1 SECONDS, GO CLOSE IT!!!    
closed now

Making the client resilient to disconnects

There’s at least one problem with this client: if the server disconnects for some reason (e.g. a restart), the client exits, and its exiting can go unnoticed. You can, of course, create a shell script that keeps the client running, but there’s another option if you don’t want to lose the client state: reconnect.

To reconnect we can use a package named reconnect-net, which is based on the generic reconnect-core, but specifically reconnects TCP connections. Let’s install it:

$ npm install reconnect-net

Here is the modified door client that reconnects:

door_client_reconnect.js:

var Reconnect = require('reconnect-net');    
var DuplexEmitter = require('duplex-emitter');
var hostname = process.argv\[2\];    
var port = Number(process.argv\[3\]);    
var timeoutSecs = Number(process.argv\[4\]);
var timeout;    
var warned = false;
var reconnect = Reconnect(onConnect).connect(port, hostname);
reconnect.on('disconnect', function() {    
  console.log('disconnected');  
});
function onConnect(conn) {    
  console.log('connected');  
  var remoteEmitter = DuplexEmitter(conn);
  remoteEmitter.on('open', onOpen);  
  remoteEmitter.on('close', onClose);  
}
function onOpen() {    
  timeout = setTimeout(onTimeout, timeoutSecs \* 1e3);  
}
function onClose() {    
  if (warned) {  
    warned = false;  
    console.log('closed now');  
  }  
  if (timeout) {  
    clearTimeout(timeout);  
  }  
}
function onTimeout() {    
  warned = true;  
  console.error(  
    'DOOR OPEN FOR MORE THAN %d SECONDS, GO CLOSE IT!!!',  
    timeoutSecs);  
}

Here you can see that we’re using reconnect-net to create a reconnect object, passing in the connection handler, and also asking it to connect to the given port and hostname. When the connection is established, our onConnect function gets called, and this is when we instantiate the remote emitter and start listening to the interesting remote events.

When this connection fails, the reconnect object emits a disconnect event which we listen to, printing out a warning. The rest of the logic we’ve already seen in the previous version of the client.

You can test this client by starting it as we did previously:

$ node door\_client\_reconnect.js localhost 8000 1

Everything should behave as previously. You can now close down the server and restart it. You should then see a series of disconnect events, where reconnect is failing to connect.

Notice that the disconnect events are quite close together in time at the beginning, but the interval between them starts to grow with time. This is because, by default, reconnect-core uses exponential backoff when reconnecting and failing. You can configure several aspects of this backoff by supplying reconnect-net with some options.

Once the server is back up, the client should reconnect successfully and resume operations:

connected    
DOOR OPEN FOR MORE THAN 1 SECOND, GO CLOSE IT!!!    
closed now    
DOOR OPEN FOR MORE THAN 1 SECOND, GO CLOSE IT!!!    
disconnected    
disconnected    
disconnected    
disconnected    
disconnected    
disconnected    
connected    
DOOR OPEN FOR MORE THAN 1 SECOND, GO CLOSE IT!!!    
closed now    
DOOR OPEN FOR MORE THAN 1 SECOND, GO CLOSE IT!!!

Next article

Let’s say now that your refrigerator door sensor server has been proved a success, and that you want to extend it. For instance, you would like to sense the refrigerator temperature and make that data available to the clients connecting to the server.

But there’s a problem: you will need to add more event types and also change the ones that exist: openand close would have to be changed to door open and door close and you will also have to add temperature reading or something similar.

In the next article of this series we’ll show you can create different channels inside one stream and treat them like individual streams.

You can find the previous post on Networking Patterns here:

This article was extracted from the Networking Patterns, a book from the Node Patterns series.

Written by Pedro Teixeira — published for YLD.

You may also like:


Written by YLDMarch 1st, 2016


Share this article