If you are not careful with a readable stream in paused mode, you can read data out of order. This post discusses how to properly read data from a readable stream in paused mode.
First, a quick lesson on readable streams in paused mode. By default, streams are in paused mode. They transition into flowing mode in one of three waysref:
- when a
data
event listener is attached - when the stream is
piped
- when
stream.resume()
is called
Otherwise the stream will be in paused mode and will require a consumer to explicitly read from the stream using the stream.read()
method.
A paused mode stream will also trigger a readable
event when new data is available on the stream. A short example of paused mode stream looks like:
const server = new net.Server(socket => {
socket.on('readable', () => {
let buf = socket.read();
// do something with data
});
});
The above example does a few things:
- obtains a read stream
socket
- attaches a
readable
listener to the stream - reads the data from the stream when the event is triggered with
socket.read()
This all looks good.
The stream.read([size])
method takes an optional size
parameter. In paused mode and when not in object mode, the size
value will cause read
to return null
if the data is not available on the read buffer yet.
This property is extremely useful for things like wire protocols where a message will likely include a header that indicates the amount of data to read. To process a header, we will need to explicitly read chunks of data to construct the header.
So lets implement a small wire protocol that looks like this:
len
UInt32BEbody
data of lengthlen
We will use this wire protocol to send messages of length len
.
Because len
is a 32bit UInt, it will be 4-bytes. The message will be arbitrary length defined by len
.
Mapping this to our stream, in order to read the body
, we need to first read the len
. A naive implementation would look like this:
const server = new net.Server(socket => {
socket.on('readable', () => {
// read 4-bytes for the len
let lenBuf = socket.read(4);
// convert the 4-bytes into number
len len = lenBuf.readUInt32BE();
// read the body of length len
let body = socket.read(len);
// do something with data
});
});
We do a few things now
- read 4-bytes for the len
- convert the 4-bytes into a number
- use the length number to read the remaining bytes
Now this implementation is naive, because we may not have received all the data required to perform these operations. A full example would need to handle that, but that is outside the scope of this article.
Problem with Asynchronous Code
Now that we have an understanding of how to read multiple pieces of information from a read stream we will introduce a problem. What happens if we do not read all information synchronously?
Take the code below:
const net = require('net');
let server = new net.Server(socket => {
let id = 0;
socket.on('readable', async () => {
try {
// capture the current id used when processing the message
let myid = ++id;
// log the current id and 1-byte of data
console.log(myid, socket.read(1).toString());
// wait 10s to simulate some very slow operation
await wait(10000);
// read the rest of the data here
console.log(myid, socket.read(4).toString());
} catch (ex) {
console.error(ex);
}
});
});
server.listen(9000);
// helper to pause for a while
function wait(ms) {
return new Promise(resolve => {
setTimeout(resolve, ms);
});
}
This code uses the identifier id
to track each unique call to readable
.
We capture myid
as the value of the start of the call to readable
so we use it for the duration of the execution of that function.
We then read 1-byte from the stream and output the results.
Next we put the problematic code into play. We perform an asynchronous operation by waiting for 10s. This code is before our next read. Unfortunately it allows other readable
events to fire in the interim!!!
Finally, we read 4-bytes from the stream again. Unfortunately, we have no idea where we are in the stream now.
If, we have a client that sends the message hello
every 7 seconds. The output looks like:
1 'h'
2 'e'
1 'lloh'
3 'e'
2 'lloh'
The first h
is read by in call 1 at t-0. We now have to wait 10s before we can read the remaining 4-bytes.
Before the 10s happens, call 2 comes in at t-7 and reads a e
. Now things are alllll out of order. Call 2 has to wait 10s, until t-17 before it can read the rest of its message.
At t-10 call 1 will read the remaining 4-bytes. Instead of reading ello
to spell hello
, it will read lloh
because call 2 has already read off the e
. Crap.
At t-14 call 3 happens and it reads an e
. It will have to wait until t-24 to read the rest of its message.
Finally, at t-17, call 2 reads the remaining 4-bytes of its message but incorrectly reads lloh
.
Clearly things did not go as planned.
Conclusion
So the lesson here is that we need be extremely careful about introducing asynchronous code into readable
events. We need to be sure that we read all data from the stream before executing asynchronous code. Otherwise we will end out with out of sync data.