0
0
mirror of https://github.com/mongodb/mongo.git synced 2024-11-30 09:06:21 +01:00

SERVER-19242 wrap state machine lambdas in pre-condition checks

This commit is contained in:
samantharitter 2015-07-09 17:13:49 -04:00
parent b1f41b4869
commit 41c97d945b
3 changed files with 25 additions and 71 deletions

View File

@ -202,6 +202,25 @@ private:
void _asyncRunCommand(AsyncOp* op);
/**
* Wraps a completion handler in pre-condition checks.
* When we resume after an asynchronous call, we may find the following:
* - the AsyncOp has been canceled in the interim (via cancelCommand())
* - the asynchronous call has returned a non-OK error code
* Should both conditions be present, we handle cancelation over errors. States use
* _validateAndRun() to perform these checks before advancing the state machine.
*/
template <typename Handler>
void _validateAndRun(AsyncOp* op, std::error_code ec, Handler&& handler) {
if (op->canceled())
return _completeOperation(op,
Status(ErrorCodes::CallbackCanceled, "Callback canceled"));
if (ec)
return _networkErrorCallback(op, ec);
handler();
}
std::unique_ptr<Message> _messageFromRequest(const RemoteCommandRequest& request,
rpc::Protocol protocol);

View File

@ -45,10 +45,6 @@
namespace mongo {
namespace executor {
namespace {
const auto kCanceledStatus = Status(ErrorCodes::CallbackCanceled, "Callback canceled");
} // namespace
void NetworkInterfaceASIO::_asyncRunCommand(AsyncOp* op) {
LOG(3) << "running command " << op->request().cmdObj << " against database "
<< op->request().dbname << " across network to " << op->request().target.toString();
@ -83,24 +79,11 @@ void NetworkInterfaceASIO::_asyncSendSimpleMessage(AsyncOp* op, const asio::cons
asio::async_write(op->connection()->sock(),
asio::buffer(buf),
[this, op](std::error_code ec, std::size_t bytes) {
if (op->canceled()) {
return _completeOperation(op, kCanceledStatus);
}
if (ec) {
return _networkErrorCallback(op, ec);
}
_receiveResponse(op);
_validateAndRun(op, ec, [this, op]() { _receiveResponse(op); });
});
}
void NetworkInterfaceASIO::_beginCommunication(AsyncOp* op) {
if (op->canceled()) {
return _completeOperation(op, kCanceledStatus);
}
auto negotiatedProtocol =
rpc::negotiate(op->connection()->serverProtocols(), op->connection()->clientProtocols());
@ -181,16 +164,7 @@ void NetworkInterfaceASIO::_recvMessageHeader(AsyncOp* op) {
asio::async_read(op->connection()->sock(),
asio::buffer(reinterpret_cast<char*>(op->header()), sizeof(MSGHEADER::Value)),
[this, op](asio::error_code ec, size_t bytes) {
if (op->canceled()) {
return _completeOperation(op, kCanceledStatus);
}
if (ec) {
LOG(3) << "error receiving header";
return _networkErrorCallback(op, ec);
}
_recvMessageBody(op);
_validateAndRun(op, ec, [this, op]() { _recvMessageBody(op); });
});
}
@ -239,17 +213,7 @@ void NetworkInterfaceASIO::_recvMessageBody(AsyncOp* op) {
asio::async_read(op->connection()->sock(),
asio::buffer(mdView.data(), bodyLength),
[this, op, mdView](asio::error_code ec, size_t bytes) {
if (op->canceled()) {
return _completeOperation(op, kCanceledStatus);
}
if (ec) {
LOG(3) << "error receiving message body";
return _networkErrorCallback(op, ec);
}
return _completedWriteCallback(op);
_validateAndRun(op, ec, [this, op]() { _completedWriteCallback(op); });
});
}

View File

@ -42,10 +42,6 @@ namespace executor {
using asio::ip::tcp;
namespace {
const auto kCanceledStatus = Status(ErrorCodes::CallbackCanceled, "Callback canceled");
} // namespace
NetworkInterfaceASIO::AsyncConnection::AsyncConnection(asio::ip::tcp::socket&& sock,
rpc::ProtocolSet protocols)
: AsyncConnection(std::move(sock), protocols, boost::none) {}
@ -92,16 +88,7 @@ void NetworkInterfaceASIO::_connectASIO(AsyncOp* op) {
_resolver.async_resolve(
query,
[this, op](std::error_code ec, asio::ip::basic_resolver_iterator<tcp> endpoints) {
if (ec) {
LOG(3) << "could not resolve address " << op->request().target.host() << ":"
<< std::to_string(op->request().target.port()) << ", " << ec.message();
return _networkErrorCallback(op, ec);
}
if (op->canceled())
return _completeOperation(op, kCanceledStatus);
_setupSocket(op, endpoints);
_validateAndRun(op, ec, [this, op, endpoints]() { _setupSocket(op, endpoints); });
});
}
@ -124,14 +111,7 @@ void NetworkInterfaceASIO::_connectWithDBClientConnection(AsyncOp* op) {
auto status = exceptionToStatus();
asio::post(_io_service,
[this, op, status]() {
if (op->canceled()) {
return _completeOperation(op, kCanceledStatus);
}
return _completeOperation(op, status);
});
[this, op, status]() { return _completeOperation(op, status); });
return;
}
@ -152,16 +132,7 @@ void NetworkInterfaceASIO::_setupSocket(AsyncOp* op, const tcp::resolver::iterat
asio::async_connect(op->connection()->sock(),
std::move(endpoints),
[this, op](std::error_code ec, tcp::resolver::iterator iter) {
if (ec) {
LOG(3) << "could not connect to host at " << iter->host_name()
<< ":" << iter->endpoint().port() << ", " << ec.message();
return _networkErrorCallback(op, ec);
}
if (op->canceled())
return _completeOperation(op, kCanceledStatus);
_sslHandshake(op);
_validateAndRun(op, ec, [this, op]() { _sslHandshake(op); });
});
}