2010-04-13 19:50:09 +02:00
|
|
|
// oplog.h - writing to and reading from oplog
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Copyright (C) 2008 10gen Inc.
|
|
|
|
*
|
|
|
|
* This program is free software: you can redistribute it and/or modify
|
|
|
|
* it under the terms of the GNU Affero General Public License, version 3,
|
|
|
|
* as published by the Free Software Foundation.
|
|
|
|
*
|
|
|
|
* This program is distributed in the hope that it will be useful,
|
|
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
* GNU Affero General Public License for more details.
|
|
|
|
*
|
|
|
|
* You should have received a copy of the GNU Affero General Public License
|
|
|
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
*/
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
|
|
|
local.oplog.$main is the default
|
|
|
|
*/
|
|
|
|
|
|
|
|
#pragma once
|
|
|
|
|
|
|
|
#include "pdfile.h"
|
|
|
|
#include "db.h"
|
|
|
|
#include "dbhelpers.h"
|
|
|
|
#include "query.h"
|
|
|
|
#include "queryoptimizer.h"
|
|
|
|
|
|
|
|
#include "../client/dbclient.h"
|
|
|
|
|
|
|
|
#include "../util/optime.h"
|
|
|
|
|
|
|
|
namespace mongo {
|
|
|
|
|
|
|
|
void createOplog();
|
|
|
|
|
|
|
|
/* Write operation to the log (local.oplog.$main)
|
|
|
|
"i" insert
|
|
|
|
"u" update
|
|
|
|
"d" delete
|
|
|
|
"c" db cmd
|
|
|
|
"db" declares presence of a database (ns is set to the db name + '.')
|
|
|
|
*/
|
|
|
|
void logOp(const char *opstr, const char *ns, const BSONObj& obj, BSONObj *patt = 0, bool *b = 0);
|
|
|
|
|
|
|
|
void logKeepalive();
|
|
|
|
|
|
|
|
void oplogCheckCloseDatabase( Database * db );
|
|
|
|
|
|
|
|
extern int __findingStartInitialTimeout; // configurable for testing
|
|
|
|
|
|
|
|
class FindingStartCursor {
|
|
|
|
public:
|
|
|
|
FindingStartCursor( const QueryPlan & qp ) :
|
|
|
|
_qp( qp ),
|
|
|
|
_findingStart( true ),
|
|
|
|
_findingStartMode(),
|
|
|
|
_findingStartTimer( 0 ),
|
|
|
|
_findingStartCursor( 0 )
|
|
|
|
{ init(); }
|
|
|
|
bool done() const { return !_findingStart; }
|
2010-05-07 23:25:57 +02:00
|
|
|
shared_ptr<Cursor> cRelease() { return _c; }
|
2010-04-13 19:50:09 +02:00
|
|
|
void next() {
|
|
|
|
if ( !_findingStartCursor || !_findingStartCursor->c->ok() ) {
|
|
|
|
_findingStart = false;
|
|
|
|
_c = _qp.newCursor(); // on error, start from beginning
|
|
|
|
destroyClientCursor();
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
switch( _findingStartMode ) {
|
|
|
|
case Initial: {
|
|
|
|
if ( !_matcher->matches( _findingStartCursor->c->currKey(), _findingStartCursor->c->currLoc() ) ) {
|
|
|
|
_findingStart = false; // found first record out of query range, so scan normally
|
|
|
|
_c = _qp.newCursor( _findingStartCursor->c->currLoc() );
|
|
|
|
destroyClientCursor();
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
_findingStartCursor->c->advance();
|
|
|
|
RARELY {
|
|
|
|
if ( _findingStartTimer.seconds() >= __findingStartInitialTimeout ) {
|
|
|
|
createClientCursor( startLoc( _findingStartCursor->c->currLoc() ) );
|
|
|
|
_findingStartMode = FindExtent;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
maybeRelease();
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
case FindExtent: {
|
|
|
|
if ( !_matcher->matches( _findingStartCursor->c->currKey(), _findingStartCursor->c->currLoc() ) ) {
|
|
|
|
_findingStartMode = InExtent;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
DiskLoc prev = prevLoc( _findingStartCursor->c->currLoc() );
|
|
|
|
if ( prev.isNull() ) { // hit beginning, so start scanning from here
|
|
|
|
createClientCursor();
|
|
|
|
_findingStartMode = InExtent;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
// There might be a more efficient implementation than creating new cursor & client cursor each time,
|
|
|
|
// not worrying about that for now
|
|
|
|
createClientCursor( prev );
|
|
|
|
maybeRelease();
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
case InExtent: {
|
|
|
|
if ( _matcher->matches( _findingStartCursor->c->currKey(), _findingStartCursor->c->currLoc() ) ) {
|
|
|
|
_findingStart = false; // found first record in query range, so scan normally
|
|
|
|
_c = _qp.newCursor( _findingStartCursor->c->currLoc() );
|
|
|
|
destroyClientCursor();
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
_findingStartCursor->c->advance();
|
|
|
|
maybeRelease();
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
default: {
|
|
|
|
massert( 12600, "invalid _findingStartMode", false );
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
private:
|
|
|
|
enum FindingStartMode { Initial, FindExtent, InExtent };
|
|
|
|
const QueryPlan &_qp;
|
|
|
|
bool _findingStart;
|
|
|
|
FindingStartMode _findingStartMode;
|
|
|
|
auto_ptr< CoveredIndexMatcher > _matcher;
|
|
|
|
Timer _findingStartTimer;
|
|
|
|
ClientCursor * _findingStartCursor;
|
2010-05-07 23:25:57 +02:00
|
|
|
shared_ptr<Cursor> _c;
|
2010-04-13 19:50:09 +02:00
|
|
|
DiskLoc startLoc( const DiskLoc &rec ) {
|
|
|
|
Extent *e = rec.rec()->myExtent( rec );
|
2010-05-13 09:13:55 +02:00
|
|
|
if ( !_qp.nsd()->capLooped() || ( e->myLoc != _qp.nsd()->capExtent ) )
|
2010-04-13 19:50:09 +02:00
|
|
|
return e->firstRecord;
|
|
|
|
// Likely we are on the fresh side of capExtent, so return first fresh record.
|
|
|
|
// If we are on the stale side of capExtent, then the collection is small and it
|
|
|
|
// doesn't matter if we start the extent scan with capFirstNewRecord.
|
|
|
|
return _qp.nsd()->capFirstNewRecord;
|
|
|
|
}
|
|
|
|
|
2010-05-13 09:13:55 +02:00
|
|
|
// should never have an empty extent in the oplog, so don't worry about that case
|
2010-04-13 19:50:09 +02:00
|
|
|
DiskLoc prevLoc( const DiskLoc &rec ) {
|
|
|
|
Extent *e = rec.rec()->myExtent( rec );
|
2010-05-13 09:13:55 +02:00
|
|
|
if ( _qp.nsd()->capLooped() ) {
|
|
|
|
if ( e->xprev.isNull() )
|
|
|
|
e = _qp.nsd()->lastExtent.ext();
|
|
|
|
else
|
|
|
|
e = e->xprev.ext();
|
|
|
|
if ( e->myLoc != _qp.nsd()->capExtent )
|
|
|
|
return e->firstRecord;
|
|
|
|
} else {
|
|
|
|
if ( !e->xprev.isNull() ) {
|
|
|
|
e = e->xprev.ext();
|
|
|
|
return e->firstRecord;
|
|
|
|
}
|
|
|
|
}
|
2010-04-13 19:50:09 +02:00
|
|
|
return DiskLoc(); // reached beginning of collection
|
|
|
|
}
|
|
|
|
void createClientCursor( const DiskLoc &startLoc = DiskLoc() ) {
|
2010-05-07 23:25:57 +02:00
|
|
|
shared_ptr<Cursor> c = _qp.newCursor( startLoc );
|
2010-04-13 19:50:09 +02:00
|
|
|
_findingStartCursor = new ClientCursor(QueryOption_NoCursorTimeout, c, _qp.ns());
|
|
|
|
}
|
|
|
|
void destroyClientCursor() {
|
|
|
|
if ( _findingStartCursor ) {
|
|
|
|
ClientCursor::erase( _findingStartCursor->cursorid );
|
|
|
|
_findingStartCursor = 0;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
void maybeRelease() {
|
|
|
|
RARELY {
|
|
|
|
CursorId id = _findingStartCursor->cursorid;
|
|
|
|
_findingStartCursor->updateLocation();
|
|
|
|
{
|
|
|
|
dbtemprelease t;
|
|
|
|
}
|
|
|
|
_findingStartCursor = ClientCursor::find( id, false );
|
|
|
|
}
|
|
|
|
}
|
|
|
|
void init() {
|
|
|
|
// Use a ClientCursor here so we can release db mutex while scanning
|
|
|
|
// oplog (can take quite a while with large oplogs).
|
2010-05-07 23:25:57 +02:00
|
|
|
shared_ptr<Cursor> c = _qp.newReverseCursor();
|
2010-04-13 19:50:09 +02:00
|
|
|
_findingStartCursor = new ClientCursor(QueryOption_NoCursorTimeout, c, _qp.ns());
|
|
|
|
_findingStartTimer.reset();
|
|
|
|
_findingStartMode = Initial;
|
|
|
|
BSONElement tsElt = _qp.query()[ "ts" ];
|
|
|
|
massert( 13044, "no ts field in query", !tsElt.eoo() );
|
|
|
|
BSONObjBuilder b;
|
|
|
|
b.append( tsElt );
|
|
|
|
BSONObj tsQuery = b.obj();
|
|
|
|
_matcher.reset(new CoveredIndexMatcher(tsQuery, _qp.indexKey()));
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
}
|