mirror of
https://github.com/nodejs/node.git
synced 2024-11-24 20:29:23 +01:00
bb8cc65edb
PR-URL: https://github.com/nodejs/node/pull/54836 Reviewed-By: Michaël Zasso <targos@protonmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Luigi Pinca <luigipinca@gmail.com> Reviewed-By: Chemi Atlow <chemi@atlow.co.il> Reviewed-By: Moshe Atlow <moshe@atlow.co.il> Reviewed-By: Antoine du Hamel <duhamelantoine1995@gmail.com> Reviewed-By: Robert Nagy <ronagy@icloud.com> Reviewed-By: Trivikram Kamat <trivikr.dev@gmail.com>
1030 lines
25 KiB
JavaScript
1030 lines
25 KiB
JavaScript
'use strict';
|
|
|
|
const {
|
|
ArrayPrototypeFilter,
|
|
ArrayPrototypeMap,
|
|
Boolean,
|
|
ObjectEntries,
|
|
PromisePrototypeThen,
|
|
PromiseResolve,
|
|
PromiseWithResolvers,
|
|
SafePromiseAll,
|
|
SafePromisePrototypeFinally,
|
|
SafeSet,
|
|
TypeError,
|
|
TypedArrayPrototypeGetBuffer,
|
|
TypedArrayPrototypeGetByteLength,
|
|
TypedArrayPrototypeGetByteOffset,
|
|
Uint8Array,
|
|
} = primordials;
|
|
|
|
const { TextEncoder } = require('internal/encoding');
|
|
|
|
const {
|
|
ReadableStream,
|
|
isReadableStream,
|
|
} = require('internal/webstreams/readablestream');
|
|
|
|
const {
|
|
WritableStream,
|
|
isWritableStream,
|
|
} = require('internal/webstreams/writablestream');
|
|
|
|
const {
|
|
CountQueuingStrategy,
|
|
ByteLengthQueuingStrategy,
|
|
} = require('internal/webstreams/queuingstrategies');
|
|
|
|
const {
|
|
Writable,
|
|
Readable,
|
|
Duplex,
|
|
destroy,
|
|
} = require('stream');
|
|
|
|
const {
|
|
isDestroyed,
|
|
isReadable,
|
|
isWritable,
|
|
isWritableEnded,
|
|
} = require('internal/streams/utils');
|
|
|
|
const {
|
|
Buffer,
|
|
} = require('buffer');
|
|
|
|
const {
|
|
AbortError,
|
|
ErrnoException,
|
|
codes: {
|
|
ERR_INVALID_ARG_TYPE,
|
|
ERR_INVALID_ARG_VALUE,
|
|
ERR_INVALID_STATE,
|
|
ERR_STREAM_PREMATURE_CLOSE,
|
|
},
|
|
} = require('internal/errors');
|
|
|
|
const {
|
|
kEmptyObject,
|
|
normalizeEncoding,
|
|
} = require('internal/util');
|
|
|
|
const {
|
|
validateBoolean,
|
|
validateFunction,
|
|
validateObject,
|
|
} = require('internal/validators');
|
|
|
|
const {
|
|
WriteWrap,
|
|
ShutdownWrap,
|
|
kReadBytesOrError,
|
|
kLastWriteWasAsync,
|
|
streamBaseState,
|
|
} = internalBinding('stream_wrap');
|
|
|
|
const finished = require('internal/streams/end-of-stream');
|
|
|
|
const { UV_EOF } = internalBinding('uv');
|
|
|
|
const encoder = new TextEncoder();
|
|
|
|
// Collect all negative (error) ZLIB codes and Z_NEED_DICT
|
|
const ZLIB_FAILURES = new SafeSet([
|
|
...ArrayPrototypeFilter(
|
|
ArrayPrototypeMap(
|
|
ObjectEntries(internalBinding('constants').zlib),
|
|
({ 0: code, 1: value }) => (value < 0 ? code : null),
|
|
),
|
|
Boolean,
|
|
),
|
|
'Z_NEED_DICT',
|
|
]);
|
|
|
|
/**
|
|
* @param {Error|null} cause
|
|
* @returns {Error|null}
|
|
*/
|
|
function handleKnownInternalErrors(cause) {
|
|
switch (true) {
|
|
case cause?.code === 'ERR_STREAM_PREMATURE_CLOSE': {
|
|
return new AbortError(undefined, { cause });
|
|
}
|
|
case ZLIB_FAILURES.has(cause?.code): {
|
|
// eslint-disable-next-line no-restricted-syntax
|
|
const error = new TypeError(undefined, { cause });
|
|
error.code = cause.code;
|
|
return error;
|
|
}
|
|
default:
|
|
return cause;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @typedef {import('../../stream').Writable} Writable
|
|
* @typedef {import('../../stream').Readable} Readable
|
|
* @typedef {import('./writablestream').WritableStream} WritableStream
|
|
* @typedef {import('./readablestream').ReadableStream} ReadableStream
|
|
*/
|
|
|
|
/**
|
|
* @typedef {import('../abort_controller').AbortSignal} AbortSignal
|
|
*/
|
|
|
|
/**
|
|
* @param {Writable} streamWritable
|
|
* @returns {WritableStream}
|
|
*/
|
|
function newWritableStreamFromStreamWritable(streamWritable) {
|
|
// Not using the internal/streams/utils isWritableNodeStream utility
|
|
// here because it will return false if streamWritable is a Duplex
|
|
// whose writable option is false. For a Duplex that is not writable,
|
|
// we want it to pass this check but return a closed WritableStream.
|
|
// We check if the given stream is a stream.Writable or http.OutgoingMessage
|
|
const checkIfWritableOrOutgoingMessage =
|
|
streamWritable &&
|
|
typeof streamWritable?.write === 'function' &&
|
|
typeof streamWritable?.on === 'function';
|
|
if (!checkIfWritableOrOutgoingMessage) {
|
|
throw new ERR_INVALID_ARG_TYPE(
|
|
'streamWritable',
|
|
'stream.Writable',
|
|
streamWritable,
|
|
);
|
|
}
|
|
|
|
if (isDestroyed(streamWritable) || !isWritable(streamWritable)) {
|
|
const writable = new WritableStream();
|
|
writable.close();
|
|
return writable;
|
|
}
|
|
|
|
const highWaterMark = streamWritable.writableHighWaterMark;
|
|
const strategy =
|
|
streamWritable.writableObjectMode ?
|
|
new CountQueuingStrategy({ highWaterMark }) :
|
|
{ highWaterMark };
|
|
|
|
let controller;
|
|
let backpressurePromise;
|
|
let closed;
|
|
|
|
function onDrain() {
|
|
if (backpressurePromise !== undefined)
|
|
backpressurePromise.resolve();
|
|
}
|
|
|
|
const cleanup = finished(streamWritable, (error) => {
|
|
error = handleKnownInternalErrors(error);
|
|
|
|
cleanup();
|
|
// This is a protection against non-standard, legacy streams
|
|
// that happen to emit an error event again after finished is called.
|
|
streamWritable.on('error', () => {});
|
|
if (error != null) {
|
|
if (backpressurePromise !== undefined)
|
|
backpressurePromise.reject(error);
|
|
// If closed is not undefined, the error is happening
|
|
// after the WritableStream close has already started.
|
|
// We need to reject it here.
|
|
if (closed !== undefined) {
|
|
closed.reject(error);
|
|
closed = undefined;
|
|
}
|
|
controller.error(error);
|
|
controller = undefined;
|
|
return;
|
|
}
|
|
|
|
if (closed !== undefined) {
|
|
closed.resolve();
|
|
closed = undefined;
|
|
return;
|
|
}
|
|
controller.error(new AbortError());
|
|
controller = undefined;
|
|
});
|
|
|
|
streamWritable.on('drain', onDrain);
|
|
|
|
return new WritableStream({
|
|
start(c) { controller = c; },
|
|
|
|
write(chunk) {
|
|
if (streamWritable.writableNeedDrain || !streamWritable.write(chunk)) {
|
|
backpressurePromise = PromiseWithResolvers();
|
|
return SafePromisePrototypeFinally(
|
|
backpressurePromise.promise, () => {
|
|
backpressurePromise = undefined;
|
|
});
|
|
}
|
|
},
|
|
|
|
abort(reason) {
|
|
destroy(streamWritable, reason);
|
|
},
|
|
|
|
close() {
|
|
if (closed === undefined && !isWritableEnded(streamWritable)) {
|
|
closed = PromiseWithResolvers();
|
|
streamWritable.end();
|
|
return closed.promise;
|
|
}
|
|
|
|
controller = undefined;
|
|
return PromiseResolve();
|
|
},
|
|
}, strategy);
|
|
}
|
|
|
|
/**
|
|
* @param {WritableStream} writableStream
|
|
* @param {{
|
|
* decodeStrings? : boolean,
|
|
* highWaterMark? : number,
|
|
* objectMode? : boolean,
|
|
* signal? : AbortSignal,
|
|
* }} [options]
|
|
* @returns {Writable}
|
|
*/
|
|
function newStreamWritableFromWritableStream(writableStream, options = kEmptyObject) {
|
|
if (!isWritableStream(writableStream)) {
|
|
throw new ERR_INVALID_ARG_TYPE(
|
|
'writableStream',
|
|
'WritableStream',
|
|
writableStream);
|
|
}
|
|
|
|
validateObject(options, 'options');
|
|
const {
|
|
highWaterMark,
|
|
decodeStrings = true,
|
|
objectMode = false,
|
|
signal,
|
|
} = options;
|
|
|
|
validateBoolean(objectMode, 'options.objectMode');
|
|
validateBoolean(decodeStrings, 'options.decodeStrings');
|
|
|
|
const writer = writableStream.getWriter();
|
|
let closed = false;
|
|
|
|
const writable = new Writable({
|
|
highWaterMark,
|
|
objectMode,
|
|
decodeStrings,
|
|
signal,
|
|
|
|
writev(chunks, callback) {
|
|
function done(error) {
|
|
error = error.filter((e) => e);
|
|
try {
|
|
callback(error.length === 0 ? undefined : error);
|
|
} catch (error) {
|
|
// In a next tick because this is happening within
|
|
// a promise context, and if there are any errors
|
|
// thrown we don't want those to cause an unhandled
|
|
// rejection. Let's just escape the promise and
|
|
// handle it separately.
|
|
process.nextTick(() => destroy(writable, error));
|
|
}
|
|
}
|
|
|
|
PromisePrototypeThen(
|
|
writer.ready,
|
|
() => {
|
|
return PromisePrototypeThen(
|
|
SafePromiseAll(
|
|
chunks,
|
|
(data) => writer.write(data.chunk)),
|
|
done,
|
|
done);
|
|
},
|
|
done);
|
|
},
|
|
|
|
write(chunk, encoding, callback) {
|
|
if (typeof chunk === 'string' && decodeStrings && !objectMode) {
|
|
const enc = normalizeEncoding(encoding);
|
|
|
|
if (enc === 'utf8') {
|
|
chunk = encoder.encode(chunk);
|
|
} else {
|
|
chunk = Buffer.from(chunk, encoding);
|
|
chunk = new Uint8Array(
|
|
TypedArrayPrototypeGetBuffer(chunk),
|
|
TypedArrayPrototypeGetByteOffset(chunk),
|
|
TypedArrayPrototypeGetByteLength(chunk),
|
|
);
|
|
}
|
|
}
|
|
|
|
function done(error) {
|
|
try {
|
|
callback(error);
|
|
} catch (error) {
|
|
destroy(writable, error);
|
|
}
|
|
}
|
|
|
|
PromisePrototypeThen(
|
|
writer.ready,
|
|
() => {
|
|
return PromisePrototypeThen(
|
|
writer.write(chunk),
|
|
done,
|
|
done);
|
|
},
|
|
done);
|
|
},
|
|
|
|
destroy(error, callback) {
|
|
function done() {
|
|
try {
|
|
callback(error);
|
|
} catch (error) {
|
|
// In a next tick because this is happening within
|
|
// a promise context, and if there are any errors
|
|
// thrown we don't want those to cause an unhandled
|
|
// rejection. Let's just escape the promise and
|
|
// handle it separately.
|
|
process.nextTick(() => { throw error; });
|
|
}
|
|
}
|
|
|
|
if (!closed) {
|
|
if (error != null) {
|
|
PromisePrototypeThen(
|
|
writer.abort(error),
|
|
done,
|
|
done);
|
|
} else {
|
|
PromisePrototypeThen(
|
|
writer.close(),
|
|
done,
|
|
done);
|
|
}
|
|
return;
|
|
}
|
|
|
|
done();
|
|
},
|
|
|
|
final(callback) {
|
|
function done(error) {
|
|
try {
|
|
callback(error);
|
|
} catch (error) {
|
|
// In a next tick because this is happening within
|
|
// a promise context, and if there are any errors
|
|
// thrown we don't want those to cause an unhandled
|
|
// rejection. Let's just escape the promise and
|
|
// handle it separately.
|
|
process.nextTick(() => destroy(writable, error));
|
|
}
|
|
}
|
|
|
|
if (!closed) {
|
|
PromisePrototypeThen(
|
|
writer.close(),
|
|
done,
|
|
done);
|
|
}
|
|
},
|
|
});
|
|
|
|
PromisePrototypeThen(
|
|
writer.closed,
|
|
() => {
|
|
// If the WritableStream closes before the stream.Writable has been
|
|
// ended, we signal an error on the stream.Writable.
|
|
closed = true;
|
|
if (!isWritableEnded(writable))
|
|
destroy(writable, new ERR_STREAM_PREMATURE_CLOSE());
|
|
},
|
|
(error) => {
|
|
// If the WritableStream errors before the stream.Writable has been
|
|
// destroyed, signal an error on the stream.Writable.
|
|
closed = true;
|
|
destroy(writable, error);
|
|
});
|
|
|
|
return writable;
|
|
}
|
|
|
|
/**
|
|
* @typedef {import('./queuingstrategies').QueuingStrategy} QueuingStrategy
|
|
* @param {Readable} streamReadable
|
|
* @param {{
|
|
* strategy : QueuingStrategy
|
|
* }} [options]
|
|
* @returns {ReadableStream}
|
|
*/
|
|
function newReadableStreamFromStreamReadable(streamReadable, options = kEmptyObject) {
|
|
// Not using the internal/streams/utils isReadableNodeStream utility
|
|
// here because it will return false if streamReadable is a Duplex
|
|
// whose readable option is false. For a Duplex that is not readable,
|
|
// we want it to pass this check but return a closed ReadableStream.
|
|
if (typeof streamReadable?._readableState !== 'object') {
|
|
throw new ERR_INVALID_ARG_TYPE(
|
|
'streamReadable',
|
|
'stream.Readable',
|
|
streamReadable);
|
|
}
|
|
|
|
if (isDestroyed(streamReadable) || !isReadable(streamReadable)) {
|
|
const readable = new ReadableStream();
|
|
readable.cancel();
|
|
return readable;
|
|
}
|
|
|
|
const objectMode = streamReadable.readableObjectMode;
|
|
const highWaterMark = streamReadable.readableHighWaterMark;
|
|
|
|
const evaluateStrategyOrFallback = (strategy) => {
|
|
// If there is a strategy available, use it
|
|
if (strategy)
|
|
return strategy;
|
|
|
|
if (objectMode) {
|
|
// When running in objectMode explicitly but no strategy, we just fall
|
|
// back to CountQueuingStrategy
|
|
return new CountQueuingStrategy({ highWaterMark });
|
|
}
|
|
|
|
return new ByteLengthQueuingStrategy({ highWaterMark });
|
|
};
|
|
|
|
const strategy = evaluateStrategyOrFallback(options?.strategy);
|
|
|
|
let controller;
|
|
let wasCanceled = false;
|
|
|
|
function onData(chunk) {
|
|
// Copy the Buffer to detach it from the pool.
|
|
if (Buffer.isBuffer(chunk) && !objectMode)
|
|
chunk = new Uint8Array(chunk);
|
|
controller.enqueue(chunk);
|
|
if (controller.desiredSize <= 0)
|
|
streamReadable.pause();
|
|
}
|
|
|
|
streamReadable.pause();
|
|
|
|
const cleanup = finished(streamReadable, (error) => {
|
|
error = handleKnownInternalErrors(error);
|
|
|
|
cleanup();
|
|
// This is a protection against non-standard, legacy streams
|
|
// that happen to emit an error event again after finished is called.
|
|
streamReadable.on('error', () => {});
|
|
if (error)
|
|
return controller.error(error);
|
|
// Was already canceled
|
|
if (wasCanceled) {
|
|
return;
|
|
}
|
|
controller.close();
|
|
});
|
|
|
|
streamReadable.on('data', onData);
|
|
|
|
return new ReadableStream({
|
|
start(c) { controller = c; },
|
|
|
|
pull() { streamReadable.resume(); },
|
|
|
|
cancel(reason) {
|
|
wasCanceled = true;
|
|
destroy(streamReadable, reason);
|
|
},
|
|
}, strategy);
|
|
}
|
|
|
|
/**
|
|
* @param {ReadableStream} readableStream
|
|
* @param {{
|
|
* highWaterMark? : number,
|
|
* encoding? : string,
|
|
* objectMode? : boolean,
|
|
* signal? : AbortSignal,
|
|
* }} [options]
|
|
* @returns {Readable}
|
|
*/
|
|
function newStreamReadableFromReadableStream(readableStream, options = kEmptyObject) {
|
|
if (!isReadableStream(readableStream)) {
|
|
throw new ERR_INVALID_ARG_TYPE(
|
|
'readableStream',
|
|
'ReadableStream',
|
|
readableStream);
|
|
}
|
|
|
|
validateObject(options, 'options');
|
|
const {
|
|
highWaterMark,
|
|
encoding,
|
|
objectMode = false,
|
|
signal,
|
|
} = options;
|
|
|
|
if (encoding !== undefined && !Buffer.isEncoding(encoding))
|
|
throw new ERR_INVALID_ARG_VALUE(encoding, 'options.encoding');
|
|
validateBoolean(objectMode, 'options.objectMode');
|
|
|
|
const reader = readableStream.getReader();
|
|
let closed = false;
|
|
|
|
const readable = new Readable({
|
|
objectMode,
|
|
highWaterMark,
|
|
encoding,
|
|
signal,
|
|
|
|
read() {
|
|
PromisePrototypeThen(
|
|
reader.read(),
|
|
(chunk) => {
|
|
if (chunk.done) {
|
|
// Value should always be undefined here.
|
|
readable.push(null);
|
|
} else {
|
|
readable.push(chunk.value);
|
|
}
|
|
},
|
|
(error) => destroy(readable, error));
|
|
},
|
|
|
|
destroy(error, callback) {
|
|
function done() {
|
|
try {
|
|
callback(error);
|
|
} catch (error) {
|
|
// In a next tick because this is happening within
|
|
// a promise context, and if there are any errors
|
|
// thrown we don't want those to cause an unhandled
|
|
// rejection. Let's just escape the promise and
|
|
// handle it separately.
|
|
process.nextTick(() => { throw error; });
|
|
}
|
|
}
|
|
|
|
if (!closed) {
|
|
PromisePrototypeThen(
|
|
reader.cancel(error),
|
|
done,
|
|
done);
|
|
return;
|
|
}
|
|
done();
|
|
},
|
|
});
|
|
|
|
PromisePrototypeThen(
|
|
reader.closed,
|
|
() => {
|
|
closed = true;
|
|
},
|
|
(error) => {
|
|
closed = true;
|
|
destroy(readable, error);
|
|
});
|
|
|
|
return readable;
|
|
}
|
|
|
|
/**
|
|
* @typedef {import('./readablestream').ReadableWritablePair
|
|
* } ReadableWritablePair
|
|
* @typedef {import('../../stream').Duplex} Duplex
|
|
*/
|
|
|
|
/**
|
|
* @param {Duplex} duplex
|
|
* @returns {ReadableWritablePair}
|
|
*/
|
|
function newReadableWritablePairFromDuplex(duplex) {
|
|
// Not using the internal/streams/utils isWritableNodeStream and
|
|
// isReadableNodeStream utilities here because they will return false
|
|
// if the duplex was created with writable or readable options set to
|
|
// false. Instead, we'll check the readable and writable state after
|
|
// and return closed WritableStream or closed ReadableStream as
|
|
// necessary.
|
|
if (typeof duplex?._writableState !== 'object' ||
|
|
typeof duplex?._readableState !== 'object') {
|
|
throw new ERR_INVALID_ARG_TYPE('duplex', 'stream.Duplex', duplex);
|
|
}
|
|
|
|
if (isDestroyed(duplex)) {
|
|
const writable = new WritableStream();
|
|
const readable = new ReadableStream();
|
|
writable.close();
|
|
readable.cancel();
|
|
return { readable, writable };
|
|
}
|
|
|
|
const writable =
|
|
isWritable(duplex) ?
|
|
newWritableStreamFromStreamWritable(duplex) :
|
|
new WritableStream();
|
|
|
|
if (!isWritable(duplex))
|
|
writable.close();
|
|
|
|
const readable =
|
|
isReadable(duplex) ?
|
|
newReadableStreamFromStreamReadable(duplex) :
|
|
new ReadableStream();
|
|
|
|
if (!isReadable(duplex))
|
|
readable.cancel();
|
|
|
|
return { writable, readable };
|
|
}
|
|
|
|
/**
|
|
* @param {ReadableWritablePair} pair
|
|
* @param {{
|
|
* allowHalfOpen? : boolean,
|
|
* decodeStrings? : boolean,
|
|
* encoding? : string,
|
|
* highWaterMark? : number,
|
|
* objectMode? : boolean,
|
|
* signal? : AbortSignal,
|
|
* }} [options]
|
|
* @returns {Duplex}
|
|
*/
|
|
function newStreamDuplexFromReadableWritablePair(pair = kEmptyObject, options = kEmptyObject) {
|
|
validateObject(pair, 'pair');
|
|
const {
|
|
readable: readableStream,
|
|
writable: writableStream,
|
|
} = pair;
|
|
|
|
if (!isReadableStream(readableStream)) {
|
|
throw new ERR_INVALID_ARG_TYPE(
|
|
'pair.readable',
|
|
'ReadableStream',
|
|
readableStream);
|
|
}
|
|
if (!isWritableStream(writableStream)) {
|
|
throw new ERR_INVALID_ARG_TYPE(
|
|
'pair.writable',
|
|
'WritableStream',
|
|
writableStream);
|
|
}
|
|
|
|
validateObject(options, 'options');
|
|
const {
|
|
allowHalfOpen = false,
|
|
objectMode = false,
|
|
encoding,
|
|
decodeStrings = true,
|
|
highWaterMark,
|
|
signal,
|
|
} = options;
|
|
|
|
validateBoolean(objectMode, 'options.objectMode');
|
|
if (encoding !== undefined && !Buffer.isEncoding(encoding))
|
|
throw new ERR_INVALID_ARG_VALUE(encoding, 'options.encoding');
|
|
|
|
const writer = writableStream.getWriter();
|
|
const reader = readableStream.getReader();
|
|
let writableClosed = false;
|
|
let readableClosed = false;
|
|
|
|
const duplex = new Duplex({
|
|
allowHalfOpen,
|
|
highWaterMark,
|
|
objectMode,
|
|
encoding,
|
|
decodeStrings,
|
|
signal,
|
|
|
|
writev(chunks, callback) {
|
|
function done(error) {
|
|
error = error.filter((e) => e);
|
|
try {
|
|
callback(error.length === 0 ? undefined : error);
|
|
} catch (error) {
|
|
// In a next tick because this is happening within
|
|
// a promise context, and if there are any errors
|
|
// thrown we don't want those to cause an unhandled
|
|
// rejection. Let's just escape the promise and
|
|
// handle it separately.
|
|
process.nextTick(() => destroy(duplex, error));
|
|
}
|
|
}
|
|
|
|
PromisePrototypeThen(
|
|
writer.ready,
|
|
() => {
|
|
return PromisePrototypeThen(
|
|
SafePromiseAll(
|
|
chunks,
|
|
(data) => writer.write(data.chunk)),
|
|
done,
|
|
done);
|
|
},
|
|
done);
|
|
},
|
|
|
|
write(chunk, encoding, callback) {
|
|
if (typeof chunk === 'string' && decodeStrings && !objectMode) {
|
|
const enc = normalizeEncoding(encoding);
|
|
|
|
if (enc === 'utf8') {
|
|
chunk = encoder.encode(chunk);
|
|
} else {
|
|
chunk = Buffer.from(chunk, encoding);
|
|
chunk = new Uint8Array(
|
|
TypedArrayPrototypeGetBuffer(chunk),
|
|
TypedArrayPrototypeGetByteOffset(chunk),
|
|
TypedArrayPrototypeGetByteLength(chunk),
|
|
);
|
|
}
|
|
}
|
|
|
|
function done(error) {
|
|
try {
|
|
callback(error);
|
|
} catch (error) {
|
|
destroy(duplex, error);
|
|
}
|
|
}
|
|
|
|
PromisePrototypeThen(
|
|
writer.ready,
|
|
() => {
|
|
return PromisePrototypeThen(
|
|
writer.write(chunk),
|
|
done,
|
|
done);
|
|
},
|
|
done);
|
|
},
|
|
|
|
final(callback) {
|
|
function done(error) {
|
|
try {
|
|
callback(error);
|
|
} catch (error) {
|
|
// In a next tick because this is happening within
|
|
// a promise context, and if there are any errors
|
|
// thrown we don't want those to cause an unhandled
|
|
// rejection. Let's just escape the promise and
|
|
// handle it separately.
|
|
process.nextTick(() => destroy(duplex, error));
|
|
}
|
|
}
|
|
|
|
if (!writableClosed) {
|
|
PromisePrototypeThen(
|
|
writer.close(),
|
|
done,
|
|
done);
|
|
}
|
|
},
|
|
|
|
read() {
|
|
PromisePrototypeThen(
|
|
reader.read(),
|
|
(chunk) => {
|
|
if (chunk.done) {
|
|
duplex.push(null);
|
|
} else {
|
|
duplex.push(chunk.value);
|
|
}
|
|
},
|
|
(error) => destroy(duplex, error));
|
|
},
|
|
|
|
destroy(error, callback) {
|
|
function done() {
|
|
try {
|
|
callback(error);
|
|
} catch (error) {
|
|
// In a next tick because this is happening within
|
|
// a promise context, and if there are any errors
|
|
// thrown we don't want those to cause an unhandled
|
|
// rejection. Let's just escape the promise and
|
|
// handle it separately.
|
|
process.nextTick(() => { throw error; });
|
|
}
|
|
}
|
|
|
|
async function closeWriter() {
|
|
if (!writableClosed)
|
|
await writer.abort(error);
|
|
}
|
|
|
|
async function closeReader() {
|
|
if (!readableClosed)
|
|
await reader.cancel(error);
|
|
}
|
|
|
|
if (!writableClosed || !readableClosed) {
|
|
PromisePrototypeThen(
|
|
SafePromiseAll([
|
|
closeWriter(),
|
|
closeReader(),
|
|
]),
|
|
done,
|
|
done);
|
|
return;
|
|
}
|
|
|
|
done();
|
|
},
|
|
});
|
|
|
|
PromisePrototypeThen(
|
|
writer.closed,
|
|
() => {
|
|
writableClosed = true;
|
|
if (!isWritableEnded(duplex))
|
|
destroy(duplex, new ERR_STREAM_PREMATURE_CLOSE());
|
|
},
|
|
(error) => {
|
|
writableClosed = true;
|
|
readableClosed = true;
|
|
destroy(duplex, error);
|
|
});
|
|
|
|
PromisePrototypeThen(
|
|
reader.closed,
|
|
() => {
|
|
readableClosed = true;
|
|
},
|
|
(error) => {
|
|
writableClosed = true;
|
|
readableClosed = true;
|
|
destroy(duplex, error);
|
|
});
|
|
|
|
return duplex;
|
|
}
|
|
|
|
/**
|
|
* @typedef {import('./queuingstrategies').QueuingStrategy} QueuingStrategy
|
|
* @typedef {{}} StreamBase
|
|
* @param {StreamBase} streamBase
|
|
* @param {QueuingStrategy} strategy
|
|
* @returns {WritableStream}
|
|
*/
|
|
function newWritableStreamFromStreamBase(streamBase, strategy) {
|
|
validateObject(streamBase, 'streamBase');
|
|
|
|
let current;
|
|
|
|
function createWriteWrap(controller, promise) {
|
|
const req = new WriteWrap();
|
|
req.handle = streamBase;
|
|
req.oncomplete = onWriteComplete;
|
|
req.async = false;
|
|
req.bytes = 0;
|
|
req.buffer = null;
|
|
req.controller = controller;
|
|
req.promise = promise;
|
|
return req;
|
|
}
|
|
|
|
function onWriteComplete(status) {
|
|
if (status < 0) {
|
|
const error = new ErrnoException(status, 'write', this.error);
|
|
this.promise.reject(error);
|
|
this.controller.error(error);
|
|
return;
|
|
}
|
|
this.promise.resolve();
|
|
}
|
|
|
|
function doWrite(chunk, controller) {
|
|
const promise = PromiseWithResolvers();
|
|
let ret;
|
|
let req;
|
|
try {
|
|
req = createWriteWrap(controller, promise);
|
|
ret = streamBase.writeBuffer(req, chunk);
|
|
if (streamBaseState[kLastWriteWasAsync])
|
|
req.buffer = chunk;
|
|
req.async = !!streamBaseState[kLastWriteWasAsync];
|
|
} catch (error) {
|
|
promise.reject(error);
|
|
}
|
|
|
|
if (ret !== 0)
|
|
promise.reject(new ErrnoException(ret, 'write', req));
|
|
else if (!req.async)
|
|
promise.resolve();
|
|
|
|
return promise.promise;
|
|
}
|
|
|
|
return new WritableStream({
|
|
write(chunk, controller) {
|
|
current = current !== undefined ?
|
|
PromisePrototypeThen(
|
|
current,
|
|
() => doWrite(chunk, controller),
|
|
(error) => controller.error(error)) :
|
|
doWrite(chunk, controller);
|
|
return current;
|
|
},
|
|
|
|
close() {
|
|
const promise = PromiseWithResolvers();
|
|
const req = new ShutdownWrap();
|
|
req.oncomplete = () => promise.resolve();
|
|
const err = streamBase.shutdown(req);
|
|
if (err === 1)
|
|
promise.resolve();
|
|
return promise.promise;
|
|
},
|
|
}, strategy);
|
|
}
|
|
|
|
/**
|
|
* @param {StreamBase} streamBase
|
|
* @param {QueuingStrategy} strategy
|
|
* @returns {ReadableStream}
|
|
*/
|
|
function newReadableStreamFromStreamBase(streamBase, strategy, options = kEmptyObject) {
|
|
validateObject(streamBase, 'streamBase');
|
|
validateObject(options, 'options');
|
|
|
|
const {
|
|
ondone = () => {},
|
|
} = options;
|
|
|
|
if (typeof streamBase.onread === 'function')
|
|
throw new ERR_INVALID_STATE('StreamBase already has a consumer');
|
|
|
|
validateFunction(ondone, 'options.ondone');
|
|
|
|
let controller;
|
|
|
|
streamBase.onread = (arrayBuffer) => {
|
|
const nread = streamBaseState[kReadBytesOrError];
|
|
|
|
if (nread === 0)
|
|
return;
|
|
|
|
try {
|
|
if (nread === UV_EOF) {
|
|
controller.close();
|
|
streamBase.readStop();
|
|
try {
|
|
ondone();
|
|
} catch (error) {
|
|
controller.error(error);
|
|
}
|
|
return;
|
|
}
|
|
|
|
controller.enqueue(arrayBuffer);
|
|
|
|
if (controller.desiredSize <= 0)
|
|
streamBase.readStop();
|
|
} catch (error) {
|
|
controller.error(error);
|
|
streamBase.readStop();
|
|
}
|
|
};
|
|
|
|
return new ReadableStream({
|
|
start(c) { controller = c; },
|
|
|
|
pull() {
|
|
streamBase.readStart();
|
|
},
|
|
|
|
cancel() {
|
|
const promise = PromiseWithResolvers();
|
|
try {
|
|
ondone();
|
|
} catch (error) {
|
|
promise.reject(error);
|
|
return promise.promise;
|
|
}
|
|
const req = new ShutdownWrap();
|
|
req.oncomplete = () => promise.resolve();
|
|
const err = streamBase.shutdown(req);
|
|
if (err === 1)
|
|
promise.resolve();
|
|
return promise.promise;
|
|
},
|
|
}, strategy);
|
|
}
|
|
|
|
module.exports = {
|
|
newWritableStreamFromStreamWritable,
|
|
newReadableStreamFromStreamReadable,
|
|
newStreamWritableFromWritableStream,
|
|
newStreamReadableFromReadableStream,
|
|
newReadableWritablePairFromDuplex,
|
|
newStreamDuplexFromReadableWritablePair,
|
|
newWritableStreamFromStreamBase,
|
|
newReadableStreamFromStreamBase,
|
|
};
|