0
0
mirror of https://github.com/mongodb/mongo.git synced 2024-11-30 17:10:48 +01:00

fix concurrency around GhostSync

This commit is contained in:
Eliot Horowitz 2011-07-16 08:03:52 -04:00
parent beb8dc7084
commit a1ab42a281
2 changed files with 47 additions and 26 deletions

View File

@ -111,10 +111,13 @@ namespace mongo {
/** /**
* This is a cache of ghost slaves * This is a cache of ghost slaves
*/ */
map<mongo::OID,GhostSlave> _ghostCache; typedef map<mongo::OID,GhostSlave> MAP;
MAP _ghostCache;
RWLock _lock; // protects _ghostCache
ReplSetImpl *rs; ReplSetImpl *rs;
public: public:
GhostSync(ReplSetImpl *_rs) : task::Server("rs ghost sync"), rs(_rs) {} GhostSync(ReplSetImpl *_rs) : task::Server("rs ghost sync"), _lock("GhostSync"), rs(_rs) {}
~GhostSync() { ~GhostSync() {
log() << "~GhostSync() called" << rsLog; log() << "~GhostSync() called" << rsLog;
} }

View File

@ -512,8 +512,10 @@ namespace mongo {
cc().shutdown(); cc().shutdown();
} }
void GhostSync::associateSlave(const BSONObj& rid, const int memberId) { void GhostSync::associateSlave(const BSONObj& id, const int memberId) {
GhostSlave &slave = _ghostCache[rid["_id"].OID()]; const OID rid = id["_id"].OID();
rwlock lk( _lock , true );
GhostSlave &slave = _ghostCache[rid];
if (slave.init) { if (slave.init) {
LOG(1) << "tracking " << slave.slave->h().toString() << " as " << rid << rsLog; LOG(1) << "tracking " << slave.slave->h().toString() << " as " << rid << rsLog;
return; return;
@ -529,66 +531,82 @@ namespace mongo {
} }
} }
void GhostSync::updateSlave(const mongo::OID& id, const OpTime& last) { void GhostSync::updateSlave(const mongo::OID& rid, const OpTime& last) {
GhostSlave& slave = _ghostCache[id]; rwlock lk( _lock , false );
MAP::iterator i = _ghostCache.find( rid );
if ( i == _ghostCache.end() ) {
OCCASIONALLY log() << "couldn't update slave " << rid << " no entry" << rsLog;
return;
}
GhostSlave& slave = i->second;
if (!slave.init) { if (!slave.init) {
OCCASIONALLY log() << "couldn't update slave " << id << rsLog; OCCASIONALLY log() << "couldn't update slave " << rid << " not init" << rsLog;
return; return;
} }
((ReplSetConfig::MemberCfg)slave.slave->config()).updateGroups(last); ((ReplSetConfig::MemberCfg)slave.slave->config()).updateGroups(last);
} }
void GhostSync::percolate(const BSONObj& rid, const OpTime& last) { void GhostSync::percolate(const BSONObj& id, const OpTime& last) {
GhostSlave &s = _ghostCache[rid["_id"].OID()]; const OID rid = id["_id"].OID();
if (!s.init) { rwlock lk( _lock , false );
log() << "replSet couldn't find a slave with id " << rid
<< ", not faux syncing" << rsLog; MAP::iterator i = _ghostCache.find( rid );
if ( i == _ghostCache.end() ) {
OCCASIONALLY log() << "couldn't percolate slave " << rid << " no entry" << rsLog;
return; return;
} }
assert(s.slave);
GhostSlave& slave = i->second;
if (!slave.init) {
OCCASIONALLY log() << "couldn't percolate slave " << rid << " not init" << rsLog;
return;
}
assert(slave.slave);
const Member *target = rs->_currentSyncTarget; const Member *target = rs->_currentSyncTarget;
if (!target || rs->box.getState().primary() if (!target || rs->box.getState().primary()
// we are currently syncing from someone who's syncing from us // we are currently syncing from someone who's syncing from us
// the target might end up with a new Member, but s.slave never // the target might end up with a new Member, but s.slave never
// changes so we'll compare the names // changes so we'll compare the names
|| target == s.slave || target->fullName() == s.slave->fullName()) { || target == slave.slave || target->fullName() == slave.slave->fullName()) {
log(1) << "replica set ghost target no good" << endl; log(1) << "replica set ghost target no good" << endl;
return; return;
} }
try { try {
if (!s.reader.haveCursor()) { if (!slave.reader.haveCursor()) {
if (!s.reader.connect(rid, s.slave->id(), target->fullName())) { if (!slave.reader.connect(id, slave.slave->id(), target->fullName())) {
// error message logged in OplogReader::connect // error message logged in OplogReader::connect
return; return;
} }
s.reader.ghostQueryGTE(rsoplog, last); slave.reader.ghostQueryGTE(rsoplog, last);
} }
log(1) << "last: " << s.last.toString() << " to " << last.toString() << rsLog; log(1) << "last: " << slave.last.toString() << " to " << last.toString() << rsLog;
if (s.last > last) { if (slave.last > last) {
return; return;
} }
while (s.last <= last) { while (slave.last <= last) {
if (!s.reader.more()) { if (!slave.reader.more()) {
// we'll be back // we'll be back
return; return;
} }
BSONObj o = s.reader.nextSafe(); BSONObj o = slave.reader.nextSafe();
s.last = o["ts"]._opTime(); slave.last = o["ts"]._opTime();
} }
log(2) << "now last is " << s.last.toString() << rsLog; log(2) << "now last is " << slave.last.toString() << rsLog;
} }
catch (DBException& e) { catch (DBException& e) {
// we'll be back // we'll be back
log(2) << "replSet ghost sync error: " << e.what() << " for " log(2) << "replSet ghost sync error: " << e.what() << " for "
<< s.slave->fullName() << rsLog; << slave.slave->fullName() << rsLog;
s.reader.resetConnection(); slave.reader.resetConnection();
} }
} }
} }