mirror of
https://github.com/mongodb/mongo.git
synced 2024-12-01 09:32:32 +01:00
SERVER-13437 Cannot wait for write concern for createIndexes command on mongos
This commit is contained in:
parent
b4a2cc96b1
commit
f35da2d63f
43
jstests/gle/create_index_gle.js
Normal file
43
jstests/gle/create_index_gle.js
Normal file
@ -0,0 +1,43 @@
|
||||
var st = new ShardingTest({ shards: { rs0: { nodes: 2, oplogSize: 10, verbose: 1 }}});
|
||||
var replTest = st.rs0;
|
||||
|
||||
var config = replTest.getReplSetConfig();
|
||||
config.members[1].priority = 0;
|
||||
// Add a delay long enough so getLastError would actually 'wait' for write concern.
|
||||
config.members[1].slaveDelay = 3;
|
||||
config.version = 2;
|
||||
|
||||
var priConn = replTest.getPrimary();
|
||||
|
||||
try {
|
||||
priConn.getDB('admin').runCommand({ replSetReconfig: config });
|
||||
} catch (x) {
|
||||
print('reconfig closed conn');
|
||||
}
|
||||
|
||||
assert.soon(function() {
|
||||
var secConn = replTest.getSecondary();
|
||||
var config = secConn.getDB('local').system.replset.findOne();
|
||||
return config.members[1].slaveDelay == 3;
|
||||
});
|
||||
|
||||
replTest.awaitSecondaryNodes();
|
||||
|
||||
var testDB = st.s.getDB('test');
|
||||
testDB.adminCommand({ connPoolSync: 1 });
|
||||
|
||||
var secConn = replTest.getSecondary();
|
||||
var testDB2 = secConn.getDB('test');
|
||||
|
||||
testDB.user.insert({ x: 1 });
|
||||
|
||||
testDB.user.ensureIndex({ x: 1 });
|
||||
assert.gleOK(testDB.runCommand({ getLastError: 1, w: 2 }));
|
||||
|
||||
var priIdx = testDB.user.getIndexes();
|
||||
var secIdx = testDB2.user.getIndexes();
|
||||
|
||||
assert.eq(priIdx.length, secIdx.length, 'pri: ' + tojson(priIdx) + ', sec: ' + tojson(secIdx));
|
||||
|
||||
st.stop();
|
||||
|
@ -1538,8 +1538,19 @@ namespace mongo {
|
||||
// ---- Future -----
|
||||
// -----------------
|
||||
|
||||
Future::CommandResult::CommandResult( const string& server , const string& db , const BSONObj& cmd , int options , DBClientBase * conn )
|
||||
:_server(server) ,_db(db) , _options(options), _cmd(cmd) ,_conn(conn) ,_done(false)
|
||||
Future::CommandResult::CommandResult( const string& server,
|
||||
const string& db,
|
||||
const BSONObj& cmd,
|
||||
int options,
|
||||
DBClientBase * conn,
|
||||
bool useShardedConn ):
|
||||
_server(server),
|
||||
_db(db),
|
||||
_options(options),
|
||||
_cmd(cmd),
|
||||
_conn(conn),
|
||||
_useShardConn(useShardedConn),
|
||||
_done(false)
|
||||
{
|
||||
init();
|
||||
}
|
||||
@ -1547,7 +1558,12 @@ namespace mongo {
|
||||
void Future::CommandResult::init(){
|
||||
try {
|
||||
if ( ! _conn ){
|
||||
_connHolder.reset( new ScopedDbConnection( _server ) );
|
||||
if ( _useShardConn) {
|
||||
_connHolder.reset( new ShardConnection( _server, "" ));
|
||||
}
|
||||
else {
|
||||
_connHolder.reset( new ScopedDbConnection( _server ) );
|
||||
}
|
||||
_conn = _connHolder->get();
|
||||
}
|
||||
|
||||
@ -1641,8 +1657,19 @@ namespace mongo {
|
||||
return _ok;
|
||||
}
|
||||
|
||||
shared_ptr<Future::CommandResult> Future::spawnCommand( const string& server , const string& db , const BSONObj& cmd , int options , DBClientBase * conn ) {
|
||||
shared_ptr<Future::CommandResult> res (new Future::CommandResult( server , db , cmd , options , conn ));
|
||||
shared_ptr<Future::CommandResult> Future::spawnCommand( const string& server,
|
||||
const string& db,
|
||||
const BSONObj& cmd,
|
||||
int options,
|
||||
DBClientBase * conn,
|
||||
bool useShardConn ) {
|
||||
shared_ptr<Future::CommandResult> res (
|
||||
new Future::CommandResult( server,
|
||||
db,
|
||||
cmd,
|
||||
options,
|
||||
conn,
|
||||
useShardConn));
|
||||
return res;
|
||||
}
|
||||
|
||||
|
@ -326,7 +326,12 @@ namespace mongo {
|
||||
|
||||
private:
|
||||
|
||||
CommandResult( const string& server , const string& db , const BSONObj& cmd , int options , DBClientBase * conn );
|
||||
CommandResult( const string& server,
|
||||
const string& db,
|
||||
const BSONObj& cmd,
|
||||
int options,
|
||||
DBClientBase * conn,
|
||||
bool useShardedConn );
|
||||
void init();
|
||||
|
||||
string _server;
|
||||
@ -334,7 +339,8 @@ namespace mongo {
|
||||
int _options;
|
||||
BSONObj _cmd;
|
||||
DBClientBase * _conn;
|
||||
scoped_ptr<ScopedDbConnection> _connHolder; // used if not provided a connection
|
||||
scoped_ptr<AScopedConnection> _connHolder; // used if not provided a connection
|
||||
bool _useShardConn;
|
||||
|
||||
scoped_ptr<DBClientCursor> _cursor;
|
||||
|
||||
@ -351,8 +357,14 @@ namespace mongo {
|
||||
* @param db db name
|
||||
* @param cmd cmd to exec
|
||||
* @param conn optional connection to use. will use standard pooled if non-specified
|
||||
* @param useShardConn use ShardConnection
|
||||
*/
|
||||
static shared_ptr<CommandResult> spawnCommand( const string& server , const string& db , const BSONObj& cmd , int options , DBClientBase * conn = 0 );
|
||||
static shared_ptr<CommandResult> spawnCommand( const string& server,
|
||||
const string& db,
|
||||
const BSONObj& cmd,
|
||||
int options,
|
||||
DBClientBase * conn = 0,
|
||||
bool useShardConn = false );
|
||||
};
|
||||
|
||||
|
||||
|
@ -268,12 +268,14 @@ namespace mongo {
|
||||
/** "read lock, and set my context, all in one operation"
|
||||
* This handles (if not recursively locked) opening an unopened database.
|
||||
*/
|
||||
Client::ReadContext::ReadContext(const string& ns, const std::string& path) {
|
||||
Client::ReadContext::ReadContext(const string& ns,
|
||||
const std::string& path,
|
||||
bool doVersion) {
|
||||
{
|
||||
lk.reset( new Lock::DBRead(ns) );
|
||||
Database *db = dbHolder().get(ns, path);
|
||||
if( db ) {
|
||||
c.reset( new Context(path, ns, db) );
|
||||
c.reset( new Context(path, ns, db, doVersion) );
|
||||
return;
|
||||
}
|
||||
}
|
||||
@ -284,17 +286,17 @@ namespace mongo {
|
||||
if( Lock::isW() ) {
|
||||
// write locked already
|
||||
DEV RARELY log() << "write locked on ReadContext construction " << ns << endl;
|
||||
c.reset(new Context(ns, path));
|
||||
c.reset(new Context(ns, path, doVersion));
|
||||
}
|
||||
else if( !Lock::nested() ) {
|
||||
lk.reset(0);
|
||||
{
|
||||
Lock::GlobalWrite w;
|
||||
Context c(ns, path);
|
||||
Context c(ns, path, doVersion);
|
||||
}
|
||||
// db could be closed at this interim point -- that is ok, we will throw, and don't mind throwing.
|
||||
lk.reset( new Lock::DBRead(ns) );
|
||||
c.reset(new Context(ns, path));
|
||||
c.reset(new Context(ns, path, doVersion));
|
||||
}
|
||||
else {
|
||||
uasserted(15928, str::stream() << "can't open a database from a nested read lock " << ns);
|
||||
@ -306,9 +308,9 @@ namespace mongo {
|
||||
// it would be easy to first check that there is at least a .ns file, or something similar.
|
||||
}
|
||||
|
||||
Client::WriteContext::WriteContext(const string& ns, const std::string& path)
|
||||
Client::WriteContext::WriteContext(const string& ns, const std::string& path, bool doVersion)
|
||||
: _lk( ns ) ,
|
||||
_c(ns, path) {
|
||||
_c(ns, path, doVersion) {
|
||||
}
|
||||
|
||||
|
||||
@ -332,12 +334,12 @@ namespace mongo {
|
||||
}
|
||||
|
||||
// invoked from ReadContext
|
||||
Client::Context::Context(const string& path, const string& ns, Database *db) :
|
||||
Client::Context::Context(const string& path, const string& ns, Database *db, bool doVersion) :
|
||||
_client( currentClient.get() ),
|
||||
_oldContext( _client->_context ),
|
||||
_path( path ),
|
||||
_justCreated(false),
|
||||
_doVersion( true ),
|
||||
_doVersion( doVersion ),
|
||||
_ns( ns ),
|
||||
_db(db)
|
||||
{
|
||||
|
@ -161,7 +161,9 @@ namespace mongo {
|
||||
*/
|
||||
class ReadContext : boost::noncopyable {
|
||||
public:
|
||||
ReadContext(const std::string& ns, const std::string& path=storageGlobalParams.dbpath);
|
||||
ReadContext(const std::string& ns,
|
||||
const std::string& path=storageGlobalParams.dbpath,
|
||||
bool doVersion = true);
|
||||
Context& ctx() { return *c.get(); }
|
||||
private:
|
||||
scoped_ptr<Lock::DBRead> lk;
|
||||
@ -175,7 +177,7 @@ namespace mongo {
|
||||
public:
|
||||
/** this is probably what you want */
|
||||
Context(const string& ns, const std::string& path=storageGlobalParams.dbpath,
|
||||
bool doVersion=true);
|
||||
bool doVersion = true);
|
||||
|
||||
/** note: this does not call finishInit -- i.e., does not call
|
||||
shardVersionOk() for example.
|
||||
@ -184,7 +186,7 @@ namespace mongo {
|
||||
Context(const std::string& ns , Database * db);
|
||||
|
||||
// used by ReadContext
|
||||
Context(const string& path, const string& ns, Database *db);
|
||||
Context(const string& path, const string& ns, Database *db, bool doVersion = true);
|
||||
|
||||
~Context();
|
||||
Client* getClient() const { return _client; }
|
||||
@ -232,7 +234,9 @@ namespace mongo {
|
||||
|
||||
class WriteContext : boost::noncopyable {
|
||||
public:
|
||||
WriteContext(const string& ns, const std::string& path=storageGlobalParams.dbpath);
|
||||
WriteContext(const string& ns,
|
||||
const std::string& path=storageGlobalParams.dbpath,
|
||||
bool doVersion = true);
|
||||
Context& ctx() { return _c; }
|
||||
private:
|
||||
Lock::DBWrite _lk;
|
||||
|
@ -131,7 +131,10 @@ namespace mongo {
|
||||
// We first take a read lock to see if we need to do anything
|
||||
// as many calls are ensureIndex (and hence no-ops), this is good so its a shared
|
||||
// lock for common calls. We only take write lock if needed.
|
||||
Client::ReadContext readContext( ns );
|
||||
// Note: createIndexes command does not currently respect shard versioning.
|
||||
Client::ReadContext readContext( ns,
|
||||
storageGlobalParams.dbpath,
|
||||
false /* doVersion */ );
|
||||
const Collection* collection = readContext.ctx().db()->getCollection( ns.ns() );
|
||||
if ( collection ) {
|
||||
for ( size_t i = 0; i < specs.size(); i++ ) {
|
||||
@ -160,7 +163,10 @@ namespace mongo {
|
||||
}
|
||||
|
||||
// now we know we have to create index(es)
|
||||
Client::WriteContext writeContext( ns.ns() );
|
||||
// Note: createIndexes command does not currently respect shard versioning.
|
||||
Client::WriteContext writeContext( ns.ns(),
|
||||
storageGlobalParams.dbpath,
|
||||
false /* doVersion */ );
|
||||
Database* db = writeContext.ctx().db();
|
||||
|
||||
Collection* collection = db->getCollection( ns.ns() );
|
||||
|
@ -156,7 +156,12 @@ namespace mongo {
|
||||
|
||||
class RunOnAllShardsCommand : public Command {
|
||||
public:
|
||||
RunOnAllShardsCommand(const char* n, const char* oldname=NULL) : Command(n, false, oldname) {}
|
||||
RunOnAllShardsCommand(const char* n,
|
||||
const char* oldname=NULL,
|
||||
bool useShardConn = false):
|
||||
Command(n, false, oldname),
|
||||
_useShardConn(useShardConn) {
|
||||
}
|
||||
|
||||
virtual bool slaveOk() const { return true; }
|
||||
virtual bool adminOnly() const { return false; }
|
||||
@ -164,7 +169,6 @@ namespace mongo {
|
||||
// all grid commands are designed not to lock
|
||||
virtual LockType locktype() const { return NONE; }
|
||||
|
||||
|
||||
// default impl uses all shards for DB
|
||||
virtual void getShards(const string& dbName , BSONObj& cmdObj, set<Shard>& shards) {
|
||||
DBConfigPtr conf = grid.getDBConfig( dbName , false );
|
||||
@ -190,7 +194,12 @@ namespace mongo {
|
||||
|
||||
list< shared_ptr<Future::CommandResult> > futures;
|
||||
for ( set<Shard>::const_iterator i=shards.begin(), end=shards.end() ; i != end ; i++ ) {
|
||||
futures.push_back( Future::spawnCommand( i->getConnString() , dbName , cmdObj, 0 ) );
|
||||
futures.push_back( Future::spawnCommand( i->getConnString(),
|
||||
dbName,
|
||||
cmdObj,
|
||||
0,
|
||||
NULL,
|
||||
_useShardConn ));
|
||||
}
|
||||
|
||||
vector<BSONObj> results;
|
||||
@ -266,11 +275,17 @@ namespace mongo {
|
||||
return true;
|
||||
}
|
||||
|
||||
private:
|
||||
bool _useShardConn; // use ShardConnection as opposed to ScopedDbConnection
|
||||
};
|
||||
|
||||
class AllShardsCollectionCommand : public RunOnAllShardsCommand {
|
||||
public:
|
||||
AllShardsCollectionCommand(const char* n, const char* oldname=NULL) : RunOnAllShardsCommand(n, oldname) {}
|
||||
AllShardsCollectionCommand(const char* n,
|
||||
const char* oldname = NULL,
|
||||
bool useShardConn = false):
|
||||
RunOnAllShardsCommand(n, oldname, useShardConn) {
|
||||
}
|
||||
|
||||
virtual void getShards(const string& dbName , BSONObj& cmdObj, set<Shard>& shards) {
|
||||
string fullns = dbName + '.' + cmdObj.firstElement().valuestrsafe();
|
||||
@ -323,7 +338,13 @@ namespace mongo {
|
||||
|
||||
class CreateIndexesCmd : public AllShardsCollectionCommand {
|
||||
public:
|
||||
CreateIndexesCmd() : AllShardsCollectionCommand("createIndexes") {}
|
||||
CreateIndexesCmd():
|
||||
AllShardsCollectionCommand("createIndexes",
|
||||
NULL, /* oldName */
|
||||
true /* use ShardConnection */) {
|
||||
// createIndexes command should use ShardConnection so the getLastError would
|
||||
// be able to properly enforce the write concern (via the saveGLEStats callback).
|
||||
}
|
||||
|
||||
/**
|
||||
* the createIndexes command doesn't require the 'ns' field to be populated
|
||||
|
Loading…
Reference in New Issue
Block a user