diff --git a/lib/internal/streams/async_iterator.js b/lib/internal/streams/async_iterator.js index 7eaef695086..cc8e218498f 100644 --- a/lib/internal/streams/async_iterator.js +++ b/lib/internal/streams/async_iterator.js @@ -39,6 +39,11 @@ function onReadable(iter) { function wrapForNext(lastPromise, iter) { return (resolve, reject) => { lastPromise.then(() => { + if (iter[kEnded]) { + resolve(createIterResult(undefined, true)); + return; + } + iter[kHandlePromise](resolve, reject); }, reject); }; @@ -61,7 +66,7 @@ const ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf({ } if (this[kEnded]) { - return Promise.resolve(createIterResult(null, true)); + return Promise.resolve(createIterResult(undefined, true)); } if (this[kStream].destroyed) { @@ -74,7 +79,7 @@ const ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf({ if (this[kError]) { reject(this[kError]); } else { - resolve(createIterResult(null, true)); + resolve(createIterResult(undefined, true)); } }); }); @@ -115,7 +120,7 @@ const ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf({ reject(err); return; } - resolve(createIterResult(null, true)); + resolve(createIterResult(undefined, true)); }); }); }, @@ -131,7 +136,6 @@ const createReadableStreamAsyncIterator = (stream) => { value: stream._readableState.endEmitted, writable: true }, - [kLastPromise]: { value: null, writable: true }, // the function passed to new Promise // is cached so we avoid allocating a new // closure at every run @@ -151,6 +155,7 @@ const createReadableStreamAsyncIterator = (stream) => { writable: true, }, }); + iterator[kLastPromise] = null; finished(stream, (err) => { if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') { @@ -172,7 +177,7 @@ const createReadableStreamAsyncIterator = (stream) => { iterator[kLastPromise] = null; iterator[kLastResolve] = null; iterator[kLastReject] = null; - resolve(createIterResult(null, true)); + resolve(createIterResult(undefined, true)); } iterator[kEnded] = true; }); diff --git a/test/parallel/test-stream-readable-async-iterators.js b/test/parallel/test-stream-readable-async-iterators.js index 83540de9def..aca1e5bcc9d 100644 --- a/test/parallel/test-stream-readable-async-iterators.js +++ b/test/parallel/test-stream-readable-async-iterators.js @@ -393,6 +393,71 @@ async function tests() { r.destroy(null); } })(); + + await (async () => { + console.log('all next promises must be resolved on end'); + const r = new Readable({ + objectMode: true, + read() { + } + }); + + const b = r[Symbol.asyncIterator](); + const c = b.next(); + const d = b.next(); + r.push(null); + assert.deepStrictEqual(await c, { done: true, value: undefined }); + assert.deepStrictEqual(await d, { done: true, value: undefined }); + })(); + + await (async () => { + console.log('all next promises must be resolved on destroy'); + const r = new Readable({ + objectMode: true, + read() { + } + }); + + const b = r[Symbol.asyncIterator](); + const c = b.next(); + const d = b.next(); + r.destroy(); + assert.deepStrictEqual(await c, { done: true, value: undefined }); + assert.deepStrictEqual(await d, { done: true, value: undefined }); + })(); + + await (async () => { + console.log('all next promises must be resolved on destroy with error'); + const r = new Readable({ + objectMode: true, + read() { + } + }); + + const b = r[Symbol.asyncIterator](); + const c = b.next(); + const d = b.next(); + const err = new Error('kaboom'); + r.destroy(err); + + await Promise.all([(async () => { + let e; + try { + await c; + } catch (_e) { + e = _e; + } + assert.strictEqual(e, err); + })(), (async () => { + let e; + try { + await d; + } catch (_e) { + e = _e; + } + assert.strictEqual(e, err); + })()]); + })(); } // to avoid missing some tests if a promise does not resolve