From a1ab42a2811b6984ab11c018ebb7b240fad4c705 Mon Sep 17 00:00:00 2001 From: Eliot Horowitz Date: Sat, 16 Jul 2011 08:03:52 -0400 Subject: [PATCH] fix concurrency around GhostSync --- db/repl/rs.h | 7 +++-- db/repl/rs_sync.cpp | 66 ++++++++++++++++++++++++++++----------------- 2 files changed, 47 insertions(+), 26 deletions(-) diff --git a/db/repl/rs.h b/db/repl/rs.h index 6f61d113d0a..8677f91638a 100644 --- a/db/repl/rs.h +++ b/db/repl/rs.h @@ -111,10 +111,13 @@ namespace mongo { /** * This is a cache of ghost slaves */ - map _ghostCache; + typedef map MAP; + MAP _ghostCache; + RWLock _lock; // protects _ghostCache ReplSetImpl *rs; + public: - GhostSync(ReplSetImpl *_rs) : task::Server("rs ghost sync"), rs(_rs) {} + GhostSync(ReplSetImpl *_rs) : task::Server("rs ghost sync"), _lock("GhostSync"), rs(_rs) {} ~GhostSync() { log() << "~GhostSync() called" << rsLog; } diff --git a/db/repl/rs_sync.cpp b/db/repl/rs_sync.cpp index 5850b22ce59..ec0131afad5 100644 --- a/db/repl/rs_sync.cpp +++ b/db/repl/rs_sync.cpp @@ -512,8 +512,10 @@ namespace mongo { cc().shutdown(); } - void GhostSync::associateSlave(const BSONObj& rid, const int memberId) { - GhostSlave &slave = _ghostCache[rid["_id"].OID()]; + void GhostSync::associateSlave(const BSONObj& id, const int memberId) { + const OID rid = id["_id"].OID(); + rwlock lk( _lock , true ); + GhostSlave &slave = _ghostCache[rid]; if (slave.init) { LOG(1) << "tracking " << slave.slave->h().toString() << " as " << rid << rsLog; return; @@ -529,66 +531,82 @@ namespace mongo { } } - void GhostSync::updateSlave(const mongo::OID& id, const OpTime& last) { - GhostSlave& slave = _ghostCache[id]; + void GhostSync::updateSlave(const mongo::OID& rid, const OpTime& last) { + rwlock lk( _lock , false ); + MAP::iterator i = _ghostCache.find( rid ); + if ( i == _ghostCache.end() ) { + OCCASIONALLY log() << "couldn't update slave " << rid << " no entry" << rsLog; + return; + } + + GhostSlave& slave = i->second; if (!slave.init) { - OCCASIONALLY log() << "couldn't update slave " << id << rsLog; + OCCASIONALLY log() << "couldn't update slave " << rid << " not init" << rsLog; return; } ((ReplSetConfig::MemberCfg)slave.slave->config()).updateGroups(last); } - void GhostSync::percolate(const BSONObj& rid, const OpTime& last) { - GhostSlave &s = _ghostCache[rid["_id"].OID()]; - if (!s.init) { - log() << "replSet couldn't find a slave with id " << rid - << ", not faux syncing" << rsLog; + void GhostSync::percolate(const BSONObj& id, const OpTime& last) { + const OID rid = id["_id"].OID(); + rwlock lk( _lock , false ); + + MAP::iterator i = _ghostCache.find( rid ); + if ( i == _ghostCache.end() ) { + OCCASIONALLY log() << "couldn't percolate slave " << rid << " no entry" << rsLog; return; } - assert(s.slave); + + GhostSlave& slave = i->second; + if (!slave.init) { + OCCASIONALLY log() << "couldn't percolate slave " << rid << " not init" << rsLog; + return; + } + + assert(slave.slave); const Member *target = rs->_currentSyncTarget; if (!target || rs->box.getState().primary() // we are currently syncing from someone who's syncing from us // the target might end up with a new Member, but s.slave never // changes so we'll compare the names - || target == s.slave || target->fullName() == s.slave->fullName()) { + || target == slave.slave || target->fullName() == slave.slave->fullName()) { log(1) << "replica set ghost target no good" << endl; return; } try { - if (!s.reader.haveCursor()) { - if (!s.reader.connect(rid, s.slave->id(), target->fullName())) { + if (!slave.reader.haveCursor()) { + if (!slave.reader.connect(id, slave.slave->id(), target->fullName())) { // error message logged in OplogReader::connect return; } - s.reader.ghostQueryGTE(rsoplog, last); + slave.reader.ghostQueryGTE(rsoplog, last); } - log(1) << "last: " << s.last.toString() << " to " << last.toString() << rsLog; + log(1) << "last: " << slave.last.toString() << " to " << last.toString() << rsLog; - if (s.last > last) { + if (slave.last > last) { return; } - while (s.last <= last) { - if (!s.reader.more()) { + while (slave.last <= last) { + if (!slave.reader.more()) { // we'll be back return; } - BSONObj o = s.reader.nextSafe(); - s.last = o["ts"]._opTime(); + BSONObj o = slave.reader.nextSafe(); + slave.last = o["ts"]._opTime(); } - log(2) << "now last is " << s.last.toString() << rsLog; + log(2) << "now last is " << slave.last.toString() << rsLog; } catch (DBException& e) { // we'll be back log(2) << "replSet ghost sync error: " << e.what() << " for " - << s.slave->fullName() << rsLog; - s.reader.resetConnection(); + << slave.slave->fullName() << rsLog; + slave.reader.resetConnection(); } } }