mirror of
https://github.com/mongodb/mongo.git
synced 2024-11-28 07:59:02 +01:00
SERVER-17623 BulkBuilder isn't an IndexAccessMethod
This is prep for merging BtreeBasedAccessMethod into IndexAccessMethod. That is why there seems to be a bit of an odd division between the two.
This commit is contained in:
parent
6d8fb7f991
commit
061dab886c
@ -739,7 +739,6 @@ serverOnlyFiles = [ "db/background.cpp",
|
||||
"db/index/2d_access_method.cpp",
|
||||
"db/index/btree_access_method.cpp",
|
||||
"db/index/btree_based_access_method.cpp",
|
||||
"db/index/btree_based_bulk_access_method.cpp",
|
||||
"db/index/btree_index_cursor.cpp",
|
||||
"db/index/fts_access_method.cpp",
|
||||
"db/index/hash_access_method.cpp",
|
||||
|
@ -175,9 +175,7 @@ namespace mongo {
|
||||
info = statusWithInfo.getValue();
|
||||
|
||||
IndexToBuild index;
|
||||
index.block = boost::make_shared<IndexCatalog::IndexBuildBlock>(_txn,
|
||||
_collection,
|
||||
info);
|
||||
index.block.reset(new IndexCatalog::IndexBuildBlock(_txn, _collection, info));
|
||||
status = index.block->init();
|
||||
if ( !status.isOK() )
|
||||
return status;
|
||||
@ -190,7 +188,7 @@ namespace mongo {
|
||||
if (!_buildInBackground) {
|
||||
// Bulk build process requires foreground building as it assumes nothing is changing
|
||||
// under it.
|
||||
index.bulk.reset(index.real->initiateBulk(_txn));
|
||||
index.bulk = index.real->initiateBulk();
|
||||
}
|
||||
|
||||
const IndexDescriptor* descriptor = index.block->getEntry()->descriptor();
|
||||
@ -208,7 +206,7 @@ namespace mongo {
|
||||
// TODO SERVER-14888 Suppress this in cases we don't want to audit.
|
||||
audit::logCreateIndex(_txn->getClient(), &info, descriptor->indexName(), ns);
|
||||
|
||||
_indexes.push_back( index );
|
||||
_indexes.push_back(std::move(index));
|
||||
}
|
||||
|
||||
// this is so that operations examining the list of indexes know there are more keys to look
|
||||
@ -316,11 +314,14 @@ namespace mongo {
|
||||
Status MultiIndexBlock::insert(const BSONObj& doc, const RecordId& loc) {
|
||||
for ( size_t i = 0; i < _indexes.size(); i++ ) {
|
||||
int64_t unused;
|
||||
Status idxStatus = _indexes[i].forInsert()->insert( _txn,
|
||||
doc,
|
||||
loc,
|
||||
_indexes[i].options,
|
||||
&unused );
|
||||
Status idxStatus(ErrorCodes::InternalError, "");
|
||||
if (_indexes[i].bulk) {
|
||||
idxStatus = _indexes[i].bulk->insert(_txn, doc, loc, _indexes[i].options, &unused);
|
||||
}
|
||||
else {
|
||||
idxStatus = _indexes[i].real->insert(_txn, doc, loc, _indexes[i].options, &unused);
|
||||
}
|
||||
|
||||
if ( !idxStatus.isOK() )
|
||||
return idxStatus;
|
||||
}
|
||||
@ -333,7 +334,8 @@ namespace mongo {
|
||||
continue;
|
||||
LOG(1) << "\t bulk commit starting for index: "
|
||||
<< _indexes[i].block->getEntry()->descriptor()->indexName();
|
||||
Status status = _indexes[i].real->commitBulk( _indexes[i].bulk.get(),
|
||||
Status status = _indexes[i].real->commitBulk( _txn,
|
||||
std::move(_indexes[i].bulk),
|
||||
_allowInterruption,
|
||||
_indexes[i].options.dupsAllowed,
|
||||
dupsOut );
|
||||
|
@ -30,8 +30,7 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <boost/scoped_ptr.hpp>
|
||||
#include <boost/shared_ptr.hpp>
|
||||
#include <memory>
|
||||
#include <set>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
@ -200,22 +199,35 @@ namespace mongo {
|
||||
class CleanupIndexesVectorOnRollback;
|
||||
|
||||
struct IndexToBuild {
|
||||
IndexToBuild() : real(NULL) {}
|
||||
#if defined(_MSC_VER) && _MSC_VER < 1900 // MVSC++ <= 2013 can't generate default move operations
|
||||
IndexToBuild() = default;
|
||||
IndexToBuild(IndexToBuild&& other)
|
||||
: block(std::move(other.block))
|
||||
, real(std::move(other.real))
|
||||
, bulk(std::move(other.bulk))
|
||||
, options(std::move(other.options))
|
||||
{}
|
||||
|
||||
IndexAccessMethod* forInsert() { return bulk ? bulk.get() : real; }
|
||||
IndexToBuild& operator= (IndexToBuild&& other) {
|
||||
block = std::move(other.block);
|
||||
real = std::move(other.real);
|
||||
bulk = std::move(other.bulk);
|
||||
options = std::move(other.options);
|
||||
return *this;
|
||||
}
|
||||
#endif
|
||||
|
||||
boost::shared_ptr<IndexCatalog::IndexBuildBlock> block;
|
||||
std::unique_ptr<IndexCatalog::IndexBuildBlock> block;
|
||||
|
||||
IndexAccessMethod* real; // owned elsewhere
|
||||
boost::shared_ptr<IndexAccessMethod> bulk;
|
||||
IndexAccessMethod* real = NULL; // owned elsewhere
|
||||
std::unique_ptr<IndexAccessMethod::BulkBuilder> bulk;
|
||||
|
||||
InsertDeleteOptions options;
|
||||
};
|
||||
|
||||
std::vector<IndexToBuild> _indexes;
|
||||
|
||||
boost::scoped_ptr<BackgroundOperation> _backgroundOperation;
|
||||
|
||||
std::unique_ptr<BackgroundOperation> _backgroundOperation;
|
||||
|
||||
// Pointers not owned here and must outlive 'this'
|
||||
Collection* _collection;
|
||||
|
@ -34,8 +34,8 @@
|
||||
|
||||
#include "mongo/base/error_codes.h"
|
||||
#include "mongo/base/status.h"
|
||||
#include "mongo/db/concurrency/write_conflict_exception.h"
|
||||
#include "mongo/db/curop.h"
|
||||
#include "mongo/db/index/btree_based_bulk_access_method.h"
|
||||
#include "mongo/db/index/btree_index_cursor.h"
|
||||
#include "mongo/db/jsobj.h"
|
||||
#include "mongo/db/keypattern.h"
|
||||
@ -53,6 +53,36 @@ namespace mongo {
|
||||
|
||||
MONGO_EXPORT_SERVER_PARAMETER(failIndexKeyTooLong, bool, true);
|
||||
|
||||
//
|
||||
// Comparison for external sorter interface
|
||||
//
|
||||
|
||||
// Defined in db/structure/btree/key.cpp
|
||||
// XXX TODO: rename to something more descriptive, etc. etc.
|
||||
int oldCompare(const BSONObj& l,const BSONObj& r, const Ordering &o);
|
||||
|
||||
class BtreeExternalSortComparison {
|
||||
public:
|
||||
BtreeExternalSortComparison(const BSONObj& ordering, int version)
|
||||
: _ordering(Ordering::make(ordering)),
|
||||
_version(version) {
|
||||
invariant(version == 1 || version == 0);
|
||||
}
|
||||
|
||||
typedef std::pair<BSONObj, RecordId> Data;
|
||||
|
||||
int operator() (const Data& l, const Data& r) const {
|
||||
int x = (_version == 1
|
||||
? l.first.woCompare(r.first, _ordering, /*considerfieldname*/false)
|
||||
: oldCompare(l.first, r.first, _ordering));
|
||||
if (x) { return x; }
|
||||
return l.second.compare(r.second);
|
||||
}
|
||||
private:
|
||||
const Ordering _ordering;
|
||||
const int _version;
|
||||
};
|
||||
|
||||
BtreeBasedAccessMethod::BtreeBasedAccessMethod(IndexCatalogEntry* btreeState,
|
||||
SortedDataInterface* btree)
|
||||
: _btreeState(btreeState),
|
||||
@ -297,20 +327,124 @@ namespace mongo {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
IndexAccessMethod* BtreeBasedAccessMethod::initiateBulk(OperationContext* txn) {
|
||||
return new BtreeBasedBulkAccessMethod(txn,
|
||||
this,
|
||||
_newInterface.get(),
|
||||
_descriptor);
|
||||
std::unique_ptr<IndexAccessMethod::BulkBuilder> BtreeBasedAccessMethod::initiateBulk() {
|
||||
|
||||
return std::unique_ptr<BulkBuilder>(new BulkBuilder(this, _descriptor));
|
||||
}
|
||||
|
||||
Status BtreeBasedAccessMethod::commitBulk(IndexAccessMethod* bulkRaw,
|
||||
IndexAccessMethod::BulkBuilder::BulkBuilder(const BtreeBasedAccessMethod* index,
|
||||
const IndexDescriptor* descriptor)
|
||||
: _sorter(Sorter::make(SortOptions().TempDir(storageGlobalParams.dbpath + "/_tmp")
|
||||
.ExtSortAllowed()
|
||||
.MaxMemoryUsageBytes(100*1024*1024),
|
||||
BtreeExternalSortComparison(descriptor->keyPattern(),
|
||||
descriptor->version())))
|
||||
, _real(index) {
|
||||
}
|
||||
|
||||
Status IndexAccessMethod::BulkBuilder::insert(OperationContext* txn,
|
||||
const BSONObj& obj,
|
||||
const RecordId& loc,
|
||||
const InsertDeleteOptions& options,
|
||||
int64_t* numInserted) {
|
||||
BSONObjSet keys;
|
||||
_real->getKeys(obj, &keys);
|
||||
|
||||
_isMultiKey = _isMultiKey || (keys.size() > 1);
|
||||
|
||||
for (BSONObjSet::iterator it = keys.begin(); it != keys.end(); ++it) {
|
||||
_sorter->add(*it, loc);
|
||||
_keysInserted++;
|
||||
}
|
||||
|
||||
if (NULL != numInserted) {
|
||||
*numInserted += keys.size();
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status BtreeBasedAccessMethod::commitBulk(OperationContext* txn,
|
||||
std::unique_ptr<BulkBuilder> bulk,
|
||||
bool mayInterrupt,
|
||||
bool dupsAllowed,
|
||||
set<RecordId>* dupsToDrop) {
|
||||
|
||||
BtreeBasedBulkAccessMethod* bulk = static_cast<BtreeBasedBulkAccessMethod*>(bulkRaw);
|
||||
return bulk->commit(dupsToDrop, mayInterrupt, dupsAllowed);
|
||||
Timer timer;
|
||||
|
||||
std::unique_ptr<BulkBuilder::Sorter::Iterator> i(bulk->_sorter->done());
|
||||
|
||||
ProgressMeterHolder pm(*txn->setMessage("Index Bulk Build: (2/3) btree bottom up",
|
||||
"Index: (2/3) BTree Bottom Up Progress",
|
||||
bulk->_keysInserted,
|
||||
10));
|
||||
|
||||
std::unique_ptr<SortedDataBuilderInterface> builder;
|
||||
|
||||
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
|
||||
WriteUnitOfWork wunit(txn);
|
||||
|
||||
if (bulk->_isMultiKey) {
|
||||
_btreeState->setMultikey( txn );
|
||||
}
|
||||
|
||||
builder.reset(_newInterface->getBulkBuilder(txn, dupsAllowed));
|
||||
wunit.commit();
|
||||
} MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "setting index multikey flag", "");
|
||||
|
||||
while (i->more()) {
|
||||
if (mayInterrupt) {
|
||||
txn->checkForInterrupt();
|
||||
}
|
||||
|
||||
WriteUnitOfWork wunit(txn);
|
||||
// Improve performance in the btree-building phase by disabling rollback tracking.
|
||||
// This avoids copying all the written bytes to a buffer that is only used to roll back.
|
||||
// Note that this is safe to do, as this entire index-build-in-progress will be cleaned
|
||||
// up by the index system.
|
||||
txn->recoveryUnit()->setRollbackWritesDisabled();
|
||||
|
||||
// Get the next datum and add it to the builder.
|
||||
BulkBuilder::Sorter::Data d = i->next();
|
||||
Status status = builder->addKey(d.first, d.second);
|
||||
|
||||
if (!status.isOK()) {
|
||||
// Overlong key that's OK to skip?
|
||||
if (status.code() == ErrorCodes::KeyTooLong && ignoreKeyTooLong(txn)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Check if this is a duplicate that's OK to skip
|
||||
if (status.code() == ErrorCodes::DuplicateKey) {
|
||||
invariant(!dupsAllowed); // shouldn't be getting DupKey errors if dupsAllowed.
|
||||
|
||||
if (dupsToDrop) {
|
||||
dupsToDrop->insert(d.second);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
// If we're here either it's a dup and we're cool with it or the addKey went just
|
||||
// fine.
|
||||
pm.hit();
|
||||
wunit.commit();
|
||||
}
|
||||
|
||||
pm.finished();
|
||||
|
||||
txn->getCurOp()->setMessage("Index Bulk Build: (3/3) btree-middle",
|
||||
"Index: (3/3) BTree Middle Progress");
|
||||
|
||||
LOG(timer.seconds() > 10 ? 0 : 1 ) << "\t done building bottom layer, going to commit";
|
||||
|
||||
builder->commit(mayInterrupt);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
} // namespace mongo
|
||||
|
||||
#include "mongo/db/sorter/sorter.cpp"
|
||||
MONGO_CREATE_SORTER(mongo::BSONObj, mongo::RecordId, mongo::BtreeExternalSortComparison);
|
||||
|
@ -28,7 +28,7 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <boost/scoped_ptr.hpp>
|
||||
#include <memory>
|
||||
#include <vector>
|
||||
|
||||
#include "mongo/base/disallow_copying.h"
|
||||
@ -38,11 +38,10 @@
|
||||
#include "mongo/db/jsobj.h"
|
||||
#include "mongo/db/record_id.h"
|
||||
#include "mongo/db/storage/sorted_data_interface.h"
|
||||
#include "mongo/db/sorter/sorter.h"
|
||||
|
||||
namespace mongo {
|
||||
|
||||
class ExternalSortComparison;
|
||||
|
||||
/**
|
||||
* Any access method that is Btree based subclasses from this.
|
||||
*
|
||||
@ -91,12 +90,13 @@ namespace mongo {
|
||||
|
||||
virtual Status initializeAsEmpty(OperationContext* txn);
|
||||
|
||||
virtual IndexAccessMethod* initiateBulk(OperationContext* txn);
|
||||
virtual std::unique_ptr<BulkBuilder> initiateBulk();
|
||||
|
||||
virtual Status commitBulk( IndexAccessMethod* bulk,
|
||||
bool mayInterrupt,
|
||||
bool dupsAllowed,
|
||||
std::set<RecordId>* dups );
|
||||
virtual Status commitBulk(OperationContext* txn,
|
||||
std::unique_ptr<BulkBuilder> bulk,
|
||||
bool mayInterrupt,
|
||||
bool dupsAllowed,
|
||||
std::set<RecordId>* dups);
|
||||
|
||||
virtual Status touch(OperationContext* txn, const BSONObj& obj);
|
||||
|
||||
@ -113,8 +113,6 @@ namespace mongo {
|
||||
virtual RecordId findSingle( OperationContext* txn, const BSONObj& key ) const;
|
||||
|
||||
protected:
|
||||
friend class BtreeBasedBulkAccessMethod;
|
||||
|
||||
// See below for body.
|
||||
class BtreeBasedPrivateUpdateData;
|
||||
|
||||
@ -130,7 +128,7 @@ namespace mongo {
|
||||
const RecordId& loc,
|
||||
bool dupsAllowed);
|
||||
|
||||
boost::scoped_ptr<SortedDataInterface> _newInterface;
|
||||
const std::unique_ptr<SortedDataInterface> _newInterface;
|
||||
};
|
||||
|
||||
/**
|
||||
|
@ -1,203 +0,0 @@
|
||||
/**
|
||||
* Copyright (C) 2013 10gen Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3,
|
||||
* as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the GNU Affero General Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kIndex
|
||||
|
||||
#include "mongo/platform/basic.h"
|
||||
|
||||
#include "mongo/db/index/btree_based_bulk_access_method.h"
|
||||
|
||||
#include <boost/scoped_ptr.hpp>
|
||||
|
||||
#include "mongo/db/concurrency/write_conflict_exception.h"
|
||||
#include "mongo/db/curop.h"
|
||||
#include "mongo/db/operation_context.h"
|
||||
#include "mongo/db/storage_options.h"
|
||||
#include "mongo/util/log.h"
|
||||
#include "mongo/util/progress_meter.h"
|
||||
|
||||
namespace mongo {
|
||||
|
||||
using boost::scoped_ptr;
|
||||
using std::set;
|
||||
|
||||
//
|
||||
// Comparison for external sorter interface
|
||||
//
|
||||
|
||||
// Defined in db/structure/btree/key.cpp
|
||||
// XXX TODO: rename to something more descriptive, etc. etc.
|
||||
int oldCompare(const BSONObj& l,const BSONObj& r, const Ordering &o);
|
||||
|
||||
class BtreeExternalSortComparison {
|
||||
public:
|
||||
BtreeExternalSortComparison(const BSONObj& ordering, int version)
|
||||
: _ordering(Ordering::make(ordering)),
|
||||
_version(version) {
|
||||
invariant(version == 1 || version == 0);
|
||||
}
|
||||
|
||||
typedef std::pair<BSONObj, RecordId> Data;
|
||||
|
||||
int operator() (const Data& l, const Data& r) const {
|
||||
int x = (_version == 1
|
||||
? l.first.woCompare(r.first, _ordering, /*considerfieldname*/false)
|
||||
: oldCompare(l.first, r.first, _ordering));
|
||||
if (x) { return x; }
|
||||
return l.second.compare(r.second);
|
||||
}
|
||||
private:
|
||||
const Ordering _ordering;
|
||||
const int _version;
|
||||
};
|
||||
|
||||
BtreeBasedBulkAccessMethod::BtreeBasedBulkAccessMethod(OperationContext* txn,
|
||||
BtreeBasedAccessMethod* real,
|
||||
SortedDataInterface* interface,
|
||||
const IndexDescriptor* descriptor) {
|
||||
_real = real;
|
||||
_interface = interface;
|
||||
_txn = txn;
|
||||
|
||||
_docsInserted = 0;
|
||||
_keysInserted = 0;
|
||||
_isMultiKey = false;
|
||||
|
||||
_sorter.reset(BSONObjExternalSorter::make(
|
||||
SortOptions().TempDir(storageGlobalParams.dbpath + "/_tmp")
|
||||
.ExtSortAllowed()
|
||||
.MaxMemoryUsageBytes(100*1024*1024),
|
||||
BtreeExternalSortComparison(descriptor->keyPattern(), descriptor->version())));
|
||||
}
|
||||
|
||||
Status BtreeBasedBulkAccessMethod::insert(OperationContext* txn,
|
||||
const BSONObj& obj,
|
||||
const RecordId& loc,
|
||||
const InsertDeleteOptions& options,
|
||||
int64_t* numInserted) {
|
||||
BSONObjSet keys;
|
||||
_real->getKeys(obj, &keys);
|
||||
|
||||
_isMultiKey = _isMultiKey || (keys.size() > 1);
|
||||
|
||||
for (BSONObjSet::iterator it = keys.begin(); it != keys.end(); ++it) {
|
||||
// False is for mayInterrupt.
|
||||
_sorter->add(*it, loc);
|
||||
_keysInserted++;
|
||||
}
|
||||
|
||||
_docsInserted++;
|
||||
|
||||
if (NULL != numInserted) {
|
||||
*numInserted += keys.size();
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status BtreeBasedBulkAccessMethod::commit(set<RecordId>* dupsToDrop,
|
||||
bool mayInterrupt,
|
||||
bool dupsAllowed) {
|
||||
Timer timer;
|
||||
|
||||
scoped_ptr<BSONObjExternalSorter::Iterator> i(_sorter->done());
|
||||
|
||||
ProgressMeterHolder pm(*_txn->setMessage("Index Bulk Build: (2/3) btree bottom up",
|
||||
"Index: (2/3) BTree Bottom Up Progress",
|
||||
_keysInserted,
|
||||
10));
|
||||
|
||||
scoped_ptr<SortedDataBuilderInterface> builder;
|
||||
|
||||
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
|
||||
WriteUnitOfWork wunit(_txn);
|
||||
|
||||
if (_isMultiKey) {
|
||||
_real->_btreeState->setMultikey( _txn );
|
||||
}
|
||||
|
||||
builder.reset(_interface->getBulkBuilder(_txn, dupsAllowed));
|
||||
wunit.commit();
|
||||
} MONGO_WRITE_CONFLICT_RETRY_LOOP_END(_txn, "setting index multikey flag", "");
|
||||
|
||||
while (i->more()) {
|
||||
if (mayInterrupt) {
|
||||
_txn->checkForInterrupt();
|
||||
}
|
||||
|
||||
WriteUnitOfWork wunit(_txn);
|
||||
// Improve performance in the btree-building phase by disabling rollback tracking.
|
||||
// This avoids copying all the written bytes to a buffer that is only used to roll back.
|
||||
// Note that this is safe to do, as this entire index-build-in-progress will be cleaned
|
||||
// up by the index system.
|
||||
_txn->recoveryUnit()->setRollbackWritesDisabled();
|
||||
|
||||
// Get the next datum and add it to the builder.
|
||||
BSONObjExternalSorter::Data d = i->next();
|
||||
Status status = builder->addKey(d.first, d.second);
|
||||
|
||||
if (!status.isOK()) {
|
||||
// Overlong key that's OK to skip?
|
||||
if (status.code() == ErrorCodes::KeyTooLong && _real->ignoreKeyTooLong(_txn)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Check if this is a duplicate that's OK to skip
|
||||
if (status.code() == ErrorCodes::DuplicateKey) {
|
||||
invariant(!dupsAllowed); // shouldn't be getting DupKey errors if dupsAllowed.
|
||||
|
||||
if (dupsToDrop) {
|
||||
dupsToDrop->insert(d.second);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
// If we're here either it's a dup and we're cool with it or the addKey went just
|
||||
// fine.
|
||||
pm.hit();
|
||||
wunit.commit();
|
||||
}
|
||||
|
||||
pm.finished();
|
||||
|
||||
_txn->getCurOp()->setMessage("Index Bulk Build: (3/3) btree-middle",
|
||||
"Index: (3/3) BTree Middle Progress");
|
||||
|
||||
LOG(timer.seconds() > 10 ? 0 : 1 ) << "\t done building bottom layer, going to commit";
|
||||
|
||||
builder->commit(mayInterrupt);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
} // namespace mongo
|
||||
|
||||
#include "mongo/db/sorter/sorter.cpp"
|
||||
MONGO_CREATE_SORTER(mongo::BSONObj, mongo::RecordId, mongo::BtreeExternalSortComparison);
|
@ -1,172 +0,0 @@
|
||||
/**
|
||||
* Copyright (C) 2013-2014 MongoDB Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3,
|
||||
* as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the GNU Affero General Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#include <boost/scoped_ptr.hpp>
|
||||
#include <set>
|
||||
#include <vector>
|
||||
|
||||
#include "mongo/base/error_codes.h"
|
||||
#include "mongo/base/status.h"
|
||||
#include "mongo/db/sorter/sorter.h"
|
||||
#include "mongo/db/index/btree_based_access_method.h"
|
||||
#include "mongo/db/index/index_access_method.h"
|
||||
#include "mongo/db/index/index_descriptor.h"
|
||||
#include "mongo/db/jsobj.h"
|
||||
#include "mongo/db/storage/sorted_data_interface.h"
|
||||
|
||||
namespace mongo {
|
||||
|
||||
class BtreeBasedBulkAccessMethod : public IndexAccessMethod {
|
||||
public:
|
||||
/**
|
||||
* Does not take ownership of any pointers.
|
||||
* All pointers must outlive 'this'.
|
||||
*/
|
||||
BtreeBasedBulkAccessMethod(OperationContext* txn,
|
||||
BtreeBasedAccessMethod* real,
|
||||
SortedDataInterface* interface,
|
||||
const IndexDescriptor* descriptor);
|
||||
|
||||
~BtreeBasedBulkAccessMethod() {}
|
||||
|
||||
virtual Status insert(OperationContext* txn,
|
||||
const BSONObj& obj,
|
||||
const RecordId& loc,
|
||||
const InsertDeleteOptions& options,
|
||||
int64_t* numInserted);
|
||||
|
||||
Status commit(std::set<RecordId>* dupsToDrop, bool mayInterrupt, bool dupsAllowed);
|
||||
|
||||
// Exposed for testing.
|
||||
static ExternalSortComparison* getComparison(int version, const BSONObj& keyPattern);
|
||||
|
||||
//
|
||||
// Stuff below here is a no-op of one form or another.
|
||||
//
|
||||
|
||||
virtual Status commitBulk(IndexAccessMethod* bulk,
|
||||
bool mayInterrupt,
|
||||
bool dupsAllowed,
|
||||
std::set<RecordId>* dups) {
|
||||
invariant(this == bulk);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
virtual Status touch(OperationContext* txn, const BSONObj& obj) {
|
||||
return _notAllowed();
|
||||
}
|
||||
|
||||
virtual Status touch(OperationContext* txn) const {
|
||||
return _notAllowed();
|
||||
}
|
||||
|
||||
virtual Status validate(OperationContext* txn, bool full, int64_t* numKeys, BSONObjBuilder* output) {
|
||||
return _notAllowed();
|
||||
}
|
||||
|
||||
virtual bool appendCustomStats(OperationContext* txn, BSONObjBuilder* output, double scale)
|
||||
const {
|
||||
return false;
|
||||
}
|
||||
|
||||
virtual Status remove(OperationContext* txn,
|
||||
const BSONObj& obj,
|
||||
const RecordId& loc,
|
||||
const InsertDeleteOptions& options,
|
||||
int64_t* numDeleted) {
|
||||
return _notAllowed();
|
||||
}
|
||||
|
||||
virtual Status validateUpdate(OperationContext* txn,
|
||||
const BSONObj& from,
|
||||
const BSONObj& to,
|
||||
const RecordId& loc,
|
||||
const InsertDeleteOptions& options,
|
||||
UpdateTicket* ticket) {
|
||||
return _notAllowed();
|
||||
}
|
||||
|
||||
virtual long long getSpaceUsedBytes( OperationContext* txn ) const {
|
||||
return -1;
|
||||
}
|
||||
|
||||
virtual Status update(OperationContext* txn,
|
||||
const UpdateTicket& ticket,
|
||||
int64_t* numUpdated) {
|
||||
return _notAllowed();
|
||||
}
|
||||
|
||||
virtual Status newCursor(OperationContext*txn,
|
||||
const CursorOptions& opts,
|
||||
IndexCursor** out) const {
|
||||
return _notAllowed();
|
||||
}
|
||||
|
||||
virtual Status initializeAsEmpty(OperationContext* txn) {
|
||||
return _notAllowed();
|
||||
}
|
||||
|
||||
virtual IndexAccessMethod* initiateBulk(OperationContext* txn) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
OperationContext* getOperationContext() { return _txn; }
|
||||
|
||||
virtual void getKeys(const BSONObj &obj, BSONObjSet *keys) const {
|
||||
_real->getKeys(obj, keys);
|
||||
}
|
||||
|
||||
private:
|
||||
typedef Sorter<BSONObj, RecordId> BSONObjExternalSorter;
|
||||
|
||||
Status _notAllowed() const {
|
||||
return Status(ErrorCodes::InternalError, "cannot use bulk for this yet");
|
||||
}
|
||||
|
||||
// Not owned here.
|
||||
BtreeBasedAccessMethod* _real;
|
||||
|
||||
// Not owned here.
|
||||
SortedDataInterface* _interface;
|
||||
|
||||
// The external sorter.
|
||||
boost::scoped_ptr<BSONObjExternalSorter> _sorter;
|
||||
|
||||
// How many docs are we indexing?
|
||||
unsigned long long _docsInserted;
|
||||
|
||||
// And how many keys?
|
||||
unsigned long long _keysInserted;
|
||||
|
||||
// Does any document have >1 key?
|
||||
bool _isMultiKey;
|
||||
|
||||
OperationContext* _txn;
|
||||
};
|
||||
|
||||
} // namespace mongo
|
@ -29,18 +29,21 @@
|
||||
#pragma once
|
||||
|
||||
#include <boost/scoped_ptr.hpp>
|
||||
#include <memory>
|
||||
|
||||
#include "mongo/db/index/index_cursor.h"
|
||||
#include "mongo/db/index/index_descriptor.h"
|
||||
#include "mongo/db/jsobj.h"
|
||||
#include "mongo/db/operation_context.h"
|
||||
#include "mongo/db/record_id.h"
|
||||
#include "mongo/db/sorter/sorter.h"
|
||||
|
||||
namespace mongo {
|
||||
|
||||
class BSONObjBuilder;
|
||||
class UpdateTicket;
|
||||
struct InsertDeleteOptions;
|
||||
class BtreeBasedAccessMethod;
|
||||
|
||||
/**
|
||||
* An IndexAccessMethod is the interface through which all the mutation, lookup, and
|
||||
@ -177,35 +180,50 @@ namespace mongo {
|
||||
// Bulk operations support
|
||||
//
|
||||
|
||||
class BulkBuilder {
|
||||
public:
|
||||
/**
|
||||
* Insert into the BulkBuilder as-if inserting into an IndexAccessMethod.
|
||||
*/
|
||||
Status insert(OperationContext* txn,
|
||||
const BSONObj& obj,
|
||||
const RecordId& loc,
|
||||
const InsertDeleteOptions& options,
|
||||
int64_t* numInserted);
|
||||
|
||||
private:
|
||||
friend class BtreeBasedAccessMethod;
|
||||
|
||||
using Sorter = mongo::Sorter<BSONObj, RecordId>;
|
||||
|
||||
BulkBuilder(const BtreeBasedAccessMethod* index, const IndexDescriptor* descriptor);
|
||||
|
||||
std::unique_ptr<Sorter> _sorter;
|
||||
const BtreeBasedAccessMethod* _real;
|
||||
int64_t _keysInserted = 0;
|
||||
bool _isMultiKey = false;
|
||||
};
|
||||
|
||||
/**
|
||||
* Starts a bulk operation.
|
||||
* You work on the returned IndexAccessMethod and then call commitBulk.
|
||||
* You work on the returned BulkBuilder and then call commitBulk.
|
||||
* This can return NULL, meaning bulk mode is not available.
|
||||
*
|
||||
* Long term, you'll eventually be able to mix/match bulk, not bulk,
|
||||
* have as many as you want, etc..
|
||||
*
|
||||
* Caller owns the returned IndexAccessMethod.
|
||||
*
|
||||
* The provided OperationContext must outlive the IndexAccessMethod returned.
|
||||
*
|
||||
* For now (1/8/14) you can only do bulk when the index is empty
|
||||
* it will fail if you try other times.
|
||||
* It is only legal to initiate bulk when the index is new and empty.
|
||||
*/
|
||||
virtual IndexAccessMethod* initiateBulk(OperationContext* txn) = 0;
|
||||
virtual std::unique_ptr<BulkBuilder> initiateBulk() = 0;
|
||||
|
||||
/**
|
||||
* Call this when you are ready to finish your bulk work.
|
||||
* Pass in the IndexAccessMethod gotten from initiateBulk.
|
||||
* After this method is called, the bulk index access method is invalid
|
||||
* and should not be used.
|
||||
* Pass in the BulkBuilder returned from initiateBulk.
|
||||
* @param bulk - something created from initiateBulk
|
||||
* @param mayInterrupt - is this commit interruptable (will cancel)
|
||||
* @param dupsAllowed - if false, error or fill 'dups' if any duplicate values are found
|
||||
* @param dups - if NULL, error out on dups if not allowed
|
||||
* if not NULL, put the bad RecordIds there
|
||||
*/
|
||||
virtual Status commitBulk( IndexAccessMethod* bulk,
|
||||
virtual Status commitBulk( OperationContext* txn,
|
||||
std::unique_ptr<BulkBuilder> bulk,
|
||||
bool mayInterrupt,
|
||||
bool dupsAllowed,
|
||||
std::set<RecordId>* dups ) = 0;
|
||||
|
@ -37,7 +37,6 @@
|
||||
#include "mongo/db/dbhelpers.h"
|
||||
#include "mongo/db/global_environment_d.h"
|
||||
#include "mongo/db/global_environment_experiment.h"
|
||||
#include "mongo/db/index/btree_based_bulk_access_method.h"
|
||||
#include "mongo/db/index/index_descriptor.h"
|
||||
#include "mongo/db/operation_context_impl.h"
|
||||
#include "mongo/platform/cstdint.h"
|
||||
|
Loading…
Reference in New Issue
Block a user