0
0
mirror of https://github.com/nodejs/node.git synced 2024-12-01 16:10:02 +01:00

Refactor HTTP

Allow throttling from outgoing messages.
This commit is contained in:
Ryan Dahl 2010-05-26 17:59:55 -07:00
parent 0e713e7482
commit 74b7fa29a1
7 changed files with 264 additions and 149 deletions

View File

@ -1,13 +1,13 @@
var debugLevel = 0;
if ("NODE_DEBUG" in process.env) debugLevel = 1;
var sys = require('sys');
function debug (x) {
if (debugLevel > 0) {
process.binding('stdio').writeError(x + "\n");
}
var debug;
var debugLevel = parseInt(process.env.NODE_DEBUG, 16);
if (debugLevel & 0x4) {
debug = function (x) { sys.error('HTTP: ' + x); };
} else {
debug = function () { };
}
var sys = require('sys');
var net = require('net');
var Utf8Decoder = require('utf8decoder').Utf8Decoder;
var events = require('events');
@ -259,13 +259,13 @@ function OutgoingMessage (socket) {
this.output = [];
this.outputEncodings = [];
this.closeOnFinish = false;
this._last = false;
this.chunkedEncoding = false;
this.shouldKeepAlive = true;
this.useChunkedEncodingByDefault = true;
this.flushing = false;
this.headWritten = false;
this._headerFlushed = false;
this._header = null; // to be filled by _storeHeader
this._hasBody = true;
@ -274,31 +274,45 @@ function OutgoingMessage (socket) {
sys.inherits(OutgoingMessage, events.EventEmitter);
exports.OutgoingMessage = OutgoingMessage;
// This abstract either writing directly to the socket or buffering it.
// Rename to _writeRaw() ?
OutgoingMessage.prototype._send = function (data, encoding) {
var length = this.output.length;
if (this.connection._outgoing[0] === this &&
this.connection.writable &&
this.output.length === 0)
{
// Directly write to socket.
return this.connection.write(data, encoding);
} else {
// Buffer
var length = this.output.length;
if (length === 0 || typeof data != 'string') {
this.output.push(data);
encoding = encoding || "ascii";
this.outputEncodings.push(encoding);
return false;
}
var lastEncoding = this.outputEncodings[length-1];
var lastData = this.output[length-1];
if ((lastEncoding === encoding) ||
(!encoding && data.constructor === lastData.constructor)) {
this.output[length-1] = lastData + data;
return false;
}
if (length === 0 || typeof data != 'string') {
this.output.push(data);
encoding = encoding || "ascii";
this.outputEncodings.push(encoding);
return;
return false;
}
var lastEncoding = this.outputEncodings[length-1];
var lastData = this.output[length-1];
if ((lastEncoding === encoding) ||
(!encoding && data.constructor === lastData.constructor)) {
this.output[length-1] = lastData + data;
return;
}
this.output.push(data);
encoding = encoding || "ascii";
this.outputEncodings.push(encoding);
};
OutgoingMessage.prototype.sendHeaderLines = function (firstLine, headers) {
OutgoingMessage.prototype._storeHeader = function (firstLine, headers) {
var sentConnectionHeader = false;
var sentContentLengthHeader = false;
var sentTransferEncodingHeader = false;
@ -325,7 +339,7 @@ OutgoingMessage.prototype.sendHeaderLines = function (firstLine, headers) {
if (connectionExpression.test(field)) {
sentConnectionHeader = true;
if (closeExpression.test(value)) this.closeOnFinish = true;
if (closeExpression.test(value)) this._last = true;
} else if (transferEncodingExpression.test(field)) {
sentTransferEncodingHeader = true;
@ -344,7 +358,7 @@ OutgoingMessage.prototype.sendHeaderLines = function (firstLine, headers) {
(sentContentLengthHeader || this.useChunkedEncodingByDefault)) {
messageHeader += "Connection: keep-alive\r\n";
} else {
this.closeOnFinish = true;
this._last = true;
messageHeader += "Connection: close\r\n";
}
}
@ -355,7 +369,7 @@ OutgoingMessage.prototype.sendHeaderLines = function (firstLine, headers) {
messageHeader += "Transfer-Encoding: chunked\r\n";
this.chunkedEncoding = true;
} else {
this.closeOnFinish = true;
this._last = true;
}
} else {
// Make sure we don't end the 0\r\n\r\n at the end of the message.
@ -365,7 +379,7 @@ OutgoingMessage.prototype.sendHeaderLines = function (firstLine, headers) {
messageHeader += CRLF;
this._send(messageHeader);
this._header = messageHeader;
// wait until the first body chunk, or close(), is sent to flush.
};
@ -376,7 +390,7 @@ OutgoingMessage.prototype.sendBody = function () {
OutgoingMessage.prototype.write = function (chunk, encoding) {
if ( (this instanceof ServerResponse) && !this.headWritten) {
if (!this._header) {
throw new Error("writeHead() must be called before write()")
}
@ -390,28 +404,35 @@ OutgoingMessage.prototype.write = function (chunk, encoding) {
throw new TypeError("first argument must be a string, Array, or Buffer");
}
encoding = encoding || "ascii";
// write the header
if (!this._headerFlushed) {
this._send(this._header);
this._headerFlushed = true;
}
if (chunk.length === 0) return false;
var len, ret;
if (this.chunkedEncoding) {
var chunkLength = (typeof chunk == 'string' ? process._byteLength(chunk, encoding) : chunk.length);
if (chunkLength > 0) {
this._send(chunkLength.toString(16));
this._send(CRLF);
this._send(chunk, encoding);
this._send(CRLF);
if (typeof(chunk) === 'string') {
len = Buffer.byteLength(chunk, encoding);
var chunk = len.toString(16) + CRLF + chunk + CRLF;
debug('string chunk = ' + sys.inspect(chunk));
ret = this._send(chunk, encoding);
} else {
// buffer
len = chunk.length;
this._send(len.toString(16) + CRLF);
this._send(chunk);
ret = this._send(CRLF);
}
} else {
this._send(chunk, encoding);
ret = this._send(chunk, encoding);
}
if (this.flushing) {
this.flush();
} else {
this.flushing = true;
}
};
OutgoingMessage.prototype.flush = function () {
this._onFlush();
debug('write ret = ' + ret);
return ret;
};
OutgoingMessage.prototype.finish = function () {
@ -429,10 +450,31 @@ OutgoingMessage.prototype.close = function (data, encoding) {
};
OutgoingMessage.prototype.end = function (data, encoding) {
if (data) this.write(data, encoding);
if (this.chunkedEncoding) this._send("0\r\n\r\n"); // last chunk
var ret;
// maybe the header hasn't been sent. if not send it.
if (!this._headerFlushed) {
ret = this._send(this._header);
this._headerFlushed = true;
}
if (data) {
ret = this.write(data, encoding);
}
this.finished = true;
this.flush();
if (this.chunkedEncoding) {
ret = this._send("0\r\n\r\n"); // last chunk
}
// There is the first message on the outgoing queue, and we've sent
// everything to the socket.
if (this.output.length === 0 && this.connection._outgoing[0] === this) {
debug('outgoing message end. shifting because was flushed');
this.connection._onOutgoingSent();
}
return ret;
};
@ -479,8 +521,7 @@ ServerResponse.prototype.writeHead = function (statusCode) {
}
this.sendHeaderLines(statusLine, headers);
this.headWritten = true;
this._storeHeader(statusLine, headers);
};
// TODO Eventually remove
@ -510,9 +551,9 @@ function ClientRequest (socket, method, url, headers) {
} else {
this.useChunkedEncodingByDefault = true;
}
this.closeOnFinish = true;
this._last = true;
this.sendHeaderLines(method + " " + url + " HTTP/1.1\r\n", headers);
this._storeHeader(method + " " + url + " HTTP/1.1\r\n", headers);
}
sys.inherits(ClientRequest, OutgoingMessage);
exports.ClientRequest = ClientRequest;
@ -531,49 +572,82 @@ ClientRequest.prototype.close = function () {
clientRequestCloseWarning = "Warning: ClientRequest.prototype.close has been renamed to end()";
sys.error(clientRequestCloseWarning);
}
if (arguments.length > 0) {
throw new Error( "ClientRequest.prototype.end does not take any arguments. "
+ "Add a response listener manually to the request object."
if (typeof arguments[0] == "function") {
throw new Error( "ClientRequest.prototype.end does not take a callback. "
+ "Add a 'response' listener manually to the request object."
);
}
return this.end();
};
ClientRequest.prototype.end = function () {
if (arguments.length > 0) {
throw new Error( "ClientRequest.prototype.end does not take any arguments. "
+ "Add a response listener manually to the request object."
if (typeof arguments[0] == "function") {
throw new Error( "ClientRequest.prototype.end does not take a callback. "
+ "Add a 'response' listener manually to get the response."
);
}
OutgoingMessage.prototype.end.call(this);
OutgoingMessage.prototype.end.apply(this, arguments);
};
/* Returns true if the message queue is finished and the socket
* should be closed. */
function flushMessageQueue (socket, queue) {
while (queue[0]) {
var message = queue[0];
function outgoingFlush (socket) {
// This logic is probably a bit confusing. Let me explain a bit:
//
// In both HTTP servers and clients it is possible to queue up several
// outgoing messages. This is easiest to imagine in the case of a client.
// Take the following situation:
//
// req1 = client.request('GET', '/');
// req2 = client.request('POST', '/');
//
// The question is what happens when the user does
//
// req2.write("hello world\n");
//
// It's possible that the first request has not been completely flushed to
// the socket yet. Thus the outgoing messages need to be prepared to queue
// up data internally before sending it on further to the socket's queue.
//
// This function, outgoingFlush(), is called by both the Server
// implementation and the Client implementation to attempt to flush any
// pending messages out to the socket.
var message = socket._outgoing[0];
while (message.output.length > 0) {
if (!socket.writable) return true;
if (!message) return;
var data = message.output.shift();
var encoding = message.outputEncodings.shift();
var ret;
socket.write(data, encoding);
}
while (message.output.length) {
if (!socket.writable) return; // XXX Necessary?
if (!message.finished) break;
var data = message.output.shift();
var encoding = message.outputEncodings.shift();
queue.shift();
if (message.closeOnFinish) return true;
ret = socket.write(data, encoding);
}
if (message.finished) {
socket._onOutgoingSent();
} else if (ret) {
message.emit('drain');
}
return false;
}
function httpSocketSetup (socket) {
// An array of outgoing messages for the socket. In pipelined connections
// we need to keep track of the order they were sent.
socket._outgoing = [];
// NOTE: be sure not to use ondrain elsewhere in this file!
socket.ondrain = function () {
var message = socket._outgoing[0];
if (message) message.emit('drain');
};
}
function Server (requestListener) {
net.Server.call(this);
@ -598,9 +672,10 @@ exports.createServer = function (requestListener) {
function connectionListener (socket) {
var self = this;
// An array of responses for each socket. In pipelined connections
// we need to keep track of the order they were sent.
var responses = [];
debug("new http connection");
httpSocketSetup(socket);
socket.setTimeout(2*60*1000); // 2 minute timeout
socket.addListener('timeout', function () {
@ -646,13 +721,29 @@ function connectionListener (socket) {
socket.onend = function () {
parser.finish();
if (socket._outgoing.length) {
socket._outgoing[socket._outgoing.length-1]._last = true;
outgoingFlush(socket);
} else {
socket.end();
}
};
socket.addListener('close', function () {
// unref the parser for easy gc
parsers.free(parser);
});
if (responses.length == 0) {
// At the end of each response message, after it has been flushed to the
// socket. Here we insert logic about what to do next.
socket._onOutgoingSent = function (message) {
var message = socket._outgoing.shift();
if (message._last) {
// No more messages to be pushed out.
socket.end();
} else {
responses[responses.length-1].closeOnFinish = true;
} else if (socket._outgoing.length) {
// Push out the next message.
outgoingFlush(socket);
}
};
@ -661,14 +752,9 @@ function connectionListener (socket) {
// to the user.
parser.onIncoming = function (req, shouldKeepAlive) {
var res = new ServerResponse(req);
debug('server response shouldKeepAlive: ' + shouldKeepAlive);
res.shouldKeepAlive = shouldKeepAlive;
res._onFlush = function () {
if (flushMessageQueue(socket, responses)) {
socket.end();
}
};
responses.push(res);
socket._outgoing.push(res);
self.emit('request', req, res);
return false; // Not a HEAD response. (Not even a response!)
@ -678,56 +764,40 @@ function connectionListener (socket) {
function Client ( ) {
net.Stream.call(this);
var self = this;
var requests = [];
var currentRequest;
httpSocketSetup(self);
var parser;
self._initParser = function () {
function initParser () {
if (!parser) parser = parsers.alloc();
parser.reinitialize('response');
parser.socket = self;
parser.reqs = []; // list of request methods
parser.onIncoming = function (res) {
debug("incoming response!");
var isHeadResponse = currentRequest.method == "HEAD";
var req = self._outgoing[0];
// Responses to HEAD requests are AWFUL. Ask Ryan.
// A major oversight in HTTP. Hence this nastiness.
var isHeadResponse = req.method == "HEAD";
debug('isHeadResponse ' + isHeadResponse);
res.addListener('end', function ( ) {
debug("request complete disconnecting. readyState = " + self.readyState);
// For the moment we reconnect for every request. FIXME!
// All that should be required for keep-alive is to not reconnect,
// but outgoingFlush instead.
self.end();
});
currentRequest.emit("response", res);
req.emit("response", res);
return isHeadResponse;
};
};
self._reconnect = function () {
if (self.readyState != "opening") {
debug("HTTP CLIENT: reconnecting readyState = " + self.readyState);
self.connect(self.port, self.host);
}
};
self._pushRequest = function (req) {
req._onFlush = function () {
if (self.readyState == "closed") {
debug("HTTP CLIENT request flush. reconnect. readyState = " + self.readyState);
self._reconnect();
return;
}
debug("self flush readyState = " + self.readyState);
if (req == currentRequest) flushMessageQueue(self, [req]);
};
requests.push(req);
};
self.ondata = function (d, start, end) {
if (!parser) {
throw new Error("parser not initialized prior to Client.ondata call");
@ -739,29 +809,27 @@ function Client ( ) {
var bytesParsed = ret;
var upgradeHead = d.slice(start + bytesParsed, end - start);
parser.incoming.upgradeHead = upgradeHead;
currentRequest.emit("response", parser.incoming);
parser.incoming.emit('end');
var req = self._outgoing[0];
self.ondata = null;
self.onend = null
}
};
self.addListener("connect", function () {
debug('client connected');
if (this.https) {
this.setSecure(this.credentials);
} else {
self._initParser();
debug('requests: ' + sys.inspect(requests));
currentRequest = requests.shift()
currentRequest.flush();
initParser();
debug('requests: ' + sys.inspect(self._outgoing));
outgoingFlush(self);
}
});
self.addListener("secure", function () {
self._initParser();
debug('requests: ' + sys.inspect(requests));
currentRequest = requests.shift()
currentRequest.flush();
initParser();
debug('requests: ' + sys.inspect(self._outgoing));
outgoingFlush(self);
});
self.onend = function () {
@ -775,28 +843,64 @@ function Client ( ) {
debug("HTTP CLIENT onClose. readyState = " + self.readyState);
// finally done with the request
self._outgoing.shift();
// If there are more requests to handle, reconnect.
if (requests.length > 0) {
if (self._outgoing.length) {
self._reconnect();
} else if (parser) {
parsers.free(parser);
parser = null;
}
});
};
sys.inherits(Client, net.Stream);
exports.Client = Client;
exports.createClient = function (port, host, https, credentials) {
var c = new Client;
var c = new Client();
c.port = port;
c.host = host;
c.https = https;
c.credentials = credentials;
return c;
}
};
// This is called each time a request has been pushed completely to the
// socket. The message that was sent is still sitting at client._outgoing[0]
// it is our responsibility to shift it off.
//
// We have to be careful when it we shift it because once we do any writes
// to other requests will be flushed directly to the socket.
//
// At the moment we're implement a client which connects and disconnects on
// each request/response cycle so we cannot shift off the request from
// client._outgoing until we're completely disconnected after the response
// comes back.
Client.prototype._onOutgoingSent = function (message) {
// We've just finished a message. We don't end/shutdown the connection here
// because HTTP servers typically cannot handle half-closed connections
// (Node servers can).
//
// Instead, we just check if the connection is closed, and if so
// reconnect if we have pending messages.
if (this._outgoing.length && this.readyState == "closed") {
debug("HTTP client request flush. reconnect. readyState = " + this.readyState);
this._reconnect();
}
};
Client.prototype._reconnect = function () {
if (this.readyState === "closed") {
debug("HTTP CLIENT: reconnecting readyState = " + this.readyState);
this.connect(this.port, this.host);
}
};
Client.prototype.get = function () {
throw new Error("client.get(...) is now client.request('GET', ...)");
@ -819,13 +923,15 @@ Client.prototype.put = function () {
};
Client.prototype.request = function (method, url, headers) {
if (typeof(url) != "string") { // assume method was omitted, shift arguments
if (typeof(url) != "string") {
// assume method was omitted, shift arguments
headers = url;
url = method;
method = null;
method = "GET";
}
var req = new ClientRequest(this, method || "GET", url, headers);
this._pushRequest(req);
var req = new ClientRequest(this, method, url, headers);
this._outgoing.push(req);
if (this.readyState === 'closed') this._reconnect();
return req;
};

View File

@ -63,9 +63,9 @@ var events = eventsModule.exports;
// Modules
var debugLevel = parseInt(process.env["NODE_DEBUG"]);
var debugLevel = parseInt(process.env["NODE_DEBUG"], 16);
function debug (x) {
if (debugLevel > 0) {
if (debugLevel & 1) {
process.binding('stdio').writeError(x + "\n");
}
}

View File

@ -7,9 +7,9 @@ var dns = require('dns');
var kMinPoolSpace = 128;
var kPoolSize = 40*1024;
var debugLevel = process.env['NODE_DEBUG'] ? 1 : 0;
var debugLevel = parseInt(process.env.NODE_DEBUG, 16);
function debug () {
if (debugLevel > 0) sys.error.apply(this, arguments);
if (debugLevel & 0x2) sys.error.apply(this, arguments);
}
var binding = process.binding('net');

View File

@ -13,10 +13,11 @@ sys.puts('Server running at http://127.0.0.1:'+PORT+'/')
s.addListener('listening', function () {
childProcess.exec('curl http://127.0.0.1:'+PORT+'/', function (err, stdout, stderr) {
childProcess.exec('curl -i http://127.0.0.1:'+PORT+'/', function (err, stdout, stderr) {
if (err) throw err;
s.close();
sys.puts('curled response correctly');
error('curled response correctly');
error(sys.inspect(stdout));
});
});

View File

@ -30,8 +30,10 @@ var req = client.request('POST', '/');
req.write('1\n');
req.write('2\n');
req.write('3\n');
req.end();
error("client finished sending request");
puts("client finished sending request");
req.addListener('response', function(res) {
res.setEncoding("utf8");
res.addListener('data', function(chunk) {
@ -42,7 +44,6 @@ req.addListener('response', function(res) {
server.close();
});
});
req.end();
process.addListener("exit", function () {
assert.equal("1\n2\n3\n", sent_body);

View File

@ -21,18 +21,21 @@ http.createServer(function (req, res) {
}
if (req.id == 1) {
error("req 1");
assert.equal("POST", req.method);
assert.equal("/quit", url.parse(req.url).pathname);
}
if (req.id == 2) {
error("req 2");
assert.equal("foo", req.headers['x-x']);
}
if (req.id == 3) {
error("req 3");
assert.equal("bar", req.headers['x-x']);
this.close();
//puts("server closed");
error("server closed");
}
setTimeout(function () {

View File

@ -2,6 +2,10 @@ require("../common");
http = require("http");
url = require("url");
function p (x) {
error(inspect(x));
}
var responses_sent = 0;
var responses_recvd = 0;
var body0 = "";
@ -38,14 +42,14 @@ http.createServer(function (req, res) {
var client = http.createClient(PORT);
var req = client.request("/hello", {"Accept": "*/*", "Foo": "bar"});
req.end();
req.addListener('response', function (res) {
assert.equal(200, res.statusCode);
responses_recvd += 1;
res.setBodyEncoding("ascii");
res.setEncoding("utf8");
res.addListener('data', function (chunk) { body0 += chunk; });
debug("Got /hello response");
});
req.end();
setTimeout(function () {
req = client.request("POST", "/world");