0
0
mirror of https://github.com/mongodb/mongo.git synced 2024-12-01 09:32:32 +01:00

checkpoint some intermediate concurrency work

This commit is contained in:
Dwight 2009-12-02 09:39:17 -05:00
parent c72e870657
commit 16bd507b51
11 changed files with 203 additions and 226 deletions

View File

@ -97,6 +97,9 @@ namespace mongo {
return *currentClient.get();
}
/* each thread which does db operations has a Client object in TLS.
call this when your thread starts.
*/
inline void Client::initThread(const char *desc) {
assert( currentClient.get() == 0 );
currentClient.reset( new Client(desc) );

View File

@ -21,22 +21,31 @@
namespace mongo {
#if 0
#if 1
//#if BOOST_VERSION >= 103500
//typedef boost::shared_mutex MongoMutex;
class MongoMutex {
boost::shared_mutex m;
public:
void lock() {
cout << "LOCK" << endl;
m.lock();
}
void unlock() { m.unlock(); }
void lock_shared() { m.lock_shared(); }
void unlock_shared() { m.unlock_shared(); }
void unlock() {
cout << "UNLOCK" << endl;
m.unlock();
}
void lock_shared() {
cout << "LOCKSHARED" << endl;
m.lock_shared();
}
void unlock_shared() {
cout << "UNLOCKSHARED" << endl;
m.unlock_shared();
}
};
#else
/* this will be for old versions of boost */
class MongoMutex {
class qMongoMutex {
boost::recursive_mutex m;
int x;
public:
@ -86,13 +95,16 @@ namespace mongo {
int isLocked() const {
return locked;
}
void timingInfo(unsigned long long &s, unsigned long long &tl) {
void getTimingInfo(unsigned long long &s, unsigned long long &tl) const {
s = start;
tl = timeLocked;
}
};
extern MongoMutex &dbMutex;
/* as we are using this right now, dbMutexInfo.isLocked() == write locked
*/
extern MutexInfo dbMutexInfo;
/*

View File

@ -26,6 +26,8 @@ namespace mongo {
class Database {
public:
static bool _openAllFiles;
Database(const char *nm, bool& newDb, const string& _path = dbpath) :
name(nm),
path(_path),
@ -41,13 +43,18 @@ namespace mongo {
}
newDb = namespaceIndex.exists();
// If already exists, open. Otherwise behave as if empty until
// there's a write, then open.
if ( !newDb )
namespaceIndex.init();
profile = 0;
profileName = name + ".system.profile";
// If already exists, open. Otherwise behave as if empty until
// there's a write, then open.
if ( !newDb ) {
namespaceIndex.init();
if( _openAllFiles )
openAllFiles();
}
}
~Database() {
btreeStore->closeFiles(name, path);
int n = files.size();
@ -55,6 +62,22 @@ namespace mongo {
delete files[i];
}
bool exists(int n) {
stringstream ss;
ss << name << '.' << n;
boost::filesystem::path fullName;
fullName = boost::filesystem::path(path) / ss.str();
return boost::filesystem::exists(fullName);
}
void openAllFiles() {
int n = 0;
while( exists(n) ) {
getFile(n);
n++;
}
}
MongoDataFile* getFile( int n, int sizeNeeded = 0, bool preallocateOnly = false ) {
assert(this);
@ -136,7 +159,7 @@ namespace mongo {
return getFile(n);
}
void finishInit(); // ugly...
//void finishInit(); // ugly...
vector<MongoDataFile*> files;
string name; // "alleyinsider"

View File

@ -163,6 +163,8 @@ namespace mongo {
*/
void connThread()
{
static int zzz = 0;
Client::initThread("conn");
/* todo: move to Client object */
@ -172,6 +174,15 @@ namespace mongo {
MessagingPort& dbMsgPort = *grab;
grab = 0;
if( ++zzz > 0 ) {
cout << "L1" << endl;
mongolock lk(false);
cout << "L2" << endl;
sleepsecs(9999);
cout << "L3" << endl;
}
try {
currentClient.get()->ai->isLocalHost = dbMsgPort.farEnd.isLocalHost();
@ -460,6 +471,12 @@ namespace mongo {
}
repairDatabases();
/* we didn't want to pre-open all fiels for the repair check above. for regular
operation we do for read/write lock concurrency reasons.
*/
Database::_openAllFiles = true;
if ( shouldRepairDatabases )
return;

11
db/db.h
View File

@ -56,7 +56,7 @@ namespace mongo {
path - datafiles directory, if not the default, so we can differentiate between db's of the same
name in different places (for example temp ones on repair).
*/
inline bool setClient(const char *ns, const string& path=dbpath) {
inline bool setClient(const char *ns, const string& path=dbpath, mongolock *lock = 0) {
if( logLevel > 5 )
log() << "setClient: " << ns << endl;
@ -69,14 +69,17 @@ namespace mongo {
return false;
}
if( lock )
lock->releaseAndWriteLock();
// when master for replication, we advertise all the db's, and that
// looks like a 'first operation'. so that breaks this log message's
// meaningfulness. instead of fixing (which would be better), we just
// stop showing for now.
// 2008-12-22 We now open every database on startup, so this log is
// no longer helpful. Commenting.
// if( !master )
// log() << "first operation for database " << key << endl;
// if( !master )
// log() << "first operation for database " << key << endl;
assertInWriteLock();
@ -85,7 +88,7 @@ namespace mongo {
bool justCreated;
Database *newdb = new Database(cl, justCreated, path);
databases[key] = newdb;
newdb->finishInit();
//newdb->finishInit();
cc().setns(ns, newdb);
return justCreated;

View File

@ -99,6 +99,26 @@ namespace mongo {
}
} cmdResetError;
/* for diagnostic / testing purposes. */
class CmdSleep : public Command {
public:
bool adminOnly() { return true; }
virtual bool logTheOp() {
return false;
}
virtual bool slaveOk() {
return true;
}
virtual void help( stringstream& help ) const {
help << "internal / make db block for 100 seconds";
}
CmdSleep() : Command("sleep") {}
bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
sleepsecs(100);
return true;
}
} cmdSleep;
class CmdGetLastError : public Command {
public:
virtual bool requiresAuth() { return false; }
@ -295,7 +315,7 @@ namespace mongo {
BSONObjBuilder t;
unsigned long long last, start, timeLocked;
dbMutexInfo.timingInfo(start, timeLocked);
dbMutexInfo.getTimingInfo(start, timeLocked);
last = curTimeMicros64();
double tt = (double) last-start;
double tl = (double) timeLocked;

View File

@ -52,8 +52,8 @@ namespace mongo {
void dbDropped();
};
inline void Database::finishInit() {
/* inline void Database::finishInit() {
DBInfo i(name.c_str());
}
}*/
} // namespace mongo

View File

@ -72,12 +72,12 @@ namespace mongo {
unsigned long long timeLastPass = 0;
while ( 1 ) {
{
Timer lktm;
dblock lk;
/* todo: do we even need readlock here? if so for what? */
readlock lk("");
Top::completeSnapshot();
q = (q+1)%NStats;
Timing timing;
dbMutexInfo.timingInfo(timing.start, timing.timeLocked);
dbMutexInfo.getTimingInfo(timing.start, timing.timeLocked);
unsigned long long now = curTimeMicros64();
if ( timeLastPass ) {
unsigned long long dt = now - timeLastPass;
@ -159,7 +159,7 @@ namespace mongo {
ss << "git hash: " << gitVersion() << "\n";
ss << "sys info: " << sysInfo() << "\n";
ss << "\n";
ss << "dblocked: " << dbMutexInfo.isLocked() << " (initial)\n";
ss << "dbwritelocked: " << dbMutexInfo.isLocked() << " (initial)\n";
ss << "uptime: " << time(0)-started << " seconds\n";
if ( replAllDead )
ss << "<b>replication replAllDead=" << replAllDead << "</b>\n";
@ -286,7 +286,7 @@ namespace mongo {
while ( 1 ) {
if ( !dbMutexInfo.isLocked() ) {
{
dblock lk;
readlock lk("");
ss << "time to get dblock: " << t.millis() << "ms\n";
doLockedStuff(ss);
}

View File

@ -38,6 +38,14 @@
namespace mongo {
void receivedKillCursors(Message& m);
void receivedUpdate(Message& m, stringstream& ss);
void receivedDelete(Message& m, stringstream& ss);
void receivedInsert(Message& m, stringstream& ss);
void receivedGetMore(DbResponse& dbresponse, Message& m, stringstream& ss);
bool Database::_openAllFiles = false;
CmdLine cmdLine;
int nloggedsome = 0;
@ -45,7 +53,6 @@ namespace mongo {
SlaveTypes slave = NotSlave;
bool master = false; // true means keep an op log
// extern int curOp;
bool autoresync = false;
/* we use new here so we don't have to worry about destructor orders at program shutdown */
@ -123,24 +130,104 @@ namespace mongo {
replyToQuery(0, m, dbresponse, obj);
}
void receivedQuery(DbResponse& dbresponse, Message& m,
stringstream& ss, bool logit,
mongolock& lock
) {
MSGID responseTo = m.data->id;
DbMessage d(m);
QueryMessage q(d);
QueryResult* msgdata;
try {
if (q.fields.get() && q.fields->errmsg)
uassert(q.fields->errmsg, false);
/* note these are logged BEFORE authentication -- which is sort of ok */
if ( _diaglog.level && logit ) {
if ( strstr(q.ns, ".$cmd") ) {
/* $cmd queries are "commands" and usually best treated as write operations */
OPWRITE;
}
else {
OPREAD;
}
}
setClient( q.ns, dbpath, &lock );
Client& client = cc();
client.top.setRead();
strncpy(client.curop()->ns, q.ns, Namespace::MaxNsLen);
msgdata = runQuery(m, ss ).release();
}
catch ( AssertionException& e ) {
ss << " exception ";
LOGSOME problem() << " Caught Assertion in runQuery ns:" << q.ns << ' ' << e.toString() << '\n';
log() << " ntoskip:" << q.ntoskip << " ntoreturn:" << q.ntoreturn << '\n';
if ( q.query.valid() )
log() << " query:" << q.query.toString() << endl;
else
log() << " query object is not valid!" << endl;
BSONObjBuilder err;
err.append("$err", e.msg.empty() ? "assertion during query" : e.msg);
BSONObj errObj = err.done();
BufBuilder b;
b.skip(sizeof(QueryResult));
b.append((void*) errObj.objdata(), errObj.objsize());
// todo: call replyToQuery() from here instead of this!!! see dbmessage.h
msgdata = (QueryResult *) b.buf();
b.decouple();
QueryResult *qr = msgdata;
qr->resultFlags() = QueryResult::ResultFlag_ErrSet;
qr->len = b.len();
qr->setOperation(opReply);
qr->cursorId = 0;
qr->startingFrom = 0;
qr->nReturned = 1;
}
Message *resp = new Message();
resp->setData(msgdata, true); // transport will free
dbresponse.response = resp;
dbresponse.responseTo = responseTo;
Database *database = cc().database();
if ( database ) {
if ( database->profile )
ss << " bytes:" << resp->data->dataLen();
}
else {
if ( strstr(q.ns, "$cmd") == 0 ) // (this condition is normal for $cmd dropDatabase)
log() << "ERROR: receiveQuery: database is null; ns=" << q.ns << endl;
}
}
// Returns false when request includes 'end'
bool assembleResponse( Message &m, DbResponse &dbresponse, const sockaddr_in &client ) {
bool writeLock = true;
// before we lock...
int op = m.data->operation();
const char *ns = m.data->_data + 4;
if ( op == dbQuery ) {
const char *ns = m.data->_data + 4;
if( strstr(ns, "$cmd.sys.") ) {
if( strstr(ns, "$cmd.sys.inprog") ) {
inProgCmd(m, dbresponse);
return true;
}
if( strstr(ns, "$cmd.sys.killop") ) {
killOp(m, dbresponse);
return true;
if( strstr(ns, ".$cmd.") ) {
if( strstr(ns, ".$cmd.sys.") ) {
if( strstr(ns, "$cmd.sys.inprog") ) {
inProgCmd(m, dbresponse);
return true;
}
if( strstr(ns, "$cmd.sys.killop") ) {
killOp(m, dbresponse);
return true;
}
}
}
else
writeLock = false;
}
else if( op == dbGetMore ) {
writeLock = false;
@ -154,7 +241,7 @@ namespace mongo {
}
mongolock lk(writeLock);
stringstream ss;
char buf[64];
time_t now = time(0);
@ -189,10 +276,10 @@ namespace mongo {
if ( op == dbQuery ) {
// receivedQuery() does its own authorization processing.
receivedQuery(dbresponse, m, ss, true);
receivedQuery(dbresponse, m, ss, true, lk);
}
else if ( op == dbGetMore ) {
// receivedQuery() does its own authorization processing.
// does its own authorization processing.
OPREAD;
DEV log = true;
ss << "getmore ";
@ -387,78 +474,6 @@ namespace mongo {
recordDelete( n );
}
void receivedQuery(DbResponse& dbresponse, /*AbstractMessagingPort& dbMsgPort, */Message& m, stringstream& ss, bool logit) {
MSGID responseTo = m.data->id;
DbMessage d(m);
QueryMessage q(d);
QueryResult* msgdata;
try {
if (q.fields.get() && q.fields->errmsg)
uassert(q.fields->errmsg, false);
/* note these are logged BEFORE authentication -- which is sort of ok */
if ( _diaglog.level && logit ) {
if ( strstr(q.ns, ".$cmd") ) {
/* $cmd queries are "commands" and usually best treated as write operations */
OPWRITE;
}
else {
OPREAD;
}
}
setClient( q.ns );
Client& client = cc();
client.top.setRead();
strncpy(client.curop()->ns, q.ns, Namespace::MaxNsLen);
msgdata = runQuery(m, ss ).release();
}
catch ( AssertionException& e ) {
ss << " exception ";
LOGSOME problem() << " Caught Assertion in runQuery ns:" << q.ns << ' ' << e.toString() << '\n';
log() << " ntoskip:" << q.ntoskip << " ntoreturn:" << q.ntoreturn << '\n';
if ( q.query.valid() )
log() << " query:" << q.query.toString() << endl;
else
log() << " query object is not valid!" << endl;
BSONObjBuilder err;
err.append("$err", e.msg.empty() ? "assertion during query" : e.msg);
BSONObj errObj = err.done();
BufBuilder b;
b.skip(sizeof(QueryResult));
b.append((void*) errObj.objdata(), errObj.objsize());
// todo: call replyToQuery() from here instead of this!!! see dbmessage.h
msgdata = (QueryResult *) b.buf();
b.decouple();
QueryResult *qr = msgdata;
qr->resultFlags() = QueryResult::ResultFlag_ErrSet;
qr->len = b.len();
qr->setOperation(opReply);
qr->cursorId = 0;
qr->startingFrom = 0;
qr->nReturned = 1;
}
Message *resp = new Message();
resp->setData(msgdata, true); // transport will free
dbresponse.response = resp;
dbresponse.responseTo = responseTo;
Database *database = cc().database();
if ( database ) {
if ( database->profile )
ss << " bytes:" << resp->data->dataLen();
}
else {
if ( strstr(q.ns, "$cmd") == 0 ) // (this condition is normal for $cmd dropDatabase)
log() << "ERROR: receiveQuery: database is null; ns=" << q.ns << endl;
}
}
QueryResult* emptyMoreResult(long long);
void receivedGetMore(DbResponse& dbresponse, /*AbstractMessagingPort& dbMsgPort, */Message& m, stringstream& ss) {
@ -522,116 +537,6 @@ namespace mongo {
Message & container;
};
/* a call from jscript to the database locally.
m - inbound message
out - outbound message, if there is any, will be set here.
if there is one, out.data will be non-null on return.
The out.data buffer will automatically clean up when out
goes out of scope (out.freeIt==true)
note we should already be in the mutex lock from connThread() at this point.
*/
/*
void jniCallbackDeprecated(Message& m, Message& out)
{
//
AuthenticationInfo *ai = currentClient.get()->ai;
Database *clientOld = cc().database();
JniMessagingPort jmp(out);
callDepth++;
int curOpOld = curOp;
try {
stringstream ss;
char buf[64];
time_t_to_String(time(0), buf);
buf[20] = 0; // don't want the year
ss << buf << " dbjs ";
{
Timer t;
bool log = logLevel >= 1;
curOp = m.data->operation();
if ( m.data->operation() == dbQuery ) {
// on a query, the Message must have m.freeIt true so that the buffer data can be
// retained by cursors. As freeIt is false, we make a copy here.
assert( m.data->len > 0 && m.data->len < 32000000 );
Message copy(malloc(m.data->len), true);
memcpy(copy.data, m.data, m.data->len);
DbResponse dbr;
receivedQuery(dbr, copy, ss, false);
jmp.reply(m, *dbr.response, dbr.responseTo);
}
else if ( m.data->operation() == dbInsert ) {
ss << "insert ";
receivedInsert(m, ss);
}
else if ( m.data->operation() == dbUpdate ) {
ss << "update ";
receivedUpdate(m, ss);
}
else if ( m.data->operation() == dbDelete ) {
ss << "remove ";
receivedDelete(m, ss);
}
else if ( m.data->operation() == dbGetMore ) {
DEV log = true;
ss << "getmore ";
DbResponse dbr;
receivedGetMore(dbr, m, ss);
jmp.reply(m, *dbr.response, dbr.responseTo);
}
else if ( m.data->operation() == dbKillCursors ) {
try {
log = true;
ss << "killcursors ";
receivedKillCursors(m);
}
catch ( AssertionException& ) {
problem() << "Caught Assertion in kill cursors, continuing" << endl;
ss << " exception ";
}
}
else {
mongo::out() << " jnicall: operation isn't supported: " << m.data->operation() << endl;
assert(false);
}
int ms = t.millis();
log = log || ctr++ % 128 == 0;
if ( log || ms > 100 ) {
ss << ' ' << t.millis() << "ms";
mongo::out() << ss.str().c_str() << endl;
}
Database *database = cc().database();
if ( database && database->profile >= 1 ) {
if ( database->profile >= 2 || ms >= 100 ) {
// profile it
profile(ss.str().c_str()+20, ms);
}
}
}
}
catch ( AssertionException& ) {
problem() << "Caught AssertionException in jniCall()" << endl;
}
curOp = curOpOld;
callDepth--;
if ( cc().database() != clientOld ) {
assert(false);
}
}
*/
void getDatabaseNames( vector< string > &names ) {
boost::filesystem::path path( dbpath );
for ( boost::filesystem::directory_iterator i( path );

View File

@ -99,12 +99,6 @@ namespace mongo {
bool assembleResponse( Message &m, DbResponse &dbresponse, const sockaddr_in &client = unknownAddress.sa );
void receivedKillCursors(Message& m);
void receivedUpdate(Message& m, stringstream& ss);
void receivedDelete(Message& m, stringstream& ss);
void receivedInsert(Message& m, stringstream& ss);
void receivedGetMore(DbResponse& dbresponse, /*AbstractMessagingPort& dbMsgPort, */Message& m, stringstream& ss);
void receivedQuery(DbResponse& dbresponse, /*AbstractMessagingPort& dbMsgPort, */Message& m, stringstream& ss, bool logit);
void getDatabaseNames( vector< string > &names );
// must call with db lock

View File

@ -73,7 +73,7 @@ namespace mongo {
{ //percent_locked
unsigned long long time = curTimeMicros64();
unsigned long long start , lock;
dbMutexInfo.timingInfo( start , lock );
dbMutexInfo.getTimingInfo( start , lock );
if ( lastTime ){
double timeDiff = (double) (time - lastTime);
double lockDiff = (double) (lock - lastLockTime);