mirror of
https://github.com/mongodb/mongo.git
synced 2024-11-28 07:59:02 +01:00
SERVER-17023 separate command and query paths
This commit is contained in:
parent
93254f6576
commit
a33b1a8688
@ -47,6 +47,7 @@ namespace mongo {
|
||||
class BSONObj;
|
||||
class BSONObjBuilder;
|
||||
class Client;
|
||||
class CurOp;
|
||||
class Database;
|
||||
class OperationContext;
|
||||
class Timer;
|
||||
@ -347,10 +348,18 @@ namespace mutablebson {
|
||||
|
||||
bool _runCommands(OperationContext* txn,
|
||||
const char* ns,
|
||||
BSONObj& jsobj,
|
||||
BSONObj& _cmdobj,
|
||||
BufBuilder& b,
|
||||
BSONObjBuilder& anObjBuilder,
|
||||
bool fromRepl,
|
||||
int queryOptions);
|
||||
bool fromRepl, int queryOptions);
|
||||
|
||||
bool runCommands(OperationContext* txn,
|
||||
const char* ns,
|
||||
BSONObj& jsobj,
|
||||
CurOp& curop,
|
||||
BufBuilder& b,
|
||||
BSONObjBuilder& anObjBuilder,
|
||||
bool fromRepl,
|
||||
int queryOptions);
|
||||
|
||||
} // namespace mongo
|
||||
|
@ -40,6 +40,7 @@
|
||||
#include "mongo/db/curop.h"
|
||||
#include "mongo/db/commands/fsync.h"
|
||||
#include "mongo/db/dbmessage.h"
|
||||
#include "mongo/db/namespace_string.h"
|
||||
#include "mongo/db/operation_context.h"
|
||||
#include "mongo/db/matcher/matcher.h"
|
||||
#include "mongo/util/log.h"
|
||||
@ -48,7 +49,10 @@ namespace mongo {
|
||||
|
||||
using std::stringstream;
|
||||
|
||||
void inProgCmd(OperationContext* txn, Message &message, DbResponse &dbresponse) {
|
||||
void inProgCmd(OperationContext* txn,
|
||||
const NamespaceString& nss,
|
||||
Message &message,
|
||||
DbResponse &dbresponse) {
|
||||
DbMessage d(message);
|
||||
QueryMessage q(d);
|
||||
|
||||
@ -86,7 +90,6 @@ namespace mongo {
|
||||
filter = b.obj();
|
||||
}
|
||||
|
||||
const NamespaceString nss(d.getns());
|
||||
const WhereCallbackReal whereCallback(txn, nss.db());
|
||||
const Matcher matcher(filter, whereCallback);
|
||||
|
||||
|
@ -35,13 +35,17 @@ namespace mongo {
|
||||
class BSONObjBuilder;
|
||||
struct DbResponse;
|
||||
class Message;
|
||||
class NamespaceString;
|
||||
class OperationContext;
|
||||
|
||||
/**
|
||||
* Executes the db.currentOp() command. Currently not an actual "command" object, but should
|
||||
* be converted to one at some point.
|
||||
*/
|
||||
void inProgCmd(OperationContext* txn, Message &m, DbResponse &dbresponse);
|
||||
void inProgCmd(OperationContext* txn,
|
||||
const NamespaceString& nss,
|
||||
Message &m,
|
||||
DbResponse &dbresponse);
|
||||
|
||||
/**
|
||||
* Constructs a human-readable BSON from the specified LockerInfo structure.
|
||||
|
@ -1555,7 +1555,6 @@ namespace mongo {
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
/* TODO make these all command objects -- legacy stuff here
|
||||
|
||||
usage:
|
||||
@ -1632,4 +1631,29 @@ namespace mongo {
|
||||
return true;
|
||||
}
|
||||
|
||||
bool runCommands(OperationContext* txn,
|
||||
const char* ns,
|
||||
BSONObj& jsobj,
|
||||
CurOp& curop,
|
||||
BufBuilder& b,
|
||||
BSONObjBuilder& anObjBuilder,
|
||||
bool fromRepl,
|
||||
int queryOptions) {
|
||||
try {
|
||||
return _runCommands(txn, ns, jsobj, b, anObjBuilder, fromRepl, queryOptions);
|
||||
}
|
||||
catch (const SendStaleConfigException&){
|
||||
throw;
|
||||
}
|
||||
catch (const AssertionException& e) {
|
||||
verify( e.getCode() != SendStaleConfigCode && e.getCode() != RecvStaleConfigCode );
|
||||
|
||||
Command::appendCommandStatus(anObjBuilder, e.toStatus());
|
||||
curop.debug().exceptionInfo = e.getInfo();
|
||||
}
|
||||
BSONObj x = anObjBuilder.done();
|
||||
b.appendBuf(x.objdata(), x.objsize());
|
||||
return true;
|
||||
}
|
||||
|
||||
} // namespace mongo
|
||||
|
@ -35,6 +35,7 @@
|
||||
#include <boost/scoped_ptr.hpp>
|
||||
#include <boost/thread/thread.hpp>
|
||||
#include <fstream>
|
||||
#include <memory>
|
||||
|
||||
#include "mongo/base/status.h"
|
||||
#include "mongo/db/audit.h"
|
||||
@ -121,9 +122,22 @@ namespace mongo {
|
||||
}
|
||||
|
||||
void receivedKillCursors(OperationContext* txn, Message& m);
|
||||
void receivedUpdate(OperationContext* txn, Message& m, CurOp& op);
|
||||
void receivedDelete(OperationContext* txn, Message& m, CurOp& op);
|
||||
void receivedInsert(OperationContext* txn, Message& m, CurOp& op);
|
||||
|
||||
void receivedUpdate(OperationContext* txn,
|
||||
const NamespaceString& nsString,
|
||||
Message& m,
|
||||
CurOp& op);
|
||||
|
||||
void receivedDelete(OperationContext* txn,
|
||||
const NamespaceString& nsString,
|
||||
Message& m,
|
||||
CurOp& op);
|
||||
|
||||
void receivedInsert(OperationContext* txn,
|
||||
const NamespaceString& nsString,
|
||||
Message& m,
|
||||
CurOp& op);
|
||||
|
||||
bool receivedGetMore(OperationContext* txn,
|
||||
DbResponse& dbresponse,
|
||||
Message& m,
|
||||
@ -192,11 +206,131 @@ namespace mongo {
|
||||
replyToQuery(0, m, dbresponse, obj);
|
||||
}
|
||||
|
||||
static bool receivedQuery(OperationContext* txn,
|
||||
namespace {
|
||||
|
||||
void generateErrorResponse(const AssertionException* exception,
|
||||
const QueryMessage& queryMessage,
|
||||
CurOp* curop,
|
||||
Message* response) {
|
||||
curop->debug().exceptionInfo = exception->getInfo();
|
||||
|
||||
log(LogComponent::kQuery) << "assertion " << exception->toString()
|
||||
<< " ns:" << queryMessage.ns << " query:"
|
||||
<< (queryMessage.query.valid() ? queryMessage.query.toString()
|
||||
: "query object is corrupt");
|
||||
if (queryMessage.ntoskip || queryMessage.ntoreturn) {
|
||||
log(LogComponent::kQuery) << " ntoskip:" << queryMessage.ntoskip
|
||||
<< " ntoreturn:" << queryMessage.ntoreturn;
|
||||
}
|
||||
|
||||
const SendStaleConfigException* scex = (exception->getCode() == SendStaleConfigCode)
|
||||
? static_cast<const SendStaleConfigException*>(exception)
|
||||
: NULL;
|
||||
|
||||
BSONObjBuilder err;
|
||||
exception->getInfo().append(err);
|
||||
if (scex) {
|
||||
err.append("ns", scex->getns());
|
||||
scex->getVersionReceived().addToBSON(err, "vReceived");
|
||||
scex->getVersionWanted().addToBSON(err, "vWanted");
|
||||
}
|
||||
BSONObj errObj = err.done();
|
||||
|
||||
if (scex) {
|
||||
log(LogComponent::kQuery) << "stale version detected during query over "
|
||||
<< queryMessage.ns << " : " << errObj;
|
||||
}
|
||||
|
||||
BufBuilder bb;
|
||||
bb.skip(sizeof(QueryResult::Value));
|
||||
bb.appendBuf((void*) errObj.objdata(), errObj.objsize());
|
||||
|
||||
// TODO: call replyToQuery() from here instead of this!!! see dbmessage.h
|
||||
QueryResult::View msgdata = bb.buf();
|
||||
bb.decouple();
|
||||
QueryResult::View qr = msgdata;
|
||||
qr.setResultFlags(ResultFlag_ErrSet);
|
||||
if (scex) qr.setResultFlags(qr.getResultFlags() | ResultFlag_ShardConfigStale);
|
||||
qr.msgdata().setLen(bb.len());
|
||||
qr.msgdata().setOperation(opReply);
|
||||
qr.setCursorId(0);
|
||||
qr.setStartingFrom(0);
|
||||
qr.setNReturned(1);
|
||||
response->setData(msgdata.view2ptr(), true);
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
static void receivedCommand(OperationContext* txn,
|
||||
const NamespaceString& nss,
|
||||
Client& client,
|
||||
DbResponse& dbResponse,
|
||||
Message& message) {
|
||||
invariant(nss.isCommand());
|
||||
uassert(28617, str::stream() << "Invalid ns [" << nss.ns() << "]", nss.isValid());
|
||||
|
||||
MSGID responseTo = message.header().getId();
|
||||
DbMessage dbMessage(message);
|
||||
QueryMessage queryMessage(dbMessage);
|
||||
|
||||
CurOp* op = client.curop();
|
||||
|
||||
std::unique_ptr<Message> response(new Message());
|
||||
|
||||
try {
|
||||
// Auth checking for Commands happens later.
|
||||
int nToReturn = queryMessage.ntoreturn;
|
||||
beginQueryOp(nss, queryMessage.query, nToReturn, queryMessage.ntoskip, op);
|
||||
op->markCommand();
|
||||
|
||||
uassert(16979, str::stream() << "bad numberToReturn (" << nToReturn
|
||||
<< ") for $cmd type ns - can only be 1 or -1",
|
||||
nToReturn == 1 || nToReturn == -1);
|
||||
|
||||
BufBuilder bb;
|
||||
bb.skip(sizeof(QueryResult::Value));
|
||||
|
||||
BSONObjBuilder cmdResBuf;
|
||||
if (!runCommands(txn, queryMessage.ns, queryMessage.query, *op, bb, cmdResBuf, false,
|
||||
queryMessage.queryOptions)) {
|
||||
uasserted(13530, "bad or malformed command request?");
|
||||
}
|
||||
|
||||
op->debug().iscommand = true;
|
||||
// TODO: Does this get overwritten/do we really need to set this twice?
|
||||
op->debug().query = queryMessage.query;
|
||||
|
||||
QueryResult::View qr = bb.buf();
|
||||
bb.decouple();
|
||||
qr.setResultFlagsToOk();
|
||||
qr.msgdata().setLen(bb.len());
|
||||
op->debug().responseLength = bb.len();
|
||||
qr.msgdata().setOperation(opReply);
|
||||
qr.setCursorId(0);
|
||||
qr.setStartingFrom(0);
|
||||
qr.setNReturned(1);
|
||||
response->setData(qr.view2ptr(), true);
|
||||
|
||||
invariant(!response->empty());
|
||||
}
|
||||
catch (const AssertionException& exception) {
|
||||
response.reset(new Message());
|
||||
generateErrorResponse(&exception, queryMessage, op, response.get());
|
||||
}
|
||||
|
||||
op->debug().responseLength = response->header().dataLen();
|
||||
|
||||
dbResponse.response = response.release();
|
||||
dbResponse.responseTo = responseTo;
|
||||
}
|
||||
|
||||
static void receivedQuery(OperationContext* txn,
|
||||
const NamespaceString& nss,
|
||||
Client& c,
|
||||
DbResponse& dbresponse,
|
||||
DbResponse& dbResponse,
|
||||
Message& m) {
|
||||
bool ok = true;
|
||||
invariant(!nss.isCommand());
|
||||
|
||||
MSGID responseTo = m.header().getId();
|
||||
|
||||
DbMessage d(m);
|
||||
@ -205,82 +339,24 @@ namespace mongo {
|
||||
|
||||
CurOp& op = *(c.curop());
|
||||
|
||||
scoped_ptr<AssertionException> ex;
|
||||
|
||||
try {
|
||||
NamespaceString ns(d.getns());
|
||||
if (!ns.isCommand()) {
|
||||
// Auth checking for Commands happens later.
|
||||
Client* client = txn->getClient();
|
||||
Status status = client->getAuthorizationSession()->checkAuthForQuery(ns, q.query);
|
||||
audit::logQueryAuthzCheck(client, ns, q.query, status.code());
|
||||
uassertStatusOK(status);
|
||||
}
|
||||
dbresponse.exhaustNS = runQuery(txn, m, q, ns, op, *resp);
|
||||
Client* client = txn->getClient();
|
||||
Status status = client->getAuthorizationSession()->checkAuthForQuery(nss, q.query);
|
||||
audit::logQueryAuthzCheck(client, nss, q.query, status.code());
|
||||
uassertStatusOK(status);
|
||||
|
||||
dbResponse.exhaustNS = runQuery(txn, q, nss, op, *resp);
|
||||
verify( !resp->empty() );
|
||||
}
|
||||
catch ( SendStaleConfigException& e ){
|
||||
ex.reset( new SendStaleConfigException( e.getns(), e.getInfo().msg, e.getVersionReceived(), e.getVersionWanted() ) );
|
||||
ok = false;
|
||||
}
|
||||
catch ( AssertionException& e ) {
|
||||
ex.reset( new AssertionException( e.getInfo().msg, e.getCode() ) );
|
||||
ok = false;
|
||||
}
|
||||
|
||||
if( ex ){
|
||||
|
||||
op.debug().exceptionInfo = ex->getInfo();
|
||||
log(LogComponent::kQuery) <<
|
||||
"assertion " << ex->toString() << " ns:" << q.ns << " query:" <<
|
||||
(q.query.valid() ? q.query.toString() : "query object is corrupt") << endl;
|
||||
if( q.ntoskip || q.ntoreturn )
|
||||
log(LogComponent::kQuery) <<
|
||||
" ntoskip:" << q.ntoskip << " ntoreturn:" << q.ntoreturn << endl;
|
||||
|
||||
SendStaleConfigException* scex = NULL;
|
||||
if ( ex->getCode() == SendStaleConfigCode ) scex = static_cast<SendStaleConfigException*>( ex.get() );
|
||||
|
||||
BSONObjBuilder err;
|
||||
ex->getInfo().append( err );
|
||||
if( scex ){
|
||||
err.append( "ns", scex->getns() );
|
||||
scex->getVersionReceived().addToBSON( err, "vReceived" );
|
||||
scex->getVersionWanted().addToBSON( err, "vWanted" );
|
||||
}
|
||||
BSONObj errObj = err.done();
|
||||
|
||||
if( scex ){
|
||||
log(LogComponent::kQuery) << "stale version detected during query over "
|
||||
<< q.ns << " : " << errObj << endl;
|
||||
}
|
||||
|
||||
BufBuilder b;
|
||||
b.skip(sizeof(QueryResult::Value));
|
||||
b.appendBuf((void*) errObj.objdata(), errObj.objsize());
|
||||
|
||||
// todo: call replyToQuery() from here instead of this!!! see dbmessage.h
|
||||
QueryResult::View msgdata = b.buf();
|
||||
b.decouple();
|
||||
QueryResult::View qr = msgdata;
|
||||
qr.setResultFlags(ResultFlag_ErrSet);
|
||||
if( scex ) qr.setResultFlags(qr.getResultFlags() | ResultFlag_ShardConfigStale);
|
||||
qr.msgdata().setLen(b.len());
|
||||
qr.msgdata().setOperation(opReply);
|
||||
qr.setCursorId(0);
|
||||
qr.setStartingFrom(0);
|
||||
qr.setNReturned(1);
|
||||
resp.reset( new Message() );
|
||||
resp->setData( msgdata.view2ptr(), true );
|
||||
|
||||
catch (const AssertionException& exception) {
|
||||
resp.reset(new Message());
|
||||
generateErrorResponse(&exception, q, &op, resp.get());
|
||||
}
|
||||
|
||||
op.debug().responseLength = resp->header().dataLen();
|
||||
|
||||
dbresponse.response = resp.release();
|
||||
dbresponse.responseTo = responseTo;
|
||||
|
||||
return ok;
|
||||
dbResponse.response = resp.release();
|
||||
dbResponse.responseTo = responseTo;
|
||||
}
|
||||
|
||||
// Mongod on win32 defines a value for this function. In all other executables it is NULL.
|
||||
@ -312,25 +388,28 @@ namespace mongo {
|
||||
invariant(!txn->lockState()->isLocked());
|
||||
}
|
||||
|
||||
if ( op == dbQuery ) {
|
||||
const char *ns = dbmsg.getns();
|
||||
const char* ns = dbmsg.messageShouldHaveNs() ? dbmsg.getns() : NULL;
|
||||
const NamespaceString nsString = ns ? NamespaceString(ns) : NamespaceString();
|
||||
|
||||
if (strstr(ns, ".$cmd")) {
|
||||
if ( op == dbQuery ) {
|
||||
if (nsString.isCommand()) {
|
||||
isCommand = true;
|
||||
opwrite(m);
|
||||
if( strstr(ns, ".$cmd.sys.") ) {
|
||||
if( strstr(ns, "$cmd.sys.inprog") ) {
|
||||
inProgCmd(txn, m, dbresponse);
|
||||
return;
|
||||
}
|
||||
if( strstr(ns, "$cmd.sys.killop") ) {
|
||||
killOp(txn, m, dbresponse);
|
||||
return;
|
||||
}
|
||||
if( strstr(ns, "$cmd.sys.unlock") ) {
|
||||
unlockFsync(txn, ns, m, dbresponse);
|
||||
return;
|
||||
}
|
||||
}
|
||||
else if (nsString.isSpecialCommand()) {
|
||||
opwrite(m);
|
||||
|
||||
if (nsString.coll() == "$cmd.sys.inprog") {
|
||||
inProgCmd(txn, nsString, m, dbresponse);
|
||||
return;
|
||||
}
|
||||
if (nsString.coll() == "$cmd.sys.killop") {
|
||||
killOp(txn, m, dbresponse);
|
||||
return;
|
||||
}
|
||||
if (nsString.coll() == "$cmd.sys.unlock") {
|
||||
unlockFsync(txn, ns, m, dbresponse);
|
||||
return;
|
||||
}
|
||||
}
|
||||
else {
|
||||
@ -398,7 +477,12 @@ namespace mongo {
|
||||
logger::LogSeverity::Debug(1));
|
||||
|
||||
if ( op == dbQuery ) {
|
||||
receivedQuery(txn, c , dbresponse, m);
|
||||
if (isCommand) {
|
||||
receivedCommand(txn, nsString, c, dbresponse, m);
|
||||
}
|
||||
else {
|
||||
receivedQuery(txn, nsString, c, dbresponse, m);
|
||||
}
|
||||
}
|
||||
else if ( op == dbGetMore ) {
|
||||
if ( ! receivedGetMore(txn, dbresponse, m, currentOp) )
|
||||
@ -438,14 +522,11 @@ namespace mongo {
|
||||
shouldLog = true;
|
||||
}
|
||||
else {
|
||||
const char* ns = dbmsg.getns();
|
||||
const NamespaceString nsString(ns);
|
||||
|
||||
if (remote != DBDirectClient::dummyHost) {
|
||||
const ShardedConnectionInfo* connInfo = ShardedConnectionInfo::get(false);
|
||||
uassert(18663,
|
||||
str::stream() << "legacy writeOps not longer supported for "
|
||||
<< "versioned connections, ns: " << string(ns)
|
||||
<< "versioned connections, ns: " << nsString.ns()
|
||||
<< ", op: " << opToString(op)
|
||||
<< ", remote: " << remote.toString(),
|
||||
connInfo == NULL);
|
||||
@ -455,13 +536,13 @@ namespace mongo {
|
||||
uassert(16257, str::stream() << "Invalid ns [" << ns << "]", false);
|
||||
}
|
||||
else if (op == dbInsert) {
|
||||
receivedInsert(txn, m, currentOp);
|
||||
receivedInsert(txn, nsString, m, currentOp);
|
||||
}
|
||||
else if (op == dbUpdate) {
|
||||
receivedUpdate(txn, m, currentOp);
|
||||
receivedUpdate(txn, nsString, m, currentOp);
|
||||
}
|
||||
else if (op == dbDelete) {
|
||||
receivedDelete(txn, m, currentOp);
|
||||
receivedDelete(txn, nsString, m, currentOp);
|
||||
}
|
||||
else {
|
||||
invariant(false);
|
||||
@ -539,11 +620,13 @@ namespace mongo {
|
||||
|
||||
}
|
||||
|
||||
void receivedUpdate(OperationContext* txn, Message& m, CurOp& op) {
|
||||
void receivedUpdate(OperationContext* txn,
|
||||
const NamespaceString& nsString,
|
||||
Message& m,
|
||||
CurOp& op) {
|
||||
DbMessage d(m);
|
||||
NamespaceString ns(d.getns());
|
||||
uassertStatusOK( userAllowedWriteNS( ns ) );
|
||||
op.debug().ns = ns.ns();
|
||||
uassertStatusOK(userAllowedWriteNS(nsString));
|
||||
op.debug().ns = nsString.ns();
|
||||
int flags = d.pullInt();
|
||||
BSONObj query = d.nextJsObj();
|
||||
|
||||
@ -557,24 +640,24 @@ namespace mongo {
|
||||
bool multi = flags & UpdateOption_Multi;
|
||||
bool broadcast = flags & UpdateOption_Broadcast;
|
||||
|
||||
Status status = txn->getClient()->getAuthorizationSession()->checkAuthForUpdate(ns,
|
||||
Status status = txn->getClient()->getAuthorizationSession()->checkAuthForUpdate(nsString,
|
||||
query,
|
||||
toupdate,
|
||||
upsert);
|
||||
audit::logUpdateAuthzCheck(txn->getClient(), ns, query, toupdate, upsert, multi, status.code());
|
||||
audit::logUpdateAuthzCheck(txn->getClient(), nsString, query, toupdate, upsert, multi,
|
||||
status.code());
|
||||
uassertStatusOK(status);
|
||||
|
||||
op.debug().query = query;
|
||||
op.setQuery(query);
|
||||
|
||||
UpdateRequest request(ns);
|
||||
|
||||
UpdateRequest request(nsString);
|
||||
request.setUpsert(upsert);
|
||||
request.setMulti(multi);
|
||||
request.setQuery(query);
|
||||
request.setUpdates(toupdate);
|
||||
request.setUpdateOpLog(); // TODO: This is wasteful if repl is not active.
|
||||
UpdateLifecycleImpl updateLifecycle(broadcast, ns);
|
||||
UpdateLifecycleImpl updateLifecycle(broadcast, nsString);
|
||||
request.setLifecycle(&updateLifecycle);
|
||||
|
||||
request.setYieldPolicy(PlanExecutor::YIELD_AUTO);
|
||||
@ -587,21 +670,21 @@ namespace mongo {
|
||||
|
||||
// Tentatively take an intent lock, fix up if we need to create the collection
|
||||
ScopedTransaction transaction(txn, MODE_IX);
|
||||
Lock::DBLock dbLock(txn->lockState(), ns.db(), MODE_IX);
|
||||
if (dbHolder().get(txn, ns.db()) == NULL) {
|
||||
Lock::DBLock dbLock(txn->lockState(), nsString.db(), MODE_IX);
|
||||
if (dbHolder().get(txn, nsString.db()) == NULL) {
|
||||
// If DB doesn't exist, don't implicitly create it in Client::Context
|
||||
break;
|
||||
}
|
||||
Lock::CollectionLock collLock(txn->lockState(),
|
||||
ns.ns(),
|
||||
nsString.ns(),
|
||||
parsedUpdate.isIsolated() ? MODE_X : MODE_IX);
|
||||
Client::Context ctx(txn, ns);
|
||||
Client::Context ctx(txn, nsString);
|
||||
|
||||
// The common case: no implicit collection creation
|
||||
if (!upsert || ctx.db()->getCollection(ns) != NULL) {
|
||||
if (!upsert || ctx.db()->getCollection(nsString) != NULL) {
|
||||
PlanExecutor* rawExec;
|
||||
uassertStatusOK(getExecutorUpdate(txn,
|
||||
ctx.db()->getCollection(ns),
|
||||
ctx.db()->getCollection(nsString),
|
||||
&parsedUpdate,
|
||||
&op.debug(),
|
||||
&rawExec));
|
||||
@ -623,7 +706,7 @@ namespace mongo {
|
||||
log(LogComponent::kWrite) << "Had WriteConflict during multi update, aborting";
|
||||
throw;
|
||||
}
|
||||
WriteConflictException::logAndBackoff( attempt++, "update", ns.toString() );
|
||||
WriteConflictException::logAndBackoff( attempt++, "update", nsString.toString() );
|
||||
}
|
||||
}
|
||||
|
||||
@ -634,29 +717,28 @@ namespace mongo {
|
||||
uassertStatusOK(parsedUpdate.parseRequest());
|
||||
|
||||
ScopedTransaction transaction(txn, MODE_IX);
|
||||
Lock::DBLock dbLock(txn->lockState(), ns.db(), MODE_X);
|
||||
Client::Context ctx(txn, ns);
|
||||
Lock::DBLock dbLock(txn->lockState(), nsString.db(), MODE_X);
|
||||
Client::Context ctx(txn, nsString);
|
||||
uassert(ErrorCodes::NotMaster,
|
||||
str::stream() << "Not primary while performing update on " << ns.ns(),
|
||||
repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase(ns.db()));
|
||||
str::stream() << "Not primary while performing update on " << nsString.ns(),
|
||||
repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase(
|
||||
nsString.db()));
|
||||
|
||||
Database* db = ctx.db();
|
||||
if ( db->getCollection( ns ) ) {
|
||||
if (db->getCollection(nsString)) {
|
||||
// someone else beat us to it, that's ok
|
||||
// we might race while we unlock if someone drops
|
||||
// but that's ok, we'll just do nothing and error out
|
||||
}
|
||||
else {
|
||||
WriteUnitOfWork wuow(txn);
|
||||
uassertStatusOK( userCreateNS( txn, db,
|
||||
ns.ns(), BSONObj(),
|
||||
true ) );
|
||||
uassertStatusOK(userCreateNS(txn, db, nsString.ns(), BSONObj(), true));
|
||||
wuow.commit();
|
||||
}
|
||||
|
||||
PlanExecutor* rawExec;
|
||||
uassertStatusOK(getExecutorUpdate(txn,
|
||||
ctx.db()->getCollection(ns),
|
||||
ctx.db()->getCollection(nsString),
|
||||
&parsedUpdate,
|
||||
&op.debug(),
|
||||
&rawExec));
|
||||
@ -667,28 +749,31 @@ namespace mongo {
|
||||
UpdateResult res = UpdateStage::makeUpdateResult(exec.get(), &op.debug());
|
||||
|
||||
lastError.getSafe()->recordUpdate( res.existing , res.numMatched , res.upserted );
|
||||
} MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "update", ns.ns());
|
||||
} MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "update", nsString.ns());
|
||||
}
|
||||
|
||||
void receivedDelete(OperationContext* txn, Message& m, CurOp& op) {
|
||||
void receivedDelete(OperationContext* txn,
|
||||
const NamespaceString& nsString,
|
||||
Message& m,
|
||||
CurOp& op) {
|
||||
DbMessage d(m);
|
||||
NamespaceString ns(d.getns());
|
||||
uassertStatusOK( userAllowedWriteNS( ns ) );
|
||||
uassertStatusOK(userAllowedWriteNS(nsString));
|
||||
|
||||
op.debug().ns = ns.ns();
|
||||
op.debug().ns = nsString.ns();
|
||||
int flags = d.pullInt();
|
||||
bool justOne = flags & RemoveOption_JustOne;
|
||||
verify( d.moreJSObjs() );
|
||||
BSONObj pattern = d.nextJsObj();
|
||||
|
||||
Status status = txn->getClient()->getAuthorizationSession()->checkAuthForDelete(ns, pattern);
|
||||
audit::logDeleteAuthzCheck(txn->getClient(), ns, pattern, status.code());
|
||||
Status status = txn->getClient()->getAuthorizationSession()->checkAuthForDelete(nsString,
|
||||
pattern);
|
||||
audit::logDeleteAuthzCheck(txn->getClient(), nsString, pattern, status.code());
|
||||
uassertStatusOK(status);
|
||||
|
||||
op.debug().query = pattern;
|
||||
op.setQuery(pattern);
|
||||
|
||||
DeleteRequest request(ns);
|
||||
DeleteRequest request(nsString);
|
||||
request.setQuery(pattern);
|
||||
request.setMulti(!justOne);
|
||||
request.setUpdateOpLog(true);
|
||||
@ -702,19 +787,19 @@ namespace mongo {
|
||||
uassertStatusOK(parsedDelete.parseRequest());
|
||||
|
||||
ScopedTransaction scopedXact(txn, MODE_IX);
|
||||
AutoGetDb autoDb(txn, ns.db(), MODE_IX);
|
||||
AutoGetDb autoDb(txn, nsString.db(), MODE_IX);
|
||||
if (!autoDb.getDb()) {
|
||||
break;
|
||||
}
|
||||
|
||||
Lock::CollectionLock collLock(txn->lockState(),
|
||||
ns.ns(),
|
||||
nsString.ns(),
|
||||
parsedDelete.isIsolated() ? MODE_X : MODE_IX);
|
||||
Client::Context ctx(txn, ns);
|
||||
Client::Context ctx(txn, nsString);
|
||||
|
||||
PlanExecutor* rawExec;
|
||||
uassertStatusOK(getExecutorDelete(txn,
|
||||
ctx.db()->getCollection(ns),
|
||||
ctx.db()->getCollection(nsString),
|
||||
&parsedDelete,
|
||||
&rawExec));
|
||||
boost::scoped_ptr<PlanExecutor> exec(rawExec);
|
||||
@ -729,7 +814,7 @@ namespace mongo {
|
||||
}
|
||||
catch ( const WriteConflictException& dle ) {
|
||||
op.debug().writeConflicts++;
|
||||
WriteConflictException::logAndBackoff( attempt++, "delete", ns.toString() );
|
||||
WriteConflictException::logAndBackoff( attempt++, "delete", nsString.toString() );
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1003,16 +1088,18 @@ namespace mongo {
|
||||
}
|
||||
}
|
||||
|
||||
void receivedInsert(OperationContext* txn, Message& m, CurOp& op) {
|
||||
void receivedInsert(OperationContext* txn,
|
||||
const NamespaceString& nsString,
|
||||
Message& m,
|
||||
CurOp& op) {
|
||||
DbMessage d(m);
|
||||
const char *ns = d.getns();
|
||||
const char* ns = d.getns();
|
||||
op.debug().ns = ns;
|
||||
uassertStatusOK( userAllowedWriteNS( ns ) );
|
||||
if (nsToCollectionSubstring(ns) == "system.indexes") {
|
||||
uassertStatusOK(userAllowedWriteNS(nsString.ns()));
|
||||
if (nsString.isSystemDotIndexes()) {
|
||||
insertSystemIndexes(txn, d, op);
|
||||
return;
|
||||
}
|
||||
const NamespaceString nsString(ns);
|
||||
|
||||
if( !d.moreJSObjs() ) {
|
||||
// strange. should we complain?
|
||||
|
@ -204,32 +204,6 @@ namespace mongo {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Move this and the other command stuff in runQuery outta here and up a level.
|
||||
static bool runCommands(OperationContext* txn,
|
||||
const char *ns,
|
||||
BSONObj& jsobj,
|
||||
CurOp& curop,
|
||||
BufBuilder &b,
|
||||
BSONObjBuilder& anObjBuilder,
|
||||
bool fromRepl,
|
||||
int queryOptions) {
|
||||
try {
|
||||
return _runCommands(txn, ns, jsobj, b, anObjBuilder, fromRepl, queryOptions);
|
||||
}
|
||||
catch( SendStaleConfigException& ){
|
||||
throw;
|
||||
}
|
||||
catch ( AssertionException& e ) {
|
||||
verify( e.getCode() != SendStaleConfigCode && e.getCode() != RecvStaleConfigCode );
|
||||
|
||||
Command::appendCommandStatus(anObjBuilder, e.toStatus());
|
||||
curop.debug().exceptionInfo = e.getInfo();
|
||||
}
|
||||
BSONObj x = anObjBuilder.done();
|
||||
b.appendBuf((void*) x.objdata(), x.objsize());
|
||||
return true;
|
||||
}
|
||||
|
||||
struct ScopedRecoveryUnitSwapper {
|
||||
explicit ScopedRecoveryUnitSwapper(ClientCursor* cc, OperationContext* txn)
|
||||
: _cc(cc), _txn(txn) {
|
||||
@ -650,51 +624,17 @@ namespace mongo {
|
||||
}
|
||||
|
||||
std::string runQuery(OperationContext* txn,
|
||||
Message& m,
|
||||
QueryMessage& q,
|
||||
const NamespaceString& nss,
|
||||
CurOp& curop,
|
||||
Message &result) {
|
||||
// Validate the namespace.
|
||||
uassert(16256, str::stream() << "Invalid ns [" << nss.ns() << "]", nss.isValid());
|
||||
invariant(!nss.isCommand());
|
||||
|
||||
// Set curop information.
|
||||
beginQueryOp(nss, q.query, q.ntoreturn, q.ntoskip, &curop);
|
||||
|
||||
// If the query is really a command, run it.
|
||||
if (nss.isCommand()) {
|
||||
int nToReturn = q.ntoreturn;
|
||||
uassert(16979, str::stream() << "bad numberToReturn (" << nToReturn
|
||||
<< ") for $cmd type ns - can only be 1 or -1",
|
||||
nToReturn == 1 || nToReturn == -1);
|
||||
|
||||
curop.markCommand();
|
||||
|
||||
BufBuilder bb;
|
||||
bb.skip(sizeof(QueryResult::Value));
|
||||
|
||||
BSONObjBuilder cmdResBuf;
|
||||
if (!runCommands(txn, q.ns, q.query, curop, bb, cmdResBuf, false, q.queryOptions)) {
|
||||
uasserted(13530, "bad or malformed command request?");
|
||||
}
|
||||
|
||||
curop.debug().iscommand = true;
|
||||
// TODO: Does this get overwritten/do we really need to set this twice?
|
||||
curop.debug().query = q.query;
|
||||
|
||||
QueryResult::View qr = bb.buf();
|
||||
bb.decouple();
|
||||
qr.setResultFlagsToOk();
|
||||
qr.msgdata().setLen(bb.len());
|
||||
curop.debug().responseLength = bb.len();
|
||||
qr.msgdata().setOperation(opReply);
|
||||
qr.setCursorId(0);
|
||||
qr.setStartingFrom(0);
|
||||
qr.setNReturned(1);
|
||||
result.setData(qr.view2ptr(), true);
|
||||
return "";
|
||||
}
|
||||
|
||||
// Parse the qm into a CanonicalQuery.
|
||||
std::auto_ptr<CanonicalQuery> cq;
|
||||
{
|
||||
|
@ -113,7 +113,6 @@ namespace mongo {
|
||||
* Run the query 'q' and place the result in 'result'.
|
||||
*/
|
||||
std::string runQuery(OperationContext* txn,
|
||||
Message& m,
|
||||
QueryMessage& q,
|
||||
const NamespaceString& ns,
|
||||
CurOp& curop,
|
||||
|
@ -1507,8 +1507,8 @@ namespace QueryTests {
|
||||
DbMessage dbMessage( message );
|
||||
QueryMessage queryMessage( dbMessage );
|
||||
Message result;
|
||||
string exhaust = runQuery( &_txn, message, queryMessage, NamespaceString(ns()),
|
||||
*cc().curop(), result );
|
||||
string exhaust = runQuery(&_txn, queryMessage, NamespaceString(ns()), *cc().curop(),
|
||||
result);
|
||||
ASSERT( exhaust.size() );
|
||||
ASSERT_EQUALS( string( ns() ), exhaust );
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user