2012-10-03 00:44:50 +02:00
|
|
|
// Copyright Joyent, Inc. and other Node contributors.
|
|
|
|
//
|
|
|
|
// Permission is hereby granted, free of charge, to any person obtaining a
|
|
|
|
// copy of this software and associated documentation files (the
|
|
|
|
// "Software"), to deal in the Software without restriction, including
|
|
|
|
// without limitation the rights to use, copy, modify, merge, publish,
|
|
|
|
// distribute, sublicense, and/or sell copies of the Software, and to permit
|
|
|
|
// persons to whom the Software is furnished to do so, subject to the
|
|
|
|
// following conditions:
|
|
|
|
//
|
|
|
|
// The above copyright notice and this permission notice shall be included
|
|
|
|
// in all copies or substantial portions of the Software.
|
|
|
|
//
|
|
|
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
|
|
|
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
|
|
|
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
|
|
|
|
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
|
|
|
|
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
|
|
|
|
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
|
|
|
|
// USE OR OTHER DEALINGS IN THE SOFTWARE.
|
|
|
|
|
|
|
|
// A bit simpler than readable streams.
|
|
|
|
// Implement an async ._write(chunk, cb), and it'll handle all
|
|
|
|
// the drain event emission and buffering.
|
|
|
|
|
2012-11-13 08:30:10 +01:00
|
|
|
module.exports = Writable;
|
|
|
|
Writable.WritableState = WritableState;
|
2012-10-03 00:44:50 +02:00
|
|
|
|
|
|
|
var util = require('util');
|
2012-11-13 08:33:06 +01:00
|
|
|
var assert = require('assert');
|
2012-10-03 00:44:50 +02:00
|
|
|
var Stream = require('stream');
|
|
|
|
|
|
|
|
util.inherits(Writable, Stream);
|
|
|
|
|
2012-11-13 08:33:06 +01:00
|
|
|
function WritableState(options, stream) {
|
2012-10-03 00:44:50 +02:00
|
|
|
options = options || {};
|
2012-11-13 08:33:06 +01:00
|
|
|
|
|
|
|
// the point at which write() starts returning false
|
2012-10-05 16:43:34 +02:00
|
|
|
this.highWaterMark = options.hasOwnProperty('highWaterMark') ?
|
|
|
|
options.highWaterMark : 16 * 1024;
|
2012-11-13 08:33:06 +01:00
|
|
|
|
|
|
|
// the point that it has to get to before we call _write(chunk,cb)
|
|
|
|
// default to pushing everything out as fast as possible.
|
2012-10-05 16:43:34 +02:00
|
|
|
this.lowWaterMark = options.hasOwnProperty('lowWaterMark') ?
|
2012-11-13 08:33:06 +01:00
|
|
|
options.lowWaterMark : 0;
|
|
|
|
|
|
|
|
// cast to ints.
|
|
|
|
assert(typeof this.lowWaterMark === 'number');
|
|
|
|
assert(typeof this.highWaterMark === 'number');
|
|
|
|
this.lowWaterMark = ~~this.lowWaterMark;
|
|
|
|
this.highWaterMark = ~~this.highWaterMark;
|
|
|
|
assert(this.lowWaterMark >= 0);
|
|
|
|
assert(this.highWaterMark >= this.lowWaterMark,
|
|
|
|
this.highWaterMark + '>=' + this.lowWaterMark);
|
|
|
|
|
2012-10-03 00:44:50 +02:00
|
|
|
this.needDrain = false;
|
2012-11-13 08:33:06 +01:00
|
|
|
// at the start of calling end()
|
2012-10-03 00:44:50 +02:00
|
|
|
this.ending = false;
|
2012-11-13 08:33:06 +01:00
|
|
|
// when end() has been called, and returned
|
|
|
|
this.ended = false;
|
|
|
|
// when 'finish' has emitted
|
|
|
|
this.finished = false;
|
|
|
|
// when 'finish' is being emitted
|
|
|
|
this.finishing = false;
|
2012-10-03 00:44:50 +02:00
|
|
|
|
2012-10-09 20:01:53 +02:00
|
|
|
// should we decode strings into buffers before passing to _write?
|
|
|
|
// this is here so that some node-core streams can optimize string
|
|
|
|
// handling at a lower level.
|
|
|
|
this.decodeStrings = options.hasOwnProperty('decodeStrings') ?
|
|
|
|
options.decodeStrings : true;
|
|
|
|
|
2012-10-03 00:44:50 +02:00
|
|
|
// not an actual buffer we keep track of, but a measurement
|
|
|
|
// of how much we're waiting to get pushed to some underlying
|
|
|
|
// socket or file.
|
|
|
|
this.length = 0;
|
|
|
|
|
2012-11-13 08:33:06 +01:00
|
|
|
// a flag to see when we're in the middle of a write.
|
2012-10-03 00:44:50 +02:00
|
|
|
this.writing = false;
|
2012-11-13 08:33:06 +01:00
|
|
|
|
|
|
|
// a flag to be able to tell if the onwrite cb is called immediately,
|
|
|
|
// or on a later tick.
|
|
|
|
this.sync = false;
|
|
|
|
|
|
|
|
// the callback that's passed to _write(chunk,cb)
|
2012-11-17 05:24:14 +01:00
|
|
|
this.onwrite = function(er) {
|
|
|
|
onwrite(stream, er);
|
|
|
|
};
|
2012-11-13 08:33:06 +01:00
|
|
|
|
|
|
|
// the callback that the user supplies to write(chunk,encoding,cb)
|
|
|
|
this.writecb = null;
|
|
|
|
|
|
|
|
// the amount that is being written when _write is called.
|
|
|
|
this.writelen = 0;
|
|
|
|
|
2012-10-03 00:44:50 +02:00
|
|
|
this.buffer = [];
|
|
|
|
}
|
|
|
|
|
|
|
|
function Writable(options) {
|
2012-10-08 23:43:17 +02:00
|
|
|
// Writable ctor is applied to Duplexes, though they're not
|
|
|
|
// instanceof Writable, they're instanceof Readable.
|
2012-10-10 02:42:47 +02:00
|
|
|
if (!(this instanceof Writable) && !(this instanceof Stream.Duplex))
|
2012-10-08 23:43:17 +02:00
|
|
|
return new Writable(options);
|
|
|
|
|
2012-11-13 08:33:06 +01:00
|
|
|
this._writableState = new WritableState(options, this);
|
2012-10-03 00:44:50 +02:00
|
|
|
|
|
|
|
// legacy.
|
|
|
|
this.writable = true;
|
|
|
|
|
|
|
|
Stream.call(this);
|
|
|
|
}
|
|
|
|
|
2012-11-13 08:33:06 +01:00
|
|
|
// Override this method or _write(chunk, cb)
|
2012-10-10 02:31:29 +02:00
|
|
|
Writable.prototype.write = function(chunk, encoding, cb) {
|
2012-10-03 00:44:50 +02:00
|
|
|
var state = this._writableState;
|
|
|
|
|
2012-10-10 02:31:29 +02:00
|
|
|
if (typeof encoding === 'function') {
|
|
|
|
cb = encoding;
|
|
|
|
encoding = null;
|
|
|
|
}
|
|
|
|
|
2012-11-13 08:33:06 +01:00
|
|
|
if (state.ended) {
|
|
|
|
var er = new Error('write after end');
|
|
|
|
if (typeof cb === 'function')
|
|
|
|
cb(er);
|
|
|
|
this.emit('error', er);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
2012-10-03 00:44:50 +02:00
|
|
|
var l = chunk.length;
|
2012-10-09 20:01:53 +02:00
|
|
|
if (false === state.decodeStrings)
|
2012-11-13 08:33:06 +01:00
|
|
|
chunk = [chunk, encoding || 'utf8'];
|
2012-10-09 20:01:53 +02:00
|
|
|
else if (typeof chunk === 'string' || encoding) {
|
|
|
|
chunk = new Buffer(chunk + '', encoding);
|
|
|
|
l = chunk.length;
|
|
|
|
}
|
|
|
|
|
2012-10-03 00:44:50 +02:00
|
|
|
state.length += l;
|
|
|
|
|
2012-10-05 16:45:03 +02:00
|
|
|
var ret = state.length < state.highWaterMark;
|
|
|
|
if (ret === false)
|
|
|
|
state.needDrain = true;
|
|
|
|
|
2012-10-10 02:31:29 +02:00
|
|
|
// if we're already writing something, then just put this
|
|
|
|
// in the queue, and wait our turn.
|
2012-10-03 00:44:50 +02:00
|
|
|
if (state.writing) {
|
2012-10-10 02:31:29 +02:00
|
|
|
state.buffer.push([chunk, cb]);
|
2012-10-03 00:44:50 +02:00
|
|
|
return ret;
|
|
|
|
}
|
|
|
|
|
|
|
|
state.writing = true;
|
2012-11-13 08:33:06 +01:00
|
|
|
state.sync = true;
|
|
|
|
state.writelen = l;
|
|
|
|
state.writecb = cb;
|
|
|
|
this._write(chunk, state.onwrite);
|
|
|
|
state.sync = false;
|
2012-10-09 20:01:53 +02:00
|
|
|
|
|
|
|
return ret;
|
2012-11-13 08:33:06 +01:00
|
|
|
};
|
2012-10-09 20:01:53 +02:00
|
|
|
|
2012-11-17 05:24:14 +01:00
|
|
|
function onwrite(stream, er) {
|
|
|
|
var state = stream._writableState;
|
2012-11-13 08:33:06 +01:00
|
|
|
var sync = state.sync;
|
|
|
|
var cb = state.writecb;
|
|
|
|
var l = state.writelen;
|
|
|
|
|
|
|
|
state.writing = false;
|
|
|
|
state.writelen = null;
|
|
|
|
state.writecb = null;
|
2012-10-03 00:44:50 +02:00
|
|
|
|
2012-11-13 08:33:06 +01:00
|
|
|
if (er) {
|
2012-10-10 02:31:29 +02:00
|
|
|
if (cb) {
|
2012-11-13 08:33:06 +01:00
|
|
|
if (sync)
|
2012-11-17 05:24:14 +01:00
|
|
|
process.nextTick(function() {
|
|
|
|
cb(er);
|
|
|
|
});
|
2012-10-10 02:31:29 +02:00
|
|
|
else
|
2012-11-13 08:33:06 +01:00
|
|
|
cb(er);
|
2012-12-14 19:49:16 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
// backwards compatibility. still emit if there was a cb.
|
|
|
|
stream.emit('error', er);
|
2012-11-13 08:33:06 +01:00
|
|
|
return;
|
|
|
|
}
|
|
|
|
state.length -= l;
|
|
|
|
|
|
|
|
if (cb) {
|
|
|
|
// don't call the cb until the next tick if we're in sync mode.
|
|
|
|
// also, defer if we're about to write some more right now.
|
|
|
|
if (sync || state.buffer.length)
|
|
|
|
process.nextTick(cb);
|
|
|
|
else
|
|
|
|
cb();
|
2012-10-09 20:01:53 +02:00
|
|
|
}
|
2012-10-03 00:44:50 +02:00
|
|
|
|
2012-12-12 09:59:54 +01:00
|
|
|
if (state.length === 0 && (state.ended || state.ending) &&
|
|
|
|
!state.finished && !state.finishing) {
|
2012-11-13 08:33:06 +01:00
|
|
|
// emit 'finish' at the very end.
|
|
|
|
state.finishing = true;
|
2012-11-17 05:24:14 +01:00
|
|
|
stream.emit('finish');
|
2012-11-13 08:33:06 +01:00
|
|
|
state.finished = true;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
// if there's something in the buffer waiting, then do that, too.
|
|
|
|
if (state.buffer.length) {
|
|
|
|
var chunkCb = state.buffer.shift();
|
|
|
|
var chunk = chunkCb[0];
|
|
|
|
cb = chunkCb[1];
|
|
|
|
|
|
|
|
if (false === state.decodeStrings)
|
|
|
|
l = chunk[0].length;
|
|
|
|
else
|
|
|
|
l = chunk.length;
|
|
|
|
|
|
|
|
state.writelen = l;
|
|
|
|
state.writecb = cb;
|
|
|
|
state.writechunk = chunk;
|
|
|
|
state.writing = true;
|
2012-11-17 05:24:14 +01:00
|
|
|
stream._write(chunk, state.onwrite);
|
2012-11-13 08:33:06 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
if (state.length <= state.lowWaterMark && state.needDrain) {
|
|
|
|
// Must force callback to be called on nextTick, so that we don't
|
|
|
|
// emit 'drain' before the write() consumer gets the 'false' return
|
|
|
|
// value, and has a chance to attach a 'drain' listener.
|
|
|
|
process.nextTick(function() {
|
|
|
|
if (!state.needDrain)
|
|
|
|
return;
|
|
|
|
state.needDrain = false;
|
2012-11-17 05:24:14 +01:00
|
|
|
stream.emit('drain');
|
|
|
|
});
|
2012-11-13 08:33:06 +01:00
|
|
|
}
|
|
|
|
}
|
2012-10-03 00:44:50 +02:00
|
|
|
|
|
|
|
Writable.prototype._write = function(chunk, cb) {
|
2012-11-17 05:24:14 +01:00
|
|
|
process.nextTick(function() {
|
|
|
|
cb(new Error('not implemented'));
|
|
|
|
});
|
2012-10-03 00:44:50 +02:00
|
|
|
};
|
|
|
|
|
|
|
|
Writable.prototype.end = function(chunk, encoding) {
|
|
|
|
var state = this._writableState;
|
2012-11-13 08:33:06 +01:00
|
|
|
|
|
|
|
// ignore unnecessary end() calls.
|
|
|
|
if (state.ending || state.ended || state.finished)
|
|
|
|
return;
|
|
|
|
|
2012-10-03 00:44:50 +02:00
|
|
|
state.ending = true;
|
|
|
|
if (chunk)
|
|
|
|
this.write(chunk, encoding);
|
2012-12-12 09:59:54 +01:00
|
|
|
else if (state.length === 0 && !state.finishing && !state.finished) {
|
2012-11-13 08:33:06 +01:00
|
|
|
state.finishing = true;
|
2012-10-03 00:44:50 +02:00
|
|
|
this.emit('finish');
|
2012-11-13 08:33:06 +01:00
|
|
|
state.finished = true;
|
|
|
|
}
|
2012-10-03 00:44:50 +02:00
|
|
|
state.ended = true;
|
|
|
|
};
|