mirror of
https://github.com/nodejs/node.git
synced 2024-12-01 16:10:02 +01:00
9180140231
Queued write requests should be invoked on handle close, otherwise the "consumer" might be already destroyed when the write callbacks of the "consumed" handle will be invoked. Same applies to the shutdown requests. Make sure to "move" away socket from server to not break the `connections` counter in `net.js`. Otherwise it might not call `close` callback, or call it too early. Fix: https://github.com/iojs/io.js/issues/1696 PR-URL: https://github.com/nodejs/io.js/pull/1910 Reviewed-By: Trevor Norris <trev.norris@gmail.com>
217 lines
4.5 KiB
JavaScript
217 lines
4.5 KiB
JavaScript
'use strict';
|
|
|
|
const assert = require('assert');
|
|
const util = require('util');
|
|
const Socket = require('net').Socket;
|
|
const JSStream = process.binding('js_stream').JSStream;
|
|
const uv = process.binding('uv');
|
|
const debug = util.debuglog('stream_wrap');
|
|
|
|
function StreamWrap(stream) {
|
|
const handle = new JSStream();
|
|
|
|
this.stream = stream;
|
|
|
|
this._list = null;
|
|
|
|
const self = this;
|
|
handle.close = function(cb) {
|
|
debug('close');
|
|
self.doClose(cb);
|
|
};
|
|
handle.isAlive = function() {
|
|
return self.isAlive();
|
|
};
|
|
handle.isClosing = function() {
|
|
return self.isClosing();
|
|
};
|
|
handle.onreadstart = function() {
|
|
return self.readStart();
|
|
};
|
|
handle.onreadstop = function() {
|
|
return self.readStop();
|
|
};
|
|
handle.onshutdown = function(req) {
|
|
return self.doShutdown(req);
|
|
};
|
|
handle.onwrite = function(req, bufs) {
|
|
return self.doWrite(req, bufs);
|
|
};
|
|
|
|
this.stream.pause();
|
|
this.stream.on('error', function(err) {
|
|
self.emit('error', err);
|
|
});
|
|
this.stream.on('data', function(chunk) {
|
|
debug('data', chunk.length);
|
|
if (self._handle)
|
|
self._handle.readBuffer(chunk);
|
|
});
|
|
this.stream.once('end', function() {
|
|
debug('end');
|
|
if (self._handle)
|
|
self._handle.emitEOF();
|
|
});
|
|
|
|
Socket.call(this, {
|
|
handle: handle
|
|
});
|
|
}
|
|
util.inherits(StreamWrap, Socket);
|
|
module.exports = StreamWrap;
|
|
|
|
// require('_stream_wrap').StreamWrap
|
|
StreamWrap.StreamWrap = StreamWrap;
|
|
|
|
StreamWrap.prototype.isAlive = function isAlive() {
|
|
return true;
|
|
};
|
|
|
|
StreamWrap.prototype.isClosing = function isClosing() {
|
|
return !this.readable || !this.writable;
|
|
};
|
|
|
|
StreamWrap.prototype.readStart = function readStart() {
|
|
this.stream.resume();
|
|
return 0;
|
|
};
|
|
|
|
StreamWrap.prototype.readStop = function readStop() {
|
|
this.stream.pause();
|
|
return 0;
|
|
};
|
|
|
|
StreamWrap.prototype.doShutdown = function doShutdown(req) {
|
|
const self = this;
|
|
const handle = this._handle;
|
|
const item = this._enqueue('shutdown', req);
|
|
|
|
this.stream.end(function() {
|
|
// Ensure that write was dispatched
|
|
setImmediate(function() {
|
|
if (!self._dequeue(item))
|
|
return;
|
|
|
|
handle.finishShutdown(req, 0);
|
|
});
|
|
});
|
|
return 0;
|
|
};
|
|
|
|
StreamWrap.prototype.doWrite = function doWrite(req, bufs) {
|
|
const self = this;
|
|
const handle = self._handle;
|
|
|
|
var pending = bufs.length;
|
|
|
|
// Queue the request to be able to cancel it
|
|
const item = self._enqueue('write', req);
|
|
|
|
self.stream.cork();
|
|
bufs.forEach(function(buf) {
|
|
self.stream.write(buf, done);
|
|
});
|
|
self.stream.uncork();
|
|
|
|
function done(err) {
|
|
if (!err && --pending !== 0)
|
|
return;
|
|
|
|
// Ensure that this is called once in case of error
|
|
pending = 0;
|
|
|
|
// Ensure that write was dispatched
|
|
setImmediate(function() {
|
|
// Do not invoke callback twice
|
|
if (!self._dequeue(item))
|
|
return;
|
|
|
|
var errCode = 0;
|
|
if (err) {
|
|
if (err.code && uv['UV_' + err.code])
|
|
errCode = uv['UV_' + err.code];
|
|
else
|
|
errCode = uv.UV_EPIPE;
|
|
}
|
|
|
|
handle.doAfterWrite(req);
|
|
handle.finishWrite(req, errCode);
|
|
});
|
|
}
|
|
|
|
return 0;
|
|
};
|
|
|
|
function QueueItem(type, req) {
|
|
this.type = type;
|
|
this.req = req;
|
|
this.prev = this;
|
|
this.next = this;
|
|
}
|
|
|
|
StreamWrap.prototype._enqueue = function enqueue(type, req) {
|
|
const item = new QueueItem(type, req);
|
|
if (this._list === null) {
|
|
this._list = item;
|
|
return item;
|
|
}
|
|
|
|
item.next = this._list.next;
|
|
item.prev = this._list;
|
|
item.next.prev = item;
|
|
item.prev.next = item;
|
|
|
|
return item;
|
|
};
|
|
|
|
StreamWrap.prototype._dequeue = function dequeue(item) {
|
|
assert(item instanceof QueueItem);
|
|
|
|
var next = item.next;
|
|
var prev = item.prev;
|
|
|
|
if (next === null && prev === null)
|
|
return false;
|
|
|
|
item.next = null;
|
|
item.prev = null;
|
|
|
|
if (next === item) {
|
|
prev = null;
|
|
next = null;
|
|
} else {
|
|
prev.next = next;
|
|
next.prev = prev;
|
|
}
|
|
|
|
if (this._list === item)
|
|
this._list = next;
|
|
|
|
return true;
|
|
};
|
|
|
|
StreamWrap.prototype.doClose = function doClose(cb) {
|
|
const self = this;
|
|
const handle = self._handle;
|
|
|
|
setImmediate(function() {
|
|
while (self._list !== null) {
|
|
const item = self._list;
|
|
const req = item.req;
|
|
self._dequeue(item);
|
|
|
|
const errCode = uv.UV_ECANCELED;
|
|
if (item.type === 'write') {
|
|
handle.doAfterWrite(req);
|
|
handle.finishWrite(req, errCode);
|
|
} else if (item.type === 'shutdown') {
|
|
handle.finishShutdown(req, errCode);
|
|
}
|
|
}
|
|
|
|
// Should be already set by net.js
|
|
assert(self._handle === null);
|
|
cb();
|
|
});
|
|
};
|