mirror of
https://github.com/mongodb/mongo.git
synced 2024-12-01 01:21:03 +01:00
SERVER-17690 Move sharding change logging operations to catalog manager
So they can be abstracted behind the interface and not go through a config server connection directly.
This commit is contained in:
parent
6d33c3637e
commit
e23b1c0eba
@ -36,15 +36,9 @@
|
||||
#include <list>
|
||||
|
||||
#include "mongo/dbtests/dbtests.h"
|
||||
#include "mongo/s/config.h"
|
||||
#include "mongo/s/distlock.h"
|
||||
#include "mongo/s/type_changelog.h"
|
||||
#include "mongo/s/type_chunk.h"
|
||||
#include "mongo/s/type_collection.h"
|
||||
#include "mongo/s/type_config_version.h"
|
||||
#include "mongo/s/type_database.h"
|
||||
#include "mongo/s/type_mongos.h"
|
||||
#include "mongo/s/type_shard.h"
|
||||
#include "mongo/util/log.h"
|
||||
|
||||
namespace mongo {
|
||||
@ -79,7 +73,6 @@ namespace mongo {
|
||||
ChunkType::ConfigNS,
|
||||
BSON( ChunkType::ns() << 1 <<
|
||||
ChunkType::DEPRECATED_lastmod() << 1 )));
|
||||
configServer.init(configSvr().toString());
|
||||
}
|
||||
|
||||
void ConfigServerFixture::clearServer() {
|
||||
@ -90,32 +83,7 @@ namespace mongo {
|
||||
_client.dropCollection(VersionType::ConfigNS);
|
||||
}
|
||||
|
||||
void ConfigServerFixture::clearShards() {
|
||||
_client.dropCollection(ShardType::ConfigNS);
|
||||
}
|
||||
|
||||
void ConfigServerFixture::clearDatabases() {
|
||||
_client.dropCollection(DatabaseType::ConfigNS);
|
||||
}
|
||||
|
||||
void ConfigServerFixture::clearCollections() {
|
||||
_client.dropCollection(CollectionType::ConfigNS);
|
||||
}
|
||||
|
||||
void ConfigServerFixture::clearChunks() {
|
||||
_client.dropCollection(ChunkType::ConfigNS);
|
||||
}
|
||||
|
||||
void ConfigServerFixture::clearPings() {
|
||||
_client.dropCollection(MongosType::ConfigNS);
|
||||
}
|
||||
|
||||
void ConfigServerFixture::clearChangelog() {
|
||||
_client.dropCollection(ChangelogType::ConfigNS);
|
||||
}
|
||||
|
||||
void ConfigServerFixture::dumpServer() {
|
||||
|
||||
log() << "Dumping virtual config server to log..." << endl;
|
||||
|
||||
list<string> collectionNames(_client.getCollectionNames("config"));
|
||||
@ -137,7 +105,6 @@ namespace mongo {
|
||||
}
|
||||
|
||||
void ConfigServerFixture::tearDown() {
|
||||
|
||||
clearServer();
|
||||
|
||||
// Reset the pinger
|
||||
|
@ -111,12 +111,6 @@ namespace mongo {
|
||||
void clearServer();
|
||||
|
||||
void clearVersion();
|
||||
void clearShards();
|
||||
void clearDatabases();
|
||||
void clearCollections();
|
||||
void clearChunks();
|
||||
void clearPings();
|
||||
void clearChangelog();
|
||||
|
||||
/**
|
||||
* Dumps the contents of the config server to the log.
|
||||
|
@ -129,6 +129,7 @@ namespace mongo {
|
||||
std::vector<std::string> configHosts;
|
||||
configHosts.push_back("$dummy:10000");
|
||||
|
||||
configServer.init(configHosts);
|
||||
invariant(grid.initCatalogManager(configHosts));
|
||||
|
||||
TestWatchDog twd;
|
||||
|
@ -133,7 +133,6 @@ namespace ShardingTests {
|
||||
ChunkType::ConfigNS,
|
||||
BSON( ChunkType::ns() << 1 << // br
|
||||
ChunkType::DEPRECATED_lastmod() << 1 ) ));
|
||||
configServer.init("$dummy:1000");
|
||||
}
|
||||
|
||||
virtual ~ChunkManagerTest() {
|
||||
|
@ -8,7 +8,6 @@ Import("env")
|
||||
|
||||
env.Library('base', ['mongo_version_range.cpp',
|
||||
'type_actionlog.cpp',
|
||||
'type_changelog.cpp',
|
||||
'type_chunk.cpp',
|
||||
'type_collection.cpp',
|
||||
'type_database.cpp',
|
||||
@ -31,10 +30,6 @@ env.CppUnitTest('mongo_version_range_test', 'mongo_version_range_test.cpp',
|
||||
'$BUILD_DIR/mongo/bson',
|
||||
'$BUILD_DIR/mongo/db/common'])
|
||||
|
||||
env.CppUnitTest('type_changelog_test', 'type_changelog_test.cpp',
|
||||
LIBDEPS=['base',
|
||||
'$BUILD_DIR/mongo/db/common'])
|
||||
|
||||
env.CppUnitTest('type_chunk_test', 'type_chunk_test.cpp',
|
||||
LIBDEPS=['base',
|
||||
'$BUILD_DIR/mongo/db/common'])
|
||||
|
@ -6,8 +6,22 @@ env.Library(
|
||||
target='catalog_manager',
|
||||
source=[
|
||||
'catalog_manager.cpp',
|
||||
'type_changelog.cpp',
|
||||
],
|
||||
LIBDEPS=[
|
||||
'$BUILD_DIR/mongo/base/base',
|
||||
'$BUILD_DIR/mongo/bson',
|
||||
'$BUILD_DIR/mongo/db/common',
|
||||
'$BUILD_DIR/mongo/s/batch_write_types',
|
||||
],
|
||||
)
|
||||
|
||||
env.CppUnitTest(
|
||||
target='catalog_manager_test',
|
||||
source=[
|
||||
'type_changelog_test.cpp'
|
||||
],
|
||||
LIBDEPS=[
|
||||
'$BUILD_DIR/mongo/s/catalog/catalog_manager',
|
||||
]
|
||||
)
|
||||
|
@ -39,6 +39,7 @@ namespace mongo {
|
||||
class BatchedCommandResponse;
|
||||
class BSONObj;
|
||||
class DatabaseType;
|
||||
class OperationContext;
|
||||
class Status;
|
||||
template<typename T> class StatusWith;
|
||||
|
||||
@ -78,6 +79,21 @@ namespace mongo {
|
||||
*/
|
||||
virtual StatusWith<DatabaseType> getDatabase(const std::string& dbName) = 0;
|
||||
|
||||
/**
|
||||
* Logs a diagnostic event locally and on the config server.
|
||||
*
|
||||
* NOTE: This method is best effort so it should never throw.
|
||||
*
|
||||
* @param opCtx The operation context of the call doing the logging
|
||||
* @param what E.g. "split", "migrate"
|
||||
* @param ns To which collection the metadata change is being applied
|
||||
* @param detail Additional info about the metadata change (not interpreted)
|
||||
*/
|
||||
virtual void logChange(OperationContext* opCtx,
|
||||
const std::string& what,
|
||||
const std::string& ns,
|
||||
const BSONObj& detail) = 0;
|
||||
|
||||
/**
|
||||
* Directly sends the specified command to the config server and returns the response.
|
||||
*
|
||||
|
@ -9,7 +9,6 @@ env.Library(
|
||||
'config_coordinator.cpp',
|
||||
],
|
||||
LIBDEPS=[
|
||||
'$BUILD_DIR/mongo/s/batch_write_types',
|
||||
'$BUILD_DIR/mongo/s/catalog/catalog_manager',
|
||||
'$BUILD_DIR/mongo/s/client/sharding_client',
|
||||
]
|
||||
@ -28,4 +27,4 @@ env.CppUnitTest(
|
||||
'$BUILD_DIR/mongo/mongoscore',
|
||||
'$BUILD_DIR/mongo/mongocommon',
|
||||
]
|
||||
)
|
||||
)
|
||||
|
@ -37,8 +37,12 @@
|
||||
#include <vector>
|
||||
|
||||
#include "mongo/client/connpool.h"
|
||||
#include "mongo/db/client.h"
|
||||
#include "mongo/db/operation_context.h"
|
||||
#include "mongo/db/server_options.h"
|
||||
#include "mongo/platform/atomic_word.h"
|
||||
#include "mongo/s/catalog/legacy/config_coordinator.h"
|
||||
#include "mongo/s/catalog/type_changelog.h"
|
||||
#include "mongo/s/client/dbclient_multi_command.h"
|
||||
#include "mongo/s/shard.h"
|
||||
#include "mongo/s/type_database.h"
|
||||
@ -85,6 +89,9 @@ namespace {
|
||||
dassert(response->isValid(NULL));
|
||||
}
|
||||
|
||||
// Whether the logChange call should attempt to create the changelog collection
|
||||
AtomicInt32 changeLogCollectionCreated(0);
|
||||
|
||||
} // namespace
|
||||
|
||||
|
||||
@ -262,6 +269,56 @@ namespace {
|
||||
return DatabaseType::fromBSON(dbObj);
|
||||
}
|
||||
|
||||
void CatalogManagerLegacy::logChange(OperationContext* opCtx,
|
||||
const string& what,
|
||||
const string& ns,
|
||||
const BSONObj& detail) {
|
||||
|
||||
// Create the change log collection and ensure that it is capped. Wrap in try/catch,
|
||||
// because creating an existing collection throws.
|
||||
if (changeLogCollectionCreated.load() == 0) {
|
||||
try {
|
||||
ScopedDbConnection conn(_configServerConnectionString, 30.0);
|
||||
conn->createCollection(ChangelogType::ConfigNS, 1024 * 1024 * 10, true);
|
||||
conn.done();
|
||||
|
||||
changeLogCollectionCreated.store(1);
|
||||
}
|
||||
catch (const UserException& e) {
|
||||
// It's ok to ignore this exception
|
||||
LOG(1) << "couldn't create changelog collection: " << e;
|
||||
}
|
||||
}
|
||||
|
||||
// Store this entry's ID so we can use on the exception code path too
|
||||
StringBuilder changeIdBuilder;
|
||||
changeIdBuilder << getHostNameCached() << "-" << terseCurrentTime()
|
||||
<< "-" << OID::gen();
|
||||
|
||||
const string changeID = changeIdBuilder.str();
|
||||
|
||||
Client* const client = (opCtx ? opCtx->getClient() : currentClient.get());
|
||||
|
||||
// Send a copy of the message to the local log in case it doesn't manage to reach
|
||||
// config.changelog
|
||||
BSONObj msg = BSON(ChangelogType::changeID(changeID) <<
|
||||
ChangelogType::server(getHostNameCached()) <<
|
||||
ChangelogType::clientAddr((client ?
|
||||
client->clientAddress(true) : "")) <<
|
||||
ChangelogType::time(jsTime()) <<
|
||||
ChangelogType::what(what) <<
|
||||
ChangelogType::ns(ns) <<
|
||||
ChangelogType::details(detail));
|
||||
|
||||
log() << "about to log metadata event: " << msg;
|
||||
|
||||
Status result = insert(ChangelogType::ConfigNS, msg, NULL);
|
||||
if (!result.isOK()) {
|
||||
warning() << "Error encountered while logging config change with ID "
|
||||
<< changeID << ": " << result;
|
||||
}
|
||||
}
|
||||
|
||||
void CatalogManagerLegacy::writeConfigServerDirect(const BatchedCommandRequest& request,
|
||||
BatchedCommandResponse* response) {
|
||||
|
||||
|
@ -36,6 +36,9 @@
|
||||
|
||||
namespace mongo {
|
||||
|
||||
/**
|
||||
* Implements the catalog manager using the legacy 3-config server protocol.
|
||||
*/
|
||||
class CatalogManagerLegacy : public CatalogManager {
|
||||
public:
|
||||
CatalogManagerLegacy() = default;
|
||||
@ -53,11 +56,15 @@ namespace mongo {
|
||||
|
||||
virtual StatusWith<DatabaseType> getDatabase(const std::string& dbName);
|
||||
|
||||
virtual void logChange(OperationContext* txn,
|
||||
const std::string& what,
|
||||
const std::string& ns,
|
||||
const BSONObj& detail);
|
||||
|
||||
virtual void writeConfigServerDirect(const BatchedCommandRequest& request,
|
||||
BatchedCommandResponse* response);
|
||||
|
||||
private:
|
||||
|
||||
/**
|
||||
* Direct network check to see if a particular database does not already exist with the
|
||||
* same name or different case.
|
||||
|
@ -25,7 +25,10 @@
|
||||
* delete this exception statement from all source files in the program,
|
||||
* then also delete it in the license file.
|
||||
*/
|
||||
#include "mongo/s/type_changelog.h"
|
||||
|
||||
#include "mongo/platform/basic.h"
|
||||
|
||||
#include "mongo/s/catalog/type_changelog.h"
|
||||
|
||||
#include "mongo/db/field_parser.h"
|
||||
#include "mongo/util/mongoutils/str.h"
|
@ -31,7 +31,6 @@
|
||||
#include <string>
|
||||
|
||||
#include "mongo/base/disallow_copying.h"
|
||||
#include "mongo/base/string_data.h"
|
||||
#include "mongo/db/jsobj.h"
|
||||
|
||||
namespace mongo {
|
@ -26,7 +26,9 @@
|
||||
* then also delete it in the license file.
|
||||
*/
|
||||
|
||||
#include "mongo/s/type_changelog.h"
|
||||
#include "mongo/platform/basic.h"
|
||||
|
||||
#include "mongo/s/catalog/type_changelog.h"
|
||||
#include "mongo/unittest/unittest.h"
|
||||
#include "mongo/util/time_support.h"
|
||||
|
@ -673,7 +673,10 @@ namespace {
|
||||
void ChunkManager::drop() const {
|
||||
boost::lock_guard<boost::mutex> lk(_mutex);
|
||||
|
||||
configServer.logChange( "dropCollection.start" , _ns , BSONObj() );
|
||||
grid.catalogManager()->logChange(NULL,
|
||||
"dropCollection.start",
|
||||
_ns,
|
||||
BSONObj());
|
||||
|
||||
ScopedDistributedLock nsLock(configServer.getConnectionString(), _ns);
|
||||
nsLock.setLockMessage("drop");
|
||||
@ -762,7 +765,7 @@ namespace {
|
||||
}
|
||||
|
||||
LOG(1) << "ChunkManager::drop : " << _ns << "\t DONE";
|
||||
configServer.logChange( "dropCollection" , _ns , BSONObj() );
|
||||
grid.catalogManager()->logChange(NULL, "dropCollection", _ns, BSONObj());
|
||||
}
|
||||
|
||||
ChunkVersion ChunkManager::getVersion(const std::string& shardName) const {
|
||||
|
@ -38,11 +38,9 @@
|
||||
|
||||
#include "mongo/client/connpool.h"
|
||||
#include "mongo/db/field_parser.h"
|
||||
#include "mongo/db/write_concern.h"
|
||||
#include "mongo/s/catalog/catalog_manager.h"
|
||||
#include "mongo/s/cluster_write.h"
|
||||
#include "mongo/s/grid.h"
|
||||
#include "mongo/s/type_changelog.h"
|
||||
#include "mongo/s/type_mongos.h"
|
||||
#include "mongo/s/type_shard.h"
|
||||
#include "mongo/util/log.h"
|
||||
@ -232,74 +230,6 @@ namespace mongo {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status logConfigChange(const ConnectionString& configLoc,
|
||||
const string& clientHost,
|
||||
const string& ns,
|
||||
const string& description,
|
||||
const BSONObj& details)
|
||||
{
|
||||
//
|
||||
// The code for writing to the changelog collection exists elsewhere - we duplicate here to
|
||||
// avoid dependency issues.
|
||||
// TODO: Merge again once config.cpp is cleaned up.
|
||||
//
|
||||
|
||||
string changeID = stream() << getHostNameCached() << "-" << terseCurrentTime() << "-"
|
||||
<< OID::gen();
|
||||
|
||||
ChangelogType changelog;
|
||||
changelog.setChangeID(changeID);
|
||||
changelog.setServer(getHostNameCached());
|
||||
changelog.setClientAddr(clientHost == "" ? "N/A" : clientHost);
|
||||
changelog.setTime(jsTime());
|
||||
changelog.setWhat(description);
|
||||
changelog.setNS(ns);
|
||||
changelog.setDetails(details);
|
||||
|
||||
log() << "about to log new metadata event: " << changelog.toBSON() << endl;
|
||||
|
||||
scoped_ptr<ScopedDbConnection> connPtr;
|
||||
|
||||
try {
|
||||
connPtr.reset(new ScopedDbConnection(configLoc, 30));
|
||||
ScopedDbConnection& conn = *connPtr;
|
||||
|
||||
// TODO: better way here
|
||||
static bool createdCapped = false;
|
||||
if (!createdCapped) {
|
||||
|
||||
try {
|
||||
conn->createCollection(ChangelogType::ConfigNS, 1024 * 1024 * 10, true);
|
||||
}
|
||||
catch (const DBException& e) {
|
||||
// don't care, someone else may have done this for us
|
||||
// if there's still a problem, caught in outer try
|
||||
LOG(1) << "couldn't create the changelog, continuing " << e << endl;
|
||||
}
|
||||
|
||||
createdCapped = true;
|
||||
}
|
||||
connPtr->done();
|
||||
}
|
||||
catch (const DBException& e) {
|
||||
// if we got here, it means the config change is only in the log,
|
||||
// it didn't make it to config.changelog
|
||||
log() << "not logging config change: " << changeID << causedBy(e) << endl;
|
||||
return e.toStatus();
|
||||
}
|
||||
|
||||
Status result = grid.catalogManager()->insert(ChangelogType::ConfigNS,
|
||||
changelog.toBSON(),
|
||||
NULL);
|
||||
if (!result.isOK()) {
|
||||
return Status(result.code(),
|
||||
str::stream() << "failed to write to changelog: "
|
||||
<< result.reason());
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
// Helper function for safe cursors
|
||||
DBClientCursor* _safeCursor(auto_ptr<DBClientCursor> cursor) {
|
||||
// TODO: Make error handling more consistent, it's annoying that cursors error out by
|
||||
|
@ -57,17 +57,6 @@ namespace mongo {
|
||||
Status checkClusterMongoVersions(const ConnectionString& configLoc,
|
||||
const std::string& minMongoVersion);
|
||||
|
||||
/**
|
||||
* Logs to the config.changelog collection
|
||||
*
|
||||
* Returns OK if loaded successfully, error Status if not.
|
||||
*/
|
||||
Status logConfigChange(const ConnectionString& configLoc,
|
||||
const std::string& clientHost,
|
||||
const std::string& ns,
|
||||
const std::string& description,
|
||||
const BSONObj& details);
|
||||
|
||||
//
|
||||
// Needed to normalize exception behavior of connections and cursors
|
||||
// TODO: Remove when we refactor the client connection interface to something more consistent.
|
||||
|
@ -38,6 +38,8 @@
|
||||
#include "mongo/db/auth/authorization_session.h"
|
||||
#include "mongo/db/client_basic.h"
|
||||
#include "mongo/db/commands.h"
|
||||
#include "mongo/db/operation_context.h"
|
||||
#include "mongo/s/catalog/catalog_manager.h"
|
||||
#include "mongo/s/config.h"
|
||||
#include "mongo/s/distlock.h"
|
||||
#include "mongo/s/grid.h"
|
||||
@ -165,7 +167,7 @@ namespace {
|
||||
s.toString(),
|
||||
shardedColls);
|
||||
|
||||
configServer.logChange("movePrimary.start", dbname, moveStartDetails);
|
||||
grid.catalogManager()->logChange(txn, "movePrimary.start", dbname, moveStartDetails);
|
||||
|
||||
BSONArrayBuilder barr;
|
||||
barr.append(shardedColls);
|
||||
@ -247,8 +249,8 @@ namespace {
|
||||
oldPrimary,
|
||||
s.toString(),
|
||||
shardedColls);
|
||||
configServer.logChange("movePrimary", dbname, moveFinishDetails);
|
||||
|
||||
grid.catalogManager()->logChange(txn, "movePrimary", dbname, moveFinishDetails);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -36,6 +36,7 @@
|
||||
#include "mongo/client/replica_set_monitor.h"
|
||||
#include "mongo/db/audit.h"
|
||||
#include "mongo/db/commands.h"
|
||||
#include "mongo/db/operation_context.h"
|
||||
#include "mongo/s/catalog/catalog_manager.h"
|
||||
#include "mongo/s/client/shard_connection.h"
|
||||
#include "mongo/s/cluster_write.h"
|
||||
@ -204,10 +205,10 @@ namespace {
|
||||
conn.done();
|
||||
|
||||
// Record start in changelog
|
||||
configServer.logChange("removeShard.start",
|
||||
"",
|
||||
buildRemoveLogEntry(s, true));
|
||||
|
||||
grid.catalogManager()->logChange(txn,
|
||||
"removeShard.start",
|
||||
"",
|
||||
buildRemoveLogEntry(s, true));
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -246,8 +247,10 @@ namespace {
|
||||
conn.done();
|
||||
|
||||
// Record finish in changelog
|
||||
configServer.logChange("removeShard", "", buildRemoveLogEntry(s, false));
|
||||
|
||||
grid.catalogManager()->logChange(txn,
|
||||
"removeShard",
|
||||
"",
|
||||
buildRemoveLogEntry(s, false));
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -50,7 +50,6 @@
|
||||
#include "mongo/s/cluster_write.h"
|
||||
#include "mongo/s/grid.h"
|
||||
#include "mongo/s/server.h"
|
||||
#include "mongo/s/type_changelog.h"
|
||||
#include "mongo/s/type_chunk.h"
|
||||
#include "mongo/s/type_collection.h"
|
||||
#include "mongo/s/type_database.h"
|
||||
@ -266,7 +265,10 @@ namespace mongo {
|
||||
collectionDetail.append("initShards", initialShards);
|
||||
collectionDetail.append("numChunks", (int)(initPoints->size() + 1));
|
||||
|
||||
configServer.logChange("shardCollection.start", ns, collectionDetail.obj());
|
||||
grid.catalogManager()->logChange(NULL,
|
||||
"shardCollection.start",
|
||||
ns,
|
||||
collectionDetail.obj());
|
||||
|
||||
ChunkManager* cm = new ChunkManager( ns, fieldsAndOrder, unique );
|
||||
cm->createFirstChunks(configServer.getPrimary().getConnString(),
|
||||
@ -306,7 +308,8 @@ namespace mongo {
|
||||
// Record finish in changelog
|
||||
BSONObjBuilder finishDetail;
|
||||
finishDetail.append("version", manager->getVersion().toString());
|
||||
configServer.logChange("shardCollection", ns, finishDetail.obj());
|
||||
|
||||
grid.catalogManager()->logChange(NULL, "shardCollection", ns, finishDetail.obj());
|
||||
|
||||
return manager;
|
||||
}
|
||||
@ -646,7 +649,7 @@ namespace mongo {
|
||||
return _load();
|
||||
}
|
||||
|
||||
bool DBConfig::dropDatabase( string& errmsg ) {
|
||||
bool DBConfig::dropDatabase(string& errmsg) {
|
||||
/**
|
||||
* 1) make sure everything is up
|
||||
* 2) update config server
|
||||
@ -656,7 +659,7 @@ namespace mongo {
|
||||
*/
|
||||
|
||||
log() << "DBConfig::dropDatabase: " << _name << endl;
|
||||
configServer.logChange( "dropDatabase.start" , _name , BSONObj() );
|
||||
grid.catalogManager()->logChange(NULL, "dropDatabase.start", _name, BSONObj());
|
||||
|
||||
// 1
|
||||
if (!configServer.allUp(false, errmsg)) {
|
||||
@ -725,7 +728,8 @@ namespace mongo {
|
||||
|
||||
LOG(1) << "\t dropped primary db for: " << _name << endl;
|
||||
|
||||
configServer.logChange( "dropDatabase" , _name , BSONObj() );
|
||||
grid.catalogManager()->logChange(NULL, "dropDatabase", _name, BSONObj());
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -1223,58 +1227,6 @@ namespace mongo {
|
||||
return name;
|
||||
}
|
||||
|
||||
/* must never throw */
|
||||
void ConfigServer::logChange( const string& what , const string& ns , const BSONObj& detail ) {
|
||||
string changeID;
|
||||
|
||||
try {
|
||||
// get this entry's ID so we can use on the exception code path too
|
||||
stringstream id;
|
||||
id << getHostNameCached() << "-" << terseCurrentTime() << "-" << OID::gen();
|
||||
changeID = id.str();
|
||||
|
||||
// send a copy of the message to the log in case it doesn't manage to reach config.changelog
|
||||
Client* c = currentClient.get();
|
||||
BSONObj msg = BSON( ChangelogType::changeID(changeID) <<
|
||||
ChangelogType::server(getHostNameCached()) <<
|
||||
ChangelogType::clientAddr((c ? c->clientAddress(true) : "N/A")) <<
|
||||
ChangelogType::time(jsTime()) <<
|
||||
ChangelogType::what(what) <<
|
||||
ChangelogType::ns(ns) <<
|
||||
ChangelogType::details(detail) );
|
||||
log() << "about to log metadata event: " << msg << endl;
|
||||
|
||||
verify( _primary.ok() );
|
||||
|
||||
ScopedDbConnection conn(_primary.getConnString(), 30.0);
|
||||
|
||||
static bool createdCapped = false;
|
||||
if ( ! createdCapped ) {
|
||||
try {
|
||||
conn->createCollection( ChangelogType::ConfigNS , 1024 * 1024 * 10 , true );
|
||||
}
|
||||
catch ( UserException& e ) {
|
||||
LOG(1) << "couldn't create changelog (like race condition): " << e << endl;
|
||||
// don't care
|
||||
}
|
||||
createdCapped = true;
|
||||
}
|
||||
|
||||
conn.done();
|
||||
|
||||
Status result = grid.catalogManager()->insert(ChangelogType::ConfigNS, msg, NULL);
|
||||
if (!result.isOK()) {
|
||||
log() << "Error encountered while logging config change with ID: " << changeID
|
||||
<< result.reason();
|
||||
}
|
||||
}
|
||||
|
||||
catch ( std::exception& e ) {
|
||||
// if we got here, it means the config change is only in the log; it didn't make it to config.changelog
|
||||
log() << "not logging config change: " << changeID << " " << e.what() << endl;
|
||||
}
|
||||
}
|
||||
|
||||
void ConfigServer::replicaSetChange(const string& setName, const string& newConnectionString) {
|
||||
// This is run in it's own thread. Exceptions escaping would result in a call to terminate.
|
||||
Client::initThread("replSetChange");
|
||||
|
@ -237,17 +237,6 @@ namespace mongo {
|
||||
|
||||
void reloadSettings();
|
||||
|
||||
/**
|
||||
* Create a metadata change log entry in the config.changelog collection.
|
||||
*
|
||||
* @param what e.g. "split" , "migrate"
|
||||
* @param ns to which collection the metadata change is being applied
|
||||
* @param msg additional info about the metadata change
|
||||
*
|
||||
* This call is guaranteed never to throw.
|
||||
*/
|
||||
void logChange( const std::string& what , const std::string& ns , const BSONObj& detail = BSONObj() );
|
||||
|
||||
ConnectionString getConnectionString() const {
|
||||
return ConnectionString( _primary.getConnString() , ConnectionString::SYNC );
|
||||
}
|
||||
|
@ -34,8 +34,10 @@
|
||||
|
||||
#include "mongo/base/init.h"
|
||||
#include "mongo/client/dbclientcursor.h"
|
||||
#include "mongo/s/catalog/catalog_manager.h"
|
||||
#include "mongo/s/cluster_client_internal.h"
|
||||
#include "mongo/s/distlock.h"
|
||||
#include "mongo/s/grid.h"
|
||||
#include "mongo/s/mongo_version_range.h"
|
||||
#include "mongo/s/type_config_version.h"
|
||||
#include "mongo/s/type_database.h"
|
||||
@ -353,8 +355,8 @@ namespace mongo {
|
||||
const ConfigUpgradeRegistry& registry,
|
||||
const VersionType& lastVersionInfo,
|
||||
VersionType* upgradedVersionInfo,
|
||||
string* errMsg)
|
||||
{
|
||||
string* errMsg) {
|
||||
|
||||
int fromVersion = lastVersionInfo.getCurrentVersion();
|
||||
|
||||
ConfigUpgradeRegistry::const_iterator foundIt = registry.find(fromVersion);
|
||||
@ -375,19 +377,10 @@ namespace mongo {
|
||||
log() << "starting next upgrade step from v" << fromVersion << " to v" << toVersion << endl;
|
||||
|
||||
// Log begin to config.changelog
|
||||
Status logStatus = logConfigChange(configLoc,
|
||||
"",
|
||||
VersionType::ConfigNS,
|
||||
"starting upgrade of config database",
|
||||
BSON("from" << fromVersion << "to" << toVersion));
|
||||
|
||||
if (!logStatus.isOK()) {
|
||||
|
||||
*errMsg = stream() << "could not write initial changelog entry for upgrade"
|
||||
<< causedBy(logStatus);
|
||||
|
||||
return false;
|
||||
}
|
||||
grid.catalogManager()->logChange(NULL,
|
||||
"starting upgrade of config database",
|
||||
VersionType::ConfigNS,
|
||||
BSON("from" << fromVersion << "to" << toVersion));
|
||||
|
||||
if (!upgrade.upgradeCallback(configLoc, lastVersionInfo, errMsg)) {
|
||||
|
||||
@ -401,28 +394,16 @@ namespace mongo {
|
||||
Status verifyConfigStatus = getConfigVersion(configLoc, upgradedVersionInfo);
|
||||
|
||||
if (!verifyConfigStatus.isOK()) {
|
||||
|
||||
*errMsg = stream() << "failed to validate v" << fromVersion << " config version upgrade"
|
||||
<< causedBy(verifyConfigStatus);
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
// Log end to config.changelog
|
||||
logStatus = logConfigChange(configLoc,
|
||||
"",
|
||||
VersionType::ConfigNS,
|
||||
"finished upgrade of config database",
|
||||
BSON("from" << fromVersion << "to" << toVersion));
|
||||
|
||||
if (!logStatus.isOK()) {
|
||||
|
||||
*errMsg = stream() << "could not write final changelog entry for upgrade"
|
||||
<< causedBy(logStatus);
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
grid.catalogManager()->logChange(NULL,
|
||||
"finished upgrade of config database",
|
||||
VersionType::ConfigNS,
|
||||
BSON("from" << fromVersion << "to" << toVersion));
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -28,14 +28,18 @@
|
||||
|
||||
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
|
||||
|
||||
#include "mongo/platform/basic.h"
|
||||
|
||||
#include "mongo/base/owned_pointer_vector.h"
|
||||
#include "mongo/db/concurrency/d_concurrency.h"
|
||||
#include "mongo/db/namespace_string.h"
|
||||
#include "mongo/db/operation_context.h"
|
||||
#include "mongo/s/d_state.h"
|
||||
#include "mongo/s/catalog/catalog_manager.h"
|
||||
#include "mongo/s/chunk.h"
|
||||
#include "mongo/s/config.h"
|
||||
#include "mongo/s/distlock.h"
|
||||
#include "mongo/s/chunk.h" // needed for genID
|
||||
#include "mongo/s/config.h" // needed for changelog write
|
||||
#include "mongo/s/d_state.h"
|
||||
#include "mongo/s/grid.h"
|
||||
#include "mongo/util/log.h"
|
||||
#include "mongo/util/mongoutils/str.h"
|
||||
|
||||
@ -311,7 +315,7 @@ namespace mongo {
|
||||
shardVersion,
|
||||
mergeVersion );
|
||||
|
||||
configServer.logChange( "merge", nss.ns(), mergeLogEntry );
|
||||
grid.catalogManager()->logChange(txn, "merge", nss.ns(), mergeLogEntry);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
@ -73,11 +73,13 @@
|
||||
#include "mongo/db/operation_context_impl.h"
|
||||
#include "mongo/db/write_concern.h"
|
||||
#include "mongo/logger/ramlog.h"
|
||||
#include "mongo/s/catalog/catalog_manager.h"
|
||||
#include "mongo/s/chunk.h"
|
||||
#include "mongo/s/chunk_version.h"
|
||||
#include "mongo/s/config.h"
|
||||
#include "mongo/s/d_state.h"
|
||||
#include "mongo/s/distlock.h"
|
||||
#include "mongo/s/grid.h"
|
||||
#include "mongo/s/shard.h"
|
||||
#include "mongo/s/type_chunk.h"
|
||||
#include "mongo/util/assert_util.h"
|
||||
@ -175,7 +177,11 @@ namespace mongo {
|
||||
if ( !_cmdErrmsg->empty() ) {
|
||||
_b.append( "errmsg" , *_cmdErrmsg );
|
||||
}
|
||||
configServer.logChange( (string)"moveChunk." + _where , _ns, _b.obj() );
|
||||
|
||||
grid.catalogManager()->logChange(_txn,
|
||||
(string)"moveChunk." + _where,
|
||||
_ns,
|
||||
_b.obj());
|
||||
}
|
||||
catch ( const std::exception& e ) {
|
||||
warning() << "couldn't record timing for moveChunk '" << _where << "': " << e.what() << migrateLog;
|
||||
@ -1183,7 +1189,8 @@ namespace mongo {
|
||||
BSONObj chunkInfo =
|
||||
BSON("min" << min << "max" << max <<
|
||||
"from" << fromShard.getName() << "to" << toShard.getName());
|
||||
configServer.logChange("moveChunk.start", ns, chunkInfo);
|
||||
|
||||
grid.catalogManager()->logChange(txn, "moveChunk.start", ns, chunkInfo);
|
||||
|
||||
// Always refresh our metadata remotely
|
||||
ChunkVersion origShardVersion;
|
||||
@ -1719,9 +1726,11 @@ namespace mongo {
|
||||
// 5.d
|
||||
BSONObjBuilder commitInfo;
|
||||
commitInfo.appendElements( chunkInfo );
|
||||
if ( res["counts"].type() == Object )
|
||||
commitInfo.appendElements( res["counts"].Obj() );
|
||||
configServer.logChange( "moveChunk.commit" , ns , commitInfo.obj() );
|
||||
if (res["counts"].type() == Object) {
|
||||
commitInfo.appendElements(res["counts"].Obj());
|
||||
}
|
||||
|
||||
grid.catalogManager()->logChange(txn, "moveChunk.commit", ns, commitInfo.obj());
|
||||
}
|
||||
|
||||
migrateFromStatus.done(txn);
|
||||
|
@ -50,11 +50,13 @@
|
||||
#include "mongo/db/instance.h"
|
||||
#include "mongo/db/jsobj.h"
|
||||
#include "mongo/db/query/internal_plans.h"
|
||||
#include "mongo/s/chunk.h" // for static genID only
|
||||
#include "mongo/s/catalog/catalog_manager.h"
|
||||
#include "mongo/s/chunk.h"
|
||||
#include "mongo/s/chunk_version.h"
|
||||
#include "mongo/s/config.h"
|
||||
#include "mongo/s/d_state.h"
|
||||
#include "mongo/s/distlock.h"
|
||||
#include "mongo/s/grid.h"
|
||||
#include "mongo/s/shard_key_pattern.h"
|
||||
#include "mongo/s/type_chunk.h"
|
||||
#include "mongo/util/log.h"
|
||||
@ -825,7 +827,8 @@ namespace mongo {
|
||||
if ( newChunks.size() == 2 ) {
|
||||
appendShortVersion(logDetail.subobjStart("left"), *newChunks[0]);
|
||||
appendShortVersion(logDetail.subobjStart("right"), *newChunks[1]);
|
||||
configServer.logChange( "split" , ns , logDetail.obj() );
|
||||
|
||||
grid.catalogManager()->logChange(txn, "split", ns, logDetail.obj());
|
||||
}
|
||||
else {
|
||||
BSONObj beforeDetailObj = logDetail.obj();
|
||||
@ -838,7 +841,8 @@ namespace mongo {
|
||||
chunkDetail.append( "number", i+1 );
|
||||
chunkDetail.append( "of" , newChunksSize );
|
||||
appendShortVersion(chunkDetail.subobjStart("chunk"), *newChunks[i]);
|
||||
configServer.logChange( "multi-split" , ns , chunkDetail.obj() );
|
||||
|
||||
grid.catalogManager()->logChange(txn, "multi-split", ns, chunkDetail.obj());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -39,6 +39,7 @@
|
||||
|
||||
#include "mongo/client/connpool.h"
|
||||
#include "mongo/client/replica_set_monitor.h"
|
||||
#include "mongo/db/client.h"
|
||||
#include "mongo/db/json.h"
|
||||
#include "mongo/db/namespace_string.h"
|
||||
#include "mongo/db/write_concern.h"
|
||||
@ -479,7 +480,8 @@ namespace mongo {
|
||||
BSONObjBuilder shardDetails;
|
||||
shardDetails.append("name", *name);
|
||||
shardDetails.append("host", servers.toString());
|
||||
configServer.logChange("addShard", "", shardDetails.obj());
|
||||
|
||||
grid.catalogManager()->logChange(NULL, "addShard", "", shardDetails.obj());
|
||||
|
||||
return true;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user