0
0
mirror of https://github.com/nodejs/node.git synced 2024-11-30 07:27:22 +01:00
nodejs/test/simple/test-stream-unshift-read-race.js
isaacs b0de1e4a41 stream: Fix unshift() race conditions
Fix #5272

The consumption of a readable stream is a dance with 3 partners.

1. The specific stream Author (A)
2. The Stream Base class (B), and
3. The Consumer of the stream (C)

When B calls the _read() method that A implements, it sets a 'reading'
flag, so that parallel calls to _read() can be avoided.  When A calls
stream.push(), B knows that it's safe to start calling _read() again.

If the consumer C is some kind of parser that wants in some cases to
pass the source stream off to some other party, but not before "putting
back" some bit of previously consumed data (as in the case of Node's
websocket http upgrade implementation).  So, stream.unshift() will
generally *never* be called by A, but *only* called by C.

Prior to this patch, stream.unshift() *also* unset the state.reading
flag, meaning that C could indicate the end of a read, and B would
dutifully fire off another _read() call to A.  This is inappropriate.
In the case of fs streams, and other variably-laggy streams that don't
tolerate overlapped _read() calls, this causes big problems.

Also, calling stream.shift() after the 'end' event did not raise any
kind of error, but would cause very strange behavior indeed.  Calling it
after the EOF chunk was seen, but before the 'end' event was fired would
also cause weird behavior, and could lead to data being lost, since it
would not emit another 'readable' event.

This change makes it so that:

1. stream.unshift() does *not* set state.reading = false
2. stream.unshift() is allowed up until the 'end' event.
3. unshifting onto a EOF-encountered and zero-length (but not yet
end-emitted) stream will defer the 'end' event until the new data is
consumed.
4. pushing onto a EOF-encountered stream is now an error.

So, if you read(), you have that single tick to safely unshift() data
back into the stream, even if the null chunk was pushed, and the length
was 0.
2013-04-11 16:12:48 -07:00

134 lines
3.8 KiB
JavaScript

// Copyright Joyent, Inc. and other Node contributors.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to permit
// persons to whom the Software is furnished to do so, subject to the
// following conditions:
//
// The above copyright notice and this permission notice shall be included
// in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.
var common = require('../common');
var assert = require('assert');
// This test verifies that:
// 1. unshift() does not cause colliding _read() calls.
// 2. unshift() after the 'end' event is an error, but after the EOF
// signalling null, it is ok, and just creates a new readable chunk.
// 3. push() after the EOF signaling null is an error.
// 4. _read() is not called after pushing the EOF null chunk.
var stream = require('stream');
var hwm = 10;
var r = stream.Readable({ highWaterMark: hwm });
var chunks = 10;
var t = (chunks * 5);
var data = new Buffer(chunks * hwm + Math.ceil(hwm / 2));
for (var i = 0; i < data.length; i++) {
var c = 'asdf'.charCodeAt(i % 4);
data[i] = c;
}
var pos = 0;
var pushedNull = false;
r._read = function(n) {
assert(!pushedNull, '_read after null push');
// every third chunk is fast
push(!(chunks % 3));
function push(fast) {
assert(!pushedNull, 'push() after null push');
var c = pos >= data.length ? null : data.slice(pos, pos + n);
pushedNull = c === null;
if (fast) {
pos += n;
r.push(c);
if (c === null) pushError();
} else {
setTimeout(function() {
pos += n;
r.push(c);
if (c === null) pushError();
});
}
}
};
function pushError() {
assert.throws(function() {
r.push(new Buffer(1));
});
}
var w = stream.Writable();
var written = [];
w._write = function(chunk, encoding, cb) {
written.push(chunk.toString());
cb();
};
var ended = false;
r.on('end', function() {
assert(!ended, 'end emitted more than once');
assert.throws(function() {
r.unshift(new Buffer(1));
});
ended = true;
w.end();
});
r.on('readable', function() {
var chunk;
while (null !== (chunk = r.read(10))) {
w.write(chunk);
if (chunk.length > 4)
r.unshift(new Buffer('1234'));
}
});
var finished = false;
w.on('finish', function() {
finished = true;
// each chunk should start with 1234, and then be asfdasdfasdf...
// The first got pulled out before the first unshift('1234'), so it's
// lacking that piece.
assert.equal(written[0], 'asdfasdfas');
var asdf = 'd';
console.error('0: %s', written[0]);
for (var i = 1; i < written.length; i++) {
console.error('%s: %s', i.toString(32), written[i]);
assert.equal(written[i].slice(0, 4), '1234');
for (var j = 4; j < written[i].length; j++) {
var c = written[i].charAt(j);
assert.equal(c, asdf);
switch (asdf) {
case 'a': asdf = 's'; break;
case 's': asdf = 'd'; break;
case 'd': asdf = 'f'; break;
case 'f': asdf = 'a'; break;
}
}
}
});
process.on('exit', function() {
assert.equal(written.length, 18);
assert(ended, 'stream ended');
assert(finished, 'stream finished');
console.log('ok');
});