mirror of
https://github.com/mongodb/mongo.git
synced 2024-12-01 09:32:32 +01:00
ChunkMatcher now has per-chunk boundaries
This commit is contained in:
parent
53f4dce289
commit
f9d7cdab63
@ -629,7 +629,7 @@ namespace mongo {
|
||||
_oldN(0),
|
||||
_nYields(),
|
||||
_nChunkSkips(),
|
||||
_chunkMatcher(shardingState.getChunkMatcher(pq.ns())),
|
||||
_chunkMatcher( shardingState.needChunkMatcher(pq.ns()) ? shardingState.getChunkMatcher(pq.ns()) : ChunkMatcherPtr() ),
|
||||
_inMemSort(false),
|
||||
_capped(false),
|
||||
_saveClientCursor(false),
|
||||
@ -749,7 +749,7 @@ namespace mongo {
|
||||
DiskLoc cl = _c->currLoc();
|
||||
if ( _chunkMatcher && ! _chunkMatcher->belongsToMe( cl.obj() ) ){
|
||||
_nChunkSkips++;
|
||||
// cout << "TEMP skipping un-owned chunk: " << _c->current() << endl;
|
||||
// log() << "TEMP skipping un-owned chunk: " << _c->current() << endl;
|
||||
}
|
||||
else if( _c->getsetdup(cl) ) {
|
||||
// dup
|
||||
|
@ -2,6 +2,9 @@
|
||||
|
||||
s = new ShardingTest( "version1" , 1 , 2 )
|
||||
|
||||
s.adminCommand( { enablesharding : "alleyinsider" } );
|
||||
s.adminCommand( { shardcollection : "alleyinsider.foo" , key : { num : 1 } } );
|
||||
|
||||
a = s._connections[0].getDB( "admin" );
|
||||
|
||||
assert( a.runCommand( { "setShardVersion" : "alleyinsider.foo" , configdb : s._configDB } ).ok == 0 );
|
||||
|
@ -2,6 +2,10 @@
|
||||
|
||||
s = new ShardingTest( "version2" , 1 , 2 )
|
||||
|
||||
s.adminCommand( { enablesharding : "alleyinsider" } );
|
||||
s.adminCommand( { shardcollection : "alleyinsider.foo" , key : { num : 1 } } );
|
||||
s.adminCommand( { shardcollection : "alleyinsider.bar" , key : { num : 1 } } );
|
||||
|
||||
a = s._connections[0].getDB( "admin" );
|
||||
|
||||
// setup from one client
|
||||
|
@ -103,7 +103,7 @@ namespace mongo {
|
||||
|
||||
q.sort( r.obj() );
|
||||
}
|
||||
|
||||
|
||||
// find the extreme key
|
||||
BSONObj end = conn->findOne( _manager->getns() , q );
|
||||
conn.done();
|
||||
|
@ -17,34 +17,102 @@
|
||||
*/
|
||||
|
||||
#include "pch.h"
|
||||
|
||||
#include "../client/connpool.h"
|
||||
#include "../db/instance.h"
|
||||
|
||||
#include "d_chunk_matcher.h"
|
||||
|
||||
namespace mongo {
|
||||
|
||||
ChunkMatcher::ChunkMatcher( ShardChunkVersion version , const BSONObj& key ) : _version( version ) {
|
||||
ChunkMatcher::ChunkMatcher( const string& configServer , const string& ns , const string& shardName ) {
|
||||
|
||||
// have to get a connection to the config db
|
||||
// special case if i'm the configdb since i'm locked and if i connect to myself
|
||||
// its a deadlock
|
||||
auto_ptr<ScopedDbConnection> scoped;
|
||||
auto_ptr<DBDirectClient> direct;
|
||||
DBClientBase * conn;
|
||||
if ( configServer.empty() ){
|
||||
direct.reset( new DBDirectClient() );
|
||||
conn = direct.get();
|
||||
} else {
|
||||
scoped.reset( new ScopedDbConnection( configServer ) );
|
||||
conn = scoped->get();
|
||||
}
|
||||
|
||||
// get this collection's key
|
||||
BSONObj collection = conn->findOne( "config.collections", BSON( "_id" << ns ) );
|
||||
assert( ! collection["key"].eoo() && collection["key"].isABSONObj() );
|
||||
BSONObj key = collection["key"].Obj().getOwned();
|
||||
BSONObjBuilder b;
|
||||
BSONForEach( e , key ) {
|
||||
b.append( e.fieldName() , 1 );
|
||||
}
|
||||
_key = b.obj();
|
||||
}
|
||||
|
||||
void ChunkMatcher::addChunk( const BSONObj& min , const BSONObj& max ) {
|
||||
_chunksMap[min] == make_pair( min , max );
|
||||
}
|
||||
// actually query all the chunks for 'ns' that live in this shard, sorting so we can efficiently bucket them
|
||||
BSONObj q;
|
||||
{
|
||||
BSONObjBuilder b;
|
||||
b.append( "ns" , ns.c_str() );
|
||||
b.append( "shard" , shardName );
|
||||
q = b.obj();
|
||||
}
|
||||
auto_ptr<DBClientCursor> cursor = conn->query( "config.chunks" , Query(q).sort( "min" ) );
|
||||
|
||||
void ChunkMatcher::addRange( const BSONObj& min , const BSONObj& max ){
|
||||
//TODO debug mode only?
|
||||
assert(min.nFields() == _key.nFields());
|
||||
assert(max.nFields() == _key.nFields());
|
||||
assert( cursor.get() );
|
||||
if ( ! cursor->more() ){
|
||||
log() << "No chunks for collection " << ns << " on shard " << shardName << endl;
|
||||
if ( scoped.get() )
|
||||
scoped->done();
|
||||
|
||||
_rangesMap[min] = make_pair(min,max);
|
||||
return;
|
||||
}
|
||||
|
||||
// load the tablet information, coallesceing the ranges
|
||||
// the version for this shard would be the highest version for any of the chunks
|
||||
ShardChunkVersion version;
|
||||
BSONObj min,max;
|
||||
while ( cursor->more() ){
|
||||
BSONObj d = cursor->next();
|
||||
|
||||
_chunksMap[min] == make_pair( d["min"].Obj().getOwned() , d["max"].Obj().getOwned() );
|
||||
|
||||
ShardChunkVersion currVersion( d["lastmod"] );
|
||||
if ( currVersion > version ) {
|
||||
version = currVersion;
|
||||
}
|
||||
|
||||
// coallesce the chunk's bounds in ranges if they are adjacent chunks
|
||||
if ( min.isEmpty() ){
|
||||
min = d["min"].Obj().getOwned();
|
||||
max = d["max"].Obj().getOwned();
|
||||
continue;
|
||||
}
|
||||
if ( max == d["min"].Obj() ){
|
||||
max = d["max"].Obj().getOwned();
|
||||
continue;
|
||||
}
|
||||
|
||||
_rangesMap[min] = make_pair( min.getOwned() , max.getOwned() );
|
||||
|
||||
min = d["min"].Obj().getOwned();
|
||||
max = d["max"].Obj().getOwned();
|
||||
}
|
||||
assert( ! min.isEmpty() );
|
||||
|
||||
_rangesMap[min] = make_pair( min.getOwned() , max.getOwned() );
|
||||
_version = version;
|
||||
|
||||
if ( scoped.get() )
|
||||
scoped->done();
|
||||
}
|
||||
|
||||
bool ChunkMatcher::belongsToMe( const BSONObj& obj ) const {
|
||||
if ( _rangesMap.size() == 0 )
|
||||
return false;
|
||||
|
||||
|
||||
BSONObj x = obj.extractFields(_key);
|
||||
|
||||
RangeMap::const_iterator a = _rangesMap.upper_bound( x );
|
||||
|
@ -24,16 +24,29 @@
|
||||
|
||||
namespace mongo {
|
||||
|
||||
/**
|
||||
* Controls the boundaries of all the chunks for a given collection that live in this shard.
|
||||
*/
|
||||
class ChunkMatcher {
|
||||
public:
|
||||
ChunkMatcher( ShardChunkVersion version , const BSONObj& key );
|
||||
|
||||
/**
|
||||
* Loads the ChunkMatcher with all boundaries for chunks of a given collection that live in an given
|
||||
* shard
|
||||
*
|
||||
* @param configServer name of the server where the configDB currently is. Can be empty to indicate
|
||||
* that the configDB is running locally
|
||||
* @param ns namespace for the collections whose chunks we're interested
|
||||
* @param shardName name of the shard that this chunk matcher should track
|
||||
*
|
||||
* This constructor throws on connectivity errors
|
||||
*/
|
||||
ChunkMatcher( const string& configServer , const string& ns , const string& shardName );
|
||||
|
||||
~ChunkMatcher() {}
|
||||
|
||||
bool belongsToMe( const BSONObj& obj ) const;
|
||||
|
||||
void addRange( const BSONObj& min , const BSONObj& max );
|
||||
void addChunk( const BSONObj& min , const BSONObj& max );
|
||||
|
||||
//void splitChunk( const BSONObj& min , const BSONObj& max , const BSONObj& middle );
|
||||
//void removeChunk( const BSONObj& min , const BSONObj& max );
|
||||
|
||||
@ -43,7 +56,7 @@ namespace mongo {
|
||||
|
||||
private:
|
||||
// highest ShardChunkVersion for which this ChunkMatcher's information is accurate
|
||||
const ShardChunkVersion _version;
|
||||
ShardChunkVersion _version;
|
||||
|
||||
// key pattern for chunks under this range
|
||||
BSONObj _key;
|
||||
|
@ -52,6 +52,7 @@ namespace mongo {
|
||||
|
||||
void appendInfo( BSONObjBuilder& b );
|
||||
|
||||
bool needChunkMatcher( const string& ns ) const;
|
||||
ChunkMatcherPtr getChunkMatcher( const string& ns );
|
||||
|
||||
bool inCriticalMigrateSection();
|
||||
|
@ -335,6 +335,7 @@ namespace mongo {
|
||||
// 4MB work of 'result' size. This should be okay for now.
|
||||
|
||||
result.append( "splitKeys" , splitKeys );
|
||||
|
||||
return true;
|
||||
|
||||
}
|
||||
|
@ -167,13 +167,17 @@ namespace mongo {
|
||||
|
||||
}
|
||||
|
||||
ChunkMatcherPtr ShardingState::getChunkMatcher( const string& ns ){
|
||||
bool ShardingState::needChunkMatcher( const string& ns ) const {
|
||||
if ( ! _enabled )
|
||||
return ChunkMatcherPtr();
|
||||
return false;
|
||||
|
||||
if ( ! ShardedConnectionInfo::get( false ) )
|
||||
return ChunkMatcherPtr();
|
||||
|
||||
return false;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
ChunkMatcherPtr ShardingState::getChunkMatcher( const string& ns ){
|
||||
ConfigVersion version;
|
||||
{
|
||||
// check cache
|
||||
@ -193,75 +197,13 @@ namespace mongo {
|
||||
}
|
||||
}
|
||||
|
||||
// have to get a connection to the config db
|
||||
// special case if i'm the configdb since i'm locked and if i connect to myself
|
||||
// its a deadlock
|
||||
auto_ptr<ScopedDbConnection> scoped;
|
||||
auto_ptr<DBDirectClient> direct;
|
||||
|
||||
DBClientBase * conn;
|
||||
|
||||
if ( _configServer == _shardHost ){
|
||||
direct.reset( new DBDirectClient() );
|
||||
conn = direct.get();
|
||||
}
|
||||
else {
|
||||
scoped.reset( new ScopedDbConnection( _configServer ) );
|
||||
conn = scoped->get();
|
||||
}
|
||||
|
||||
// actually query all the chunks for 'ns' that live in this shard
|
||||
// sorting so we can efficiently bucket them
|
||||
BSONObj q;
|
||||
{
|
||||
BSONObjBuilder b;
|
||||
b.append( "ns" , ns.c_str() );
|
||||
b.append( "shard" , BSON( "$in" << BSON_ARRAY( _shardHost << _shardName ) ) );
|
||||
q = b.obj();
|
||||
}
|
||||
auto_ptr<DBClientCursor> cursor = conn->query( "config.chunks" , Query(q).sort( "min" ) );
|
||||
|
||||
assert( cursor.get() );
|
||||
if ( ! cursor->more() ){
|
||||
// TODO: should we update the local version or cache this result?
|
||||
if ( scoped.get() )
|
||||
scoped->done();
|
||||
return ChunkMatcherPtr();
|
||||
}
|
||||
|
||||
BSONObj collection = conn->findOne( "config.collections", BSON( "_id" << ns ) );
|
||||
assert( ! collection["key"].eoo() && collection["key"].isABSONObj() );
|
||||
ChunkMatcherPtr p( new ChunkMatcher( version , collection["key"].Obj().getOwned() ) );
|
||||
|
||||
BSONObj min,max;
|
||||
while ( cursor->more() ){
|
||||
BSONObj d = cursor->next();
|
||||
p->addChunk( d["min"].Obj().getOwned() , d["max"].Obj().getOwned() );
|
||||
|
||||
// coallesce the chunk's bounds in ranges if there are adjacent chunks
|
||||
if ( min.isEmpty() ){
|
||||
min = d["min"].Obj().getOwned();
|
||||
max = d["max"].Obj().getOwned();
|
||||
continue;
|
||||
}
|
||||
|
||||
// chunk is adjacent to last chunk
|
||||
if ( max == d["min"].Obj() ){
|
||||
max = d["max"].Obj().getOwned();
|
||||
continue;
|
||||
}
|
||||
|
||||
// discontinuity; register range and reset min/max
|
||||
p->addRange( min.getOwned() , max.getOwned() );
|
||||
min = d["min"].Obj().getOwned();
|
||||
max = d["max"].Obj().getOwned();
|
||||
}
|
||||
assert( ! min.isEmpty() );
|
||||
p->addRange( min.getOwned() , max.getOwned() );
|
||||
|
||||
if ( scoped.get() )
|
||||
scoped->done();
|
||||
// load the chunk information for this shard from the config database
|
||||
// a reminder: ChunkMatcher may throw on construction
|
||||
const string c = (_configServer == _shardHost) ? "" /* local */ : _configServer;
|
||||
ChunkMatcherPtr p( new ChunkMatcher( c , ns , _shardName ) );
|
||||
|
||||
// TODO 11-18-2010 verify that the version in _versions is compatible with _checks[ns]
|
||||
// Eventually, the version that will be authoritative is the ChunkMatcher's
|
||||
{
|
||||
scoped_lock lk( _mutex );
|
||||
_chunks[ns] = p;
|
||||
|
Loading…
Reference in New Issue
Block a user