mirror of
https://github.com/nodejs/node.git
synced 2024-11-21 21:19:50 +01:00
b4a962d0e6
Fixes: https://github.com/nodejs/node/issues/46595 PR-URL: https://github.com/nodejs/node/pull/46600 Reviewed-By: Robert Nagy <ronagy@icloud.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Luigi Pinca <luigipinca@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Colin Ihrig <cjihrig@gmail.com>
423 lines
7.8 KiB
JavaScript
423 lines
7.8 KiB
JavaScript
'use strict';
|
|
|
|
const common = require('../common');
|
|
const assert = require('assert');
|
|
const { Readable, Writable, Transform, pipeline, PassThrough } = require('stream');
|
|
const { pipeline: pipelinePromise } = require('stream/promises');
|
|
const { ReadableStream, WritableStream, TransformStream } = require('stream/web');
|
|
const http = require('http');
|
|
|
|
{
|
|
const values = [];
|
|
let c;
|
|
const rs = new ReadableStream({
|
|
start(controller) {
|
|
c = controller;
|
|
}
|
|
});
|
|
const ws = new WritableStream({
|
|
write(chunk) {
|
|
values.push(chunk);
|
|
}
|
|
});
|
|
|
|
pipeline(rs, ws, common.mustSucceed(() => {
|
|
assert.deepStrictEqual(values, ['hello', 'world']);
|
|
}));
|
|
|
|
c.enqueue('hello');
|
|
c.enqueue('world');
|
|
c.close();
|
|
}
|
|
|
|
{
|
|
let c;
|
|
const rs = new ReadableStream({
|
|
start(controller) {
|
|
c = controller;
|
|
}
|
|
});
|
|
|
|
const ws = new WritableStream({
|
|
write() { }
|
|
});
|
|
|
|
pipeline(rs, ws, common.mustCall((err) => {
|
|
assert.strictEqual(err?.message, 'kaboom');
|
|
}));
|
|
|
|
c.error(new Error('kaboom'));
|
|
}
|
|
|
|
{
|
|
let c;
|
|
const values = [];
|
|
const rs = new ReadableStream({
|
|
start(controller) {
|
|
c = controller;
|
|
}
|
|
});
|
|
|
|
const ts = new TransformStream({
|
|
transform(chunk, controller) {
|
|
controller.enqueue(chunk?.toString().toUpperCase());
|
|
}
|
|
});
|
|
|
|
const ws = new WritableStream({
|
|
write(chunk) {
|
|
values.push(chunk?.toString());
|
|
}
|
|
});
|
|
|
|
pipeline(rs, ts, ws, common.mustSucceed(() => {
|
|
assert.deepStrictEqual(values, ['HELLO', 'WORLD']);
|
|
}));
|
|
|
|
c.enqueue('hello');
|
|
c.enqueue('world');
|
|
c.close();
|
|
}
|
|
|
|
{
|
|
function makeTransformStream() {
|
|
return new TransformStream({
|
|
transform(chunk, controller) {
|
|
controller.enqueue(chunk?.toString());
|
|
}
|
|
});
|
|
}
|
|
|
|
let c;
|
|
const rs = new ReadableStream({
|
|
start(controller) {
|
|
c = controller;
|
|
}
|
|
});
|
|
|
|
const ws = new WritableStream({
|
|
write() { }
|
|
});
|
|
|
|
pipeline(rs,
|
|
makeTransformStream(),
|
|
makeTransformStream(),
|
|
makeTransformStream(),
|
|
makeTransformStream(),
|
|
ws,
|
|
common.mustCall((err) => {
|
|
assert.strictEqual(err?.message, 'kaboom');
|
|
}));
|
|
|
|
c.error(new Error('kaboom'));
|
|
}
|
|
|
|
{
|
|
const values = [];
|
|
|
|
const r = new Readable({
|
|
read() { }
|
|
});
|
|
|
|
const ws = new WritableStream({
|
|
write(chunk) {
|
|
values.push(chunk?.toString());
|
|
}
|
|
});
|
|
|
|
pipeline(r, ws, common.mustSucceed(() => {
|
|
assert.deepStrictEqual(values, ['helloworld']);
|
|
}));
|
|
|
|
r.push('hello');
|
|
r.push('world');
|
|
r.push(null);
|
|
}
|
|
|
|
{
|
|
const values = [];
|
|
let c;
|
|
const rs = new ReadableStream({
|
|
start(controller) {
|
|
c = controller;
|
|
}
|
|
});
|
|
|
|
const w = new Writable({
|
|
write(chunk, encoding, callback) {
|
|
values.push(chunk?.toString());
|
|
callback();
|
|
}
|
|
});
|
|
|
|
pipeline(rs, w, common.mustSucceed(() => {
|
|
assert.deepStrictEqual(values, ['hello', 'world']);
|
|
}));
|
|
|
|
c.enqueue('hello');
|
|
c.enqueue('world');
|
|
c.close();
|
|
}
|
|
|
|
{
|
|
const values = [];
|
|
let c;
|
|
const rs = new ReadableStream({
|
|
start(controller) {
|
|
c = controller;
|
|
}
|
|
});
|
|
|
|
const ws = new WritableStream({
|
|
write(chunk) {
|
|
values.push(chunk?.toString());
|
|
}
|
|
});
|
|
|
|
const t = new Transform({
|
|
transform(chunk, encoding, callback) {
|
|
callback(null, chunk?.toString().toUpperCase());
|
|
}
|
|
});
|
|
|
|
pipeline(rs, t, ws, common.mustSucceed(() => {
|
|
assert.deepStrictEqual(values, ['HELLOWORLD']);
|
|
}));
|
|
|
|
c.enqueue('hello');
|
|
c.enqueue('world');
|
|
c.close();
|
|
}
|
|
|
|
{
|
|
const server = http.createServer((req, res) => {
|
|
const rs = new ReadableStream({
|
|
start(controller) {
|
|
controller.enqueue('hello');
|
|
controller.enqueue('world');
|
|
controller.close();
|
|
}
|
|
});
|
|
pipeline(rs, res, common.mustSucceed(() => {}));
|
|
});
|
|
|
|
server.listen(0, common.mustCall(() => {
|
|
const req = http.request({
|
|
port: server.address().port
|
|
});
|
|
req.end();
|
|
const values = [];
|
|
req.on('response', (res) => {
|
|
res.on('data', (chunk) => {
|
|
values.push(chunk?.toString());
|
|
});
|
|
res.on('end', common.mustCall(() => {
|
|
assert.deepStrictEqual(values, ['hello', 'world']);
|
|
server.close();
|
|
}));
|
|
});
|
|
}));
|
|
}
|
|
|
|
{
|
|
const values = [];
|
|
const server = http.createServer((req, res) => {
|
|
const ts = new TransformStream({
|
|
transform(chunk, controller) {
|
|
controller.enqueue(chunk?.toString().toUpperCase());
|
|
}
|
|
});
|
|
pipeline(req, ts, res, common.mustSucceed());
|
|
});
|
|
|
|
server.listen(0, () => {
|
|
const req = http.request({
|
|
port: server.address().port,
|
|
method: 'POST',
|
|
});
|
|
|
|
|
|
const rs = new ReadableStream({
|
|
start(controller) {
|
|
controller.enqueue('hello');
|
|
controller.close();
|
|
}
|
|
});
|
|
|
|
pipeline(rs, req, common.mustSucceed());
|
|
|
|
req.on('response', (res) => {
|
|
res.on('data', (chunk) => {
|
|
values.push(chunk?.toString());
|
|
}
|
|
);
|
|
res.on('end', common.mustCall(() => {
|
|
assert.deepStrictEqual(values, ['HELLO']);
|
|
server.close();
|
|
}));
|
|
});
|
|
});
|
|
}
|
|
|
|
{
|
|
const values = [];
|
|
let c;
|
|
const rs = new ReadableStream({
|
|
start(controller) {
|
|
c = controller;
|
|
}
|
|
});
|
|
const ws = new WritableStream({
|
|
write(chunk) {
|
|
values.push(chunk?.toString());
|
|
}
|
|
});
|
|
|
|
pipelinePromise(rs, ws).then(common.mustCall(() => {
|
|
assert.deepStrictEqual(values, ['hello', 'world']);
|
|
}));
|
|
|
|
c.enqueue('hello');
|
|
c.enqueue('world');
|
|
c.close();
|
|
}
|
|
|
|
{
|
|
let c;
|
|
const rs = new ReadableStream({
|
|
start(controller) {
|
|
c = controller;
|
|
}
|
|
});
|
|
|
|
const ws = new WritableStream({
|
|
write() { }
|
|
});
|
|
|
|
pipelinePromise(rs, ws).then(common.mustNotCall()).catch(common.mustCall((err) => {
|
|
assert.strictEqual(err?.message, 'kaboom');
|
|
}));
|
|
|
|
c.error(new Error('kaboom'));
|
|
}
|
|
|
|
{
|
|
const values = [];
|
|
let c;
|
|
const rs = new ReadableStream({
|
|
start(controller) {
|
|
c = controller;
|
|
}
|
|
});
|
|
|
|
pipeline(rs, async function(source) {
|
|
for await (const chunk of source) {
|
|
values.push(chunk?.toString());
|
|
}
|
|
}, common.mustSucceed(() => {
|
|
assert.deepStrictEqual(values, ['hello', 'world']);
|
|
}));
|
|
|
|
c.enqueue('hello');
|
|
c.enqueue('world');
|
|
c.close();
|
|
}
|
|
|
|
{
|
|
const rs = new ReadableStream({
|
|
start() {}
|
|
});
|
|
|
|
pipeline(rs, async function(source) {
|
|
throw new Error('kaboom');
|
|
}, (err) => {
|
|
assert.strictEqual(err?.message, 'kaboom');
|
|
});
|
|
}
|
|
|
|
{
|
|
const values = [];
|
|
let c;
|
|
const rs = new ReadableStream({
|
|
start(controller) {
|
|
c = controller;
|
|
}
|
|
});
|
|
|
|
const ts = new TransformStream({
|
|
transform(chunk, controller) {
|
|
controller.enqueue(chunk?.toString().toUpperCase());
|
|
}
|
|
});
|
|
|
|
pipeline(rs, ts, async function(source) {
|
|
for await (const chunk of source) {
|
|
values.push(chunk?.toString());
|
|
}
|
|
}, common.mustSucceed(() => {
|
|
assert.deepStrictEqual(values, ['HELLO', 'WORLD']);
|
|
}));
|
|
|
|
c.enqueue('hello');
|
|
c.enqueue('world');
|
|
c.close();
|
|
}
|
|
|
|
{
|
|
const values = [];
|
|
let c;
|
|
const rs = new ReadableStream({
|
|
start(controller) {
|
|
c = controller;
|
|
}
|
|
});
|
|
|
|
const ws = new WritableStream({
|
|
write(chunk) {
|
|
values.push(chunk?.toString());
|
|
}
|
|
});
|
|
|
|
pipeline(rs, async function* (source) {
|
|
for await (const chunk of source) {
|
|
yield chunk?.toString().toUpperCase();
|
|
}
|
|
}, ws, common.mustSucceed(() => {
|
|
assert.deepStrictEqual(values, ['HELLO', 'WORLD']);
|
|
}));
|
|
|
|
c.enqueue('hello');
|
|
c.enqueue('world');
|
|
c.close();
|
|
}
|
|
|
|
{
|
|
let c;
|
|
const rs = new ReadableStream({
|
|
start(controller) {
|
|
c = controller;
|
|
}
|
|
});
|
|
|
|
const ws = new WritableStream({
|
|
write(chunk) { }
|
|
}, { highWaterMark: 0 });
|
|
|
|
pipeline(rs, ws, common.mustNotCall());
|
|
|
|
for (let i = 0; i < 10; i++) {
|
|
c.enqueue(`${i}`);
|
|
}
|
|
c.close();
|
|
}
|
|
|
|
{
|
|
const rs = new ReadableStream({
|
|
start(controller) {
|
|
controller.close();
|
|
}
|
|
});
|
|
|
|
pipeline(rs, new PassThrough(), common.mustSucceed());
|
|
}
|