mirror of
https://github.com/nodejs/node.git
synced 2024-12-01 16:10:02 +01:00
86433979c6
This handles the fact that stream.Writable inherits from the Stream class, meaning that it has the legacy pipe() method. Override that with a pipe() method that emits an error. Ensure that Duplex streams ARE still pipe()able, however. Since the 'readable' flag on streams is sometimes temporary, it's probably better not to put too much weight on that. But if something is an instanceof Writable, rather than of Readable or Duplex, then it's safe to say that reading from it is the wrong thing to do. Fix #3647
329 lines
9.4 KiB
JavaScript
329 lines
9.4 KiB
JavaScript
// Copyright Joyent, Inc. and other Node contributors.
|
|
//
|
|
// Permission is hereby granted, free of charge, to any person obtaining a
|
|
// copy of this software and associated documentation files (the
|
|
// "Software"), to deal in the Software without restriction, including
|
|
// without limitation the rights to use, copy, modify, merge, publish,
|
|
// distribute, sublicense, and/or sell copies of the Software, and to permit
|
|
// persons to whom the Software is furnished to do so, subject to the
|
|
// following conditions:
|
|
//
|
|
// The above copyright notice and this permission notice shall be included
|
|
// in all copies or substantial portions of the Software.
|
|
//
|
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
|
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
|
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
|
|
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
|
|
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
|
|
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
|
|
// USE OR OTHER DEALINGS IN THE SOFTWARE.
|
|
|
|
// A bit simpler than readable streams.
|
|
// Implement an async ._write(chunk, cb), and it'll handle all
|
|
// the drain event emission and buffering.
|
|
|
|
module.exports = Writable;
|
|
Writable.WritableState = WritableState;
|
|
|
|
var util = require('util');
|
|
var assert = require('assert');
|
|
var Stream = require('stream');
|
|
|
|
util.inherits(Writable, Stream);
|
|
|
|
function WritableState(options, stream) {
|
|
options = options || {};
|
|
|
|
// the point at which write() starts returning false
|
|
// Note: 0 is a valid value, means that we always return false if
|
|
// the entire buffer is not flushed immediately on write()
|
|
var hwm = options.highWaterMark;
|
|
this.highWaterMark = (hwm || hwm === 0) ? hwm : 16 * 1024;
|
|
|
|
// object stream flag to indicate whether or not this stream
|
|
// contains buffers or objects.
|
|
this.objectMode = !!options.objectMode;
|
|
|
|
// cast to ints.
|
|
this.highWaterMark = ~~this.highWaterMark;
|
|
|
|
this.needDrain = false;
|
|
// at the start of calling end()
|
|
this.ending = false;
|
|
// when end() has been called, and returned
|
|
this.ended = false;
|
|
// when 'finish' has emitted
|
|
this.finished = false;
|
|
// when 'finish' is being emitted
|
|
this.finishing = false;
|
|
|
|
// should we decode strings into buffers before passing to _write?
|
|
// this is here so that some node-core streams can optimize string
|
|
// handling at a lower level.
|
|
var noDecode = options.decodeStrings === false;
|
|
this.decodeStrings = !noDecode;
|
|
|
|
// not an actual buffer we keep track of, but a measurement
|
|
// of how much we're waiting to get pushed to some underlying
|
|
// socket or file.
|
|
this.length = 0;
|
|
|
|
// a flag to see when we're in the middle of a write.
|
|
this.writing = false;
|
|
|
|
// a flag to be able to tell if the onwrite cb is called immediately,
|
|
// or on a later tick. We set this to true at first, becuase any
|
|
// actions that shouldn't happen until "later" should generally also
|
|
// not happen before the first write call.
|
|
this.sync = true;
|
|
|
|
// a flag to know if we're processing previously buffered items, which
|
|
// may call the _write() callback in the same tick, so that we don't
|
|
// end up in an overlapped onwrite situation.
|
|
this.bufferProcessing = false;
|
|
|
|
// the callback that's passed to _write(chunk,cb)
|
|
this.onwrite = function(er) {
|
|
onwrite(stream, er);
|
|
};
|
|
|
|
// the callback that the user supplies to write(chunk,encoding,cb)
|
|
this.writecb = null;
|
|
|
|
// the amount that is being written when _write is called.
|
|
this.writelen = 0;
|
|
|
|
this.buffer = [];
|
|
}
|
|
|
|
function Writable(options) {
|
|
// Writable ctor is applied to Duplexes, though they're not
|
|
// instanceof Writable, they're instanceof Readable.
|
|
if (!(this instanceof Writable) && !(this instanceof Stream.Duplex))
|
|
return new Writable(options);
|
|
|
|
this._writableState = new WritableState(options, this);
|
|
|
|
// legacy.
|
|
this.writable = true;
|
|
|
|
Stream.call(this);
|
|
}
|
|
|
|
// Otherwise people can pipe Writable streams, which is just wrong.
|
|
Writable.prototype.pipe = function() {
|
|
this.emit('error', new Error('Cannot pipe. Not readable.'));
|
|
};
|
|
|
|
// Override this method or _write(chunk, cb)
|
|
Writable.prototype.write = function(chunk, encoding, cb) {
|
|
var state = this._writableState;
|
|
|
|
if (typeof encoding === 'function') {
|
|
cb = encoding;
|
|
encoding = null;
|
|
}
|
|
|
|
if (state.ended) {
|
|
var er = new Error('write after end');
|
|
if (typeof cb === 'function')
|
|
cb(er);
|
|
this.emit('error', er);
|
|
return;
|
|
}
|
|
|
|
// If we get something that is not a buffer, string, null, or undefined,
|
|
// and we're not in objectMode, then that's an error.
|
|
// Otherwise stream chunks are all considered to be of length=1, and the
|
|
// watermarks determine how many objects to keep in the buffer, rather than
|
|
// how many bytes or characters.
|
|
if (!Buffer.isBuffer(chunk) &&
|
|
'string' !== typeof chunk &&
|
|
chunk !== null &&
|
|
chunk !== undefined &&
|
|
!state.objectMode) {
|
|
var er = new TypeError('Invalid non-string/buffer chunk');
|
|
if (typeof cb === 'function')
|
|
cb(er);
|
|
this.emit('error', er);
|
|
return;
|
|
}
|
|
|
|
var len;
|
|
if (state.objectMode)
|
|
len = 1;
|
|
else {
|
|
len = chunk.length;
|
|
if (false === state.decodeStrings)
|
|
chunk = [chunk, encoding || 'utf8'];
|
|
else if (typeof chunk === 'string') {
|
|
chunk = new Buffer(chunk, encoding);
|
|
len = chunk.length;
|
|
}
|
|
}
|
|
|
|
state.length += len;
|
|
|
|
var ret = state.length < state.highWaterMark;
|
|
if (ret === false)
|
|
state.needDrain = true;
|
|
|
|
// if we're already writing something, then just put this
|
|
// in the queue, and wait our turn.
|
|
if (state.writing) {
|
|
state.buffer.push([chunk, cb]);
|
|
return ret;
|
|
}
|
|
|
|
state.writing = true;
|
|
state.sync = true;
|
|
state.writelen = len;
|
|
state.writecb = cb;
|
|
this._write(chunk, state.onwrite);
|
|
state.sync = false;
|
|
|
|
return ret;
|
|
};
|
|
|
|
function onwrite(stream, er) {
|
|
var state = stream._writableState;
|
|
var sync = state.sync;
|
|
var cb = state.writecb;
|
|
var len = state.writelen;
|
|
|
|
state.writing = false;
|
|
state.writelen = null;
|
|
state.writecb = null;
|
|
|
|
if (er) {
|
|
if (cb) {
|
|
// If _write(chunk,cb) calls cb() in this tick, we still defer
|
|
// the *user's* write callback to the next tick.
|
|
// Never present an external API that is *sometimes* async!
|
|
if (sync)
|
|
process.nextTick(function() {
|
|
cb(er);
|
|
});
|
|
else
|
|
cb(er);
|
|
}
|
|
|
|
// backwards compatibility. still emit if there was a cb.
|
|
stream.emit('error', er);
|
|
return;
|
|
}
|
|
state.length -= len;
|
|
|
|
if (cb) {
|
|
// Don't call the cb until the next tick if we're in sync mode.
|
|
if (sync)
|
|
process.nextTick(cb);
|
|
else
|
|
cb();
|
|
}
|
|
|
|
if (state.length === 0 && (state.ended || state.ending) &&
|
|
!state.finished && !state.finishing) {
|
|
// emit 'finish' at the very end.
|
|
state.finishing = true;
|
|
stream.emit('finish');
|
|
state.finished = true;
|
|
return;
|
|
}
|
|
|
|
if (state.length === 0 && state.needDrain) {
|
|
// Must force callback to be called on nextTick, so that we don't
|
|
// emit 'drain' before the write() consumer gets the 'false' return
|
|
// value, and has a chance to attach a 'drain' listener.
|
|
process.nextTick(function() {
|
|
if (!state.needDrain)
|
|
return;
|
|
state.needDrain = false;
|
|
stream.emit('drain');
|
|
});
|
|
}
|
|
|
|
// if there's something in the buffer waiting, then process it
|
|
// It would be nice if there were TCO in JS, and we could just
|
|
// shift the top off the buffer and _write that, but that approach
|
|
// causes RangeErrors when you have a very large number of very
|
|
// small writes, and is not very efficient otherwise.
|
|
if (!state.bufferProcessing && state.buffer.length) {
|
|
state.bufferProcessing = true;
|
|
|
|
for (var c = 0; c < state.buffer.length; c++) {
|
|
var chunkCb = state.buffer[c];
|
|
var chunk = chunkCb[0];
|
|
cb = chunkCb[1];
|
|
|
|
if (state.objectMode)
|
|
len = 1;
|
|
else if (false === state.decodeStrings)
|
|
len = chunk[0].length;
|
|
else
|
|
len = chunk.length;
|
|
|
|
state.writelen = len;
|
|
state.writecb = cb;
|
|
state.writechunk = chunk;
|
|
state.writing = true;
|
|
state.sync = true;
|
|
stream._write(chunk, state.onwrite);
|
|
state.sync = false;
|
|
|
|
// if we didn't call the onwrite immediately, then
|
|
// it means that we need to wait until it does.
|
|
// also, that means that the chunk and cb are currently
|
|
// being processed, so move the buffer counter past them.
|
|
if (state.writing) {
|
|
c++;
|
|
break;
|
|
}
|
|
}
|
|
|
|
state.bufferProcessing = false;
|
|
if (c < state.buffer.length)
|
|
state.buffer = state.buffer.slice(c);
|
|
else
|
|
state.buffer.length = 0;
|
|
}
|
|
}
|
|
|
|
Writable.prototype._write = function(chunk, cb) {
|
|
process.nextTick(function() {
|
|
cb(new Error('not implemented'));
|
|
});
|
|
};
|
|
|
|
Writable.prototype.end = function(chunk, encoding, cb) {
|
|
var state = this._writableState;
|
|
|
|
// ignore unnecessary end() calls.
|
|
if (state.ending || state.ended || state.finished)
|
|
return;
|
|
|
|
if (typeof chunk === 'function') {
|
|
cb = chunk;
|
|
chunk = null;
|
|
encoding = null;
|
|
} else if (typeof encoding === 'function') {
|
|
cb = encoding;
|
|
encoding = null;
|
|
}
|
|
|
|
state.ending = true;
|
|
if (chunk)
|
|
this.write(chunk, encoding, cb);
|
|
else if (state.length === 0 && !state.finishing && !state.finished) {
|
|
state.finishing = true;
|
|
this.emit('finish');
|
|
state.finished = true;
|
|
if (cb) process.nextTick(cb);
|
|
} else if (cb) {
|
|
this.once('finish', cb);
|
|
}
|
|
|
|
state.ended = true;
|
|
};
|