0
0
mirror of https://github.com/mongodb/mongo.git synced 2024-12-01 09:32:32 +01:00

Merge branch 'master' of github.com:mongodb/mongo

This commit is contained in:
dwight 2010-07-26 16:19:21 -04:00
commit f2087d8969
11 changed files with 243 additions and 102 deletions

View File

@ -49,7 +49,17 @@
namespace bson {
class assertion : public std::exception {
public:
virtual const char* what() const throw() { return "BsonAssertion"; }
assertion( unsigned u , const string& s )
: id( u ) , msg( s ){
stringstream ss;
ss << "BsonAssertion id: " << u << " " << s;
full = s.str();
}
virtual const char* what() const throw() { return full.c_str(); }
unsigned id;
string msg;
string full;
};
}
@ -57,14 +67,13 @@ namespace mongo {
#if !defined(assert)
inline void assert(bool expr) {
if(!expr) {
std::cout << "assertion failure in bson library" << std::endl;
throw bson::assertion();
throw bson::assertion( 0 , "assertion failure in bson library" );
}
}
#endif
#if !defined(uassert)
inline void uasserted(unsigned msgid, std::string) {
throw bson::assertion();
inline void uasserted(unsigned msgid, std::string s) {
throw bson::assertion( msgid , s );
}
inline void uassert(unsigned msgid, std::string msg, bool expr) {
@ -72,13 +81,13 @@ namespace mongo {
uasserted( msgid , msg );
}
inline void msgasserted(int msgid, const char *msg) {
throw bson::assertion();
throw bson::assertion( msgid , msg );
}
inline void msgasserted(int msgid, const std::string &msg) { msgasserted(msgid, msg.c_str()); }
inline void massert(unsigned msgid, std::string msg, bool expr) {
if(!expr) {
std::cout << "assertion failure in bson library: " << msgid << ' ' << msg << std::endl;
throw bson::assertion();
throw bson::assertion( msgid , msg );
}
}
#endif

View File

@ -670,7 +670,8 @@ namespace mongo {
void fill( const StringData& name ) {
char *r;
int n = strtol( name.data(), &r, 10 );
uassert( 13048, (string)"can't append to array using string field name [" + name.data() + "]" , !*r );
if ( *r )
uasserted( 13048, (string)"can't append to array using string field name [" + name.data() + "]" );
while( _i < n )
append( nullElt() );
}

View File

@ -496,12 +496,12 @@ sendmore:
}
Date_t start = jsTime();
MemoryMappedFile::flushAll( true );
int numFiles = MemoryMappedFile::flushAll( true );
time_flushing = (int) (jsTime() - start);
globalFlushCounters.flushed(time_flushing);
log(1) << "flushing mmap took " << time_flushing << "ms" << endl;
log(1) << "flushing mmap took " << time_flushing << "ms " << " for " << numFiles << " files" << endl;
}
}

View File

@ -221,7 +221,7 @@ namespace mongo {
NamespaceDetails *oplogDetails = nsdetails(rsoplog);
uassert(13412, str::stream() << "replSet error in rollback can't find " << rsoplog, oplogDetails);
for( list<pair<DocID,bo>>::iterator i = goodVersions.begin(); i != goodVersions.end(); i++ ) {
for( list<pair<DocID,bo> >::iterator i = goodVersions.begin(); i != goodVersions.end(); i++ ) {
const DocID& d = i->first;
bo pattern = d._id.wrap(); // { _id : ... }
try {

View File

@ -615,8 +615,6 @@ namespace mongo {
help << "add a new shard to the system";
}
bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
ScopedDbConnection conn( configServer.getPrimary() );
string host = cmdObj.firstElement().valuestrsafe();
if ( host == "localhost" || host.find( "localhost:" ) == 0 ||
host == "127.0.0.1" || host.find( "127.0.0.1:" ) == 0 ){
@ -633,64 +631,27 @@ namespace mongo {
host = ss.str();
}
string name;
string name = "";
if ( cmdObj["name"].type() == String ) {
name = cmdObj["name"].valuestrsafe();
} else {
name = grid.getNewShardName();
if ( name.empty() ){
result.append( "msg" , "cant generate new shard name" );
conn.done();
return false;
}
}
long long maxSize = 0;
if ( cmdObj[ ShardFields::maxSize.name() ].isNumber() ){
maxSize = cmdObj[ ShardFields::maxSize.name() ].numberLong();
}
if ( ! grid.addShard( &name , host , maxSize , &errmsg ) ){
// addShard filled errmsg
return false;
}
BSONObj shard;
{
BSONObjBuilder b;
b.append( "_id" , name );
b.append( "host" , host );
if ( cmdObj[ ShardFields::maxSize.name() ].isNumber() )
b.append( cmdObj[ ShardFields::maxSize.name() ] );
shard = b.obj();
}
BSONObj old = conn->findOne( "config.shards" , BSON( "host" << host ) );
if ( ! old.isEmpty() ){
result.append( "msg" , "host already used" );
conn.done();
return false;
}
try {
ScopedDbConnection newShardConn( host );
newShardConn->getLastError();
newShardConn.done();
}
catch ( DBException& e ){
errmsg = "couldn't connect to new shard";
result.append( "host" , host );
result.append( "exception" , e.what() );
conn.done();
return false;
}
log() << "going to add shard: " << shard << endl;
conn->insert( "config.shards" , shard );
errmsg = conn->getLastError();
if ( errmsg.size() ){
log() << "error adding shard: " << shard << " err: " << errmsg << endl;
return false;
}
result.append( "added" , shard["host"].valuestrsafe() );
conn.done();
Shard::reloadShardInfo();
result << "shardAdded" << name;
return true;
}
} addServer;
/* See usage docs at:
* http://www.mongodb.org/display/DOCS/Configuring+Sharding#ConfiguringSharding-Removingashard
*/

View File

@ -456,6 +456,70 @@ namespace mongo {
}
bool Grid::addShard( string* name , const string& host , long long maxSize , string* errMsg ){
// errMsg is required but name is optional
DEV assert( ! errMsg );
string nameInternal;
if ( ! name ) {
name = &nameInternal;
}
// check whether host exists and is operative
try {
ScopedDbConnection newShardConn( host );
newShardConn->getLastError();
newShardConn.done();
}
catch ( DBException& e ){
ostringstream ss;
ss << "couldn't connect to new shard ";
ss << e.what();
*errMsg = ss.str();
return false;
}
// if a name for a shard wasn't provided, pick one.
if ( name->empty() && ! _getNewShardName( name ) ){
*errMsg = "error generating new shard name";
return false;
}
// build the ConfigDB shard document.
BSONObjBuilder b;
b.append( "_id" , *name );
b.append( "host" , host );
if ( maxSize > 0 ){
b.append( ShardFields::maxSize.name() , maxSize );
}
BSONObj shardDoc = b.obj();
{
ScopedDbConnection conn( configServer.getPrimary() );
// check whether this host:port is already a known shard
BSONObj old = conn->findOne( ShardNS::shard , BSON( "host" << host ) );
if ( ! old.isEmpty() ){
*errMsg = "host already used";
conn.done();
return false;
}
log() << "going to add shard: " << shardDoc << endl;
conn->insert( ShardNS::shard , shardDoc );
*errMsg = conn->getLastError();
if ( ! errMsg->empty() ){
log() << "error adding shard: " << shardDoc << " err: " << *errMsg << endl;
return false;
}
conn.done();
}
Shard::reloadShardInfo();
return true;
}
bool Grid::knowAboutShard( const string& name ) const{
ShardConnection conn( configServer.getPrimary() , "" );
BSONObj shard = conn->findOne( ShardNS::shard , BSON( "host" << name ) );
@ -463,11 +527,13 @@ namespace mongo {
return ! shard.isEmpty();
}
string Grid::getNewShardName() const{
ShardConnection conn( configServer.getPrimary() , "" );
bool Grid::_getNewShardName( string* name ) const{
DEV assert( ! name );
string shardName;
bool ok = false;
int count = 0;
ShardConnection conn( configServer.getPrimary() , "" );
BSONObj o = conn->findOne( ShardNS::shard , Query( fromjson ( "{_id: /^shard/}" ) ).sort( BSON( "_id" << -1 ) ) );
if ( ! o.isEmpty() ) {
string last = o["_id"].String();
@ -478,10 +544,12 @@ namespace mongo {
if (count < 9999) {
stringstream ss;
ss << "shard" << setfill('0') << setw(4) << count;
shardName = ss.str();
*name = ss.str();
ok = true;
}
conn.done();
return shardName;
return ok;
}
bool Grid::shouldBalance() const {

View File

@ -216,17 +216,27 @@ namespace mongo {
*/
void removeDB( string db );
/**
*
* addShard will create a new shard in the grid. It expects a mongod process to be runing
* on the provided address.
* TODO - add the mongod's databases to the grid
*
* @param name is an optional string with the name of the shard. if ommited, grid will
* generate one and update the parameter.
* @param host is the complete address of the machine where the shard will be
* @param maxSize is the optional space quota in bytes. Zeros means there's no limitation to
* space usage
* @param errMsg is the error description in case the operation failed.
* @return true if shard was successfully added.
*/
bool addShard( string* name , const string& host , long long maxSize , string* errMsg );
/**
* @return true if the config database knows about a host 'name'
*/
bool knowAboutShard( const string& name ) const;
/**
* @return the next available shard name or an empty string, if there are
* no more shard names available.
*/
string getNewShardName() const;
/**
* @return true if the chunk balancing functionality is enabled
*/
@ -237,6 +247,14 @@ namespace mongo {
private:
map<string, DBConfigPtr > _databases;
mongo::mutex _lock; // TODO: change to r/w lock ??
/**
* @param name is the chose name for the shard. Parameter is mandatory.
* @return true if it managed to generate a shard name. May return false if (currently)
* 10000 shard
*/
bool _getNewShardName( string* name) const;
};
class ConfigServer : public DBConfig {

View File

@ -94,18 +94,43 @@ namespace mongo {
}
/*static*/ int MongoFile::flushAll( bool sync ){
int num = 0;
rwlock lk( mmmutex , false );
for ( set<MongoFile*>::iterator i = mmfiles.begin(); i != mmfiles.end(); i++ ){
num++;
MongoFile * mmf = *i;
if ( ! mmf )
continue;
mmf->flush( sync );
if ( ! sync ){
int num = 0;
rwlock lk( mmmutex , false );
for ( set<MongoFile*>::iterator i = mmfiles.begin(); i != mmfiles.end(); i++ ){
num++;
MongoFile * mmf = *i;
if ( ! mmf )
continue;
mmf->flush( sync );
}
return num;
}
return num;
// want to do it sync
set<MongoFile*> seen;
while ( true ){
auto_ptr<Flushable> f;
{
rwlock lk( mmmutex , false );
for ( set<MongoFile*>::iterator i = mmfiles.begin(); i != mmfiles.end(); i++ ){
MongoFile * mmf = *i;
if ( ! mmf )
continue;
if ( seen.count( mmf ) )
continue;
f.reset( mmf->prepareFlush() );
seen.insert( mmf );
break;
}
}
if ( ! f.get() )
break;
f->flush();
}
return seen.size();
}
void MongoFile::created(){

View File

@ -18,13 +18,27 @@
#pragma once
namespace mongo {
/* the administrative-ish stuff here */
class MongoFile : boost::noncopyable {
public:
/** Flushable has to fail nicely if the underlying object gets killed */
class Flushable {
public:
virtual ~Flushable(){}
virtual void flush() = 0;
};
protected:
virtual void close() = 0;
virtual void flush(bool sync) = 0;
/**
* returns a thread safe object that you can call flush on
* Flushable has to fail nicely if the underlying object gets killed
*/
virtual Flushable * prepareFlush() = 0;
void created(); /* subclass must call after create */
void destroyed(); /* subclass must call in destructor */
@ -145,6 +159,7 @@ namespace mongo {
void* map(const char *filename, long &length, int options = 0 );
void flush(bool sync);
virtual Flushable * prepareFlush();
/*void* viewOfs() {
return view;

View File

@ -106,6 +106,28 @@ namespace mongo {
problem() << "msync " << errnoWithDescription() << endl;
}
class PosixFlushable : public MemoryMappedFile::Flushable {
public:
PosixFlushable( void * view , HANDLE fd , long len )
: _view( view ) , _fd( fd ) , _len(len){
}
void flush(){
if ( _view && _fd )
if ( msync(_view, _len, MS_SYNC ) )
problem() << "msync " << errnoWithDescription() << endl;
}
void * _view;
HANDLE _fd;
long _len;
};
MemoryMappedFile::Flushable * MemoryMappedFile::prepareFlush(){
return new PosixFlushable( view , fd , len );
}
void MemoryMappedFile::_lock() {
if (view) assert(mprotect(view, len, PROT_READ | PROT_WRITE) == 0);
}

View File

@ -95,24 +95,46 @@ namespace mongo {
return view;
}
class WindowsFlushable : public MemoryMappedFile::Flushable {
public:
WindowsFlushable( void * view , HANDLE fd , string filename )
: _view(view) , _fd(fd) , _filename(filename){
}
void flush(){
if (!_view || !_fd)
return;
bool success = FlushViewOfFile(_view, 0); // 0 means whole mapping
if (!success){
int err = GetLastError();
out() << "FlushViewOfFile failed " << err << " file: " << _filename << endl;
}
success = FlushFileBuffers(_fd);
if (!success){
int err = GetLastError();
out() << "FlushFileBuffers failed " << err << " file: " << _filename << endl;
}
}
void * _view;
HANDLE _fd;
string _filename;
};
void MemoryMappedFile::flush(bool sync) {
uassert(13056, "Async flushing not supported on windows", sync);
if (!view || !fd) return;
bool success = FlushViewOfFile(view, 0); // 0 means whole mapping
if (!success){
int err = GetLastError();
out() << "FlushViewOfFile failed " << err << " file: " << _filename << endl;
}
success = FlushFileBuffers(fd);
if (!success){
int err = GetLastError();
out() << "FlushFileBuffers failed " << err << " file: " << _filename << endl;
}
WindowsFlushable f( view , fd , _filename );
f.flush();
}
MemoryMappedFile::Flushable * MemoryMappedFile::prepareFlush(){
return new WindowsFlushable( view , fd , _filename );
}
void MemoryMappedFile::_lock() {}
void MemoryMappedFile::_unlock() {}