From dcc547d798c11ed81f4617724a0120dad484d0ce Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Fri, 5 Nov 2010 13:03:32 -0700 Subject: [PATCH] Dumper net.js integration --- lib/http.js | 4 +- lib/net.js | 268 +++++++++++----------------------------------------- 2 files changed, 59 insertions(+), 213 deletions(-) diff --git a/lib/http.js b/lib/http.js index b0e86f28333..1a00efce83b 100644 --- a/lib/http.js +++ b/lib/http.js @@ -827,8 +827,8 @@ function connectionListener (socket) { // No more messages to be pushed out. // HACK: need way to do this with socket interface - if (socket._writeQueue.length) { - socket.__destroyOnDrain = true; //socket.end(); + if (socket._writeWatcher.firstBucket) { + socket._eof = true; } else { socket.destroy(); } diff --git a/lib/net.js b/lib/net.js index 6373b40f520..b79dbbd545c 100644 --- a/lib/net.js +++ b/lib/net.js @@ -54,6 +54,24 @@ var ioWatchers = new FreeList("iowatcher", 100, function () { return new IOWatcher(); }); + +IOWatcher.prototype.ondrain = function () { + assert(this.socket); + if (this.writable || this.readable) { + require('timers').active(this.socket); + this.socket.emit('drain'); + } + + if (this.socket._eof) this.socket._shutdown(); +}; + + +IOWatcher.prototype.onerror = function (errno) { + assert(this.socket); + this.socket.destroy(errnoException(errno, 'write')); +}; + + exports.isIP = binding.isIP; exports.isIPv4 = function (input) { @@ -92,16 +110,6 @@ function setImplmentationMethods (self) { }; if (self.type == 'unix') { - self._writeImpl = function (buf, off, len, fd, flags) { - // Detect and disallow zero-byte writes wth an attached file - // descriptor. This is an implementation limitation of sendmsg(2). - if (fd && noData(buf, off, len)) { - throw new Error('File descriptors can only be written with data'); - } - - return sendMsg(self.fd, buf, off, len, fd, flags); - }; - self._readImpl = function (buf, off, len) { var bytesRead = recvMsg(self.fd, buf, off, len); @@ -123,25 +131,10 @@ function setImplmentationMethods (self) { return bytesRead; }; } else { - self._writeImpl = function (buf, off, len, fd, flags) { - // XXX: TLS support requires that 0-byte writes get processed - // by the kernel for some reason. Otherwise, we'd just - // fast-path return here. - - // Drop 'fd' and 'flags' as these are not supported by the write(2) - // system call - return write(self.fd, buf, off, len); - }; - self._readImpl = function (buf, off, len) { return read(self.fd, buf, off, len); }; } - - self._shutdownImpl = function () { - shutdown(self.fd, 'write'); - }; - }; @@ -168,11 +161,6 @@ function initStream (self) { self._readWatcher.callback = onReadable; self.readable = false; - // Queue of buffers and string that need to be written to socket. - self._writeQueue = []; - self._writeQueueEncoding = []; - self._writeQueueFD = []; - self._writeWatcher = ioWatchers.alloc(); self._writeWatcher.socket = self; self._writeWatcher.callback = onWritable; @@ -255,182 +243,44 @@ Object.defineProperty(Stream.prototype, 'readyState', { }); -// Returns true if all the data was flushed to socket. Returns false if -// something was queued. If data was queued, then the "drain" event will -// signal when it has been finally flushed to socket. -Stream.prototype.write = function (data, encoding, fd) { - if (this._connecting || (this._writeQueue && this._writeQueue.length)) { - if (!this._writeQueue) { - this._writeQueue = []; - this._writeQueueEncoding = []; - this._writeQueueFD = []; - } +Stream.prototype._appendBucket = function (data, encoding, fd) { + var newBucket = { data: data }; + if (encoding) newBucket.encoding = encoding; + if (fd) newBucket.fd = fd; - // Slow. There is already a write queue, so let's append to it. - if (this._writeQueueLast() === END_OF_FILE) { - throw new Error('Stream.end() called already; cannot write.'); - } + var queueSize = data.length; - if (typeof data == 'string' && - this._writeQueue.length && - typeof this._writeQueue[this._writeQueue.length-1] === 'string' && - this._writeQueueEncoding[this._writeQueueEncoding.length-1] === encoding) { - // optimization - concat onto last - this._writeQueue[this._writeQueue.length-1] += data; - } else { - this._writeQueue.push(data); - this._writeQueueEncoding.push(encoding); - } + // TODO properly calculate queueSize - if (fd != undefined) { - this._writeQueueFD.push(fd); - } - - return false; + if (this._writeWatcher.lastBucket) { + this._writeWatcher.lastBucket.next = newBucket; } else { - // Fast. - // The most common case. There is no write queue. Just push the data - // directly to the socket. - return this._writeOut(data, encoding, fd); + this._writeWatcher.firstBucket = newBucket; } + + this._writeWatcher.lastBucket = newBucket; + + return queueSize; }; -// Directly writes the data to socket. -// -// Steps: -// 1. If it's a string, write it to the `pool`. (If not space remains -// on the pool make a new one.) -// 2. Write data to socket. Return true if flushed. -// 3. Slice out remaining -// 4. Unshift remaining onto _writeQueue. Return false. -Stream.prototype._writeOut = function (data, encoding, fd) { + +Stream.prototype.write = function (data, encoding, fd) { + if (this._eof) { + throw new Error('Stream.end() called already; cannot write.'); + } + if (!this.writable) { throw new Error('Stream is not writable'); } - var buffer, off, len; - var bytesWritten, charsWritten; - var queuedData = false; + var queueSize = this._appendBucket(data, encoding, fd); - if (typeof data != 'string') { - // 'data' is a buffer, ignore 'encoding' - buffer = data; - off = 0; - len = data.length; - - } else { - assert(typeof data == 'string'); - - if (!pool || pool.length - pool.used < kMinPoolSpace) { - pool = null; - allocNewPool(); - } - - if (!encoding || encoding == 'utf8' || encoding == 'utf-8') { - // default to utf8 - bytesWritten = pool.write(data, 'utf8', pool.used); - charsWritten = Buffer._charsWritten; - } else { - bytesWritten = pool.write(data, encoding, pool.used); - charsWritten = bytesWritten; - } - - if (encoding && data.length > 0) { - assert(bytesWritten > 0); - } - - buffer = pool; - len = bytesWritten; - off = pool.used; - - pool.used += bytesWritten; - - debug('wrote ' + bytesWritten + ' bytes to pool'); - - if (charsWritten != data.length) { - //debug("couldn't fit " + (data.length - charsWritten) + " bytes into the pool\n"); - // Unshift whatever didn't fit onto the buffer - this._writeQueue.unshift(data.slice(charsWritten)); - this._writeQueueEncoding.unshift(encoding); - this._writeWatcher.start(); - queuedData = true; - } - } - - try { - bytesWritten = this._writeImpl(buffer, off, len, fd, 0); - } catch (e) { - this.destroy(e); - return false; - } - - debug('wrote ' + bytesWritten + ' to socket. [fd, off, len] = ' + JSON.stringify([this.fd, off, len]) + "\n"); + if (this._connecting) return false; + this._onWritable(); // Insert writeWatcher into the dumpQueue require('timers').active(this); - if (bytesWritten == len) { - // awesome. sent to buffer. - if (buffer === pool) { - // If we're just writing from the pool then we can make a little - // optimization and save the space. - buffer.used -= len; - } - - if (queuedData) { - return false; - } else { - return true; - } - } - - // Didn't write the entire thing to buffer. - // Need to wait for the socket to become available before trying again. - this._writeWatcher.start(); - - // Slice out the data left. - var leftOver = buffer.slice(off + bytesWritten, off + len); - leftOver.used = leftOver.length; // used the whole thing... - - // util.error('data.used = ' + data.used); - //if (!this._writeQueue) initWriteStream(this); - - // data should be the next thing to write. - this._writeQueue.unshift(leftOver); - this._writeQueueEncoding.unshift(null); - - // If didn't successfully write any bytes, enqueue our fd and try again - if (!bytesWritten) { - this._writeQueueFD.unshift(fd); - } - - return false; -}; - - -// Flushes the write buffer out. -// Returns true if the entire buffer was flushed. -Stream.prototype.flush = function () { - while (this._writeQueue && this._writeQueue.length) { - var data = this._writeQueue.shift(); - var encoding = this._writeQueueEncoding.shift(); - var fd = this._writeQueueFD.shift(); - - if (data === END_OF_FILE) { - this._shutdown(); - return true; - } - - var flushed = this._writeOut(data,encoding,fd); - if (!flushed) return false; - } - if (this._writeWatcher) this._writeWatcher.stop(); - return true; -}; - - -Stream.prototype._writeQueueLast = function () { - return this._writeQueue.length > 0 ? this._writeQueue[this._writeQueue.length-1] - : null; + return queueSize < (64*1024); }; @@ -481,7 +331,7 @@ Stream.prototype._onConnect = function () { } - if (this._writeQueue && this._writeQueue.length) { + if (this._writeWatcher.firstBucket) { // Flush this in case any writes are queued up while connecting. this._onWritable(); } @@ -493,11 +343,10 @@ Stream.prototype._onConnect = function () { Stream.prototype._onWritable = function () { - // Stream becomes writable on connect() but don't flush if there's - // nothing actually to write - if (this.flush()) { - if (this._events && this._events['drain']) this.emit("drain"); - if (this.ondrain) this.ondrain(); // Optimization + // Stick it into the dumpQueue + if (!this._writeWatcher.next) { + this._writeWatcher.next = IOWatcher.dumpQueue.next; + IOWatcher.dumpQueue.next = this._writeWatcher; } }; @@ -648,15 +497,13 @@ Stream.prototype.destroy = function (exception) { // pool is shared between sockets, so don't need to free it here. var self = this; - // TODO would like to set _writeQueue to null to avoid extra object alloc, - // but lots of code assumes this._writeQueue is always an array. - this._writeQueue = []; - this.readable = this.writable = false; if (this._writeWatcher) { this._writeWatcher.stop(); this._writeWatcher.socket = null; + this._writeWatcher.firstBucket = null; + this._writeWatcher.lastBucket = null; ioWatchers.free(this._writeWatcher); this._writeWatcher = null; } @@ -695,7 +542,7 @@ Stream.prototype._shutdown = function () { this.writable = false; try { - this._shutdownImpl(); + shutdown(this.fd, 'write'); } catch (e) { this.destroy(e); } @@ -708,15 +555,14 @@ Stream.prototype._shutdown = function () { Stream.prototype.end = function (data, encoding) { - if (this.writable) { - if (this._writeQueueLast() !== END_OF_FILE) { - if (data) this.write(data, encoding); - this._writeQueue.push(END_OF_FILE); - if (!this._connecting) { - this.flush(); - } - } - } + if (!this.writable) return; // TODO this should throw error + if (this._eof) return; // TODO this should also throw error + + if (data) this._appendBucket(data, encoding); + this._eof = true; + + // If this isn't in the dumpQueue then we shutdown now. + if (!this._writeWatcher.next) this._shutdown(); };