One of the things we frequently do, but probably don't think about, is sending and receiving TCP data. Many of the packages we frequently use are using TCP sockets under the hood to connect to databases, remote services, and other devices on the network.
Working directly with a TCP Socket is something you may find yourself needing to do. Creating a TCP socket to connect to a server is fairly straight forward thanks to how net.Socket
is implemented.
But what if you want to make your code reusable? How do you create a "custom" TCP Socket that wraps the communication to some remote service. How do you define that language (protocol) and make your TCP Socket as easy to use as net.Socket
.
This article will share some of the techniques I used to build @lntools/noise, a Noise Protocol socket library.
For this article, we'll keep things simple and will create a simple protocol that sends and receives JSON objects.
First let's start by brushing up on a few concepts.
Event Emitters Basics
EventEmitters are an awesome feature of Node.js. They are the core of what makes sockets work.
EventEmitters allow you to easily make objects that broadcast events. Each event is named and can include data. Events can be multicast, meaning multiple subscribers can listen to these events.
A quick example below emits an event called greet
and passes a greeting mesage with the event. To make an object an EventEmitter, you simply extend events.EventEmitter
.
// example EventEmitter
const { EventEmitter } = require('events')
class Greeter extends EventEmitter {
sayHi() {
this.emit('greet', 'hello!');
}
}
A consumer of these events attachs with the .on
method to listen for specific events that are emitted.
In this example, we subscribe to the greet
event and log the greeting.
// example consumer
let greeter = new Greeter();
greeter.on('greet', greeting => console.log(greeting));
greeter.sayHi();
Stream Basics
The next refresher is streams. Streams are EventEmitters under the covers and emit events pertaining to reading and writing data.
Streams are a more complex topic because of the myriad ways you can use them. This won't be a comprehensive guide, but will discuss some of the details on how we're going to implement our custom socket.
The Node.js documentation says:
A stream is an abstract interface for working with streaming data in Node.js.
So now that we've gotten the self-referential definitions out of the way, lets put it in simpler terms via an example.
If you wanted to retrieve 1000 database records and write them to a file you have a few ways of doing this:
Option 1: Retrieve all of the records from the database and buffer them into memory. Then write all of the records from memory into the file.
While simple to implement we're buffering everything to memory. This has an upper bound on the amount of records you can operate on: the amount of allocatable memory in your system. This technique may work for 1000 or 10000 records but will likely break down if you have 1 million or 1 billion records.
Option 2: Leave a connection open, read a record off the connection. Then write that record to disk. Then pluck off the next records. Wash, rinse, repeat until there are no more records.
This is a simple demonstration of streaming. Streaming is about minimizing the memory footprint to only the information that is being actively worked upon. We don't buffer huge amounts of information to memory.
As a metaphore, you can think of a stream as a bucket brigade. Each person passes a buckets of water to the next person to extinguish a fire. In this metaphore, the buckets are the data and each person is a stream object that may be interested in the data.
Backpressure
One of the byproducts is that streams are efficient at handling backpressure.
Backpressure is a what happens when you have a slow person in your bucket bridage.
If the person on the left just drops buckets at the feet of the slow person a huge pile up happens.
In computer terms, we eventually buffer all of the data into memory waiting for the slow consumer to pass the data.
Fortunately, streams have a mechanism for handling this. The consumer can signal to the producer that it is overtaxed. The producer will stop pushing data until the consumer signals that it is ok to do so. This will go all the way back and prevent any stream in the system from over buffering memory.
Types of Streams
There are several types of streams:
- Readable stream - read data from a source
- Writeable stream - write data to a destination
- Duplex stream - read and write data
In this example we'll be working with a Duplex
stream since we'll need both read and write support.
Streaming Modes
There are two types of streaming modes. These modes determine how data is passed into our out of the stream.
- Object mode
- Buffering
Object mode means that the stream reads and write JavaScript objects. Yes, you can actually stream objects in Node.js, it's pretty cool.
Buffering is the more commonly used method and is the default for file system, http, and socket streams. Data is emitted as a Buffer
, or if the stream has an encoding, a string
after decoding the data.
Reading Modes
When working with a Readable
stream there are two reading modes. These reading modes determine how the stream emits read events.
- Flowing mode
- Paused mode
Most Node.js developers are familiar with flowing mode of Readable
streams. Most commonly, flowing mode is used when piping data or attaching a data
event handler. Flowing mode sends data automatically and provides it to the application as fast possible.
// flowing mode with data event
readStream.on('data', data => { /* do something */ })
// flowing mode via pipe
let rs = someReadStream();
let ws = someWriteStream();
rs.pipe(ws);
The alternative is paused mode.
Paused mode is different. The consumer must explicitly call stream.read(len)
to retrieve data. The consumer can choose to either read all buffered data (by omitting the length parameter) or read a specific byte length of data. If there is no data or not enough data, the call to stream.read(len)
returns null.
Paused mode is enabled by adding a handler for the readable
event. This event is used to notify the consumer that new data is available. This gives the consumer an opportunity to call stream.read()
and retrieve that data.
let rs = someReadStream();
rs.on('readable', () => {
let data = rs.read(1024); // read 1024 bytes
if(!data) return;
// do something with data
});
Socket Basics
Lastly, we'll discuss sockets.
TCP Sockets are part of the net
core package of Node.js.
net.Socket
is implemented as a Duplex
stream and is used in many core packages, http
and tls
to name a few.
net.Socket
is used in two ways:
- as a client that connects to a remote server via the
socket.connect
,net.createConnection
, ornet.connect
. - created by a server and emitted in the
connection
event when a remote client connects to the server.
net.Socket
has a few events that are emitted pertaining to connections:
connect
is trigger when the socket connection is successfully establishedready
is triggered when the socket is ready for use and is emitted directly after theconnect
event.
Either way that you end up with a net.Socket
it has a few capabilities that are enabled by being a Duplex
stream.
- When data is received by the socket, it follows the same event emitting patterns used by any
Readable
stream. This means that both reading modes (flowing and paused) are supported. This means you can usedata
orreadable
events orpipe
to a write stream. - Data can be written to the socket via the
write
andend
methods using the same patterns asWriteable
streams. If there is backpressure,write
returns false and the you should wait fordrain
to be emitted before writing additional data.
That's most of the interesting stuff. Sockets are basically a nice wrapper for a stream.
Implementing a Custom Socket
Now that we've covered the basics, let's build our custom socket implementation.
A review of how others have implemented custom sockets show two main techniques:
- Implementing an
EventEmitter
- Implementing a
Duplex
stream
When custom socket functionality uses an EventEmitter
(websocket, nssocket) the data
event fired by the net.Socket
is passed through as another event.
While functional, this technique does not support paused mode reading, nor does it respect backpressure. Unfortunately this can open the door for remote denial of service attacks.
I don't think an EventEmitter
is a very good approach, so we won't do it that way.
That leaves us with using a Duplex
stream. This makes sense since it supports backpressure and is how net.Socket
is implemented in the first place!
If we swing our attention to TLSSocket
, it is implemented as a Duplex
stream and is a subclass of net.Socket
.
Most interesting is that the constructor accepts a net.Socket
as an argument. The supplied socket is wrapped with the logic for TLS.
We're going to use a similar approach with our custom protocol socket: JsonSocket
.
JsonSocket Wire Protocol
We'll start by defining our simple wire protocol.
The goal is to read and write objects and serialize the objects into JSON strings to be sent over the underlying TCP socket.
The wire protocol is formally defined as follows.
A message consists of two parts:
- 4 bytes: 32-bit unsigned integer
len
- describes the length of the JSON body len
bytes: string - JSON encoded object with minimal white spacing.
Additionally:
-
The max supported
len
is going to be 2^18 or 256KiB. This is done to simplify the read/write buffering. -
If
len
exceeds 2^18, the connection should be terminated. -
If there is an error deserializing JSON data, the connection should be terminated.
Coding JsonSocket
The general architecture is that we implement a Duplex
stream and wrap a net.Socket
with some custom functionality.
The documentation for streams includes a great guide for how to implement read and write streams. In short, we will need to implement 3 methods to make this work: _read
, _write
, and _final
.
We're also going to make our stream operate in object mode so that we can support reading and writing of plain JavaScript objects. Internally we will convert these objects into Buffers to be sent across TCP socket.
So we know that we're going to use a Duplex
stream, but how do we add TCP socket functionality? Just like the TLSSocket
mentioned above, we pass an existing net.Socket
into the constructor and wrap its functionality.
With an existing net.Socket
all we have to do capture its events. Some of these events we perform custom actions specific to our wire protocol. Others we just emit them as-is.
So let's get started. We will first take a look at the constructor. The constructor configures the Duplex
stream so that it operates in object mode. We also create a few private properties. Lastly, we wrap the socket with a special method we will write.
class JsonSocket extends Duplex {
constructor(socket) {
super({ objectMode: true });
// used to control reading
this._readingPaused = false;
// wrap the socket if one was provided
if (socket) this._wrapSocket(socket);
}
// more code...
}
You'll notice that we extend Duplex
instead of net.Socket
. Extending net.Socket
directly probably could be done, but I don't think it is a good idea for a few reasons:
-
We won't be able to have
JsonSocket
operate in object mode while the underlyingnet.Socket
operates in buffering mode. Which is really a symptom of point two. -
We end up mixing separation of concerns where
JsonSocket
deals with our wire protocol and the underlyingnet.Socket
is concerned with buffered reads/writes to/from the network.
In reality, JsonSocket
is consuming a net.Socket
and adding additional functional. I think composition works better than inheritance in this scenario. Coincidentally, our JsonSocket
is really a decorator for a standard net.Socket
.
We will duck-type JsonSocket
so that it maintains similar eventing and method signatures to net.Socket
. A good portion of this is already handled since both net.Socket
and JsonSocket
implement Duplex
streams.
The next method we'll look at is _wrapSocket
.
_wrapSocket(socket) {
this._socket = socket;
// these are simply passed through
this._socket.on('close', hadError => this.emit('close', hadError));
this._socket.on('connect', () => this.emit('connect'));
this._socket.on('drain', () => this.emit('drain'));
this._socket.on('end', () => this.emit('end'));
this._socket.on('error', err => this.emit('error', err));
this._socket.on('lookup', (err, address, family, host) => this.emit('lookup', err, address, family, host)); // prettier-ignore
this._socket.on('ready', () => this.emit('ready'));
this._socket.on('timeout', () => this.emit('timeout'));
// we customize data events!
this._socket.on('readable', this._onReadable.bind(this));
}
That's a lot of code, but its not doing much. Astute readers will notice that almost all of the events are simply emitted again.
JsonSocket
doesn't require much customization, so we'll only intercept the readable
event and perform custom actions with it.
Recall that the readable
event puts the stream into paused mode. This means we'll be required to read from the net.Socket
manually instead of data freely flowing.
This functionality is perfect for implementing a protocol because we know exactly how we want to read data from the socket. If you recall, we first read 4-bytes to learn the length of subsequent JSON data. We then read the JSON data of the length specified.
Let's examine how this works:
_onReadable() {
while (!this._readingPaused) {
// read raw len
let lenBuf = this._socket.read(4);
if (!lenBuf) return;
// convert raw len to integer
let len = lenBuf.readUInt32BE();
// read read json data
let body = this._socket.read(len);
if (!body) {
this._socket.unshift(lenBuf);
return;
}
// convert raw json to js object
let json;
try {
json = JSON.parse(body);
} catch (ex) {
this.socket.destroy(ex);
return;
}
// add object to read buffer
let pushOk = this.push(json);
// pause reading if consumer is slow
if (!pushOk) this._readingPaused = true;
}
}
We create a loop to read data from the socket. This is necessary because the remote socket could have sent a single packet with multiple messages in it.
Our read loop executes until:
- There is no more data to read off the socket
- The consumer pushes back (yay for backpressure support)
We read data by first reading the length
let lenBuf = this._socket.read(4);
if (!lenBuf) return;
let len = lenBuf.readUInt32BE();
We check to see if we were unable to read the 4-bytes, and if not, we abort the loop and exit the function.
Otherwise, we read the 4-bytes into a 32-bit integer. This integer represents the length of the JSON we need to deserialize.
Now that we know the length, we can read the bytes representing the JSON body.
let body = this._socket.read(len);
if (!body) {
this._socket.unshift(lenBuf);
return;
}
We read the buffer, and if we don't have all of the data in the message, will call unshift
to push the length buffer back onto the socket. This allows us to try reading the message again when additional data is received by the socket. This can happen if the underlying TCP connectioin chunks the message into multiple packets.
If we successfully read the data we now need to parse it.
let json;
try {
json = JSON.parse(body);
} catch (ex) {
this.socket.destroy(ex);
return;
}
If we fail to parse the JSON, some invalid data was sent to us. We need to immediately terminate the connection because the socket is not longer in a valid state. We do this by calling socket.destroy
. Bye.
If we successfully parse it we're almost done. The last step is to use push
to add it to the read buffer. This method is part of a Readable
stream implementation provided to us by extending Duplex
.
let pushOk = this.push(json);
if (!pushOk) this._readingPaused = true;
Calling push
will return a boolean indicating if additional data can be added to the read buffer. When it returns false
it means that the read buffer is full and we should stop pushing data. This is the built-in support for read backpressure.
If the read buffer is full, we set the _readingPaused
flag and break the loop. Once this flag is set, additional readable
events emitted by the socket will not push onto the read buffer until the consumer actually consumes some data.
If the consumer doesn't read data, the socket's internal buffer will backup and eventually cause TCP backpressure to propagate to the sender. Pretty cool.
So what happens when the consumer does trigger a read? Because Duplex
stream implements Readable
we need to implement a _read
method.
This method gets called under the covers when the consumer calls the read()
method. Our _read
method is pretty simple:
_read() {
this._readingPaused = false;
setImmediate(this._onReadable.bind(this));
}
We mark the _readingPaused
flag as false so that we can resume reading data off the TCP socket.
We then call _onReadable
to read any data from the socket and push it onto the read buffer. We do this asynchronously to support an upstream consumer in paused mode.
You can skip this next paragraph if you don't want to get into the weeds on why we make this call asynchronous. You were warned. So, if you dig into how readable streams work are implemented, you can see that _read
can be either synchronous or asynchronous. Our code ends in a hung state when _onReadable
executes synchronously and JsonSocket
is in paused mode and encounters a pushOk
failure. Why is this? Well assume that we're already inside of a readable
event. When _read
gets called synchronously it immediately pushes data to the read buffer which is passed to the consumer. No new readable
event is triggered. This works as long as we don't encounter backpressure. When there is backpressure, the consumer gets some data but not all of it, the _readingPaused
flag gets set to true, but no new readable
event is emitted because we're already inside of that event. Since there is no new readable
event, the user never calls read()
again to clear the _readingPaused
flag. And the next time data arrives on the socket, the _readingPaused
flag is still set to false, which means no new data is pushed and no readable
event gets emitted. We could certainly code around this condition by always emitting a readable
event when the socket triggers it's readable
event, but I think it's better to make the call to _onReadable
asynchronous. Either could probably work though.
Lastly, lets take a look at what happens when you want to write data.
_write(obj, encoding, cb) {
let json = JSON.stringify(obj);
let jsonBytes = Buffer.byteLength(json);
let buffer = Buffer.alloc(4 + jsonBytes);
buffer.writeUInt32BE(jsonBytes);
buffer.write(json, 4);
this._socket.write(buffer, encoding, cb);
}
This code will accept a JavaScript object obj
as a parameter. We convert the object into JSON using the standard JSON.stringify
method.
let json = JSON.stringify(obj);
With the JSON string, we can calculate the total byte length used by the JSON data.
let jsonBytes = Buffer.byteLength(json);
Armed with this data, we can construct our output buffer that includes length part and the payload length. Remember that the length is encoded as a 32-bit unsigned integer, which means it will be 4-bytes.
let buffer = Buffer.alloc(4 + jsonBytes);
With our output buffer, we just need to write the data into it. First we write the length as the first four bytes. We write them in big endian. Then we write the data string to the remainder of the buffer, start index of 4.
buffer.writeUInt32BE(jsonBytes);
buffer.write(json, 4);
Finally, with our output buffer ready to rock and roll, we write it to the socket! We pass the completion callback supplied to the original write method
this._socket.write(buffer, null, cb);
With that we've implemented the hard parts of the JsonSocket
. There are a couple other complexities that we won't get into. Additionally, to make sure that JsonSocket
is 1:1 with Socket
the remaining methods and properties on Socket
should be implemented. Most of these will be straight pass through to the underlying socket.
Example Server
Now that we have completed JsonSocket
we want to create a server that can accept requests. The easiest thing to do is create a new server and on connection
events, wrap the underlying Socket
with our JsonSocket
.
const net = require('net');
const JsonSocket = require('./json-socket');
// construct a server
let server = new net.Server(socket => {
socket = new JsonSocket(socket);
socket.on('error', console.error);
socket.on('data', data => {
console.log(data);
});
});
server.listen(9000);
The above code starts a new TCP server and attaches a connection listener. When a new socket connects we simply wrap the standard TCP socket with JsonSocket
.
socket = new JsonSocket(socket);
Then we just need to attach an event handler to capture message when they are received. We can do this by attaching to the socket's data
event. This will place the socket in flowing mode where messages are consumed as fast as possible by the data event handler. Each time the data event is called it passes an object as the data since JsonSocket
is operating in object mode.
socket.on('data', data => {
console.log(data);
});
Alternatively, we can subscribe to the readable
event which places the socket in paused mode. The readable
event handler will then have to call read
on the socket directly to obtain data. Because JsonSocket
is in object mode it will only ever return a single object (the number argument is ignored)ref.
// paused mode
socket.on('readable', () => {
let data;
while ((data = socket.read())) {
console.log(data);
}
});
We can wrap this is a loop so that we read all data from the buffer until there is none left.
Example Client
Now that we have a server, we just need a client. We can directly use the JsonSocket
. We glossed over implementing this function, but it's basically wrapper for the underlying net.Socket
connect method.
With this method, you can connect to a socket server that speaks the same language. Then all you have to do is send some messages!
const JsonSocket = require('./json-socket');
async function run() {
let socket = new JsonSocket();
socket.connect({ host: 'localhost', port: 9000 });
let interval = setInterval(() => {
socket.write({ hello: 'world' });
}, 1000);
socket.on('close', () => clearInterval(interval));
socket.on('error', console.error);
socket.on('data', d => console.log(d));
}
run().catch(console.error);
The code above will send a message every second. As you can see the message it sends is an object, since JsonSocket
operates in object mode:
socket.write({ hello: 'world' });
The write method will return false if there is back pressure. The client should wait to send more messages until the drain
event is emitted.
Conclusion
Hopefully you've learned a bit about creating a custom TCP socket. Feel free to checkout the full example and fire up the client and server to see messages passed from the client to the server.
Similar code is also being used in the @lntools/noise project which implements the Noise Protocol used by the Bitcoin Lightning Network.
If you have any thoughts or suggestions please leave a comment!