mirror of
https://github.com/nodejs/node.git
synced 2024-12-01 16:10:02 +01:00
http: Refactor for streams2
Because of some of the peculiarities of http, this has a bit of special magic to handle cases where the IncomingMessage would wait forever in a paused state. In the server, if you do not begin consuming the request body by the time the response emits 'finish', then it will be flushed out. In the client, if you do not add a 'response' handler onto the request, then the response stream will be flushed out.
This commit is contained in:
parent
81e356279d
commit
1d369317ea
142
lib/http.js
142
lib/http.js
@ -114,19 +114,30 @@ function parserOnHeadersComplete(info) {
|
||||
return skipBody;
|
||||
}
|
||||
|
||||
// XXX This is a mess.
|
||||
// TODO: http.Parser should be a Writable emits request/response events.
|
||||
function parserOnBody(b, start, len) {
|
||||
var parser = this;
|
||||
var slice = b.slice(start, start + len);
|
||||
if (parser.incoming._paused || parser.incoming._pendings.length) {
|
||||
parser.incoming._pendings.push(slice);
|
||||
} else {
|
||||
parser.incoming._emitData(slice);
|
||||
var stream = parser.incoming;
|
||||
var rs = stream._readableState;
|
||||
var socket = stream.socket;
|
||||
|
||||
// pretend this was the result of a stream._read call.
|
||||
if (len > 0) {
|
||||
var slice = b.slice(start, start + len);
|
||||
rs.onread(null, slice);
|
||||
}
|
||||
|
||||
if (rs.length >= rs.highWaterMark)
|
||||
socket.pause();
|
||||
}
|
||||
|
||||
function parserOnMessageComplete() {
|
||||
var parser = this;
|
||||
parser.incoming.complete = true;
|
||||
var stream = parser.incoming;
|
||||
var socket = stream.socket;
|
||||
|
||||
stream.complete = true;
|
||||
|
||||
// Emit any trailing headers.
|
||||
var headers = parser._headers;
|
||||
@ -140,19 +151,13 @@ function parserOnMessageComplete() {
|
||||
parser._url = '';
|
||||
}
|
||||
|
||||
if (!parser.incoming.upgrade) {
|
||||
if (!stream.upgrade)
|
||||
// For upgraded connections, also emit this after parser.execute
|
||||
if (parser.incoming._paused || parser.incoming._pendings.length) {
|
||||
parser.incoming._pendings.push(END_OF_FILE);
|
||||
} else {
|
||||
parser.incoming.readable = false;
|
||||
parser.incoming._emitEnd();
|
||||
}
|
||||
}
|
||||
stream._readableState.onread(null, null);
|
||||
|
||||
if (parser.socket.readable) {
|
||||
// force to read the next incoming message
|
||||
parser.socket.resume();
|
||||
socket.resume();
|
||||
}
|
||||
}
|
||||
|
||||
@ -263,9 +268,13 @@ function utcDate() {
|
||||
|
||||
/* Abstract base class for ServerRequest and ClientResponse. */
|
||||
function IncomingMessage(socket) {
|
||||
Stream.call(this);
|
||||
Stream.Readable.call(this);
|
||||
|
||||
// XXX This implementation is kind of all over the place
|
||||
// When the parser emits body chunks, they go in this list.
|
||||
// _read() pulls them out, and when it finds EOF, it ends.
|
||||
this._pendings = [];
|
||||
|
||||
// TODO Remove one of these eventually.
|
||||
this.socket = socket;
|
||||
this.connection = socket;
|
||||
|
||||
@ -276,78 +285,50 @@ function IncomingMessage(socket) {
|
||||
|
||||
this.readable = true;
|
||||
|
||||
this._paused = false;
|
||||
this._pendings = [];
|
||||
|
||||
this._endEmitted = false;
|
||||
this._pendingIndex = 0;
|
||||
|
||||
// request (server) only
|
||||
this.url = '';
|
||||
|
||||
this.method = null;
|
||||
|
||||
// response (client) only
|
||||
this.statusCode = null;
|
||||
this.client = this.socket;
|
||||
|
||||
// flag for backwards compatibility grossness.
|
||||
this._consuming = false;
|
||||
}
|
||||
util.inherits(IncomingMessage, Stream);
|
||||
util.inherits(IncomingMessage, Stream.Readable);
|
||||
|
||||
|
||||
exports.IncomingMessage = IncomingMessage;
|
||||
|
||||
|
||||
IncomingMessage.prototype.read = function(n) {
|
||||
this._consuming = true;
|
||||
return Stream.Readable.prototype.read.call(this, n);
|
||||
};
|
||||
|
||||
|
||||
IncomingMessage.prototype._read = function(n, callback) {
|
||||
// We actually do almost nothing here, because the parserOnBody
|
||||
// function fills up our internal buffer directly. However, we
|
||||
// do need to unpause the underlying socket so that it flows.
|
||||
if (!this.socket.readable)
|
||||
return callback(null, null);
|
||||
else
|
||||
this.socket.resume();
|
||||
};
|
||||
|
||||
|
||||
IncomingMessage.prototype.destroy = function(error) {
|
||||
this.socket.destroy(error);
|
||||
};
|
||||
|
||||
|
||||
IncomingMessage.prototype.setEncoding = function(encoding) {
|
||||
var StringDecoder = require('string_decoder').StringDecoder; // lazy load
|
||||
this._decoder = new StringDecoder(encoding);
|
||||
};
|
||||
|
||||
|
||||
IncomingMessage.prototype.pause = function() {
|
||||
this._paused = true;
|
||||
this.socket.pause();
|
||||
};
|
||||
|
||||
|
||||
IncomingMessage.prototype.resume = function() {
|
||||
this._paused = false;
|
||||
if (this.socket) {
|
||||
this.socket.resume();
|
||||
}
|
||||
|
||||
this._emitPending();
|
||||
};
|
||||
|
||||
|
||||
IncomingMessage.prototype._emitPending = function(callback) {
|
||||
if (this._pendings.length) {
|
||||
var self = this;
|
||||
process.nextTick(function() {
|
||||
while (!self._paused && self._pendings.length) {
|
||||
var chunk = self._pendings.shift();
|
||||
if (chunk !== END_OF_FILE) {
|
||||
assert(Buffer.isBuffer(chunk));
|
||||
self._emitData(chunk);
|
||||
} else {
|
||||
assert(self._pendings.length === 0);
|
||||
self.readable = false;
|
||||
self._emitEnd();
|
||||
}
|
||||
}
|
||||
|
||||
if (callback) {
|
||||
callback();
|
||||
}
|
||||
});
|
||||
} else if (callback) {
|
||||
callback();
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
IncomingMessage.prototype._emitData = function(d) {
|
||||
if (this._decoder) {
|
||||
@ -1016,7 +997,7 @@ ServerResponse.prototype.writeHead = function(statusCode) {
|
||||
|
||||
// don't keep alive connections where the client expects 100 Continue
|
||||
// but we sent a final status; they may put extra bytes on the wire.
|
||||
if (this._expect_continue && ! this._sent100) {
|
||||
if (this._expect_continue && !this._sent100) {
|
||||
this.shouldKeepAlive = false;
|
||||
}
|
||||
|
||||
@ -1321,11 +1302,10 @@ function socketCloseListener() {
|
||||
// Socket closed before we emitted 'end' below.
|
||||
req.res.emit('aborted');
|
||||
var res = req.res;
|
||||
req.res._emitPending(function() {
|
||||
res._emitEnd();
|
||||
res.on('end', function() {
|
||||
res.emit('close');
|
||||
res = null;
|
||||
});
|
||||
res._readableState.onread(null, null);
|
||||
} else if (!req.res && !req._hadError) {
|
||||
// This socket error fired before we started to
|
||||
// receive a response. The error needs to
|
||||
@ -1428,11 +1408,13 @@ function socketOnData(d, start, end) {
|
||||
}
|
||||
|
||||
|
||||
// client
|
||||
function parserOnIncomingClient(res, shouldKeepAlive) {
|
||||
var parser = this;
|
||||
var socket = this.socket;
|
||||
var req = socket._httpMessage;
|
||||
|
||||
|
||||
// propogate "domain" setting...
|
||||
if (req.domain && !res.domain) {
|
||||
debug('setting "res.domain"');
|
||||
@ -1480,15 +1462,21 @@ function parserOnIncomingClient(res, shouldKeepAlive) {
|
||||
|
||||
DTRACE_HTTP_CLIENT_RESPONSE(socket, req);
|
||||
COUNTER_HTTP_CLIENT_RESPONSE();
|
||||
req.emit('response', res);
|
||||
req.res = res;
|
||||
res.req = req;
|
||||
|
||||
var handled = req.emit('response', res);
|
||||
res.on('end', responseOnEnd);
|
||||
|
||||
// If the user did not listen for the 'response' event, then they
|
||||
// can't possibly read the data, so we .resume() it into the void
|
||||
// so that the socket doesn't hang there in a paused state.
|
||||
if (!handled)
|
||||
res.resume();
|
||||
|
||||
return isHeadResponse;
|
||||
}
|
||||
|
||||
// client
|
||||
function responseOnEnd() {
|
||||
var res = this;
|
||||
var req = res.req;
|
||||
@ -1784,7 +1772,7 @@ function connectionListener(socket) {
|
||||
incoming.push(req);
|
||||
|
||||
var res = new ServerResponse(req);
|
||||
debug('server response shouldKeepAlive: ' + shouldKeepAlive);
|
||||
|
||||
res.shouldKeepAlive = shouldKeepAlive;
|
||||
DTRACE_HTTP_SERVER_REQUEST(req, socket);
|
||||
COUNTER_HTTP_SERVER_REQUEST();
|
||||
@ -1806,6 +1794,12 @@ function connectionListener(socket) {
|
||||
|
||||
incoming.shift();
|
||||
|
||||
// if the user never called req.read(), and didn't pipe() or
|
||||
// .resume() or .on('data'), then we call req.resume() so that the
|
||||
// bytes will be pulled off the wire.
|
||||
if (!req._consuming)
|
||||
req.resume();
|
||||
|
||||
res.detachSocket(socket);
|
||||
|
||||
if (res._last) {
|
||||
|
Loading…
Reference in New Issue
Block a user