If you are not careful with a readable stream in paused mode, you can read data out of order. This post discusses how to properly read data from a readable stream in paused mode.

First, a quick lesson on readable streams in paused mode. By default, streams are in paused mode. They transition into flowing mode in one of three waysref:

  • when a data event listener is attached
  • when the stream is piped
  • when stream.resume() is called

Otherwise the stream will be in paused mode and will require a consumer to explicitly read from the stream using the stream.read() method.

A paused mode stream will also trigger a readable event when new data is available on the stream. A short example of paused mode stream looks like:

const server = new net.Server(socket => {  
  socket.on('readable', () => {
    let buf = socket.read();
    // do something with data
  });
});

The above example does a few things:

  • obtains a read stream socket
  • attaches a readable listener to the stream
  • reads the data from the stream when the event is triggered with socket.read()

This all looks good.

The stream.read([size]) method takes an optional size parameter. In paused mode and when not in object mode, the size value will cause read to return null if the data is not available on the read buffer yet.

This property is extremely useful for things like wire protocols where a message will likely include a header that indicates the amount of data to read. To process a header, we will need to explicitly read chunks of data to construct the header.

So lets implement a small wire protocol that looks like this:

  • len UInt32BE
  • body data of length len

We will use this wire protocol to send messages of length len.

Because len is a 32bit UInt, it will be 4-bytes. The message will be arbitrary length defined by len.

Mapping this to our stream, in order to read the body, we need to first read the len. A naive implementation would look like this:

const server = new net.Server(socket => {  
  socket.on('readable', () => {
    // read 4-bytes for the len
    let lenBuf = socket.read(4);

    // convert the 4-bytes into number
    len len = lenBuf.readUInt32BE();

    // read the body of length len
    let body = socket.read(len);

    // do something with data
  });
});

We do a few things now

  • read 4-bytes for the len
  • convert the 4-bytes into a number
  • use the length number to read the remaining bytes

Now this implementation is naive, because we may not have received all the data required to perform these operations. A full example would need to handle that, but that is outside the scope of this article.

Problem with Asynchronous Code

Now that we have an understanding of how to read multiple pieces of information from a read stream we will introduce a problem. What happens if we do not read all information synchronously?

Take the code below:

const net = require('net');

let server = new net.Server(socket => {
  let id = 0;
  socket.on('readable', async () => {
    try {
      // capture the current id used when processing the message
      let myid = ++id;

      // log the current id and 1-byte of data
      console.log(myid, socket.read(1).toString());

      // wait 10s to simulate some very slow operation
      await wait(10000);

      // read the rest of the data here
      console.log(myid, socket.read(4).toString());
    } catch (ex) {
      console.error(ex);
    }
  });
});
server.listen(9000);

// helper to pause for a while
function wait(ms) {
  return new Promise(resolve => {
    setTimeout(resolve, ms);
  });
}

This code uses the identifier id to track each unique call to readable.

We capture myid as the value of the start of the call to readable so we use it for the duration of the execution of that function.

We then read 1-byte from the stream and output the results.

Next we put the problematic code into play. We perform an asynchronous operation by waiting for 10s. This code is before our next read. Unfortunately it allows other readable events to fire in the interim!!!

Finally, we read 4-bytes from the stream again. Unfortunately, we have no idea where we are in the stream now.

If, we have a client that sends the message hello every 7 seconds. The output looks like:

1 'h'
2 'e'
1 'lloh'
3 'e'
2 'lloh'

The first h is read by in call 1 at t-0. We now have to wait 10s before we can read the remaining 4-bytes.

Before the 10s happens, call 2 comes in at t-7 and reads a e. Now things are alllll out of order. Call 2 has to wait 10s, until t-17 before it can read the rest of its message.

At t-10 call 1 will read the remaining 4-bytes. Instead of reading ello to spell hello, it will read lloh because call 2 has already read off the e. Crap.

At t-14 call 3 happens and it reads an e. It will have to wait until t-24 to read the rest of its message.

Finally, at t-17, call 2 reads the remaining 4-bytes of its message but incorrectly reads lloh.

Clearly things did not go as planned.

Conclusion

So the lesson here is that we need be extremely careful about introducing asynchronous code into readable events. We need to be sure that we read all data from the stream before executing asynchronous code. Otherwise we will end out with out of sync data.