mirror of
https://github.com/mongodb/mongo.git
synced 2024-12-01 09:32:32 +01:00
SERVER-19855 Include min OpTime with shard version
This change adds the OpTime of the chunk manager as of the time its cached metadata was loaded along with each versioned request sent from MongoS. This includes write commands and the setShardVersion command. The OpTime is only sent as part of this change. There will be a follow-up change to add code to interpret this information on the MongoD side.
This commit is contained in:
parent
b4f7738e80
commit
5be3721da7
@ -149,15 +149,11 @@ public:
|
||||
}
|
||||
|
||||
// step 2
|
||||
if (!ChunkVersion::canParseBSON(cmdObj, "version")) {
|
||||
errmsg = "need to specify version";
|
||||
return false;
|
||||
}
|
||||
|
||||
const ChunkVersion version = ChunkVersion::fromBSON(cmdObj, "version");
|
||||
ChunkVersionAndOpTime verAndOpTime =
|
||||
uassertStatusOK(ChunkVersionAndOpTime::parseFromBSONForSetShardVersion(cmdObj));
|
||||
const auto& version = verAndOpTime.getVersion();
|
||||
|
||||
// step 3
|
||||
|
||||
const ChunkVersion oldVersion = info->getVersion(ns);
|
||||
const ChunkVersion globalVersion = ShardingState::get(txn)->getVersion(ns);
|
||||
|
||||
|
@ -33,11 +33,13 @@ env.Library(
|
||||
target='common',
|
||||
source=[
|
||||
'chunk_diff.cpp',
|
||||
'chunk_version.cpp',
|
||||
'set_shard_version_request.cpp',
|
||||
],
|
||||
LIBDEPS=[
|
||||
'catalog/catalog_types',
|
||||
'$BUILD_DIR/mongo/client/connection_string',
|
||||
'$BUILD_DIR/mongo/db/repl/optime',
|
||||
]
|
||||
)
|
||||
|
||||
|
@ -622,7 +622,7 @@ Status CatalogManagerLegacy::dropCollection(OperationContext* txn, const Namespa
|
||||
shardEntry.getName(),
|
||||
fassertStatusOK(28753, ConnectionString::parse(shardEntry.getHost())),
|
||||
ns,
|
||||
ChunkVersion::DROPPED(),
|
||||
ChunkVersionAndOpTime(ChunkVersion::DROPPED()),
|
||||
true);
|
||||
|
||||
auto ssvResult = shardRegistry->runCommandWithNotMasterRetries(
|
||||
|
@ -213,7 +213,7 @@ Status CatalogManagerReplicaSet::shardCollection(OperationContext* txn,
|
||||
dbPrimaryShardId,
|
||||
primaryShard->getConnString(),
|
||||
NamespaceString(ns),
|
||||
manager->getVersion(),
|
||||
ChunkVersionAndOpTime(manager->getVersion(), manager->getConfigOpTime()),
|
||||
true);
|
||||
|
||||
auto ssvStatus = grid.shardRegistry()->runCommandWithNotMasterRetries(
|
||||
@ -539,13 +539,18 @@ Status CatalogManagerReplicaSet::dropCollection(OperationContext* txn, const Nam
|
||||
|
||||
LOG(1) << "dropCollection " << ns << " collection marked as dropped";
|
||||
|
||||
// We just called updateCollection above and this would have advanced the config op time, so use
|
||||
// the latest value. On the MongoD side, we need to load the latest config metadata, which
|
||||
// indicates that the collection was dropped.
|
||||
const ChunkVersionAndOpTime droppedVersion(ChunkVersion::DROPPED(), _getConfigOpTime());
|
||||
|
||||
for (const auto& shardEntry : allShards) {
|
||||
SetShardVersionRequest ssv = SetShardVersionRequest::makeForVersioningNoPersist(
|
||||
grid.shardRegistry()->getConfigServerConnectionString(),
|
||||
shardEntry.getName(),
|
||||
fassertStatusOK(28781, ConnectionString::parse(shardEntry.getHost())),
|
||||
ns,
|
||||
ChunkVersion::DROPPED(),
|
||||
droppedVersion,
|
||||
true);
|
||||
|
||||
auto ssvResult = shardRegistry->runCommandWithNotMasterRetries(
|
||||
|
@ -320,7 +320,8 @@ Status ChunkManagerTargeter::targetInsert(OperationContext* txn,
|
||||
<< "; no metadata found");
|
||||
}
|
||||
|
||||
*endpoint = new ShardEndpoint(_primary->getId(), ChunkVersion::UNSHARDED());
|
||||
*endpoint =
|
||||
new ShardEndpoint(_primary->getId(), ChunkVersionAndOpTime(ChunkVersion::UNSHARDED()));
|
||||
return Status::OK();
|
||||
}
|
||||
}
|
||||
@ -496,7 +497,10 @@ Status ChunkManagerTargeter::targetQuery(const BSONObj& query,
|
||||
|
||||
for (const ShardId& shardId : shardIds) {
|
||||
endpoints->push_back(new ShardEndpoint(
|
||||
shardId, _manager ? _manager->getVersion(shardId) : ChunkVersion::UNSHARDED()));
|
||||
shardId,
|
||||
_manager
|
||||
? ChunkVersionAndOpTime(_manager->getVersion(shardId), _manager->getConfigOpTime())
|
||||
: ChunkVersionAndOpTime(ChunkVersion::UNSHARDED())));
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
@ -516,7 +520,9 @@ Status ChunkManagerTargeter::targetShardKey(OperationContext* txn,
|
||||
_stats.chunkSizeDelta[chunk->getMin()] += estDataSize;
|
||||
}
|
||||
|
||||
*endpoint = new ShardEndpoint(chunk->getShardId(), _manager->getVersion(chunk->getShardId()));
|
||||
*endpoint = new ShardEndpoint(chunk->getShardId(),
|
||||
ChunkVersionAndOpTime(_manager->getVersion(chunk->getShardId()),
|
||||
_manager->getConfigOpTime()));
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
@ -537,7 +543,10 @@ Status ChunkManagerTargeter::targetCollection(vector<ShardEndpoint*>* endpoints)
|
||||
|
||||
for (const ShardId& shardId : shardIds) {
|
||||
endpoints->push_back(new ShardEndpoint(
|
||||
shardId, _manager ? _manager->getVersion(shardId) : ChunkVersion::UNSHARDED()));
|
||||
shardId,
|
||||
_manager
|
||||
? ChunkVersionAndOpTime(_manager->getVersion(shardId), _manager->getConfigOpTime())
|
||||
: ChunkVersionAndOpTime(ChunkVersion::UNSHARDED())));
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
@ -555,7 +564,10 @@ Status ChunkManagerTargeter::targetAllShards(vector<ShardEndpoint*>* endpoints)
|
||||
|
||||
for (const ShardId& shardId : shardIds) {
|
||||
endpoints->push_back(new ShardEndpoint(
|
||||
shardId, _manager ? _manager->getVersion(shardId) : ChunkVersion::UNSHARDED()));
|
||||
shardId,
|
||||
_manager
|
||||
? ChunkVersionAndOpTime(_manager->getVersion(shardId), _manager->getConfigOpTime())
|
||||
: ChunkVersionAndOpTime(ChunkVersion::UNSHARDED())));
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
|
152
src/mongo/s/chunk_version.cpp
Normal file
152
src/mongo/s/chunk_version.cpp
Normal file
@ -0,0 +1,152 @@
|
||||
/**
|
||||
* Copyright (C) 2015 MongoDB Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3,
|
||||
* as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the GNU Affero General Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#include "mongo/platform/basic.h"
|
||||
|
||||
#include "mongo/s/chunk_version.h"
|
||||
|
||||
#include "mongo/bson/bsonobj.h"
|
||||
#include "mongo/bson/bsonobjbuilder.h"
|
||||
#include "mongo/base/status_with.h"
|
||||
#include "mongo/bson/util/bson_extract.h"
|
||||
#include "mongo/util/mongoutils/str.h"
|
||||
|
||||
namespace mongo {
|
||||
namespace {
|
||||
|
||||
const char kVersion[] = "version";
|
||||
const char kShardVersion[] = "shardVersion";
|
||||
|
||||
} // namespace
|
||||
|
||||
StatusWith<ChunkVersion> ChunkVersion::parseFromBSONForCommands(const BSONObj& obj) {
|
||||
BSONElement versionElem;
|
||||
Status status = bsonExtractField(obj, kShardVersion, &versionElem);
|
||||
if (!status.isOK())
|
||||
return status;
|
||||
|
||||
if (versionElem.type() != Array) {
|
||||
return {ErrorCodes::TypeMismatch,
|
||||
str::stream() << "Invalid type " << versionElem.type()
|
||||
<< " for shardVersion element. Expected an array"};
|
||||
}
|
||||
|
||||
BSONObjIterator it(versionElem.Obj());
|
||||
if (!it.more())
|
||||
return {ErrorCodes::BadValue, "Unexpected empty version"};
|
||||
|
||||
ChunkVersion version;
|
||||
|
||||
// Expect the timestamp
|
||||
{
|
||||
BSONElement tsPart = it.next();
|
||||
if (tsPart.type() != bsonTimestamp)
|
||||
return {ErrorCodes::TypeMismatch,
|
||||
str::stream() << "Invalid type " << tsPart.type()
|
||||
<< " for version timestamp part."};
|
||||
|
||||
version._combined = tsPart.timestamp().asULL();
|
||||
}
|
||||
|
||||
// Expect the epoch OID
|
||||
{
|
||||
BSONElement epochPart = it.next();
|
||||
if (epochPart.type() != jstOID)
|
||||
return {ErrorCodes::TypeMismatch,
|
||||
str::stream() << "Invalid type " << epochPart.type()
|
||||
<< " for version epoch part."};
|
||||
|
||||
version._epoch = epochPart.OID();
|
||||
}
|
||||
|
||||
return version;
|
||||
}
|
||||
|
||||
StatusWith<ChunkVersion> ChunkVersion::parseFromBSONForSetShardVersion(const BSONObj& obj) {
|
||||
bool canParse;
|
||||
const ChunkVersion chunkVersion = ChunkVersion::fromBSON(obj, kVersion, &canParse);
|
||||
if (!canParse)
|
||||
return {ErrorCodes::BadValue, "Unable to parse shard version"};
|
||||
|
||||
return chunkVersion;
|
||||
}
|
||||
|
||||
|
||||
ChunkVersionAndOpTime::ChunkVersionAndOpTime(ChunkVersion chunkVersion)
|
||||
: _verAndOpT(chunkVersion) {}
|
||||
|
||||
ChunkVersionAndOpTime::ChunkVersionAndOpTime(ChunkVersion chunkVersion, repl::OpTime ts)
|
||||
: _verAndOpT(chunkVersion, ts) {}
|
||||
|
||||
StatusWith<ChunkVersionAndOpTime> ChunkVersionAndOpTime::parseFromBSONForCommands(
|
||||
const BSONObj& obj) {
|
||||
const auto chunkVersionStatus = ChunkVersion::parseFromBSONForCommands(obj);
|
||||
if (!chunkVersionStatus.isOK())
|
||||
return chunkVersionStatus.getStatus();
|
||||
|
||||
const ChunkVersion& chunkVersion = chunkVersionStatus.getValue();
|
||||
|
||||
const auto opTimeStatus = repl::OpTime::parseFromBSON(obj);
|
||||
if (opTimeStatus.isOK()) {
|
||||
return ChunkVersionAndOpTime(chunkVersion, opTimeStatus.getValue());
|
||||
} else if (opTimeStatus == ErrorCodes::NoSuchKey) {
|
||||
return ChunkVersionAndOpTime(chunkVersion);
|
||||
}
|
||||
|
||||
return opTimeStatus.getStatus();
|
||||
}
|
||||
|
||||
StatusWith<ChunkVersionAndOpTime> ChunkVersionAndOpTime::parseFromBSONForSetShardVersion(
|
||||
const BSONObj& obj) {
|
||||
const auto chunkVersionStatus = ChunkVersion::parseFromBSONForSetShardVersion(obj);
|
||||
if (!chunkVersionStatus.isOK())
|
||||
return chunkVersionStatus.getStatus();
|
||||
|
||||
const ChunkVersion& chunkVersion = chunkVersionStatus.getValue();
|
||||
|
||||
const auto opTimeStatus = repl::OpTime::parseFromBSON(obj);
|
||||
if (opTimeStatus.isOK()) {
|
||||
return ChunkVersionAndOpTime(chunkVersion, opTimeStatus.getValue());
|
||||
} else if (opTimeStatus == ErrorCodes::NoSuchKey) {
|
||||
return ChunkVersionAndOpTime(chunkVersion);
|
||||
}
|
||||
|
||||
return opTimeStatus.getStatus();
|
||||
}
|
||||
|
||||
void ChunkVersionAndOpTime::appendForSetShardVersion(BSONObjBuilder* builder) const {
|
||||
_verAndOpT.value.addToBSON(*builder, kVersion);
|
||||
_verAndOpT.opTime.append(builder);
|
||||
}
|
||||
|
||||
void ChunkVersionAndOpTime::appendForCommands(BSONObjBuilder* builder) const {
|
||||
builder->appendArray(kShardVersion, _verAndOpT.value.toBSON());
|
||||
_verAndOpT.opTime.append(builder);
|
||||
}
|
||||
|
||||
} // namespace mongo
|
@ -1,37 +1,42 @@
|
||||
/**
|
||||
* Copyright (C) 2012 10gen Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3,
|
||||
* as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the GNU Affero General Public License in all respects
|
||||
* for all of the code used other than as permitted herein. If you modify
|
||||
* file(s) with this exception, you may extend this exception to your
|
||||
* version of the file(s), but you are not obligated to do so. If you do not
|
||||
* wish to do so, delete this exception statement from your version. If you
|
||||
* delete this exception statement from all source files in the program,
|
||||
* then also delete it in the license file.
|
||||
*/
|
||||
* Copyright (C) 2012-2015 MongoDB Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3,
|
||||
* as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the GNU Affero General Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "mongo/db/jsobj.h"
|
||||
#include "mongo/s/optime_pair.h"
|
||||
|
||||
namespace mongo {
|
||||
|
||||
class BSONObj;
|
||||
template <typename T>
|
||||
class StatusWith;
|
||||
|
||||
/**
|
||||
* ChunkVersions consist of a major/minor version scoped to a version epoch
|
||||
*
|
||||
@ -46,32 +51,38 @@ namespace mongo {
|
||||
* expected from types.
|
||||
*/
|
||||
struct ChunkVersion {
|
||||
union {
|
||||
struct {
|
||||
int _minor;
|
||||
int _major;
|
||||
};
|
||||
unsigned long long _combined;
|
||||
};
|
||||
OID _epoch;
|
||||
|
||||
public:
|
||||
ChunkVersion() : _minor(0), _major(0), _epoch(OID()) {}
|
||||
|
||||
//
|
||||
// Constructors shouldn't have default parameters here, since it's vital we track from
|
||||
// here on the epochs of versions, even if not used.
|
||||
//
|
||||
|
||||
ChunkVersion(int major, int minor, const OID& epoch)
|
||||
: _minor(minor), _major(major), _epoch(epoch) {}
|
||||
|
||||
/**
|
||||
* Interprets the specified BSON content as the format for commands, which is in the form:
|
||||
* { ..., shardVersion: [ <combined major/minor>, <OID epoch> ], ... }
|
||||
*/
|
||||
static StatusWith<ChunkVersion> parseFromBSONForCommands(const BSONObj& obj);
|
||||
|
||||
/**
|
||||
* Interprets the specified BSON content as the format for the setShardVersion command, which
|
||||
* is in the form:
|
||||
* { ..., version: [ <combined major/minor> ], versionEpoch: [ <OID epoch> ], ... }
|
||||
*/
|
||||
static StatusWith<ChunkVersion> parseFromBSONForSetShardVersion(const BSONObj& obj);
|
||||
|
||||
/**
|
||||
* Indicates a dropped collection. All components are zeroes (OID is zero time, zero
|
||||
* machineId/inc).
|
||||
*/
|
||||
static ChunkVersion DROPPED() {
|
||||
return ChunkVersion(0, 0, OID()); // dropped OID is zero time, zero machineId/inc
|
||||
return ChunkVersion(0, 0, OID());
|
||||
}
|
||||
|
||||
/**
|
||||
* Indicates that the collection is not sharded. Same as DROPPED.
|
||||
*/
|
||||
static ChunkVersion UNSHARDED() {
|
||||
// TODO: Distinguish between these cases
|
||||
return DROPPED();
|
||||
return ChunkVersion(0, 0, OID());
|
||||
}
|
||||
|
||||
static ChunkVersion IGNORED() {
|
||||
@ -369,35 +380,23 @@ struct ChunkVersion {
|
||||
return b.arr();
|
||||
}
|
||||
|
||||
bool parseBSON(const BSONObj& source, std::string* errMsg) {
|
||||
// ChunkVersion wants to be an array.
|
||||
BSONArray arrSource = static_cast<BSONArray>(source);
|
||||
|
||||
bool canParse;
|
||||
ChunkVersion version = fromBSON(arrSource, &canParse);
|
||||
if (!canParse) {
|
||||
*errMsg = "Could not parse version structure";
|
||||
return false;
|
||||
}
|
||||
|
||||
_minor = version._minor;
|
||||
_major = version._major;
|
||||
_epoch = version._epoch;
|
||||
return true;
|
||||
}
|
||||
|
||||
void clear() {
|
||||
_minor = 0;
|
||||
_major = 0;
|
||||
_epoch = OID();
|
||||
}
|
||||
|
||||
void cloneTo(ChunkVersion* other) const {
|
||||
other->clear();
|
||||
other->_minor = _minor;
|
||||
other->_major = _major;
|
||||
other->_epoch = _epoch;
|
||||
}
|
||||
private:
|
||||
union {
|
||||
struct {
|
||||
int _minor;
|
||||
int _major;
|
||||
};
|
||||
|
||||
uint64_t _combined;
|
||||
};
|
||||
|
||||
OID _epoch;
|
||||
};
|
||||
|
||||
inline std::ostream& operator<<(std::ostream& s, const ChunkVersion& v) {
|
||||
@ -405,4 +404,51 @@ inline std::ostream& operator<<(std::ostream& s, const ChunkVersion& v) {
|
||||
return s;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Represents a chunk version along with the optime from when it was retrieved. Provides logic to
|
||||
* serialize and deserialize the combo to BSON.
|
||||
*/
|
||||
class ChunkVersionAndOpTime {
|
||||
public:
|
||||
ChunkVersionAndOpTime(ChunkVersion chunkVersion);
|
||||
ChunkVersionAndOpTime(ChunkVersion chunkVersion, repl::OpTime ts);
|
||||
|
||||
const ChunkVersion& getVersion() const {
|
||||
return _verAndOpT.value;
|
||||
}
|
||||
|
||||
const repl::OpTime& getOpTime() const {
|
||||
return _verAndOpT.opTime;
|
||||
}
|
||||
|
||||
/**
|
||||
* Interprets the contents of the BSON documents as having been constructed in the format for
|
||||
* write commands. The optime component is optional for backwards compatibility and if not
|
||||
* present, the optime will be default initialized.
|
||||
*/
|
||||
static StatusWith<ChunkVersionAndOpTime> parseFromBSONForCommands(const BSONObj& obj);
|
||||
|
||||
/**
|
||||
* Interprets the contents of the BSON document as having been constructed in the format for the
|
||||
* setShardVersion command. The optime component is optional for backwards compatibility and if
|
||||
* not present, the optime will be default initialized.
|
||||
*/
|
||||
static StatusWith<ChunkVersionAndOpTime> parseFromBSONForSetShardVersion(const BSONObj& obj);
|
||||
|
||||
/**
|
||||
* Appends the contents to the specified builder in the format expected by the setShardVersion
|
||||
* command.
|
||||
*/
|
||||
void appendForSetShardVersion(BSONObjBuilder* builder) const;
|
||||
|
||||
/**
|
||||
* Appends the contents to the specified builder in the format expected by the write commands.
|
||||
*/
|
||||
void appendForCommands(BSONObjBuilder* builder) const;
|
||||
|
||||
private:
|
||||
OpTimePair<ChunkVersion> _verAndOpT;
|
||||
};
|
||||
|
||||
} // namespace mongo
|
||||
|
@ -211,8 +211,11 @@ private:
|
||||
|
||||
inline void assertEndpointsEqual(const ShardEndpoint& endpointA, const ShardEndpoint& endpointB) {
|
||||
ASSERT_EQUALS(endpointA.shardName, endpointB.shardName);
|
||||
ASSERT_EQUALS(endpointA.shardVersion.toLong(), endpointB.shardVersion.toLong());
|
||||
ASSERT_EQUALS(endpointA.shardVersion.epoch(), endpointB.shardVersion.epoch());
|
||||
ASSERT_EQUALS(endpointA.shardVersion.getVersion().toLong(),
|
||||
endpointB.shardVersion.getVersion().toLong());
|
||||
ASSERT_EQUALS(endpointA.shardVersion.getVersion().epoch(),
|
||||
endpointB.shardVersion.getVersion().epoch());
|
||||
ASSERT_EQUALS(endpointA.shardVersion.getOpTime(), endpointB.shardVersion.getOpTime());
|
||||
}
|
||||
|
||||
} // namespace mongo
|
||||
|
@ -157,26 +157,11 @@ struct ShardEndpoint {
|
||||
ShardEndpoint(const ShardEndpoint& other)
|
||||
: shardName(other.shardName), shardVersion(other.shardVersion) {}
|
||||
|
||||
ShardEndpoint(const std::string& shardName, const ChunkVersion& shardVersion)
|
||||
ShardEndpoint(const std::string& shardName, const ChunkVersionAndOpTime& shardVersion)
|
||||
: shardName(shardName), shardVersion(shardVersion) {}
|
||||
|
||||
const std::string shardName;
|
||||
const ChunkVersion shardVersion;
|
||||
|
||||
//
|
||||
// For testing *only* - do not use as part of API
|
||||
//
|
||||
|
||||
BSONObj toBSON() const {
|
||||
BSONObjBuilder b;
|
||||
appendBSON(&b);
|
||||
return b.obj();
|
||||
}
|
||||
|
||||
void appendBSON(BSONObjBuilder* builder) const {
|
||||
builder->append("shardName", shardName);
|
||||
shardVersion.addToBSON(*builder, "shardVersion");
|
||||
}
|
||||
const ChunkVersionAndOpTime shardVersion;
|
||||
};
|
||||
|
||||
} // namespace mongo
|
||||
|
@ -46,7 +46,6 @@ const char kShardName[] = "shard";
|
||||
const char kShardConnectionString[] = "shardHost";
|
||||
const char kInit[] = "init";
|
||||
const char kAuthoritative[] = "authoritative";
|
||||
const char kVersion[] = "version";
|
||||
const char kNoConnectionVersioning[] = "noConnectionVersioning";
|
||||
|
||||
} // namespace
|
||||
@ -64,7 +63,7 @@ SetShardVersionRequest::SetShardVersionRequest(ConnectionString configServer,
|
||||
std::string shardName,
|
||||
ConnectionString shardConnectionString,
|
||||
NamespaceString nss,
|
||||
ChunkVersion version,
|
||||
ChunkVersionAndOpTime version,
|
||||
bool isAuthoritative)
|
||||
: _init(false),
|
||||
_isAuthoritative(isAuthoritative),
|
||||
@ -88,7 +87,7 @@ SetShardVersionRequest SetShardVersionRequest::makeForVersioning(
|
||||
const std::string& shardName,
|
||||
const ConnectionString& shardConnectionString,
|
||||
const NamespaceString& nss,
|
||||
const ChunkVersion& nssVersion,
|
||||
const ChunkVersionAndOpTime& nssVersion,
|
||||
bool isAuthoritative) {
|
||||
return SetShardVersionRequest(
|
||||
configServer, shardName, shardConnectionString, nss, nssVersion, isAuthoritative);
|
||||
@ -99,7 +98,7 @@ SetShardVersionRequest SetShardVersionRequest::makeForVersioningNoPersist(
|
||||
const std::string& shardName,
|
||||
const ConnectionString& shard,
|
||||
const NamespaceString& nss,
|
||||
const ChunkVersion& nssVersion,
|
||||
const ChunkVersionAndOpTime& nssVersion,
|
||||
bool isAuthoritative) {
|
||||
auto ssv = makeForVersioning(configServer, shardName, shard, nss, nssVersion, isAuthoritative);
|
||||
ssv._noConnectionVersioning = true;
|
||||
@ -185,14 +184,11 @@ StatusWith<SetShardVersionRequest> SetShardVersionRequest::parseFromBSON(const B
|
||||
}
|
||||
|
||||
{
|
||||
bool canParse;
|
||||
auto versionStatus = ChunkVersionAndOpTime::parseFromBSONForSetShardVersion(cmdObj);
|
||||
if (!versionStatus.isOK())
|
||||
return versionStatus.getStatus();
|
||||
|
||||
ChunkVersion chunkVersion = ChunkVersion::fromBSON(cmdObj, kVersion, &canParse);
|
||||
if (!canParse) {
|
||||
return {ErrorCodes::BadValue, "Unable to parse shard version"};
|
||||
}
|
||||
|
||||
request._version = std::move(chunkVersion);
|
||||
request._version = versionStatus.getValue();
|
||||
}
|
||||
|
||||
return request;
|
||||
@ -209,7 +205,7 @@ BSONObj SetShardVersionRequest::toBSON() const {
|
||||
cmdBuilder.append(kShardConnectionString, _shardCS.toString());
|
||||
|
||||
if (!_init) {
|
||||
_version.get().addToBSON(cmdBuilder, kVersion);
|
||||
_version.get().appendForSetShardVersion(&cmdBuilder);
|
||||
}
|
||||
|
||||
if (_noConnectionVersioning) {
|
||||
@ -226,7 +222,7 @@ const NamespaceString& SetShardVersionRequest::getNS() const {
|
||||
|
||||
const ChunkVersion SetShardVersionRequest::getNSVersion() const {
|
||||
invariant(!_init);
|
||||
return _version.get();
|
||||
return _version.get().getVersion();
|
||||
}
|
||||
|
||||
} // namespace mongo
|
||||
|
@ -38,7 +38,6 @@
|
||||
namespace mongo {
|
||||
|
||||
class BSONObj;
|
||||
struct ChunkVersion;
|
||||
template <typename T>
|
||||
class StatusWith;
|
||||
|
||||
@ -67,7 +66,7 @@ public:
|
||||
const std::string& shardName,
|
||||
const ConnectionString& shard,
|
||||
const NamespaceString& nss,
|
||||
const ChunkVersion& nssVersion,
|
||||
const ChunkVersionAndOpTime& nssVersion,
|
||||
bool isAuthoritative);
|
||||
|
||||
/**
|
||||
@ -77,12 +76,13 @@ public:
|
||||
* connection WILL NOT be marked as "versioned". DO NOT USE unless the command will be sent
|
||||
* through the TaskExecutor.
|
||||
*/
|
||||
static SetShardVersionRequest makeForVersioningNoPersist(const ConnectionString& configServer,
|
||||
const std::string& shardName,
|
||||
const ConnectionString& shard,
|
||||
const NamespaceString& nss,
|
||||
const ChunkVersion& nssVersion,
|
||||
bool isAuthoritative);
|
||||
static SetShardVersionRequest makeForVersioningNoPersist(
|
||||
const ConnectionString& configServer,
|
||||
const std::string& shardName,
|
||||
const ConnectionString& shard,
|
||||
const NamespaceString& nss,
|
||||
const ChunkVersionAndOpTime& nssVersion,
|
||||
bool isAuthoritative);
|
||||
|
||||
/**
|
||||
* Parses an SSV request from a set shard version command.
|
||||
@ -153,7 +153,7 @@ private:
|
||||
std::string shardName,
|
||||
ConnectionString shardConnectionString,
|
||||
NamespaceString nss,
|
||||
ChunkVersion nssVersion,
|
||||
ChunkVersionAndOpTime version,
|
||||
bool isAuthoritative);
|
||||
|
||||
SetShardVersionRequest();
|
||||
@ -169,7 +169,7 @@ private:
|
||||
|
||||
// These values are only set if _init is false
|
||||
boost::optional<NamespaceString> _nss;
|
||||
boost::optional<ChunkVersion> _version;
|
||||
boost::optional<ChunkVersionAndOpTime> _version;
|
||||
};
|
||||
|
||||
} // namespace mongo
|
||||
|
@ -215,7 +215,8 @@ TEST(SetShardVersionRequest, ToSSVCommandInit) {
|
||||
}
|
||||
|
||||
TEST(SetShardVersionRequest, ToSSVCommandFull) {
|
||||
const ChunkVersion chunkVersion(1, 2, OID::gen());
|
||||
const ChunkVersionAndOpTime chunkVersion(ChunkVersion(1, 2, OID::gen()),
|
||||
repl::OpTime(Timestamp(10), 20LL));
|
||||
|
||||
SetShardVersionRequest ssv = SetShardVersionRequest::makeForVersioning(
|
||||
configCS, "TestShard", shardCS, NamespaceString("db.coll"), chunkVersion, false);
|
||||
@ -228,7 +229,7 @@ TEST(SetShardVersionRequest, ToSSVCommandFull) {
|
||||
ASSERT_EQ(ssv.getShardConnectionString().toString(), shardCS.toString());
|
||||
ASSERT_EQ(ssv.getNS().ns(), "db.coll");
|
||||
ASSERT_EQ(ssv.getNSVersion().toBSONWithPrefix("version"),
|
||||
chunkVersion.toBSONWithPrefix("version"));
|
||||
chunkVersion.getVersion().toBSONWithPrefix("version"));
|
||||
|
||||
ASSERT_EQ(ssv.toBSON(),
|
||||
BSON("setShardVersion"
|
||||
@ -237,11 +238,13 @@ TEST(SetShardVersionRequest, ToSSVCommandFull) {
|
||||
<< configCS.toString() << "shard"
|
||||
<< "TestShard"
|
||||
<< "shardHost" << shardCS.toString() << "version"
|
||||
<< Timestamp(chunkVersion.toLong()) << "versionEpoch" << chunkVersion.epoch()));
|
||||
<< Timestamp(chunkVersion.getVersion().toLong()) << "versionEpoch"
|
||||
<< chunkVersion.getVersion().epoch() << "ts" << Timestamp(10) << "t" << 20LL));
|
||||
}
|
||||
|
||||
TEST(SetShardVersionRequest, ToSSVCommandFullAuthoritative) {
|
||||
const ChunkVersion chunkVersion(1, 2, OID::gen());
|
||||
const ChunkVersionAndOpTime chunkVersion(ChunkVersion(1, 2, OID::gen()),
|
||||
repl::OpTime(Timestamp(10), 20LL));
|
||||
|
||||
SetShardVersionRequest ssv = SetShardVersionRequest::makeForVersioning(
|
||||
configCS, "TestShard", shardCS, NamespaceString("db.coll"), chunkVersion, true);
|
||||
@ -254,7 +257,7 @@ TEST(SetShardVersionRequest, ToSSVCommandFullAuthoritative) {
|
||||
ASSERT_EQ(ssv.getShardConnectionString().toString(), shardCS.toString());
|
||||
ASSERT_EQ(ssv.getNS().ns(), "db.coll");
|
||||
ASSERT_EQ(ssv.getNSVersion().toBSONWithPrefix("version"),
|
||||
chunkVersion.toBSONWithPrefix("version"));
|
||||
chunkVersion.getVersion().toBSONWithPrefix("version"));
|
||||
|
||||
ASSERT_EQ(ssv.toBSON(),
|
||||
BSON("setShardVersion"
|
||||
@ -263,11 +266,13 @@ TEST(SetShardVersionRequest, ToSSVCommandFullAuthoritative) {
|
||||
<< configCS.toString() << "shard"
|
||||
<< "TestShard"
|
||||
<< "shardHost" << shardCS.toString() << "version"
|
||||
<< Timestamp(chunkVersion.toLong()) << "versionEpoch" << chunkVersion.epoch()));
|
||||
<< Timestamp(chunkVersion.getVersion().toLong()) << "versionEpoch"
|
||||
<< chunkVersion.getVersion().epoch() << "ts" << Timestamp(10) << "t" << 20LL));
|
||||
}
|
||||
|
||||
TEST(SetShardVersionRequest, ToSSVCommandFullNoConnectionVersioning) {
|
||||
const ChunkVersion chunkVersion(1, 2, OID::gen());
|
||||
const ChunkVersionAndOpTime chunkVersion(ChunkVersion(1, 2, OID::gen()),
|
||||
repl::OpTime(Timestamp(10), 20LL));
|
||||
|
||||
SetShardVersionRequest ssv = SetShardVersionRequest::makeForVersioningNoPersist(
|
||||
configCS, "TestShard", shardCS, NamespaceString("db.coll"), chunkVersion, true);
|
||||
@ -280,7 +285,7 @@ TEST(SetShardVersionRequest, ToSSVCommandFullNoConnectionVersioning) {
|
||||
ASSERT_EQ(ssv.getShardConnectionString().toString(), shardCS.toString());
|
||||
ASSERT_EQ(ssv.getNS().ns(), "db.coll");
|
||||
ASSERT_EQ(ssv.getNSVersion().toBSONWithPrefix("version"),
|
||||
chunkVersion.toBSONWithPrefix("version"));
|
||||
chunkVersion.getVersion().toBSONWithPrefix("version"));
|
||||
|
||||
ASSERT_EQ(ssv.toBSON(),
|
||||
BSON("setShardVersion"
|
||||
@ -289,7 +294,8 @@ TEST(SetShardVersionRequest, ToSSVCommandFullNoConnectionVersioning) {
|
||||
<< configCS.toString() << "shard"
|
||||
<< "TestShard"
|
||||
<< "shardHost" << shardCS.toString() << "version"
|
||||
<< Timestamp(chunkVersion.toLong()) << "versionEpoch" << chunkVersion.epoch()
|
||||
<< Timestamp(chunkVersion.getVersion().toLong()) << "versionEpoch"
|
||||
<< chunkVersion.getVersion().epoch() << "ts" << Timestamp(10) << "t" << 20LL
|
||||
<< "noConnectionVersioning" << true));
|
||||
}
|
||||
|
||||
|
@ -1,32 +1,30 @@
|
||||
// @file version_manager.cpp
|
||||
|
||||
/**
|
||||
* Copyright (C) 2010 10gen Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3,
|
||||
* as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the GNU Affero General Public License in all respects
|
||||
* for all of the code used other than as permitted herein. If you modify
|
||||
* file(s) with this exception, you may extend this exception to your
|
||||
* version of the file(s), but you are not obligated to do so. If you do not
|
||||
* wish to do so, delete this exception statement from your version. If you
|
||||
* delete this exception statement from all source files in the program,
|
||||
* then also delete it in the license file.
|
||||
*/
|
||||
* Copyright (C) 2010-2015 MongoDB Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3,
|
||||
* as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the GNU Affero General Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
|
||||
|
||||
@ -45,6 +43,7 @@
|
||||
#include "mongo/s/config.h"
|
||||
#include "mongo/s/grid.h"
|
||||
#include "mongo/s/mongos_options.h"
|
||||
#include "mongo/s/set_shard_version_request.h"
|
||||
#include "mongo/s/stale_exception.h"
|
||||
#include "mongo/util/log.h"
|
||||
|
||||
@ -111,34 +110,35 @@ private:
|
||||
*/
|
||||
bool setShardVersion(DBClientBase& conn,
|
||||
const string& ns,
|
||||
const string& configServerPrimary,
|
||||
const ConnectionString& configServer,
|
||||
ChunkVersion version,
|
||||
ChunkManager* manager,
|
||||
bool authoritative,
|
||||
BSONObj& result) {
|
||||
BSONObjBuilder cmdBuilder;
|
||||
cmdBuilder.append("setShardVersion", ns);
|
||||
cmdBuilder.append("configdb", configServerPrimary);
|
||||
|
||||
ShardId shardId;
|
||||
ConnectionString shardCS;
|
||||
{
|
||||
const auto shard = grid.shardRegistry()->getShard(conn.getServerAddress());
|
||||
shardId = shard->getId();
|
||||
cmdBuilder.append("shard", shardId);
|
||||
cmdBuilder.append("shardHost", shard->getConnString().toString());
|
||||
shardCS = shard->getConnString();
|
||||
}
|
||||
|
||||
if (ns.size() > 0) {
|
||||
version.addToBSON(cmdBuilder);
|
||||
BSONObj cmd;
|
||||
|
||||
if (ns.empty()) {
|
||||
SetShardVersionRequest ssv =
|
||||
SetShardVersionRequest::makeForInit(configServer, shardId, shardCS);
|
||||
cmd = ssv.toBSON();
|
||||
} else {
|
||||
cmdBuilder.append("init", true);
|
||||
}
|
||||
const ChunkVersionAndOpTime verAndOpT = manager
|
||||
? ChunkVersionAndOpTime(version, manager->getConfigOpTime())
|
||||
: ChunkVersionAndOpTime(version);
|
||||
|
||||
if (authoritative) {
|
||||
cmdBuilder.appendBool("authoritative", 1);
|
||||
}
|
||||
SetShardVersionRequest ssv = SetShardVersionRequest::makeForVersioning(
|
||||
configServer, shardId, shardCS, NamespaceString(ns), verAndOpT, authoritative);
|
||||
|
||||
BSONObj cmd = cmdBuilder.obj();
|
||||
cmd = ssv.toBSON();
|
||||
}
|
||||
|
||||
LOG(1) << " setShardVersion " << shardId << " " << conn.getServerAddress() << " " << ns
|
||||
<< " " << cmd
|
||||
@ -214,7 +214,7 @@ bool initShardVersionEmptyNS(OperationContext* txn, DBClientBase* conn_in) {
|
||||
|
||||
ok = setShardVersion(*conn,
|
||||
"",
|
||||
grid.shardRegistry()->getConfigServerConnectionString().toString(),
|
||||
grid.shardRegistry()->getConfigServerConnectionString(),
|
||||
ChunkVersion(),
|
||||
NULL,
|
||||
true,
|
||||
@ -373,7 +373,7 @@ bool checkShardVersion(OperationContext* txn,
|
||||
BSONObj result;
|
||||
if (setShardVersion(*conn,
|
||||
ns,
|
||||
grid.shardRegistry()->getConfigServerConnectionString().toString(),
|
||||
grid.shardRegistry()->getConfigServerConnectionString(),
|
||||
version,
|
||||
manager.get(),
|
||||
authoritative,
|
||||
|
@ -87,11 +87,13 @@ static int compareEndpoints(const ShardEndpoint* endpointA, const ShardEndpoint*
|
||||
if (shardNameDiff != 0)
|
||||
return shardNameDiff;
|
||||
|
||||
long shardVersionDiff = endpointA->shardVersion.toLong() - endpointB->shardVersion.toLong();
|
||||
long shardVersionDiff = endpointA->shardVersion.getVersion().toLong() -
|
||||
endpointB->shardVersion.getVersion().toLong();
|
||||
if (shardVersionDiff != 0)
|
||||
return shardVersionDiff;
|
||||
|
||||
int shardEpochDiff = endpointA->shardVersion.epoch().compare(endpointB->shardVersion.epoch());
|
||||
int shardEpochDiff = endpointA->shardVersion.getVersion().epoch().compare(
|
||||
endpointB->shardVersion.getVersion().epoch());
|
||||
return shardEpochDiff;
|
||||
}
|
||||
|
||||
@ -456,7 +458,8 @@ void BatchWriteOp::buildBatchRequest(const TargetedWriteBatch& targetedBatch,
|
||||
|
||||
unique_ptr<BatchedRequestMetadata> requestMetadata(new BatchedRequestMetadata());
|
||||
requestMetadata->setShardName(targetedBatch.getEndpoint().shardName);
|
||||
requestMetadata->setShardVersion(targetedBatch.getEndpoint().shardVersion);
|
||||
requestMetadata->setShardVersion(
|
||||
ChunkVersionAndOpTime(targetedBatch.getEndpoint().shardVersion));
|
||||
requestMetadata->setSession(0);
|
||||
request->setMetadata(requestMetadata.release());
|
||||
}
|
||||
|
@ -46,32 +46,24 @@ TEST(BatchedDeleteRequest, Basic) {
|
||||
BSON(BatchedDeleteDocument::query(BSON("a" << 1)) << BatchedDeleteDocument::limit(1))
|
||||
<< BSON(BatchedDeleteDocument::query(BSON("b" << 1)) << BatchedDeleteDocument::limit(1)));
|
||||
|
||||
BSONObj writeConcernObj = BSON("w" << 1);
|
||||
|
||||
// The BSON_ARRAY macro doesn't support Timestamps.
|
||||
BSONArrayBuilder arrBuilder;
|
||||
arrBuilder.append(Timestamp(1, 1));
|
||||
arrBuilder.append(OID::gen());
|
||||
BSONArray shardVersionArray = arrBuilder.arr();
|
||||
|
||||
BSONObj origDeleteRequestObj =
|
||||
BSON(BatchedDeleteRequest::collName("test")
|
||||
<< BatchedDeleteRequest::deletes() << deleteArray
|
||||
<< BatchedDeleteRequest::writeConcern(writeConcernObj)
|
||||
<< BatchedDeleteRequest::ordered(true) << BatchedDeleteRequest::metadata()
|
||||
<< BSON(BatchedRequestMetadata::shardName("shard000")
|
||||
<< BatchedRequestMetadata::shardVersion() << shardVersionArray
|
||||
<< BatchedRequestMetadata::session(0)));
|
||||
BSONObj origDeleteRequestObj = BSON(
|
||||
BatchedDeleteRequest::collName("test")
|
||||
<< BatchedDeleteRequest::deletes() << deleteArray
|
||||
<< BatchedDeleteRequest::writeConcern(BSON("w" << 1)) << BatchedDeleteRequest::ordered(true)
|
||||
<< BatchedDeleteRequest::metadata() << BSON("shardName"
|
||||
<< "shard000"
|
||||
<< "shardVersion"
|
||||
<< BSON_ARRAY(Timestamp(1, 2) << OID::gen())
|
||||
<< "ts" << Timestamp(3, 4) << "t" << 5
|
||||
<< "session" << 0LL));
|
||||
|
||||
string errMsg;
|
||||
BatchedDeleteRequest request;
|
||||
bool ok = request.parseBSON("foo", origDeleteRequestObj, &errMsg);
|
||||
ASSERT_TRUE(ok);
|
||||
ASSERT_TRUE(request.parseBSON("foo", origDeleteRequestObj, &errMsg));
|
||||
|
||||
ASSERT_EQ("foo.test", request.getNS().ns());
|
||||
|
||||
BSONObj genDeleteRequestObj = request.toBSON();
|
||||
ASSERT_EQUALS(0, genDeleteRequestObj.woCompare(origDeleteRequestObj));
|
||||
ASSERT_EQUALS(origDeleteRequestObj, request.toBSON());
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
@ -45,32 +45,24 @@ namespace {
|
||||
TEST(BatchedInsertRequest, Basic) {
|
||||
BSONArray insertArray = BSON_ARRAY(BSON("a" << 1) << BSON("b" << 1));
|
||||
|
||||
BSONObj writeConcernObj = BSON("w" << 1);
|
||||
|
||||
// The BSON_ARRAY macro doesn't support Timestamps.
|
||||
BSONArrayBuilder arrBuilder;
|
||||
arrBuilder.append(Timestamp(1, 1));
|
||||
arrBuilder.append(OID::gen());
|
||||
BSONArray shardVersionArray = arrBuilder.arr();
|
||||
|
||||
BSONObj origInsertRequestObj =
|
||||
BSON(BatchedInsertRequest::collName("test")
|
||||
<< BatchedInsertRequest::documents() << insertArray
|
||||
<< BatchedInsertRequest::writeConcern(writeConcernObj)
|
||||
<< BatchedInsertRequest::ordered(true) << BatchedInsertRequest::metadata()
|
||||
<< BSON(BatchedRequestMetadata::shardName("shard0000")
|
||||
<< BatchedRequestMetadata::shardVersion() << shardVersionArray
|
||||
<< BatchedRequestMetadata::session(0)));
|
||||
BSONObj origInsertRequestObj = BSON(
|
||||
BatchedInsertRequest::collName("test")
|
||||
<< BatchedInsertRequest::documents() << insertArray
|
||||
<< BatchedInsertRequest::writeConcern(BSON("w" << 1)) << BatchedInsertRequest::ordered(true)
|
||||
<< BatchedInsertRequest::metadata() << BSON("shardName"
|
||||
<< "shard000"
|
||||
<< "shardVersion"
|
||||
<< BSON_ARRAY(Timestamp(1, 2) << OID::gen())
|
||||
<< "ts" << Timestamp(3, 4) << "t" << 5
|
||||
<< "session" << 0LL));
|
||||
|
||||
string errMsg;
|
||||
BatchedInsertRequest request;
|
||||
bool ok = request.parseBSON("foo", origInsertRequestObj, &errMsg);
|
||||
ASSERT_TRUE(ok);
|
||||
ASSERT_TRUE(request.parseBSON("foo", origInsertRequestObj, &errMsg));
|
||||
|
||||
ASSERT_EQ("foo.test", request.getNS().ns());
|
||||
|
||||
BSONObj genInsertRequestObj = request.toBSON();
|
||||
ASSERT_EQUALS(0, genInsertRequestObj.woCompare(origInsertRequestObj));
|
||||
ASSERT_EQUALS(origInsertRequestObj, request.toBSON());
|
||||
}
|
||||
|
||||
TEST(BatchedInsertRequest, GenIDAll) {
|
||||
|
@ -26,6 +26,8 @@
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#include "mongo/platform/basic.h"
|
||||
|
||||
#include "mongo/s/write_ops/batched_request_metadata.h"
|
||||
|
||||
#include "mongo/db/field_parser.h"
|
||||
@ -55,9 +57,8 @@ BSONObj BatchedRequestMetadata::toBSON() const {
|
||||
if (_isShardNameSet)
|
||||
metadataBuilder << shardName(_shardName);
|
||||
|
||||
if (_shardVersion.get()) {
|
||||
// ChunkVersion wants to be an array.
|
||||
metadataBuilder.append(shardVersion(), static_cast<BSONArray>(_shardVersion->toBSON()));
|
||||
if (_shardVersion) {
|
||||
_shardVersion.get().appendForCommands(&metadataBuilder);
|
||||
}
|
||||
|
||||
if (_isSessionSet)
|
||||
@ -80,12 +81,12 @@ bool BatchedRequestMetadata::parseBSON(const BSONObj& source, string* errMsg) {
|
||||
_isShardNameSet = fieldState == FieldParser::FIELD_SET;
|
||||
|
||||
{
|
||||
std::unique_ptr<ChunkVersion> tempChunkVersion(new ChunkVersion);
|
||||
fieldState = FieldParser::extract(source, shardVersion, tempChunkVersion.get(), errMsg);
|
||||
if (fieldState == FieldParser::FIELD_INVALID)
|
||||
auto verAndOpTStatus = ChunkVersionAndOpTime::parseFromBSONForCommands(source);
|
||||
if (!verAndOpTStatus.isOK()) {
|
||||
return false;
|
||||
if (fieldState == FieldParser::FIELD_SET)
|
||||
_shardVersion.swap(tempChunkVersion);
|
||||
}
|
||||
|
||||
_shardVersion = verAndOpTStatus.getValue();
|
||||
}
|
||||
|
||||
fieldState = FieldParser::extract(source, session, &_session, errMsg);
|
||||
@ -113,10 +114,7 @@ string BatchedRequestMetadata::toString() const {
|
||||
void BatchedRequestMetadata::cloneTo(BatchedRequestMetadata* other) const {
|
||||
other->_shardName = _shardName;
|
||||
other->_isShardNameSet = _isShardNameSet;
|
||||
|
||||
if (other->_shardVersion.get())
|
||||
_shardVersion->cloneTo(other->_shardVersion.get());
|
||||
|
||||
other->_shardVersion = _shardVersion;
|
||||
other->_session = _session;
|
||||
other->_isSessionSet = _isSessionSet;
|
||||
}
|
||||
@ -126,36 +124,21 @@ void BatchedRequestMetadata::setShardName(StringData shardName) {
|
||||
_isShardNameSet = true;
|
||||
}
|
||||
|
||||
void BatchedRequestMetadata::unsetShardName() {
|
||||
_isShardNameSet = false;
|
||||
}
|
||||
|
||||
bool BatchedRequestMetadata::isShardNameSet() const {
|
||||
return _isShardNameSet;
|
||||
}
|
||||
|
||||
const string& BatchedRequestMetadata::getShardName() const {
|
||||
dassert(_isShardNameSet);
|
||||
return _shardName;
|
||||
}
|
||||
|
||||
void BatchedRequestMetadata::setShardVersion(const ChunkVersion& shardVersion) {
|
||||
unique_ptr<ChunkVersion> temp(new ChunkVersion);
|
||||
shardVersion.cloneTo(temp.get());
|
||||
_shardVersion.reset(temp.release());
|
||||
}
|
||||
|
||||
void BatchedRequestMetadata::unsetShardVersion() {
|
||||
_shardVersion.reset();
|
||||
void BatchedRequestMetadata::setShardVersion(const ChunkVersionAndOpTime& shardVersion) {
|
||||
_shardVersion = shardVersion;
|
||||
}
|
||||
|
||||
bool BatchedRequestMetadata::isShardVersionSet() const {
|
||||
return _shardVersion.get() != NULL;
|
||||
return _shardVersion.is_initialized();
|
||||
}
|
||||
|
||||
const ChunkVersion& BatchedRequestMetadata::getShardVersion() const {
|
||||
dassert(_shardVersion.get());
|
||||
return *_shardVersion;
|
||||
return _shardVersion.get().getVersion();
|
||||
}
|
||||
|
||||
void BatchedRequestMetadata::setSession(long long session) {
|
||||
|
@ -28,34 +28,33 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <boost/optional.hpp>
|
||||
#include <string>
|
||||
|
||||
#include "mongo/base/disallow_copying.h"
|
||||
#include "mongo/db/jsobj.h"
|
||||
#include "mongo/s/bson_serializable.h"
|
||||
#include "mongo/s/chunk_version.h"
|
||||
|
||||
namespace mongo {
|
||||
class BatchedRequestMetadata : public BSONSerializable {
|
||||
MONGO_DISALLOW_COPYING(BatchedRequestMetadata);
|
||||
|
||||
class BatchedRequestMetadata {
|
||||
public:
|
||||
static const BSONField<std::string> shardName;
|
||||
static const BSONField<ChunkVersion> shardVersion;
|
||||
static const BSONField<long long> session;
|
||||
|
||||
BatchedRequestMetadata();
|
||||
virtual ~BatchedRequestMetadata();
|
||||
~BatchedRequestMetadata();
|
||||
|
||||
//
|
||||
// bson serializable interface implementation
|
||||
//
|
||||
|
||||
virtual bool isValid(std::string* errMsg) const;
|
||||
virtual BSONObj toBSON() const;
|
||||
virtual bool parseBSON(const BSONObj& source, std::string* errMsg);
|
||||
virtual void clear();
|
||||
virtual std::string toString() const;
|
||||
bool isValid(std::string* errMsg) const;
|
||||
BSONObj toBSON() const;
|
||||
bool parseBSON(const BSONObj& source, std::string* errMsg);
|
||||
void clear();
|
||||
std::string toString() const;
|
||||
|
||||
void cloneTo(BatchedRequestMetadata* other) const;
|
||||
|
||||
@ -64,14 +63,12 @@ public:
|
||||
//
|
||||
|
||||
void setShardName(StringData shardName);
|
||||
void unsetShardName();
|
||||
bool isShardNameSet() const;
|
||||
const std::string& getShardName() const;
|
||||
|
||||
void setShardVersion(const ChunkVersion& shardVersion);
|
||||
void unsetShardVersion();
|
||||
void setShardVersion(const ChunkVersionAndOpTime& shardVersion);
|
||||
bool isShardVersionSet() const;
|
||||
const ChunkVersion& getShardVersion() const;
|
||||
const repl::OpTime& getOpTime() const;
|
||||
|
||||
void setSession(long long session);
|
||||
void unsetSession();
|
||||
@ -84,10 +81,11 @@ private:
|
||||
bool _isShardNameSet;
|
||||
|
||||
// (O) version for this collection on a given shard
|
||||
std::unique_ptr<ChunkVersion> _shardVersion;
|
||||
boost::optional<ChunkVersionAndOpTime> _shardVersion;
|
||||
|
||||
// (O) session number the inserts belong to
|
||||
long long _session;
|
||||
bool _isSessionSet;
|
||||
};
|
||||
}
|
||||
|
||||
} // namespace mongo
|
||||
|
@ -40,22 +40,16 @@ using std::string;
|
||||
namespace {
|
||||
|
||||
TEST(BatchedRequestMetadata, Basic) {
|
||||
// The BSON_ARRAY macro doesn't support Timestamps.
|
||||
BSONArrayBuilder arrBuilder;
|
||||
arrBuilder.append(Timestamp(1, 1));
|
||||
arrBuilder.append(OID::gen());
|
||||
BSONArray shardVersionArray = arrBuilder.arr();
|
||||
|
||||
BSONObj metadataObj(BSON(BatchedRequestMetadata::shardName("shard0000")
|
||||
<< BatchedRequestMetadata::shardVersion() << shardVersionArray
|
||||
<< BatchedRequestMetadata::session(100)));
|
||||
BSONObj metadataObj(BSON("shardName"
|
||||
<< "shard0000"
|
||||
<< "shardVersion" << BSON_ARRAY(Timestamp(1, 2) << OID::gen()) << "ts"
|
||||
<< Timestamp(3, 4) << "t" << 5 << "session" << 0LL));
|
||||
|
||||
string errMsg;
|
||||
BatchedRequestMetadata metadata;
|
||||
ASSERT_TRUE(metadata.parseBSON(metadataObj, &errMsg));
|
||||
|
||||
BSONObj genMetadataObj = metadata.toBSON();
|
||||
ASSERT_EQUALS(metadataObj, genMetadataObj);
|
||||
ASSERT_EQUALS(metadataObj, metadata.toBSON());
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
@ -50,32 +50,24 @@ TEST(BatchedUpdateRequest, Basic) {
|
||||
<< BatchedUpdateDocument::updateExpr(BSON("$set" << BSON("b" << 2)))
|
||||
<< BatchedUpdateDocument::multi(false) << BatchedUpdateDocument::upsert(false)));
|
||||
|
||||
BSONObj writeConcernObj = BSON("w" << 1);
|
||||
|
||||
// The BSON_ARRAY macro doesn't support Timestamps.
|
||||
BSONArrayBuilder arrBuilder;
|
||||
arrBuilder.append(Timestamp(1, 1));
|
||||
arrBuilder.append(OID::gen());
|
||||
BSONArray shardVersionArray = arrBuilder.arr();
|
||||
|
||||
BSONObj origUpdateRequestObj =
|
||||
BSON(BatchedUpdateRequest::collName("test")
|
||||
<< BatchedUpdateRequest::updates() << updateArray
|
||||
<< BatchedUpdateRequest::writeConcern(writeConcernObj)
|
||||
<< BatchedUpdateRequest::ordered(true) << BatchedUpdateRequest::metadata()
|
||||
<< BSON(BatchedRequestMetadata::shardName("shard0000")
|
||||
<< BatchedRequestMetadata::shardVersion() << shardVersionArray
|
||||
<< BatchedRequestMetadata::session(0)));
|
||||
BSONObj origUpdateRequestObj = BSON(
|
||||
BatchedUpdateRequest::collName("test")
|
||||
<< BatchedUpdateRequest::updates() << updateArray
|
||||
<< BatchedUpdateRequest::writeConcern(BSON("w" << 1)) << BatchedUpdateRequest::ordered(true)
|
||||
<< BatchedUpdateRequest::metadata() << BSON("shardName"
|
||||
<< "shard000"
|
||||
<< "shardVersion"
|
||||
<< BSON_ARRAY(Timestamp(1, 2) << OID::gen())
|
||||
<< "ts" << Timestamp(3, 4) << "t" << 5
|
||||
<< "session" << 0LL));
|
||||
|
||||
string errMsg;
|
||||
BatchedUpdateRequest request;
|
||||
bool ok = request.parseBSON("foo", origUpdateRequestObj, &errMsg);
|
||||
ASSERT_TRUE(ok);
|
||||
ASSERT_TRUE(request.parseBSON("foo", origUpdateRequestObj, &errMsg));
|
||||
|
||||
ASSERT_EQ("foo.test", request.getNS().ns());
|
||||
|
||||
BSONObj genUpdateRequestObj = request.toBSON();
|
||||
ASSERT_EQUALS(0, genUpdateRequestObj.woCompare(origUpdateRequestObj));
|
||||
ASSERT_EQUALS(origUpdateRequestObj, request.toBSON());
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
@ -127,7 +127,8 @@ Status WriteOp::targetWrites(OperationContext* txn,
|
||||
if (endpoints.size() == 1u) {
|
||||
targetedWrites->push_back(new TargetedWrite(*endpoint, ref));
|
||||
} else {
|
||||
ShardEndpoint broadcastEndpoint(endpoint->shardName, ChunkVersion::IGNORED());
|
||||
ShardEndpoint broadcastEndpoint(endpoint->shardName,
|
||||
ChunkVersionAndOpTime(ChunkVersion::IGNORED()));
|
||||
targetedWrites->push_back(new TargetedWrite(broadcastEndpoint, ref));
|
||||
}
|
||||
|
||||
|
@ -218,11 +218,11 @@ TEST(WriteOpTests, TargetMultiAllShards) {
|
||||
ASSERT_EQUALS(targeted.size(), 3u);
|
||||
sortByEndpoint(&targeted);
|
||||
ASSERT_EQUALS(targeted[0]->endpoint.shardName, endpointA.shardName);
|
||||
ASSERT(ChunkVersion::isIgnoredVersion(targeted[0]->endpoint.shardVersion));
|
||||
ASSERT(ChunkVersion::isIgnoredVersion(targeted[0]->endpoint.shardVersion.getVersion()));
|
||||
ASSERT_EQUALS(targeted[1]->endpoint.shardName, endpointB.shardName);
|
||||
ASSERT(ChunkVersion::isIgnoredVersion(targeted[1]->endpoint.shardVersion));
|
||||
ASSERT(ChunkVersion::isIgnoredVersion(targeted[1]->endpoint.shardVersion.getVersion()));
|
||||
ASSERT_EQUALS(targeted[2]->endpoint.shardName, endpointC.shardName);
|
||||
ASSERT(ChunkVersion::isIgnoredVersion(targeted[2]->endpoint.shardVersion));
|
||||
ASSERT(ChunkVersion::isIgnoredVersion(targeted[2]->endpoint.shardVersion.getVersion()));
|
||||
|
||||
writeOp.noteWriteComplete(*targeted[0]);
|
||||
writeOp.noteWriteComplete(*targeted[1]);
|
||||
|
Loading…
Reference in New Issue
Block a user