0
0
mirror of https://github.com/nodejs/node.git synced 2024-12-01 16:10:02 +01:00

stream: use lazy registration for drain for fast destinations

PR-URL: https://github.com/nodejs/node/pull/29095
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Rich Trott <rtrott@gmail.com>
This commit is contained in:
Robert Nagy 2019-08-12 14:39:18 +02:00 committed by Rich Trott
parent 4111c57f7c
commit 7195cd6fb3
3 changed files with 36 additions and 15 deletions

View File

@ -676,12 +676,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
dest.end();
}
// When the dest drains, it reduces the awaitDrain counter
// on the source. This would be more elegant with a .once()
// handler in flow(), but adding and removing repeatedly is
// too slow.
const ondrain = pipeOnDrain(src);
dest.on('drain', ondrain);
let ondrain;
var cleanedUp = false;
function cleanup() {
@ -689,7 +684,9 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
// Cleanup event handlers once the pipe is broken
dest.removeListener('close', onclose);
dest.removeListener('finish', onfinish);
dest.removeListener('drain', ondrain);
if (ondrain) {
dest.removeListener('drain', ondrain);
}
dest.removeListener('error', onerror);
dest.removeListener('unpipe', onunpipe);
src.removeListener('end', onend);
@ -703,7 +700,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
// flowing again.
// So, if this is awaiting a drain, then we just call it now.
// If we don't know, then assume that we are waiting for one.
if (state.awaitDrain &&
if (ondrain && state.awaitDrain &&
(!dest._writableState || dest._writableState.needDrain))
ondrain();
}
@ -722,6 +719,14 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
debug('false write response, pause', state.awaitDrain);
state.awaitDrain++;
}
if (!ondrain) {
// When the dest drains, it reduces the awaitDrain counter
// on the source. This would be more elegant with a .once()
// handler in flow(), but adding and removing repeatedly is
// too slow.
ondrain = pipeOnDrain(src);
dest.on('drain', ondrain);
}
src.pause();
}
}

View File

@ -1,5 +1,6 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const { Readable, Writable, PassThrough } = require('stream');
{
@ -65,3 +66,25 @@ const { Readable, Writable, PassThrough } = require('stream');
wrapper.resume();
wrapper.on('end', common.mustCall());
}
{
// Only register drain if there is backpressure.
const rs = new Readable({ read() {} });
const pt = rs
.pipe(new PassThrough({ objectMode: true, highWaterMark: 2 }));
assert.strictEqual(pt.listenerCount('drain'), 0);
pt.on('finish', () => {
assert.strictEqual(pt.listenerCount('drain'), 0);
});
rs.push('asd');
assert.strictEqual(pt.listenerCount('drain'), 0);
process.nextTick(() => {
rs.push('asd');
assert.strictEqual(pt.listenerCount('drain'), 0);
rs.push(null);
assert.strictEqual(pt.listenerCount('drain'), 0);
});
}

View File

@ -52,11 +52,4 @@ function drain() {
w.end = common.mustCall();
// Just for kicks, let's mess with the drain count.
// This verifies that even if it gets negative in the
// pipe() cleanup function, we'll still function properly.
r.on('readable', function() {
w.emit('drain');
});
r.pipe(w);