0
0
mirror of https://github.com/nodejs/node.git synced 2024-12-01 16:10:02 +01:00
nodejs/lib/internal/fs/streams.js
Robert Nagy 95d6ad67bf fs: remove unnecessary argument check
Writable already assures that only Buffer's are passed to _write. Also
this is not the "correct" way to handle errors inside _write.

PR-URL: https://github.com/nodejs/node/pull/29043
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
Reviewed-By: Rich Trott <rtrott@gmail.com>
2019-09-22 16:23:02 -07:00

389 lines
9.6 KiB
JavaScript

'use strict';
const { Math, Object } = primordials;
const {
ERR_OUT_OF_RANGE
} = require('internal/errors').codes;
const { validateNumber } = require('internal/validators');
const fs = require('fs');
const { Buffer } = require('buffer');
const {
copyObject,
getOptions,
} = require('internal/fs/utils');
const { Readable, Writable } = require('stream');
const { toPathIfFileURL } = require('internal/url');
const kMinPoolSpace = 128;
let pool;
// It can happen that we expect to read a large chunk of data, and reserve
// a large chunk of the pool accordingly, but the read() call only filled
// a portion of it. If a concurrently executing read() then uses the same pool,
// the "reserved" portion cannot be used, so we allow it to be re-used as a
// new pool later.
const poolFragments = [];
function allocNewPool(poolSize) {
if (poolFragments.length > 0)
pool = poolFragments.pop();
else
pool = Buffer.allocUnsafe(poolSize);
pool.used = 0;
}
// Check the `this.start` and `this.end` of stream.
function checkPosition(pos, name) {
if (!Number.isSafeInteger(pos)) {
validateNumber(pos, name);
if (!Number.isInteger(pos))
throw new ERR_OUT_OF_RANGE(name, 'an integer', pos);
throw new ERR_OUT_OF_RANGE(name, '>= 0 and <= 2 ** 53 - 1', pos);
}
if (pos < 0) {
throw new ERR_OUT_OF_RANGE(name, '>= 0 and <= 2 ** 53 - 1', pos);
}
}
function roundUpToMultipleOf8(n) {
return (n + 7) & ~7; // Align to 8 byte boundary.
}
function ReadStream(path, options) {
if (!(this instanceof ReadStream))
return new ReadStream(path, options);
// A little bit bigger buffer and water marks by default
options = copyObject(getOptions(options, {}));
if (options.highWaterMark === undefined)
options.highWaterMark = 64 * 1024;
// For backwards compat do not emit close on destroy.
if (options.emitClose === undefined) {
options.emitClose = false;
}
Readable.call(this, options);
// Path will be ignored when fd is specified, so it can be falsy
this.path = toPathIfFileURL(path);
this.fd = options.fd === undefined ? null : options.fd;
this.flags = options.flags === undefined ? 'r' : options.flags;
this.mode = options.mode === undefined ? 0o666 : options.mode;
this.start = options.start;
this.end = options.end;
this.autoClose = options.autoClose === undefined ? true : options.autoClose;
this.pos = undefined;
this.bytesRead = 0;
this.closed = false;
if (this.start !== undefined) {
checkPosition(this.start, 'start');
this.pos = this.start;
}
if (this.end === undefined) {
this.end = Infinity;
} else if (this.end !== Infinity) {
checkPosition(this.end, 'end');
if (this.start !== undefined && this.start > this.end) {
throw new ERR_OUT_OF_RANGE(
'start',
`<= "end" (here: ${this.end})`,
this.start
);
}
}
if (typeof this.fd !== 'number')
this.open();
this.on('end', function() {
if (this.autoClose) {
this.destroy();
}
});
}
Object.setPrototypeOf(ReadStream.prototype, Readable.prototype);
Object.setPrototypeOf(ReadStream, Readable);
ReadStream.prototype.open = function() {
fs.open(this.path, this.flags, this.mode, (er, fd) => {
if (er) {
if (this.autoClose) {
this.destroy();
}
this.emit('error', er);
return;
}
this.fd = fd;
this.emit('open', fd);
this.emit('ready');
// Start the flow of data.
this.read();
});
};
ReadStream.prototype._read = function(n) {
if (typeof this.fd !== 'number') {
return this.once('open', function() {
this._read(n);
});
}
if (!pool || pool.length - pool.used < kMinPoolSpace) {
// Discard the old pool.
allocNewPool(this.readableHighWaterMark);
}
// Grab another reference to the pool in the case that while we're
// in the thread pool another read() finishes up the pool, and
// allocates a new one.
const thisPool = pool;
let toRead = Math.min(pool.length - pool.used, n);
const start = pool.used;
if (this.pos !== undefined)
toRead = Math.min(this.end - this.pos + 1, toRead);
else
toRead = Math.min(this.end - this.bytesRead + 1, toRead);
// Already read everything we were supposed to read!
// treat as EOF.
if (toRead <= 0)
return this.push(null);
// the actual read.
fs.read(this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) => {
if (er) {
if (this.autoClose) {
this.destroy();
}
this.emit('error', er);
} else {
let b = null;
// Now that we know how much data we have actually read, re-wind the
// 'used' field if we can, and otherwise allow the remainder of our
// reservation to be used as a new pool later.
if (start + toRead === thisPool.used && thisPool === pool) {
const newUsed = thisPool.used + bytesRead - toRead;
thisPool.used = roundUpToMultipleOf8(newUsed);
} else {
// Round down to the next lowest multiple of 8 to ensure the new pool
// fragment start and end positions are aligned to an 8 byte boundary.
const alignedEnd = (start + toRead) & ~7;
const alignedStart = roundUpToMultipleOf8(start + bytesRead);
if (alignedEnd - alignedStart >= kMinPoolSpace) {
poolFragments.push(thisPool.slice(alignedStart, alignedEnd));
}
}
if (bytesRead > 0) {
this.bytesRead += bytesRead;
b = thisPool.slice(start, start + bytesRead);
}
this.push(b);
}
});
// Move the pool positions, and internal position for reading.
if (this.pos !== undefined)
this.pos += toRead;
pool.used = roundUpToMultipleOf8(pool.used + toRead);
};
ReadStream.prototype._destroy = function(err, cb) {
if (typeof this.fd !== 'number') {
this.once('open', closeFsStream.bind(null, this, cb, err));
return;
}
closeFsStream(this, cb, err);
this.fd = null;
};
function closeFsStream(stream, cb, err) {
fs.close(stream.fd, (er) => {
er = er || err;
cb(er);
stream.closed = true;
if (!er)
stream.emit('close');
});
}
ReadStream.prototype.close = function(cb) {
this.destroy(null, cb);
};
Object.defineProperty(ReadStream.prototype, 'pending', {
get() { return this.fd === null; },
configurable: true
});
function WriteStream(path, options) {
if (!(this instanceof WriteStream))
return new WriteStream(path, options);
options = copyObject(getOptions(options, {}));
// Only buffers are supported.
options.decodeStrings = true;
// For backwards compat do not emit close on destroy.
if (options.emitClose === undefined) {
options.emitClose = false;
}
Writable.call(this, options);
// Path will be ignored when fd is specified, so it can be falsy
this.path = toPathIfFileURL(path);
this.fd = options.fd === undefined ? null : options.fd;
this.flags = options.flags === undefined ? 'w' : options.flags;
this.mode = options.mode === undefined ? 0o666 : options.mode;
this.start = options.start;
this.autoClose = options.autoClose === undefined ? true : !!options.autoClose;
this.pos = undefined;
this.bytesWritten = 0;
this.closed = false;
if (this.start !== undefined) {
checkPosition(this.start, 'start');
this.pos = this.start;
}
if (options.encoding)
this.setDefaultEncoding(options.encoding);
if (typeof this.fd !== 'number')
this.open();
}
Object.setPrototypeOf(WriteStream.prototype, Writable.prototype);
Object.setPrototypeOf(WriteStream, Writable);
WriteStream.prototype._final = function(callback) {
if (this.autoClose) {
this.destroy();
}
callback();
};
WriteStream.prototype.open = function() {
fs.open(this.path, this.flags, this.mode, (er, fd) => {
if (er) {
if (this.autoClose) {
this.destroy();
}
this.emit('error', er);
return;
}
this.fd = fd;
this.emit('open', fd);
this.emit('ready');
});
};
WriteStream.prototype._write = function(data, encoding, cb) {
if (typeof this.fd !== 'number') {
return this.once('open', function() {
this._write(data, encoding, cb);
});
}
fs.write(this.fd, data, 0, data.length, this.pos, (er, bytes) => {
if (er) {
if (this.autoClose) {
this.destroy();
}
return cb(er);
}
this.bytesWritten += bytes;
cb();
});
if (this.pos !== undefined)
this.pos += data.length;
};
WriteStream.prototype._writev = function(data, cb) {
if (typeof this.fd !== 'number') {
return this.once('open', function() {
this._writev(data, cb);
});
}
const self = this;
const len = data.length;
const chunks = new Array(len);
let size = 0;
for (var i = 0; i < len; i++) {
const chunk = data[i].chunk;
chunks[i] = chunk;
size += chunk.length;
}
fs.writev(this.fd, chunks, this.pos, function(er, bytes) {
if (er) {
self.destroy();
return cb(er);
}
self.bytesWritten += bytes;
cb();
});
if (this.pos !== undefined)
this.pos += size;
};
WriteStream.prototype._destroy = ReadStream.prototype._destroy;
WriteStream.prototype.close = function(cb) {
if (cb) {
if (this.closed) {
process.nextTick(cb);
return;
} else {
this.on('close', cb);
}
}
// If we are not autoClosing, we should call
// destroy on 'finish'.
if (!this.autoClose) {
this.on('finish', this.destroy.bind(this));
}
// We use end() instead of destroy() because of
// https://github.com/nodejs/node/issues/2006
this.end();
};
// There is no shutdown() for files.
WriteStream.prototype.destroySoon = WriteStream.prototype.end;
Object.defineProperty(WriteStream.prototype, 'pending', {
get() { return this.fd === null; },
configurable: true
});
module.exports = {
ReadStream,
WriteStream
};