Extending TCP Socket in Node.js

One of the things we frequently do, but probably don't think about, is sending and receiving TCP data. Many of the packages we frequently use are using TCP sockets under the hood to connect to databases, remote services, and other devices on the network.

Working directly with a TCP Socket is something you may find yourself needing to do. Creating a TCP socket to connect to a server is fairly straight forward thanks to how net.Socket is implemented.

But what if you want to make your code reusable? How do you create a "custom" TCP Socket that wraps the communication to some remote service. How do you define that language (protocol) and make your TCP Socket as easy to use as net.Socket.

This article will share some of the techniques I used to build @lntools/noise, a Noise Protocol socket library.

For this article, we'll keep things simple and will create a simple protocol that sends and receives JSON objects.

First let's start by brushing up on a few concepts.

Event Emitters Basics

EventEmitters are an awesome feature of Node.js. They are the core of what makes sockets work.

EventEmitters allow you to easily make objects that broadcast events. Each event is named and can include data. Events can be multicast, meaning multiple subscribers can listen to these events.

A quick example below emits an event called greet and passes a greeting mesage with the event. To make an object an EventEmitter, you simply extend events.EventEmitter.

// example EventEmitter
const { EventEmitter } = require('events')  
class Greeter extends EventEmitter {  
  sayHi() {
    this.emit('greet', 'hello!');
  }
}

A consumer of these events attachs with the .on method to listen for specific events that are emitted.

In this example, we subscribe to the greet event and log the greeting.

// example consumer
let greeter = new Greeter();  
greeter.on('greet', greeting => console.log(greeting));  
greeter.sayHi();  

Stream Basics

The next refresher is streams. Streams are EventEmitters under the covers and emit events pertaining to reading and writing data.

Streams are a more complex topic because of the myriad ways you can use them. This won't be a comprehensive guide, but will discuss some of the details on how we're going to implement our custom socket.

The Node.js documentation says:

A stream is an abstract interface for working with streaming data in Node.js.

So now that we've gotten the self-referential definitions out of the way, lets put it in simpler terms via an example.

If you wanted to retrieve 1000 database records and write them to a file you have a few ways of doing this:

Option 1: Retrieve all of the records from the database and buffer them into memory. Then write all of the records from memory into the file.

While simple to implement we're buffering everything to memory. This has an upper bound on the amount of records you can operate on: the amount of allocatable memory in your system. This technique may work for 1000 or 10000 records but will likely break down if you have 1 million or 1 billion records.

Option 2: Leave a connection open, read a record off the connection. Then write that record to disk. Then pluck off the next records. Wash, rinse, repeat until there are no more records.

This is a simple demonstration of streaming. Streaming is about minimizing the memory footprint to only the information that is being actively worked upon. We don't buffer huge amounts of information to memory.

As a metaphore, you can think of a stream as a bucket brigade. Each person passes a buckets of water to the next person to extinguish a fire. In this metaphore, the buckets are the data and each person is a stream object that may be interested in the data.

Backpressure

One of the byproducts is that streams are efficient at handling backpressure.

Backpressure is a what happens when you have a slow person in your bucket bridage.

If the person on the left just drops buckets at the feet of the slow person a huge pile up happens.

In computer terms, we eventually buffer all of the data into memory waiting for the slow consumer to pass the data.

Fortunately, streams have a mechanism for handling this. The consumer can signal to the producer that it is overtaxed. The producer will stop pushing data until the consumer signals that it is ok to do so. This will go all the way back and prevent any stream in the system from over buffering memory.

Types of Streams

There are several types of streams:

In this example we'll be working with a Duplex stream since we'll need both read and write support.

Streaming Modes

There are two types of streaming modes. These modes determine how data is passed into our out of the stream.

  1. Object mode
  2. Buffering

Object mode means that the stream reads and write JavaScript objects. Yes, you can actually stream objects in Node.js, it's pretty cool.

Buffering is the more commonly used method and is the default for file system, http, and socket streams. Data is emitted as a Buffer, or if the stream has an encoding, a string after decoding the data.

Reading Modes

When working with a Readable stream there are two reading modes. These reading modes determine how the stream emits read events.

  1. Flowing mode
  2. Paused mode

Most Node.js developers are familiar with flowing mode of Readable streams. Most commonly, flowing mode is used when piping data or attaching a data event handler. Flowing mode sends data automatically and provides it to the application as fast possible.

// flowing mode with data event
readStream.on('data', data => { /* do something */ })  
// flowing mode via pipe
let rs = someReadStream();  
let ws = someWriteStream();  
rs.pipe(ws);  

The alternative is paused mode.

Paused mode is different. The consumer must explicitly call stream.read(len) to retrieve data. The consumer can choose to either read all buffered data (by omitting the length parameter) or read a specific byte length of data. If there is no data or not enough data, the call to stream.read(len) returns null.

Paused mode is enabled by adding a handler for the readable event. This event is used to notify the consumer that new data is available. This gives the consumer an opportunity to call stream.read() and retrieve that data.

let rs = someReadStream();

rs.on('readable', () => {  
  let data = rs.read(1024); // read 1024 bytes
  if(!data) return;
  // do something with data
});

Socket Basics

Lastly, we'll discuss sockets.

TCP Sockets are part of the net core package of Node.js.

net.Socket is implemented as a Duplex stream and is used in many core packages, http and tls to name a few.

net.Socket is used in two ways:

  • as a client that connects to a remote server via the socket.connect, net.createConnection, or net.connect.
  • created by a server and emitted in the connection event when a remote client connects to the server.

net.Socket has a few events that are emitted pertaining to connections:

  • connect is trigger when the socket connection is successfully established
  • ready is triggered when the socket is ready for use and is emitted directly after the connect event.

Either way that you end up with a net.Socket it has a few capabilities that are enabled by being a Duplex stream.

  • When data is received by the socket, it follows the same event emitting patterns used by any Readable stream. This means that both reading modes (flowing and paused) are supported. This means you can use data or readable events or pipe to a write stream.
  • Data can be written to the socket via the write and end methods using the same patterns as Writeable streams. If there is backpressure, write returns false and the you should wait for drain to be emitted before writing additional data.

That's most of the interesting stuff. Sockets are basically a nice wrapper for a stream.

Implementing a Custom Socket

Now that we've covered the basics, let's build our custom socket implementation.

A review of how others have implemented custom sockets show two main techniques:

  1. Implementing an EventEmitter
  2. Implementing a Duplex stream

When custom socket functionality uses an EventEmitter (websocket, nssocket) the data event fired by the net.Socket is passed through as another event.

While functional, this technique does not support paused mode reading, nor does it respect backpressure. Unfortunately this can open the door for remote denial of service attacks.

I don't think an EventEmitter is a very good approach, so we won't do it that way.

That leaves us with using a Duplex stream. This makes sense since it supports backpressure and is how net.Socket is implemented in the first place!

If we swing our attention to TLSSocket, it is implemented as a Duplex stream and is a subclass of net.Socket.

Most interesting is that the constructor accepts a net.Socket as an argument. The supplied socket is wrapped with the logic for TLS.

We're going to use a similar approach with our custom protocol socket: JsonSocket.

JsonSocket Wire Protocol

We'll start by defining our simple wire protocol.

The goal is to read and write objects and serialize the objects into JSON strings to be sent over the underlying TCP socket.

The wire protocol is formally defined as follows.

A message consists of two parts:

  1. 4 bytes: 32-bit unsigned integer len - describes the length of the JSON body
  2. len bytes: string - JSON encoded object with minimal white spacing.

Additionally:

  • The max supported len is going to be 2^18 or 256KiB. This is done to simplify the read/write buffering.

  • If len exceeds 2^18, the connection should be terminated.

  • If there is an error deserializing JSON data, the connection should be terminated.

Coding JsonSocket

The general architecture is that we implement a Duplex stream and wrap a net.Socket with some custom functionality.

The documentation for streams includes a great guide for how to implement read and write streams. In short, we will need to implement 3 methods to make this work: _read, _write, and _final.

We're also going to make our stream operate in object mode so that we can support reading and writing of plain JavaScript objects. Internally we will convert these objects into Buffers to be sent across TCP socket.

So we know that we're going to use a Duplex stream, but how do we add TCP socket functionality? Just like the TLSSocket mentioned above, we pass an existing net.Socket into the constructor and wrap its functionality.

With an existing net.Socket all we have to do capture its events. Some of these events we perform custom actions specific to our wire protocol. Others we just emit them as-is.

So let's get started. We will first take a look at the constructor. The constructor configures the Duplex stream so that it operates in object mode. We also create a few private properties. Lastly, we wrap the socket with a special method we will write.

class JsonSocket extends Duplex {  
  constructor(socket) {
    super({ objectMode: true });

    // used to control reading
    this._readingPaused = false;

    // wrap the socket if one was provided
    if (socket) this._wrapSocket(socket);
  }

  // more code...
}

You'll notice that we extend Duplex instead of net.Socket. Extending net.Socket directly probably could be done, but I don't think it is a good idea for a few reasons:

  1. We won't be able to have JsonSocket operate in object mode while the underlying net.Socket operates in buffering mode. Which is really a symptom of point two.

  2. We end up mixing separation of concerns where JsonSocket deals with our wire protocol and the underlying net.Socket is concerned with buffered reads/writes to/from the network.

In reality, JsonSocket is consuming a net.Socket and adding additional functional. I think composition works better than inheritance in this scenario. Coincidentally, our JsonSocket is really a decorator for a standard net.Socket.

We will duck-type JsonSocket so that it maintains similar eventing and method signatures to net.Socket. A good portion of this is already handled since both net.Socket and JsonSocket implement Duplex streams.

The next method we'll look at is _wrapSocket.

  _wrapSocket(socket) {
    this._socket = socket;

    // these are simply passed through
    this._socket.on('close', hadError => this.emit('close', hadError));
    this._socket.on('connect', () => this.emit('connect'));
    this._socket.on('drain', () => this.emit('drain'));
    this._socket.on('end', () => this.emit('end'));
    this._socket.on('error', err => this.emit('error', err));
    this._socket.on('lookup', (err, address, family, host) => this.emit('lookup', err, address, family, host)); // prettier-ignore
    this._socket.on('ready', () => this.emit('ready'));
    this._socket.on('timeout', () => this.emit('timeout'));

    // we customize data events!
    this._socket.on('readable', this._onReadable.bind(this));
  }

That's a lot of code, but its not doing much. Astute readers will notice that almost all of the events are simply emitted again.

JsonSocket doesn't require much customization, so we'll only intercept the readable event and perform custom actions with it.

Recall that the readable event puts the stream into paused mode. This means we'll be required to read from the net.Socket manually instead of data freely flowing.

This functionality is perfect for implementing a protocol because we know exactly how we want to read data from the socket. If you recall, we first read 4-bytes to learn the length of subsequent JSON data. We then read the JSON data of the length specified.

Let's examine how this works:

  _onReadable() {
    while (!this._readingPaused) {

      // read raw len 
      let lenBuf = this._socket.read(4);
      if (!lenBuf) return;

      // convert raw len to integer
      let len = lenBuf.readUInt32BE();

      // read read json data
      let body = this._socket.read(len);
      if (!body) {
        this._socket.unshift(lenBuf);
        return;
      }

      // convert raw json to js object
      let json;
      try {
        json = JSON.parse(body);
      } catch (ex) {
        this.socket.destroy(ex);
        return;
      }

      // add object to read buffer
      let pushOk = this.push(json);

      // pause reading if consumer is slow
      if (!pushOk) this._readingPaused = true;
    }
  }

We create a loop to read data from the socket. This is necessary because the remote socket could have sent a single packet with multiple messages in it.

Our read loop executes until:

  1. There is no more data to read off the socket
  2. The consumer pushes back (yay for backpressure support)

We read data by first reading the length

let lenBuf = this._socket.read(4);  
if (!lenBuf) return;  
let len = lenBuf.readUInt32BE();  

We check to see if we were unable to read the 4-bytes, and if not, we abort the loop and exit the function.

Otherwise, we read the 4-bytes into a 32-bit integer. This integer represents the length of the JSON we need to deserialize.

Now that we know the length, we can read the bytes representing the JSON body.

let body = this._socket.read(len);  
if (!body) {  
  this._socket.unshift(lenBuf);
  return;
}

We read the buffer, and if we don't have all of the data in the message, will call unshift to push the length buffer back onto the socket. This allows us to try reading the message again when additional data is received by the socket. This can happen if the underlying TCP connectioin chunks the message into multiple packets.

If we successfully read the data we now need to parse it.

let json;  
try {  
  json = JSON.parse(body);
} catch (ex) {
  this.socket.destroy(ex);
  return;
}

If we fail to parse the JSON, some invalid data was sent to us. We need to immediately terminate the connection because the socket is not longer in a valid state. We do this by calling socket.destroy. Bye.

If we successfully parse it we're almost done. The last step is to use push to add it to the read buffer. This method is part of a Readable stream implementation provided to us by extending Duplex.

let pushOk = this.push(json);  
if (!pushOk) this._readingPaused = true;  

Calling push will return a boolean indicating if additional data can be added to the read buffer. When it returns false it means that the read buffer is full and we should stop pushing data. This is the built-in support for read backpressure.

If the read buffer is full, we set the _readingPaused flag and break the loop. Once this flag is set, additional readable events emitted by the socket will not push onto the read buffer until the consumer actually consumes some data.

If the consumer doesn't read data, the socket's internal buffer will backup and eventually cause TCP backpressure to propagate to the sender. Pretty cool.

So what happens when the consumer does trigger a read? Because Duplex stream implements Readable we need to implement a _read method.

This method gets called under the covers when the consumer calls the read() method. Our _read method is pretty simple:

_read() {  
  this._readingPaused = false;
  setImmediate(this._onReadable.bind(this));
}

We mark the _readingPaused flag as false so that we can resume reading data off the TCP socket.

We then call _onReadable to read any data from the socket and push it onto the read buffer. We do this asynchronously to support an upstream consumer in paused mode.

You can skip this next paragraph if you don't want to get into the weeds on why we make this call asynchronous. You were warned. So, if you dig into how readable streams work are implemented, you can see that _read can be either synchronous or asynchronous. Our code ends in a hung state when _onReadable executes synchronously and JsonSocket is in paused mode and encounters a pushOk failure. Why is this? Well assume that we're already inside of a readable event. When _read gets called synchronously it immediately pushes data to the read buffer which is passed to the consumer. No new readable event is triggered. This works as long as we don't encounter backpressure. When there is backpressure, the consumer gets some data but not all of it, the _readingPaused flag gets set to true, but no new readable event is emitted because we're already inside of that event. Since there is no new readable event, the user never calls read() again to clear the _readingPaused flag. And the next time data arrives on the socket, the _readingPaused flag is still set to false, which means no new data is pushed and no readable event gets emitted. We could certainly code around this condition by always emitting a readable event when the socket triggers it's readable event, but I think it's better to make the call to _onReadable asynchronous. Either could probably work though.

Lastly, lets take a look at what happens when you want to write data.

_write(obj, encoding, cb) {  
  let json = JSON.stringify(obj);
  let jsonBytes = Buffer.byteLength(json);
  let buffer = Buffer.alloc(4 + jsonBytes);
  buffer.writeUInt32BE(jsonBytes);
  buffer.write(json, 4);
  this._socket.write(buffer, encoding, cb);
}

This code will accept a JavaScript object obj as a parameter. We convert the object into JSON using the standard JSON.stringify method.

let json = JSON.stringify(obj);  

With the JSON string, we can calculate the total byte length used by the JSON data.

let jsonBytes = Buffer.byteLength(json);  

Armed with this data, we can construct our output buffer that includes length part and the payload length. Remember that the length is encoded as a 32-bit unsigned integer, which means it will be 4-bytes.

let buffer = Buffer.alloc(4 + jsonBytes);  

With our output buffer, we just need to write the data into it. First we write the length as the first four bytes. We write them in big endian. Then we write the data string to the remainder of the buffer, start index of 4.

buffer.writeUInt32BE(jsonBytes);  
buffer.write(json, 4);  

Finally, with our output buffer ready to rock and roll, we write it to the socket! We pass the completion callback supplied to the original write method

this._socket.write(buffer, null, cb);  

With that we've implemented the hard parts of the JsonSocket. There are a couple other complexities that we won't get into. Additionally, to make sure that JsonSocket is 1:1 with Socket the remaining methods and properties on Socket should be implemented. Most of these will be straight pass through to the underlying socket.

Example Server

Now that we have completed JsonSocket we want to create a server that can accept requests. The easiest thing to do is create a new server and on connection events, wrap the underlying Socket with our JsonSocket.

const net = require('net');  
const JsonSocket = require('./json-socket');

// construct a server
let server = new net.Server(socket => {  
  socket = new JsonSocket(socket);
  socket.on('error', console.error);
  socket.on('data', data => {
    console.log(data);
  });
});
server.listen(9000);  

The above code starts a new TCP server and attaches a connection listener. When a new socket connects we simply wrap the standard TCP socket with JsonSocket.

  socket = new JsonSocket(socket);

Then we just need to attach an event handler to capture message when they are received. We can do this by attaching to the socket's data event. This will place the socket in flowing mode where messages are consumed as fast as possible by the data event handler. Each time the data event is called it passes an object as the data since JsonSocket is operating in object mode.

  socket.on('data', data => {
    console.log(data);
  });

Alternatively, we can subscribe to the readable event which places the socket in paused mode. The readable event handler will then have to call read on the socket directly to obtain data. Because JsonSocket is in object mode it will only ever return a single object (the number argument is ignored)ref.

  // paused mode
  socket.on('readable', () => {
    let data;
    while ((data = socket.read())) {
      console.log(data);
    }
  });

We can wrap this is a loop so that we read all data from the buffer until there is none left.

Example Client

Now that we have a server, we just need a client. We can directly use the JsonSocket. We glossed over implementing this function, but it's basically wrapper for the underlying net.Socket connect method.

With this method, you can connect to a socket server that speaks the same language. Then all you have to do is send some messages!

const JsonSocket = require('./json-socket');

async function run() {  
  let socket = new JsonSocket();
  socket.connect({ host: 'localhost', port: 9000 });

  let interval = setInterval(() => {
    socket.write({ hello: 'world' });
  }, 1000);

  socket.on('close', () => clearInterval(interval));
  socket.on('error', console.error);
  socket.on('data', d => console.log(d));
}
run().catch(console.error);  

The code above will send a message every second. As you can see the message it sends is an object, since JsonSocket operates in object mode:

  socket.write({ hello: 'world' });

The write method will return false if there is back pressure. The client should wait to send more messages until the drain event is emitted.

Conclusion

Hopefully you've learned a bit about creating a custom TCP socket. Feel free to checkout the full example and fire up the client and server to see messages passed from the client to the server.

Similar code is also being used in the @lntools/noise project which implements the Noise Protocol used by the Bitcoin Lightning Network.

If you have any thoughts or suggestions please leave a comment!

comments powered by Disqus