0
0
mirror of https://github.com/mongodb/mongo.git synced 2024-11-30 17:10:48 +01:00
mongodb/db/btree.cpp

903 lines
31 KiB
C++

// btree.cpp
/**
* Copyright (C) 2008 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "stdafx.h"
#include "btree.h"
#include "pdfile.h"
#include "../util/unittest.h"
#include "json.h"
namespace mongo {
#define VERIFYTHISLOC dassert( thisLoc.btree() == this );
const int KeyMax = BucketSize / 10;
int ninserts = 0;
extern int otherTraceLevel;
int split_debug = 0;
int insert_debug = 0;
KeyNode::KeyNode(BucketBasics& bb, _KeyNode &k) :
prevChildBucket(k.prevChildBucket),
recordLoc(k.recordLoc), key(bb.data+k.keyDataOfs())
{ }
/* BucketBasics --------------------------------------------------- */
inline void BucketBasics::modified(const DiskLoc& thisLoc) {
VERIFYTHISLOC
BtreeStore::modified(thisLoc);
}
int BucketBasics::Size() const {
assert( _Size == BucketSize );
return _Size;
}
inline void BucketBasics::setNotPacked() {
flags &= ~Packed;
}
inline void BucketBasics::setPacked() {
flags |= Packed;
}
void BucketBasics::_shape(int level, stringstream& ss) {
for ( int i = 0; i < level; i++ ) ss << ' ';
ss << "*\n";
for ( int i = 0; i < n; i++ )
if ( !k(i).prevChildBucket.isNull() )
k(i).prevChildBucket.btree()->_shape(level+1,ss);
if ( !nextChild.isNull() )
nextChild.btree()->_shape(level+1,ss);
}
int bt_fv=0;
int bt_dmp=0;
void BucketBasics::dumpTree(DiskLoc thisLoc, const BSONObj &order) {
bt_dmp=1;
fullValidate(thisLoc, order);
bt_dmp=0;
}
int BucketBasics::fullValidate(const DiskLoc& thisLoc, const BSONObj &order) {
assertValid(order, true);
// if( bt_fv==0 )
// return;
if ( bt_dmp ) {
out() << thisLoc.toString() << ' ';
((BtreeBucket *) this)->dump();
}
// keycount
int kc = 0;
for ( int i = 0; i < n; i++ ) {
_KeyNode& kn = k(i);
if ( kn.isUsed() ) kc++;
if ( !kn.prevChildBucket.isNull() ) {
DiskLoc left = kn.prevChildBucket;
BtreeBucket *b = left.btree();
wassert( b->parent == thisLoc );
kc += b->fullValidate(kn.prevChildBucket, order);
}
}
if ( !nextChild.isNull() ) {
BtreeBucket *b = nextChild.btree();
wassert( b->parent == thisLoc );
kc += b->fullValidate(nextChild, order);
}
return kc;
}
int nDumped = 0;
void BucketBasics::assertValid(const BSONObj &order, bool force) {
if ( !debug && !force )
return;
wassert( n >= 0 && n < Size() );
wassert( emptySize >= 0 && emptySize < BucketSize );
wassert( topSize >= n && topSize <= BucketSize );
DEV {
// slow:
for ( int i = 0; i < n-1; i++ ) {
BSONObj k1 = keyNode(i).key;
BSONObj k2 = keyNode(i+1).key;
int z = k1.woCompare(k2, order); //OK
if ( z > 0 ) {
out() << "ERROR: btree key order corrupt. Keys:" << endl;
if ( ++nDumped < 5 ) {
for ( int j = 0; j < n; j++ ) {
out() << " " << keyNode(j).key.toString() << endl;
}
((BtreeBucket *) this)->dump();
}
wassert(false);
break;
}
else if ( z == 0 ) {
if ( !(k(i).recordLoc < k(i+1).recordLoc) ) {
out() << "ERROR: btree key order corrupt (recordloc's wrong). Keys:" << endl;
out() << " k(" << i << "):" << keyNode(i).key.toString() << " RL:" << k(i).recordLoc.toString() << endl;
out() << " k(" << i+1 << "):" << keyNode(i+1).key.toString() << " RL:" << k(i+1).recordLoc.toString() << endl;
wassert( k(i).recordLoc < k(i+1).recordLoc );
}
}
}
}
else {
//faster:
if ( n > 1 ) {
BSONObj k1 = keyNode(0).key;
BSONObj k2 = keyNode(n-1).key;
int z = k1.woCompare(k2, order);
//wassert( z <= 0 );
if ( z > 0 ) {
problem() << "btree keys out of order" << '\n';
ONCE {
((BtreeBucket *) this)->dump();
}
assert(false);
}
}
}
}
inline void BucketBasics::markUnused(int keypos) {
assert( keypos >= 0 && keypos < n );
k(keypos).setUnused();
}
inline int BucketBasics::totalDataSize() const {
return Size() - (data-(char*)this);
}
void BucketBasics::init() {
parent.Null();
nextChild.Null();
_Size = BucketSize;
flags = Packed;
n = 0;
emptySize = totalDataSize();
topSize = 0;
reserved = 0;
}
/* we allocate space from the end of the buffer for data.
the keynodes grow from the front.
*/
inline int BucketBasics::_alloc(int bytes) {
topSize += bytes;
emptySize -= bytes;
int ofs = totalDataSize() - topSize;
assert( ofs > 0 );
return ofs;
}
void BucketBasics::_delKeyAtPos(int keypos) {
assert( keypos >= 0 && keypos <= n );
assert( childForPos(keypos).isNull() );
n--;
assert( n > 0 || nextChild.isNull() );
for ( int j = keypos; j < n; j++ )
k(j) = k(j+1);
emptySize += sizeof(_KeyNode);
setNotPacked();
}
/* add a key. must be > all existing. be careful to set next ptr right. */
void BucketBasics::pushBack(const DiskLoc& recordLoc, BSONObj& key, const BSONObj &order, DiskLoc prevChild) {
int bytesNeeded = key.objsize() + sizeof(_KeyNode);
assert( bytesNeeded <= emptySize );
assert( n == 0 || keyNode(n-1).key.woCompare(key, order) <= 0 );
emptySize -= sizeof(_KeyNode);
_KeyNode& kn = k(n++);
kn.prevChildBucket = prevChild;
kn.recordLoc = recordLoc;
kn.setKeyDataOfs( (short) _alloc(key.objsize()) );
char *p = dataAt(kn.keyDataOfs());
memcpy(p, key.objdata(), key.objsize());
}
/* insert a key in a bucket with no complexity -- no splits required */
bool BucketBasics::basicInsert(const DiskLoc& thisLoc, int keypos, const DiskLoc& recordLoc, BSONObj& key, const BSONObj &order) {
modified(thisLoc);
assert( keypos >= 0 && keypos <= n );
int bytesNeeded = key.objsize() + sizeof(_KeyNode);
if ( bytesNeeded > emptySize ) {
pack( order );
if ( bytesNeeded > emptySize )
return false;
}
for ( int j = n; j > keypos; j-- ) // make room
k(j) = k(j-1);
n++;
emptySize -= sizeof(_KeyNode);
_KeyNode& kn = k(keypos);
kn.prevChildBucket.Null();
kn.recordLoc = recordLoc;
kn.setKeyDataOfs((short) _alloc(key.objsize()) );
char *p = dataAt(kn.keyDataOfs());
memcpy(p, key.objdata(), key.objsize());
return true;
}
/* when we delete things we just leave empty space until the node is
full and then we repack it.
*/
void BucketBasics::pack( const BSONObj &order ) {
if ( flags & Packed )
return;
int tdz = totalDataSize();
char temp[BucketSize];
int ofs = tdz;
topSize = 0;
for ( int j = 0; j < n; j++ ) {
short ofsold = k(j).keyDataOfs();
int sz = keyNode(j).key.objsize();
ofs -= sz;
topSize += sz;
memcpy(temp+ofs, dataAt(ofsold), sz);
k(j).setKeyDataOfsSavingUse( ofs );
}
int dataUsed = tdz - ofs;
memcpy(data + ofs, temp + ofs, dataUsed);
emptySize = tdz - dataUsed - n * sizeof(_KeyNode);
assert( emptySize >= 0 );
setPacked();
assertValid( order );
}
inline void BucketBasics::truncateTo(int N, const BSONObj &order) {
n = N;
setNotPacked();
pack( order );
}
/* - BtreeBucket --------------------------------------------------- */
/* return largest key in the subtree. */
void BtreeBucket::findLargestKey(const DiskLoc& thisLoc, DiskLoc& largestLoc, int& largestKey) {
DiskLoc loc = thisLoc;
while ( 1 ) {
BtreeBucket *b = loc.btree();
if ( !b->nextChild.isNull() ) {
loc = b->nextChild;
continue;
}
assert(b->n>0);
largestLoc = loc;
largestKey = b->n-1;
break;
}
}
extern DiskLoc minDiskLoc;
bool BtreeBucket::exists(IndexDetails& idx, DiskLoc thisLoc, BSONObj& key, BSONObj order) {
int pos;
bool found;
DiskLoc b = locate(idx, thisLoc, key, order, pos, found, minDiskLoc);
// skip unused keys
while ( 1 ) {
if( b.isNull() )
break;
BtreeBucket *bucket = b.btree();
_KeyNode& kn = bucket->k(pos);
if ( kn.isUsed() )
return bucket->keyAt(pos).woEqual(key);
b = bucket->advance(b, pos, 1, "BtreeBucket::exists");
}
return false;
}
/* Find a key withing this btree bucket.
When duplicate keys are allowed, we use the DiskLoc of the record as if it were part of the
key. That assures that even when there are many duplicates (e.g., 1 million) for a key,
our performance is still good.
assertIfDup: if the key exists (ignoring the recordLoc), uassert
pos: for existing keys k0...kn-1.
returns # it goes BEFORE. so key[pos-1] < key < key[pos]
returns n if it goes after the last existing key.
note result might be an Unused location!
*/
char foo;
bool BtreeBucket::find(IndexDetails& idx, BSONObj& key, DiskLoc recordLoc, const BSONObj &order, int& pos, bool assertIfDup) {
#if defined(_EXPERIMENT1)
{
char *z = (char *) this;
int i = 0;
while( 1 ) {
i += 4096;
if( i >= BucketSize )
break;
foo += z[i];
}
}
#endif
/* binary search for this key */
bool dupsChecked = false;
int l=0;
int h=n-1;
while ( l <= h ) {
int m = (l+h)/2;
KeyNode M = keyNode(m);
int x = key.woCompare(M.key, order);
if ( x == 0 ) {
if( assertIfDup ) {
if( k(m).isUnused() ) {
// ok that key is there if unused. but we need to check that there aren't other
// entries for the key then. as it is very rare that we get here, we don't put any
// coding effort in here to make this particularly fast
if( !dupsChecked ) {
dupsChecked = true;
if( idx.head.btree()->exists(idx, idx.head, key, order) )
uasserted("E11000 duplicate key error");
}
}
else
uasserted("E11000 duplicate key error");
}
// dup keys allowed. use recordLoc as if it is part of the key
DiskLoc unusedRL = M.recordLoc;
unusedRL.GETOFS() &= ~1; // so we can test equality without the used bit messing us up
x = recordLoc.compare(unusedRL);
}
if ( x < 0 ) // key < M.key
h = m-1;
else if ( x > 0 )
l = m+1;
else {
// found it.
pos = m;
return true;
}
}
// not found
pos = l;
if ( pos != n ) {
BSONObj keyatpos = keyNode(pos).key;
wassert( key.woCompare(keyatpos, order) <= 0 );
if ( pos > 0 ) {
wassert( keyNode(pos-1).key.woCompare(key, order) <= 0 );
}
}
return false;
}
void aboutToDeleteBucket(const DiskLoc&);
void BtreeBucket::delBucket(const DiskLoc& thisLoc, IndexDetails& id) {
aboutToDeleteBucket(thisLoc);
assert( !isHead() );
BtreeBucket *p = parent.btreemod();
if ( p->nextChild == thisLoc ) {
p->nextChild.Null();
}
else {
for ( int i = 0; i < p->n; i++ ) {
if ( p->k(i).prevChildBucket == thisLoc ) {
p->k(i).prevChildBucket.Null();
goto found;
}
}
out() << "ERROR: can't find ref to deleted bucket.\n";
out() << "To delete:\n";
dump();
out() << "Parent:\n";
p->dump();
assert(false);
}
found:
#if 1
/* as a temporary defensive measure, we zap the whole bucket, AND don't truly delete
it (meaning it is ineligible for reuse).
*/
memset(this, 0, Size());
modified(thisLoc);
#else
//defensive:
n = -1;
parent.Null();
massert("todo: use RecStoreInterface instead", false);
// TODO: this was broken anyway as deleteRecord does unindexRecord() call which assumes the data is a BSONObj,
// and it isn't.
assert(false);
// theDataFileMgr.deleteRecord(id.indexNamespace().c_str(), thisLoc.rec(), thisLoc);
#endif
}
/* note: may delete the entire bucket! this invalid upon return sometimes. */
void BtreeBucket::delKeyAtPos(const DiskLoc& thisLoc, IndexDetails& id, int p) {
modified(thisLoc);
assert(n>0);
DiskLoc left = childForPos(p);
if ( n == 1 ) {
if ( left.isNull() && nextChild.isNull() ) {
if ( isHead() )
_delKeyAtPos(p); // we don't delete the top bucket ever
else
delBucket(thisLoc, id);
return;
}
markUnused(p);
return;
}
if ( left.isNull() )
_delKeyAtPos(p);
else
markUnused(p);
}
int qqq = 0;
/* remove a key from the index */
bool BtreeBucket::unindex(const DiskLoc& thisLoc, IndexDetails& id, BSONObj& key, const DiskLoc& recordLoc ) {
if ( key.objsize() > KeyMax ) {
OCCASIONALLY problem() << "unindex: key too large to index, skipping " << id.indexNamespace() << /* ' ' << key.toString() << */ '\n';
return false;
}
int pos;
bool found;
DiskLoc loc = locate(id, thisLoc, key, id.keyPattern(), pos, found, recordLoc, 1);
if ( found ) {
loc.btree()->delKeyAtPos(loc, id, pos);
return true;
}
return false;
}
BtreeBucket* BtreeBucket::allocTemp() {
BtreeBucket *b = (BtreeBucket*) malloc(BucketSize);
b->init();
return b;
}
inline void fix(const DiskLoc& thisLoc, const DiskLoc& child) {
if ( !child.isNull() ) {
if ( insert_debug )
out() << " " << child.toString() << ".parent=" << thisLoc.toString() << endl;
child.btreemod()->parent = thisLoc;
}
}
/* this sucks. maybe get rid of parent ptrs. */
void BtreeBucket::fixParentPtrs(const DiskLoc& thisLoc) {
VERIFYTHISLOC
fix(thisLoc, nextChild);
for ( int i = 0; i < n; i++ )
fix(thisLoc, k(i).prevChildBucket);
}
/* insert a key in this bucket, splitting if necessary.
keypos - where to insert the key i3n range 0..n. 0=make leftmost, n=make rightmost.
*/
void BtreeBucket::insertHere(DiskLoc thisLoc, int keypos,
DiskLoc recordLoc, BSONObj& key, const BSONObj& order,
DiskLoc lchild, DiskLoc rchild, IndexDetails& idx)
{
modified(thisLoc);
if ( insert_debug )
out() << " " << thisLoc.toString() << ".insertHere " << key.toString() << '/' << recordLoc.toString() << ' '
<< lchild.toString() << ' ' << rchild.toString() << " keypos:" << keypos << endl;
DiskLoc oldLoc = thisLoc;
if ( basicInsert(thisLoc, keypos, recordLoc, key, order) ) {
_KeyNode& kn = k(keypos);
if ( keypos+1 == n ) { // last key
if ( nextChild != lchild ) {
out() << "ERROR nextChild != lchild" << endl;
out() << " thisLoc: " << thisLoc.toString() << ' ' << idx.indexNamespace() << endl;
out() << " keyPos: " << keypos << " n:" << n << endl;
out() << " nextChild: " << nextChild.toString() << " lchild: " << lchild.toString() << endl;
out() << " recordLoc: " << recordLoc.toString() << " rchild: " << rchild.toString() << endl;
out() << " key: " << key.toString() << endl;
dump();
#if 0
out() << "\n\nDUMPING FULL INDEX" << endl;
bt_dmp=1;
bt_fv=1;
idx.head.btree()->fullValidate(idx.head);
#endif
assert(false);
}
kn.prevChildBucket = nextChild;
assert( kn.prevChildBucket == lchild );
nextChild = rchild;
if ( !rchild.isNull() )
rchild.btreemod()->parent = thisLoc;
}
else {
k(keypos).prevChildBucket = lchild;
if ( k(keypos+1).prevChildBucket != lchild ) {
out() << "ERROR k(keypos+1).prevChildBucket != lchild" << endl;
out() << " thisLoc: " << thisLoc.toString() << ' ' << idx.indexNamespace() << endl;
out() << " keyPos: " << keypos << " n:" << n << endl;
out() << " k(keypos+1).pcb: " << k(keypos+1).prevChildBucket.toString() << " lchild: " << lchild.toString() << endl;
out() << " recordLoc: " << recordLoc.toString() << " rchild: " << rchild.toString() << endl;
out() << " key: " << key.toString() << endl;
dump();
#if 0
out() << "\n\nDUMPING FULL INDEX" << endl;
bt_dmp=1;
bt_fv=1;
idx.head.btree()->fullValidate(idx.head);
#endif
assert(false);
}
k(keypos+1).prevChildBucket = rchild;
if ( !rchild.isNull() )
rchild.btreemod()->parent = thisLoc;
}
return;
}
/* ---------- split ---------------- */
if ( split_debug )
out() << " " << thisLoc.toString() << ".split" << endl;
int mid = n / 2;
BtreeBucket *r = allocTemp();
DiskLoc rLoc;
if ( split_debug )
out() << " mid:" << mid << ' ' << keyNode(mid).key.toString() << " n:" << n << endl;
for ( int i = mid+1; i < n; i++ ) {
KeyNode kn = keyNode(i);
r->pushBack(kn.recordLoc, kn.key, order, kn.prevChildBucket);
}
r->nextChild = nextChild;
r->assertValid( order );
rLoc = BtreeStore::insert(idx.indexNamespace().c_str(), r, r->Size(), true);
if ( split_debug )
out() << " new rLoc:" << rLoc.toString() << endl;
free(r);
r = 0;
rLoc.btree()->fixParentPtrs(rLoc);
{
KeyNode middle = keyNode(mid);
nextChild = middle.prevChildBucket; // middle key gets promoted, its children will be thisLoc (l) and rLoc (r)
if ( split_debug ) {
out() << " middle key:" << middle.key.toString() << endl;
}
// promote middle to a parent node
if ( parent.isNull() ) {
// make a new parent if we were the root
BtreeBucket *p = allocTemp();
p->pushBack(middle.recordLoc, middle.key, order, thisLoc);
p->nextChild = rLoc;
p->assertValid( order );
parent = idx.head = BtreeStore::insert(idx.indexNamespace().c_str(), p, p->Size(), true);
if ( split_debug )
out() << " we were root, making new root:" << hex << parent.getOfs() << dec << endl;
free(p);
rLoc.btreemod()->parent = parent;
}
else {
/* set this before calling _insert - if it splits it will do fixParent() logic and change the value.
*/
rLoc.btreemod()->parent = parent;
if ( split_debug )
out() << " promoting middle key " << middle.key.toString() << endl;
parent.btree()->_insert(parent, middle.recordLoc, middle.key, order, /*dupsallowed*/true, thisLoc, rLoc, idx);
}
}
truncateTo(mid, order); // note this may trash middle.key. thus we had to promote it before finishing up here.
// add our new key, there is room now
{
if ( keypos <= mid ) {
if ( split_debug )
out() << " keypos<mid, insertHere() the new key" << endl;
insertHere(thisLoc, keypos, recordLoc, key, order, lchild, rchild, idx);
} else {
int kp = keypos-mid-1;
assert(kp>=0);
rLoc.btree()->insertHere(rLoc, kp, recordLoc, key, order, lchild, rchild, idx);
}
}
if ( split_debug )
out() << " split end " << hex << thisLoc.getOfs() << dec << endl;
}
/* start a new index off, empty */
DiskLoc BtreeBucket::addHead(IndexDetails& id) {
BtreeBucket *p = allocTemp();
DiskLoc loc = BtreeStore::insert(id.indexNamespace().c_str(), p, p->Size(), true);
free(p);
return loc;
}
DiskLoc BtreeBucket::getHead(const DiskLoc& thisLoc) {
DiskLoc p = thisLoc;
while ( !p.btree()->isHead() )
p = p.btree()->parent;
return p;
}
DiskLoc BtreeBucket::advance(const DiskLoc& thisLoc, int& keyOfs, int direction, const char *caller) {
if ( keyOfs < 0 || keyOfs >= n ) {
out() << "ASSERT failure BtreeBucket::advance, caller: " << caller << endl;
out() << " thisLoc: " << thisLoc.toString() << endl;
out() << " keyOfs: " << keyOfs << " n:" << n << " direction: " << direction << endl;
out() << bucketSummary() << endl;
assert(false);
}
int adj = direction < 0 ? 1 : 0;
int ko = keyOfs + direction;
DiskLoc nextDown = childForPos(ko+adj);
if ( !nextDown.isNull() ) {
while ( 1 ) {
keyOfs = direction>0 ? 0 : nextDown.btree()->n - 1;
DiskLoc loc= nextDown.btree()->childForPos(keyOfs + adj);
if ( loc.isNull() )
break;
nextDown = loc;
}
return nextDown;
}
if ( ko < n && ko >= 0 ) {
keyOfs = ko;
return thisLoc;
}
// end of bucket. traverse back up.
DiskLoc childLoc = thisLoc;
DiskLoc ancestor = parent;
while ( 1 ) {
if ( ancestor.isNull() )
break;
BtreeBucket *an = ancestor.btree();
for ( int i = 0; i < an->n; i++ ) {
if ( an->childForPos(i+adj) == childLoc ) {
keyOfs = i;
return ancestor;
}
}
assert( direction<0 || an->nextChild == childLoc );
// parent exhausted also, keep going up
childLoc = ancestor;
ancestor = an->parent;
}
return DiskLoc();
}
DiskLoc BtreeBucket::locate(IndexDetails& idx, const DiskLoc& thisLoc, BSONObj& key, const BSONObj &order, int& pos, bool& found, DiskLoc recordLoc, int direction) {
int p;
found = find(idx, key, recordLoc, order, p, /*assertIfDup*/ false);
if ( found ) {
pos = p;
return thisLoc;
}
DiskLoc child = childForPos(p);
if ( !child.isNull() ) {
DiskLoc l = child.btree()->locate(idx, child, key, order, pos, found, recordLoc, direction);
if ( !l.isNull() )
return l;
}
pos = p;
if ( direction < 0 )
return --pos == -1 ? DiskLoc() /*theend*/ : thisLoc;
else
return pos == n ? DiskLoc() /*theend*/ : thisLoc;
}
/* thisloc is the location of this bucket object. you must pass that in. */
int BtreeBucket::_insert(DiskLoc thisLoc, DiskLoc recordLoc,
BSONObj& key, const BSONObj &order, bool dupsAllowed,
DiskLoc lChild, DiskLoc rChild, IndexDetails& idx) {
if ( key.objsize() > KeyMax ) {
problem() << "ERROR: key too large len:" << key.objsize() << " max:" << KeyMax << ' ' << idx.indexNamespace() << endl;
return 2;
}
assert( key.objsize() > 0 );
int pos;
bool found = find(idx, key, recordLoc, order, pos, !dupsAllowed);
if ( insert_debug ) {
out() << " " << thisLoc.toString() << '.' << "_insert " <<
key.toString() << '/' << recordLoc.toString() <<
" l:" << lChild.toString() << " r:" << rChild.toString() << endl;
out() << " found:" << found << " pos:" << pos << " n:" << n << endl;
}
if ( found ) {
_KeyNode& kn = k(pos);
if ( kn.isUnused() ) {
DEBUGGING out() << "reusing unused key" << endl;
massert( "btree reuse unused key error?", kn.prevChildBucket == lChild );
// check rchild too?
kn.setUsed();
return 0;
}
out() << "_insert(): key already exists in index\n";
out() << " " << idx.indexNamespace().c_str() << " thisLoc:" << thisLoc.toString() << '\n';
out() << " " << key.toString() << '\n';
out() << " " << "recordLoc:" << recordLoc.toString() << " pos:" << pos << endl;
out() << " old l r: " << childForPos(pos).toString() << ' ' << childForPos(pos+1).toString() << endl;
out() << " new l r: " << lChild.toString() << ' ' << rChild.toString() << endl;
massert("btree: key+recloc already in index", false);
// on a dup key always insert on the right or else you will be broken.
// pos++;
// on a promotion, find the right point to update if dup keys.
/* not needed: we always insert right after the first key so we are ok with just pos++...
if( !rChild.isNull() ) {
while( pos < n && k(pos).prevChildBucket != lchild ) {
pos++;
out() << "looking for the right dup key" << endl;
}
}
*/
}
DEBUGGING out() << "TEMP: key: " << key.toString() << endl;
DiskLoc& child = getChild(pos);
if ( insert_debug )
out() << " getChild(" << pos << "): " << child.toString() << endl;
if ( child.isNull() || !rChild.isNull() /* means an 'internal' insert */ ) {
insertHere(thisLoc, pos, recordLoc, key, order, lChild, rChild, idx);
return 0;
}
return child.btree()->bt_insert(child, recordLoc, key, order, dupsAllowed, idx, /*toplevel*/false);
}
void BtreeBucket::dump() {
out() << "DUMP btreebucket n:" << n;
out() << " parent:" << hex << parent.getOfs() << dec;
for ( int i = 0; i < n; i++ ) {
out() << '\n';
KeyNode k = keyNode(i);
out() << '\t' << i << '\t' << k.key.toString() << "\tleft:" << hex <<
k.prevChildBucket.getOfs() << "\tRecLoc:" << k.recordLoc.toString() << dec;
if ( this->k(i).isUnused() )
out() << " UNUSED";
}
out() << " right:" << hex << nextChild.getOfs() << dec << endl;
}
/* todo: meaning of return code unclear clean up */
int BtreeBucket::bt_insert(DiskLoc thisLoc, DiskLoc recordLoc,
BSONObj& key, const BSONObj &order, bool dupsAllowed,
IndexDetails& idx, bool toplevel)
{
if ( toplevel ) {
if ( key.objsize() > KeyMax ) {
problem() << "Btree::insert: key too large to index, skipping " << idx.indexNamespace().c_str() << ' ' << key.toString() << '\n';
return 3;
}
++ninserts;
/*
if( ninserts % 1000 == 0 ) {
out() << "ninserts: " << ninserts << endl;
if( 0 && ninserts >= 127287 ) {
out() << "debug?" << endl;
split_debug = 1;
}
}
*/
}
int x = _insert(thisLoc, recordLoc, key, order, dupsAllowed, DiskLoc(), DiskLoc(), idx);
assertValid( order );
return x;
}
void BtreeBucket::shape(stringstream& ss) {
_shape(0, ss);
}
} // namespace mongo
#include "db.h"
#include "dbhelpers.h"
namespace mongo {
void BtreeBucket::a_test(IndexDetails& id) {
BtreeBucket *b = id.head.btree();
// record locs for testing
DiskLoc A(1, 20);
DiskLoc B(1, 30);
DiskLoc C(1, 40);
DiskLoc rl;
BSONObj key = fromjson("{x:9}");
BSONObj order = fromjson("{}");
b->bt_insert(id.head, A, key, order, true, id);
A.GETOFS() += 2;
b->bt_insert(id.head, A, key, order, true, id);
A.GETOFS() += 2;
b->bt_insert(id.head, A, key, order, true, id);
A.GETOFS() += 2;
b->bt_insert(id.head, A, key, order, true, id);
A.GETOFS() += 2;
assert( b->k(0).isUsed() );
// b->k(0).setUnused();
b->k(1).setUnused();
b->k(2).setUnused();
b->k(3).setUnused();
b->dumpTree(id.head, order);
/* b->bt_insert(id.head, B, key, order, false, id);
b->k(1).setUnused();
b->dumpTree(id.head, order);
cout << "---\n";
b->bt_insert(id.head, A, key, order, false, id);
b->dumpTree(id.head, order);
cout << "---\n";*/
// this should assert. does it? (it might "accidentally" though, not asserting proves a problem, asserting proves nothing)
b->bt_insert(id.head, C, key, order, false, id);
b->dumpTree(id.head, order);
}
void testClient() {
/* first do:
test.foo.drop();
test.foo.ensureIndex({x:9});
*/
dblock lk;
DBContext ctxt("test.foo");
assert( nsdetails("test.foo") );
assert( nsdetails("test.foo")->nIndexes > 0 );
BtreeBucket::a_test( nsdetails("test.foo")->indexes[0] );
}
}