From 08577906569a4c2de70ad2a861e2f8456cd8fcdd Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Tue, 17 Apr 2018 17:24:22 +0200 Subject: [PATCH] stream: prevent 'end' to be emitted after 'error' This PR adds _readableState.errorEmitted and add the tracking of it. Fixes: https://github.com/nodejs/node/issues/6083 PR-URL: https://github.com/nodejs/node/pull/20104 Reviewed-By: James M Snell Reviewed-By: Luigi Pinca Reviewed-By: Ruben Bridgewater Reviewed-By: Trivikram Kamat --- lib/_stream_readable.js | 16 +++++++++---- lib/_stream_writable.js | 10 ++++++++ lib/internal/streams/destroy.js | 14 +++++++++-- test/parallel/test-http2-client-destroy.js | 1 - .../test-http2-client-onconnect-errors.js | 1 - ...p2-client-stream-destroy-before-connect.js | 1 - ...est-http2-compat-serverresponse-destroy.js | 2 -- .../test-http2-max-concurrent-streams.js | 1 - .../test-http2-misused-pseudoheaders.js | 1 - .../test-http2-multi-content-length.js | 1 - .../test-http2-respond-file-fd-invalid.js | 2 +- .../test-http2-respond-nghttperrors.js | 2 +- .../test-http2-respond-with-fd-errors.js | 2 +- ...st-http2-server-shutdown-before-respond.js | 2 +- .../test-http2-server-socket-destroy.js | 1 - .../test-stream-duplex-error-write.js | 24 +++++++++++++++++++ test/parallel/test-stream-readable-destroy.js | 15 ++++++++++++ 17 files changed, 76 insertions(+), 20 deletions(-) create mode 100644 test/parallel/test-stream-duplex-error-write.js diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 8073e174cc5..31a8a11e4a0 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -99,6 +99,9 @@ function ReadableState(options, stream, isDuplex) { this.endEmitted = false; this.reading = false; + // Flipped if an 'error' is emitted. + this.errorEmitted = false; + // a flag to be able to tell if the event 'readable'/'data' is emitted // immediately, or on a later tick. We set this to true at first, because // any actions that shouldn't happen until "later" should generally also @@ -1069,20 +1072,23 @@ function fromList(n, state) { function endReadable(stream) { var state = stream._readableState; - debug('endReadable', state.endEmitted); - if (!state.endEmitted) { + debug('endReadable', state.endEmitted, state.errorEmitted); + if (!state.endEmitted && !state.errorEmitted) { state.ended = true; process.nextTick(endReadableNT, state, stream); } } function endReadableNT(state, stream) { - debug('endReadableNT', state.endEmitted, state.length); + debug('endReadableNT', state.endEmitted, state.length, state.errorEmitted); // Check that we didn't get one last unshift. if (!state.endEmitted && state.length === 0) { - state.endEmitted = true; stream.readable = false; - stream.emit('end'); + + if (!state.errorEmitted) { + state.endEmitted = true; + stream.emit('end'); + } } } diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index d21daf0541d..0891f85526f 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -424,12 +424,22 @@ function onwriteError(stream, state, sync, er, cb) { // this can emit finish, and it will always happen // after error process.nextTick(finishMaybe, stream, state); + + // needed for duplex, fixes https://github.com/nodejs/node/issues/6083 + if (stream._readableState) { + stream._readableState.errorEmitted = true; + } stream._writableState.errorEmitted = true; stream.emit('error', er); } else { // the caller expect this to happen before if // it is async cb(er); + + // needed for duplex, fixes https://github.com/nodejs/node/issues/6083 + if (stream._readableState) { + stream._readableState.errorEmitted = true; + } stream._writableState.errorEmitted = true; stream.emit('error', er); // this can emit finish, but finish must diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js index 3a0383cc3ce..2ab614e1d59 100644 --- a/lib/internal/streams/destroy.js +++ b/lib/internal/streams/destroy.js @@ -8,10 +8,14 @@ function destroy(err, cb) { this._writableState.destroyed; if (readableDestroyed || writableDestroyed) { + const readableErrored = this._readableState && + this._readableState.errorEmitted; + const writableErrored = this._writableState && + this._writableState.errorEmitted; + if (cb) { cb(err); - } else if (err && - (!this._writableState || !this._writableState.errorEmitted)) { + } else if (err && !readableErrored && !writableErrored) { process.nextTick(emitErrorNT, this, err); } return this; @@ -32,6 +36,11 @@ function destroy(err, cb) { this._destroy(err || null, (err) => { if (!cb && err) { process.nextTick(emitErrorAndCloseNT, this, err); + + if (this._readableState) { + this._readableState.errorEmitted = true; + } + if (this._writableState) { this._writableState.errorEmitted = true; } @@ -65,6 +74,7 @@ function undestroy() { this._readableState.reading = false; this._readableState.ended = false; this._readableState.endEmitted = false; + this._readableState.errorEmitted = false; } if (this._writableState) { diff --git a/test/parallel/test-http2-client-destroy.js b/test/parallel/test-http2-client-destroy.js index eab413e2327..09ca011c736 100644 --- a/test/parallel/test-http2-client-destroy.js +++ b/test/parallel/test-http2-client-destroy.js @@ -95,7 +95,6 @@ const Countdown = require('../common/countdown'); }); req.resume(); - req.on('end', common.mustCall()); req.on('close', common.mustCall(() => server.close())); })); } diff --git a/test/parallel/test-http2-client-onconnect-errors.js b/test/parallel/test-http2-client-onconnect-errors.js index a75dc590c66..f427bfb4907 100644 --- a/test/parallel/test-http2-client-onconnect-errors.js +++ b/test/parallel/test-http2-client-onconnect-errors.js @@ -101,7 +101,6 @@ function runTest(test) { }); } - req.on('end', common.mustCall()); req.on('close', common.mustCall(() => { client.destroy(); diff --git a/test/parallel/test-http2-client-stream-destroy-before-connect.js b/test/parallel/test-http2-client-stream-destroy-before-connect.js index d834de5d11e..9e81015ec58 100644 --- a/test/parallel/test-http2-client-stream-destroy-before-connect.js +++ b/test/parallel/test-http2-client-stream-destroy-before-connect.js @@ -45,5 +45,4 @@ server.listen(0, common.mustCall(() => { req.on('response', common.mustNotCall()); req.resume(); - req.on('end', common.mustCall()); })); diff --git a/test/parallel/test-http2-compat-serverresponse-destroy.js b/test/parallel/test-http2-compat-serverresponse-destroy.js index 8ee52a74ab4..49822082979 100644 --- a/test/parallel/test-http2-compat-serverresponse-destroy.js +++ b/test/parallel/test-http2-compat-serverresponse-destroy.js @@ -63,7 +63,6 @@ server.listen(0, common.mustCall(() => { req.on('close', common.mustCall(() => countdown.dec())); req.resume(); - req.on('end', common.mustCall()); } { @@ -78,6 +77,5 @@ server.listen(0, common.mustCall(() => { req.on('close', common.mustCall(() => countdown.dec())); req.resume(); - req.on('end', common.mustCall()); } })); diff --git a/test/parallel/test-http2-max-concurrent-streams.js b/test/parallel/test-http2-max-concurrent-streams.js index b270d6cc6af..2b576700aa4 100644 --- a/test/parallel/test-http2-max-concurrent-streams.js +++ b/test/parallel/test-http2-max-concurrent-streams.js @@ -45,7 +45,6 @@ server.listen(0, common.mustCall(() => { req.on('aborted', common.mustCall()); req.on('response', common.mustNotCall()); req.resume(); - req.on('end', common.mustCall()); req.on('close', common.mustCall(() => countdown.dec())); req.on('error', common.expectsError({ code: 'ERR_HTTP2_STREAM_ERROR', diff --git a/test/parallel/test-http2-misused-pseudoheaders.js b/test/parallel/test-http2-misused-pseudoheaders.js index c1ae37b9a36..0b7becef5f6 100644 --- a/test/parallel/test-http2-misused-pseudoheaders.js +++ b/test/parallel/test-http2-misused-pseudoheaders.js @@ -41,7 +41,6 @@ server.listen(0, common.mustCall(() => { req.on('response', common.mustCall()); req.resume(); - req.on('end', common.mustCall()); req.on('close', common.mustCall(() => { server.close(); client.close(); diff --git a/test/parallel/test-http2-multi-content-length.js b/test/parallel/test-http2-multi-content-length.js index 7d8ff4858fe..908f6ecd64f 100644 --- a/test/parallel/test-http2-multi-content-length.js +++ b/test/parallel/test-http2-multi-content-length.js @@ -53,7 +53,6 @@ server.listen(0, common.mustCall(() => { // header to be set for non-payload bearing requests... const req = client.request({ 'content-length': 1 }); req.resume(); - req.on('end', common.mustCall()); req.on('close', common.mustCall(() => countdown.dec())); req.on('error', common.expectsError({ code: 'ERR_HTTP2_STREAM_ERROR', diff --git a/test/parallel/test-http2-respond-file-fd-invalid.js b/test/parallel/test-http2-respond-file-fd-invalid.js index 21fcf790b44..28d1c0f057d 100644 --- a/test/parallel/test-http2-respond-file-fd-invalid.js +++ b/test/parallel/test-http2-respond-file-fd-invalid.js @@ -40,7 +40,7 @@ server.listen(0, () => { req.on('response', common.mustCall()); req.on('error', common.mustCall(errorCheck)); req.on('data', common.mustNotCall()); - req.on('end', common.mustCall(() => { + req.on('close', common.mustCall(() => { assert.strictEqual(req.rstCode, NGHTTP2_INTERNAL_ERROR); client.close(); server.close(); diff --git a/test/parallel/test-http2-respond-nghttperrors.js b/test/parallel/test-http2-respond-nghttperrors.js index ad9eee0d59f..4adf678b681 100644 --- a/test/parallel/test-http2-respond-nghttperrors.js +++ b/test/parallel/test-http2-respond-nghttperrors.js @@ -87,7 +87,7 @@ function runTest(test) { req.resume(); req.end(); - req.on('end', common.mustCall(() => { + req.on('close', common.mustCall(() => { client.close(); if (!tests.length) { diff --git a/test/parallel/test-http2-respond-with-fd-errors.js b/test/parallel/test-http2-respond-with-fd-errors.js index 3a671a3e364..7e7394d2930 100644 --- a/test/parallel/test-http2-respond-with-fd-errors.js +++ b/test/parallel/test-http2-respond-with-fd-errors.js @@ -95,7 +95,7 @@ function runTest(test) { req.resume(); req.end(); - req.on('end', common.mustCall(() => { + req.on('close', common.mustCall(() => { client.close(); if (!tests.length) { diff --git a/test/parallel/test-http2-server-shutdown-before-respond.js b/test/parallel/test-http2-server-shutdown-before-respond.js index 33f224fc69a..50b3a5572a5 100644 --- a/test/parallel/test-http2-server-shutdown-before-respond.js +++ b/test/parallel/test-http2-server-shutdown-before-respond.js @@ -32,5 +32,5 @@ server.on('listening', common.mustCall(() => { })); req.resume(); req.on('data', common.mustNotCall()); - req.on('end', common.mustCall(() => server.close())); + req.on('close', common.mustCall(() => server.close())); })); diff --git a/test/parallel/test-http2-server-socket-destroy.js b/test/parallel/test-http2-server-socket-destroy.js index 03afc1957b8..d631ef032b8 100644 --- a/test/parallel/test-http2-server-socket-destroy.js +++ b/test/parallel/test-http2-server-socket-destroy.js @@ -52,5 +52,4 @@ server.on('listening', common.mustCall(() => { req.on('aborted', common.mustCall()); req.resume(); - req.on('end', common.mustCall()); })); diff --git a/test/parallel/test-stream-duplex-error-write.js b/test/parallel/test-stream-duplex-error-write.js new file mode 100644 index 00000000000..5a80ce5c3e4 --- /dev/null +++ b/test/parallel/test-stream-duplex-error-write.js @@ -0,0 +1,24 @@ +'use strict'; + +const common = require('../common'); +const { Duplex } = require('stream'); +const { strictEqual } = require('assert'); + +const duplex = new Duplex({ + write(chunk, enc, cb) { + cb(new Error('kaboom')); + }, + read() { + this.push(null); + } +}); + +duplex.on('error', common.mustCall(function() { + strictEqual(this._readableState.errorEmitted, true); + strictEqual(this._writableState.errorEmitted, true); +})); + +duplex.on('end', common.mustNotCall()); + +duplex.end('hello'); +duplex.resume(); diff --git a/test/parallel/test-stream-readable-destroy.js b/test/parallel/test-stream-readable-destroy.js index 026aa8ca160..eecee04294e 100644 --- a/test/parallel/test-stream-readable-destroy.js +++ b/test/parallel/test-stream-readable-destroy.js @@ -189,3 +189,18 @@ const { inherits } = require('util'); read.push('hi'); read.on('data', common.mustNotCall()); } + +{ + // double error case + const read = new Readable({ + read() {} + }); + + read.on('close', common.mustCall()); + read.on('error', common.mustCall()); + + read.destroy(new Error('kaboom 1')); + read.destroy(new Error('kaboom 2')); + assert.strictEqual(read._readableState.errorEmitted, true); + assert.strictEqual(read.destroyed, true); +}