diff --git a/src/mongo/executor/network_interface_asio.h b/src/mongo/executor/network_interface_asio.h index add90aa2c0a..28955bda89e 100644 --- a/src/mongo/executor/network_interface_asio.h +++ b/src/mongo/executor/network_interface_asio.h @@ -202,6 +202,25 @@ private: void _asyncRunCommand(AsyncOp* op); + /** + * Wraps a completion handler in pre-condition checks. + * When we resume after an asynchronous call, we may find the following: + * - the AsyncOp has been canceled in the interim (via cancelCommand()) + * - the asynchronous call has returned a non-OK error code + * Should both conditions be present, we handle cancelation over errors. States use + * _validateAndRun() to perform these checks before advancing the state machine. + */ + template + void _validateAndRun(AsyncOp* op, std::error_code ec, Handler&& handler) { + if (op->canceled()) + return _completeOperation(op, + Status(ErrorCodes::CallbackCanceled, "Callback canceled")); + if (ec) + return _networkErrorCallback(op, ec); + + handler(); + } + std::unique_ptr _messageFromRequest(const RemoteCommandRequest& request, rpc::Protocol protocol); diff --git a/src/mongo/executor/network_interface_asio_command.cpp b/src/mongo/executor/network_interface_asio_command.cpp index 94d1fdbd5ca..21e39ba5b11 100644 --- a/src/mongo/executor/network_interface_asio_command.cpp +++ b/src/mongo/executor/network_interface_asio_command.cpp @@ -45,10 +45,6 @@ namespace mongo { namespace executor { -namespace { -const auto kCanceledStatus = Status(ErrorCodes::CallbackCanceled, "Callback canceled"); -} // namespace - void NetworkInterfaceASIO::_asyncRunCommand(AsyncOp* op) { LOG(3) << "running command " << op->request().cmdObj << " against database " << op->request().dbname << " across network to " << op->request().target.toString(); @@ -83,24 +79,11 @@ void NetworkInterfaceASIO::_asyncSendSimpleMessage(AsyncOp* op, const asio::cons asio::async_write(op->connection()->sock(), asio::buffer(buf), [this, op](std::error_code ec, std::size_t bytes) { - - if (op->canceled()) { - return _completeOperation(op, kCanceledStatus); - } - - if (ec) { - return _networkErrorCallback(op, ec); - } - - _receiveResponse(op); + _validateAndRun(op, ec, [this, op]() { _receiveResponse(op); }); }); } void NetworkInterfaceASIO::_beginCommunication(AsyncOp* op) { - if (op->canceled()) { - return _completeOperation(op, kCanceledStatus); - } - auto negotiatedProtocol = rpc::negotiate(op->connection()->serverProtocols(), op->connection()->clientProtocols()); @@ -181,16 +164,7 @@ void NetworkInterfaceASIO::_recvMessageHeader(AsyncOp* op) { asio::async_read(op->connection()->sock(), asio::buffer(reinterpret_cast(op->header()), sizeof(MSGHEADER::Value)), [this, op](asio::error_code ec, size_t bytes) { - - if (op->canceled()) { - return _completeOperation(op, kCanceledStatus); - } - - if (ec) { - LOG(3) << "error receiving header"; - return _networkErrorCallback(op, ec); - } - _recvMessageBody(op); + _validateAndRun(op, ec, [this, op]() { _recvMessageBody(op); }); }); } @@ -239,17 +213,7 @@ void NetworkInterfaceASIO::_recvMessageBody(AsyncOp* op) { asio::async_read(op->connection()->sock(), asio::buffer(mdView.data(), bodyLength), [this, op, mdView](asio::error_code ec, size_t bytes) { - - if (op->canceled()) { - return _completeOperation(op, kCanceledStatus); - } - - if (ec) { - LOG(3) << "error receiving message body"; - return _networkErrorCallback(op, ec); - } - - return _completedWriteCallback(op); + _validateAndRun(op, ec, [this, op]() { _completedWriteCallback(op); }); }); } diff --git a/src/mongo/executor/network_interface_asio_connect.cpp b/src/mongo/executor/network_interface_asio_connect.cpp index 1eded832ed6..9819497e24f 100644 --- a/src/mongo/executor/network_interface_asio_connect.cpp +++ b/src/mongo/executor/network_interface_asio_connect.cpp @@ -42,10 +42,6 @@ namespace executor { using asio::ip::tcp; -namespace { -const auto kCanceledStatus = Status(ErrorCodes::CallbackCanceled, "Callback canceled"); -} // namespace - NetworkInterfaceASIO::AsyncConnection::AsyncConnection(asio::ip::tcp::socket&& sock, rpc::ProtocolSet protocols) : AsyncConnection(std::move(sock), protocols, boost::none) {} @@ -92,16 +88,7 @@ void NetworkInterfaceASIO::_connectASIO(AsyncOp* op) { _resolver.async_resolve( query, [this, op](std::error_code ec, asio::ip::basic_resolver_iterator endpoints) { - if (ec) { - LOG(3) << "could not resolve address " << op->request().target.host() << ":" - << std::to_string(op->request().target.port()) << ", " << ec.message(); - return _networkErrorCallback(op, ec); - } - - if (op->canceled()) - return _completeOperation(op, kCanceledStatus); - - _setupSocket(op, endpoints); + _validateAndRun(op, ec, [this, op, endpoints]() { _setupSocket(op, endpoints); }); }); } @@ -124,14 +111,7 @@ void NetworkInterfaceASIO::_connectWithDBClientConnection(AsyncOp* op) { auto status = exceptionToStatus(); asio::post(_io_service, - [this, op, status]() { - - if (op->canceled()) { - return _completeOperation(op, kCanceledStatus); - } - - return _completeOperation(op, status); - }); + [this, op, status]() { return _completeOperation(op, status); }); return; } @@ -152,16 +132,7 @@ void NetworkInterfaceASIO::_setupSocket(AsyncOp* op, const tcp::resolver::iterat asio::async_connect(op->connection()->sock(), std::move(endpoints), [this, op](std::error_code ec, tcp::resolver::iterator iter) { - if (ec) { - LOG(3) << "could not connect to host at " << iter->host_name() - << ":" << iter->endpoint().port() << ", " << ec.message(); - return _networkErrorCallback(op, ec); - } - - if (op->canceled()) - return _completeOperation(op, kCanceledStatus); - - _sslHandshake(op); + _validateAndRun(op, ec, [this, op]() { _sslHandshake(op); }); }); }