0
0
mirror of https://github.com/nodejs/node.git synced 2024-11-30 15:30:56 +01:00

streams: refactor ReadableStream asyncIterator creation and a few fixes

Closes: https://github.com/nodejs/node/issues/23041

- Rewrite `ReadableAsyncIterator` class into
`ReadableStreamAsyncIteratorPrototype` which contains no constructor and
inherits from `%AsyncIteratorPrototype%`.

- Rewrite `AsyncIteratorRecord` into dumb function.

PR-URL: https://github.com/nodejs/node/pull/23042
Fixes: https://github.com/nodejs/node/issues/23041
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Anatoli Papirovski <apapirovski@mac.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Ruben Bridgewater <ruben@bridgewater.de>
This commit is contained in:
Gus Caplan 2018-09-23 15:10:12 -05:00
parent e688fe6b7e
commit 8bce9e8f19
No known key found for this signature in database
GPG Key ID: F00BD11880E82F0E
3 changed files with 78 additions and 48 deletions

View File

@ -42,7 +42,7 @@ const { emitExperimentalWarning } = require('internal/util');
// Lazy loaded to improve the startup performance.
let StringDecoder;
let ReadableAsyncIterator;
let createReadableStreamAsyncIterator;
util.inherits(Readable, Stream);
@ -990,9 +990,11 @@ Readable.prototype.wrap = function(stream) {
Readable.prototype[Symbol.asyncIterator] = function() {
emitExperimentalWarning('Readable[Symbol.asyncIterator]');
if (ReadableAsyncIterator === undefined)
ReadableAsyncIterator = require('internal/streams/async_iterator');
return new ReadableAsyncIterator(this);
if (createReadableStreamAsyncIterator === undefined) {
createReadableStreamAsyncIterator =
require('internal/streams/async_iterator');
}
return createReadableStreamAsyncIterator(this);
};
Object.defineProperty(Readable.prototype, 'readableHighWaterMark', {

View File

@ -8,12 +8,9 @@ const kLastPromise = Symbol('lastPromise');
const kHandlePromise = Symbol('handlePromise');
const kStream = Symbol('stream');
const AsyncIteratorRecord = class AsyncIteratorRecord {
constructor(value, done) {
this.done = done;
this.value = value;
}
};
function createIterResult(value, done) {
return { value, done };
}
function readAndResolve(iter) {
const resolve = iter[kLastResolve];
@ -26,7 +23,7 @@ function readAndResolve(iter) {
iter[kLastPromise] = null;
iter[kLastResolve] = null;
iter[kLastReject] = null;
resolve(new AsyncIteratorRecord(data, false));
resolve(createIterResult(data, false));
}
}
}
@ -43,7 +40,7 @@ function onEnd(iter) {
iter[kLastPromise] = null;
iter[kLastResolve] = null;
iter[kLastReject] = null;
resolve(new AsyncIteratorRecord(null, true));
resolve(createIterResult(null, true));
}
iter[kEnded] = true;
}
@ -69,39 +66,13 @@ function wrapForNext(lastPromise, iter) {
};
}
const ReadableAsyncIterator = class ReadableAsyncIterator {
constructor(stream) {
this[kStream] = stream;
this[kLastResolve] = null;
this[kLastReject] = null;
this[kError] = null;
this[kEnded] = false;
this[kLastPromise] = null;
stream.on('readable', onReadable.bind(null, this));
stream.on('end', onEnd.bind(null, this));
stream.on('error', onError.bind(null, this));
// the function passed to new Promise
// is cached so we avoid allocating a new
// closure at every run
this[kHandlePromise] = (resolve, reject) => {
const data = this[kStream].read();
if (data) {
this[kLastPromise] = null;
this[kLastResolve] = null;
this[kLastReject] = null;
resolve(new AsyncIteratorRecord(data, false));
} else {
this[kLastResolve] = resolve;
this[kLastReject] = reject;
}
};
}
const AsyncIteratorPrototype = Object.getPrototypeOf(
Object.getPrototypeOf(async function* () {}).prototype);
const ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf({
get stream() {
return this[kStream];
}
},
next() {
// if we have detected an error in the meanwhile
@ -112,7 +83,7 @@ const ReadableAsyncIterator = class ReadableAsyncIterator {
}
if (this[kEnded]) {
return Promise.resolve(new AsyncIteratorRecord(null, true));
return Promise.resolve(createIterResult(null, true));
}
// if we have multiple next() calls
@ -129,7 +100,7 @@ const ReadableAsyncIterator = class ReadableAsyncIterator {
// without triggering the next() queue
const data = this[kStream].read();
if (data !== null) {
return Promise.resolve(new AsyncIteratorRecord(data, false));
return Promise.resolve(createIterResult(data, false));
}
promise = new Promise(this[kHandlePromise]);
@ -138,7 +109,7 @@ const ReadableAsyncIterator = class ReadableAsyncIterator {
this[kLastPromise] = promise;
return promise;
}
},
return() {
// destroy(err, cb) is a private API
@ -150,10 +121,45 @@ const ReadableAsyncIterator = class ReadableAsyncIterator {
reject(err);
return;
}
resolve(new AsyncIteratorRecord(null, true));
resolve(createIterResult(null, true));
});
});
}
},
}, AsyncIteratorPrototype);
const createReadableStreamAsyncIterator = (stream) => {
const iterator = Object.create(ReadableStreamAsyncIteratorPrototype, {
[kStream]: { value: stream, writable: true },
[kLastResolve]: { value: null, writable: true },
[kLastReject]: { value: null, writable: true },
[kError]: { value: null, writable: true },
[kEnded]: { value: false, writable: true },
[kLastPromise]: { value: null, writable: true },
// the function passed to new Promise
// is cached so we avoid allocating a new
// closure at every run
[kHandlePromise]: {
value: (resolve, reject) => {
const data = iterator[kStream].read();
if (data) {
iterator[kLastPromise] = null;
iterator[kLastResolve] = null;
iterator[kLastReject] = null;
resolve(createIterResult(data, false));
} else {
iterator[kLastResolve] = resolve;
iterator[kLastReject] = reject;
}
},
writable: true,
},
});
stream.on('readable', onReadable.bind(null, iterator));
stream.on('end', onEnd.bind(null, iterator));
stream.on('error', onError.bind(null, iterator));
return iterator;
};
module.exports = ReadableAsyncIterator;
module.exports = createReadableStreamAsyncIterator;

View File

@ -5,6 +5,28 @@ const { Readable } = require('stream');
const assert = require('assert');
async function tests() {
{
const AsyncIteratorPrototype = Object.getPrototypeOf(
Object.getPrototypeOf(async function* () {}).prototype);
const rs = new Readable({});
assert.strictEqual(
Object.getPrototypeOf(Object.getPrototypeOf(rs[Symbol.asyncIterator]())),
AsyncIteratorPrototype);
}
await (async function() {
const readable = new Readable({ objectMode: true, read() {} });
readable.push(0);
readable.push(1);
readable.push(null);
const iter = readable[Symbol.asyncIterator]();
assert.strictEqual((await iter.next()).value, 0);
for await (const d of iter) {
assert.strictEqual(d, 1);
}
})();
await (async function() {
console.log('read without for..await');
const max = 5;