0
0
mirror of https://github.com/nodejs/node.git synced 2024-12-01 16:10:02 +01:00
nodejs/test/parallel/test-readable-from-iterator-closing.js
Vadzim Zieńka 8a3fa32a1f stream: close iterator in Readable.from
Call iterator.return() if not all of its values are consumed.

Fixes: https://github.com/nodejs/node/issues/32842

PR-URL: https://github.com/nodejs/node/pull/32844
Reviewed-By: Robert Nagy <ronagy@icloud.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Zeyu Yang <himself65@outlook.com>
2020-04-18 21:28:17 +02:00

199 lines
4.0 KiB
JavaScript

'use strict';
const { mustCall, mustNotCall } = require('../common');
const { Readable } = require('stream');
const { strictEqual } = require('assert');
async function asyncSupport() {
const finallyMustCall = mustCall();
const bodyMustCall = mustCall();
async function* infiniteGenerate() {
try {
while (true) yield 'a';
} finally {
finallyMustCall();
}
}
const stream = Readable.from(infiniteGenerate());
for await (const chunk of stream) {
bodyMustCall();
strictEqual(chunk, 'a');
break;
}
}
async function syncSupport() {
const finallyMustCall = mustCall();
const bodyMustCall = mustCall();
function* infiniteGenerate() {
try {
while (true) yield 'a';
} finally {
finallyMustCall();
}
}
const stream = Readable.from(infiniteGenerate());
for await (const chunk of stream) {
bodyMustCall();
strictEqual(chunk, 'a');
break;
}
}
async function syncPromiseSupport() {
const returnMustBeAwaited = mustCall();
const bodyMustCall = mustCall();
function* infiniteGenerate() {
try {
while (true) yield Promise.resolve('a');
} finally {
// eslint-disable-next-line no-unsafe-finally
return { then(cb) {
returnMustBeAwaited();
cb();
} };
}
}
const stream = Readable.from(infiniteGenerate());
for await (const chunk of stream) {
bodyMustCall();
strictEqual(chunk, 'a');
break;
}
}
async function syncRejectedSupport() {
const returnMustBeAwaited = mustCall();
const bodyMustNotCall = mustNotCall();
const catchMustCall = mustCall();
const secondNextMustNotCall = mustNotCall();
function* generate() {
try {
yield Promise.reject('a');
secondNextMustNotCall();
} finally {
// eslint-disable-next-line no-unsafe-finally
return { then(cb) {
returnMustBeAwaited();
cb();
} };
}
}
const stream = Readable.from(generate());
try {
for await (const chunk of stream) {
bodyMustNotCall(chunk);
}
} catch {
catchMustCall();
}
}
async function noReturnAfterThrow() {
const returnMustNotCall = mustNotCall();
const bodyMustNotCall = mustNotCall();
const catchMustCall = mustCall();
const nextMustCall = mustCall();
const stream = Readable.from({
[Symbol.asyncIterator]() { return this; },
async next() {
nextMustCall();
throw new Error('a');
},
async return() {
returnMustNotCall();
return { done: true };
},
});
try {
for await (const chunk of stream) {
bodyMustNotCall(chunk);
}
} catch {
catchMustCall();
}
}
async function closeStreamWhileNextIsPending() {
const finallyMustCall = mustCall();
const dataMustCall = mustCall();
let resolveDestroy;
const destroyed =
new Promise((resolve) => { resolveDestroy = mustCall(resolve); });
let resolveYielded;
const yielded =
new Promise((resolve) => { resolveYielded = mustCall(resolve); });
async function* infiniteGenerate() {
try {
while (true) {
yield 'a';
resolveYielded();
await destroyed;
}
} finally {
finallyMustCall();
}
}
const stream = Readable.from(infiniteGenerate());
stream.on('data', (data) => {
dataMustCall();
strictEqual(data, 'a');
});
yielded.then(() => {
stream.destroy();
resolveDestroy();
});
}
async function closeAfterNullYielded() {
const finallyMustCall = mustCall();
const dataMustCall = mustCall(3);
function* infiniteGenerate() {
try {
yield 'a';
yield 'a';
yield 'a';
while (true) yield null;
} finally {
finallyMustCall();
}
}
const stream = Readable.from(infiniteGenerate());
stream.on('data', (chunk) => {
dataMustCall();
strictEqual(chunk, 'a');
});
}
Promise.all([
asyncSupport(),
syncSupport(),
syncPromiseSupport(),
syncRejectedSupport(),
noReturnAfterThrow(),
closeStreamWhileNextIsPending(),
closeAfterNullYielded(),
]).then(mustCall());