diff --git a/.eslintrc.yml b/.eslintrc.yml index 1e3fe5e3a44..0d0057e5129 100644 --- a/.eslintrc.yml +++ b/.eslintrc.yml @@ -147,6 +147,8 @@ globals: _resultSetsEqualUnordered: true getStringWidth: true _compareStringsWithCollation: true + eventResumeTokenType: true + highWaterMarkResumeTokenType: true # likely could be replaced with `path` _copyFileRange: true @@ -200,6 +202,7 @@ globals: Timestamp: true MD5: true Geo: true + decodeResumeToken: true bsonWoCompare: true bsonUnorderedFieldsCompare: true bsonBinaryEqual: true diff --git a/jstests/change_streams/resume_from_high_water_mark_token.js b/jstests/change_streams/resume_from_high_water_mark_token.js index 0116bba9849..ccb423866e7 100644 --- a/jstests/change_streams/resume_from_high_water_mark_token.js +++ b/jstests/change_streams/resume_from_high_water_mark_token.js @@ -105,6 +105,7 @@ const cmdResCollWithCollation = assert.commandWorked(runExactCommand(db, { })); csCursor = new DBCommandCursor(db, cmdResCollWithCollation); const hwmFromCollWithCollation = csCursor.getResumeToken(); +assert.eq(decodeResumeToken(hwmFromCollWithCollation).tokenType, highWaterMarkResumeTokenType); assert.neq(undefined, hwmFromCollWithCollation); csCursor.close(); @@ -164,9 +165,11 @@ assert.soon(() => { assert.eq(csCursor.objsLeftInBatch(), 0); hwmToken = csCursor.getResumeToken(); assert.neq(undefined, hwmToken); + return relatedEvent && bsonWoCompare(hwmToken, relatedEvent._id) > 0; }); csCursor.close(); +assert.eq(decodeResumeToken(hwmToken).tokenType, highWaterMarkResumeTokenType); // Now write some further documents to the collection before attempting to resume. for (let i = 0; i < 5; ++i) { @@ -261,4 +264,5 @@ assert.soon(() => { assert.soon(() => { return csCursor.hasNext() && csCursor.next().operationType === "invalidate"; }); + csCursor.close(); diff --git a/jstests/change_streams/split_large_event.js b/jstests/change_streams/split_large_event.js index ea3bd8e2703..24990a647bd 100644 --- a/jstests/change_streams/split_large_event.js +++ b/jstests/change_streams/split_large_event.js @@ -83,6 +83,16 @@ let csCursor = testColl.watch([]); // Record a resume token marking the start point of the test. const testStartToken = csCursor.getResumeToken(); +const decodedToken = decodeResumeToken(testStartToken); +assert.eq(decodedToken.tokenType, highWaterMarkResumeTokenType); +assert.eq(decodedToken.version, 2); +assert.eq(decodedToken.txnOpIndex, 0); +assert.eq(decodedToken.tokenType, 0); +assert.eq(decodedToken.fromInvalidate, false); +assert.gt(decodedToken.clusterTime, new Timestamp(0, 0)); +assert.eq(decodedToken.uuid, undefined); +assert.eq(decodedToken.fragmentNum, undefined); + // Perform ~16MB updates which generate ~16MB change events and ~16MB post-images. assert.commandWorked(testColl.update({_id: "aaa"}, {$set: {a: "y".repeat(kLargeStringSize)}})); assert.commandWorked(testColl.update({_id: "bbb"}, {$set: {a: "y".repeat(kLargeStringSize)}})); @@ -139,6 +149,23 @@ function validateReconstructedEvent(event, expectedId) { assert.eq(kLargeStringSize, event.updateDescription.updatedFields.a.length); } +// Helper function to validate a collection of resume tokens. +function validateResumeTokens(resumeTokens, numFragments) { + resumeTokens.forEach((resumeToken, idx) => { + const decodedToken = decodeResumeToken(resumeToken); + const eventIdentifier = {"operationType": "update", "documentKey": {"_id": "aaa"}}; + assert.eq(decodedToken.eventIdentifier, eventIdentifier); + assert.eq(decodedToken.tokenType, eventResumeTokenType); + assert.eq(decodedToken.version, 2); + assert.eq(decodedToken.txnOpIndex, 0); + assert.eq(decodedToken.tokenType, 128); + assert.eq(decodedToken.fromInvalidate, false); + assert.gt(decodedToken.clusterTime, new Timestamp(0, 0)); + assert.eq(decodedToken.fragmentNum, idx); + assert.neq(decodedToken.uuid, undefined); + }); +} + // We declare 'resumeTokens' array outside of the for-scope to collect and share resume tokens // across several test-cases. let resumeTokens = []; @@ -187,6 +214,7 @@ for (const postImageMode of ["required", "updateLookup"]) { var reconstructedEvent; [reconstructedEvent, resumeTokens] = reconstructSplitEvent(csCursor, 3); validateReconstructedEvent(reconstructedEvent, "aaa"); + validateResumeTokens(resumeTokens, 3); const [reconstructedEvent2, _] = reconstructSplitEvent(csCursor, 3); validateReconstructedEvent(reconstructedEvent2, "bbb"); @@ -219,6 +247,7 @@ for (const postImageMode of ["required", "updateLookup"]) { resumeAfter: testStartToken }); assert.docEq(resumeTokens, reconstructSplitEvent(csCursor, 3)[1]); + validateResumeTokens(resumeTokens, 3); } { diff --git a/jstests/core/query/js/js_global_scope.js b/jstests/core/query/js/js_global_scope.js index 9a09c058d74..6155ea5c92f 100644 --- a/jstests/core/query/js/js_global_scope.js +++ b/jstests/core/query/js/js_global_scope.js @@ -65,6 +65,7 @@ const expectedGlobalVars = [ "RangeError", "ReferenceError", "RegExp", + "ResumeTokenDataUtility", "Set", "String", "Symbol", @@ -87,6 +88,7 @@ const expectedGlobalVars = [ "bsonUnorderedFieldsCompare", "bsonWoCompare", "buildInfo", + "decodeResumeToken", "decodeURI", "decodeURIComponent", "doassert", @@ -94,11 +96,13 @@ const expectedGlobalVars = [ "encodeURIComponent", "escape", "eval", + "eventResumeTokenType", "gc", "getJSHeapLimitMB", "globalThis", "hex_md5", "isFinite", + "highWaterMarkResumeTokenType", "isNaN", "isNumber", "isObject", diff --git a/jstests/noPassthrough/query/change_streams/change_stream_sharded_startafter_invalidate.js b/jstests/noPassthrough/query/change_streams/change_stream_sharded_startafter_invalidate.js index 9d1469f59a2..ed3faad1556 100644 --- a/jstests/noPassthrough/query/change_streams/change_stream_sharded_startafter_invalidate.js +++ b/jstests/noPassthrough/query/change_streams/change_stream_sharded_startafter_invalidate.js @@ -59,6 +59,7 @@ assert(mongosColl.drop()); }); const invalidateResumeToken = changeStream.getResumeToken(); + assert(decodeResumeToken(invalidateResumeToken).fromInvalidate); // Recreate and then immediately drop the collection again to make sure that change stream when // opened with the invalidate resume token sees this invalidate event. @@ -74,4 +75,4 @@ assert(mongosColl.drop()); }); })(); -st.stop(); \ No newline at end of file +st.stop(); diff --git a/jstests/noPassthrough/query/queryStats/query_stats_agg_key_change_stream.js b/jstests/noPassthrough/query/queryStats/query_stats_agg_key_change_stream.js index cc563699d6f..c69eb032de6 100644 --- a/jstests/noPassthrough/query/queryStats/query_stats_agg_key_change_stream.js +++ b/jstests/noPassthrough/query/queryStats/query_stats_agg_key_change_stream.js @@ -99,6 +99,9 @@ function validateResumeTokenQueryShape(conn, coll) { assert.soon(() => changeStream.hasNext()); changeStream.next(); const invalidateResumeToken = changeStream.getResumeToken(); + const decodedToken = decodeResumeToken(invalidateResumeToken); + assert.eq(decodedToken.tokenType, eventResumeTokenType); + assert.eq(decodedToken.fromInvalidate, false); // Resume the change stream using 'startAfter' field. coll.watch([], {startAfter: invalidateResumeToken}); diff --git a/jstests/serverless/change_streams/isolate_high_water_mark.js b/jstests/serverless/change_streams/isolate_high_water_mark.js index 66fa1d929e5..75b71030f83 100644 --- a/jstests/serverless/change_streams/isolate_high_water_mark.js +++ b/jstests/serverless/change_streams/isolate_high_water_mark.js @@ -49,6 +49,7 @@ assert.soon(() => { assert.neq(undefined, hwmToken); return bsonWoCompare(hwmToken, monitoredEvent._id) > 0; }); +assert.eq(decodeResumeToken(hwmToken).tokenType, highWaterMarkResumeTokenType); // Open a change stream on tenant 2 so we can observe a write that happens and verify that write // advanced the global oplog timestamp. @@ -65,6 +66,7 @@ assert.soon(() => { // greater than the last resume token we got. assert.eq(csCursor.hasNext(), false); const hwmToken2 = csCursor.getResumeToken(); +assert.eq(decodeResumeToken(hwmToken2).tokenType, highWaterMarkResumeTokenType); assert.neq(undefined, hwmToken2); assert.eq(bsonWoCompare(hwmToken, hwmToken2), 0); diff --git a/src/mongo/db/pipeline/resume_token.cpp b/src/mongo/db/pipeline/resume_token.cpp index fb03f215ed3..333ed20cb41 100644 --- a/src/mongo/db/pipeline/resume_token.cpp +++ b/src/mongo/db/pipeline/resume_token.cpp @@ -89,23 +89,29 @@ bool ResumeTokenData::operator==(const ResumeTokenData& other) const { fragmentNum == other.fragmentNum; } +BSONObj ResumeTokenData::toBSON() const { + // TODO SERVER-96418: Make ResumeTokenData an IDL type so that this method is auto-generated. + BSONObjBuilder builder; + builder.append("clusterTime", clusterTime); + builder.append("tokenData", tokenType); + builder.append("version", version); + builder.append("txnOpIndex", static_cast(txnOpIndex)); + if (version > 0) { + builder.append("tokenType", tokenType); + builder.append("fromInvalidate", static_cast(fromInvalidate)); + } + if (uuid) { + builder.append("uuid", uuid->toBSON()); + } + if (fragmentNum) { + builder.append("fragmentNum", static_cast(*fragmentNum)); + } + eventIdentifier.addToBsonObj(&builder, "eventIdentifier"); + return builder.obj(); +} + std::ostream& operator<<(std::ostream& out, const ResumeTokenData& tokenData) { - out << "{clusterTime: " << tokenData.clusterTime.toString(); - out << ", version: " << tokenData.version; - if (tokenData.version > 0) { - out << ", tokenType: " << tokenData.tokenType; - } - out << ", txnOpIndex: " << tokenData.txnOpIndex; - if (tokenData.version > 0) { - out << ", fromInvalidate: " << static_cast(tokenData.fromInvalidate); - } - out << ", uuid: " << optional_io::Extension{tokenData.uuid}; - out << ", eventIdentifier: " << tokenData.eventIdentifier; - if (tokenData.version >= 2) { - out << ", fragmentNum: " << optional_io::Extension{tokenData.fragmentNum}; - } - out << "}"; - return out; + return out << tokenData.toBSON(); } ResumeToken::ResumeToken(const Document& resumeDoc) { diff --git a/src/mongo/db/pipeline/resume_token.h b/src/mongo/db/pipeline/resume_token.h index 960a76ec06f..dc8c651cf73 100644 --- a/src/mongo/db/pipeline/resume_token.h +++ b/src/mongo/db/pipeline/resume_token.h @@ -119,6 +119,8 @@ struct ResumeTokenData { // Index of the current fragment, for oversized events that have been split. boost::optional fragmentNum; + BSONObj toBSON() const; + private: // This private constructor should only ever be used internally or by the ResumeToken class. ResumeTokenData() = default; diff --git a/src/mongo/scripting/BUILD.bazel b/src/mongo/scripting/BUILD.bazel index 3f59f494253..634c6fcbe78 100644 --- a/src/mongo/scripting/BUILD.bazel +++ b/src/mongo/scripting/BUILD.bazel @@ -111,6 +111,7 @@ mongo_cc_library( "//src/mongo/scripting/mozjs:oid.cpp", "//src/mongo/scripting/mozjs:proxyscope.cpp", "//src/mongo/scripting/mozjs:regexp.cpp", + "//src/mongo/scripting/mozjs:resumetoken.cpp", "//src/mongo/scripting/mozjs:scripting_util_gen", "//src/mongo/scripting/mozjs:session.cpp", "//src/mongo/scripting/mozjs:status.cpp", @@ -162,6 +163,7 @@ mongo_cc_library( "//src/mongo/scripting/mozjs:oid.h", "//src/mongo/scripting/mozjs:proxyscope.h", "//src/mongo/scripting/mozjs:regexp.h", + "//src/mongo/scripting/mozjs:resumetoken.h", "//src/mongo/scripting/mozjs:session.h", "//src/mongo/scripting/mozjs:status.h", "//src/mongo/scripting/mozjs:timestamp.h", diff --git a/src/mongo/scripting/mozjs/bson.cpp b/src/mongo/scripting/mozjs/bson.cpp index 311b353def2..5213772eb9c 100644 --- a/src/mongo/scripting/mozjs/bson.cpp +++ b/src/mongo/scripting/mozjs/bson.cpp @@ -48,6 +48,7 @@ #include "mongo/bson/bsonelement.h" #include "mongo/bson/bsonobj_comparator_interface.h" #include "mongo/bson/bsontypes.h" +#include "mongo/db/pipeline/resume_token.h" #include "mongo/scripting/mozjs/bson.h" #include "mongo/scripting/mozjs/idwrapper.h" #include "mongo/scripting/mozjs/implscope.h" diff --git a/src/mongo/scripting/mozjs/implscope.cpp b/src/mongo/scripting/mozjs/implscope.cpp index facd62c1fd4..b7b3d4cd110 100644 --- a/src/mongo/scripting/mozjs/implscope.cpp +++ b/src/mongo/scripting/mozjs/implscope.cpp @@ -513,6 +513,7 @@ MozJSImplScope::MozJSImplScope(MozJSScriptEngine* engine, boost::optional j _objectProto(_context), _oidProto(_context), _regExpProto(_context), + _resumeTokenDataProto(_context), _sessionProto(_context), _statusProto(_context), _timestampProto(_context), @@ -1134,6 +1135,7 @@ void MozJSImplScope::installBSONTypes() { _numberDecimalProto.install(_global); _oidProto.install(_global); _regExpProto.install(_global); + _resumeTokenDataProto.install(_global); _timestampProto.install(_global); _uriProto.install(_global); _statusProto.install(_global); diff --git a/src/mongo/scripting/mozjs/implscope.h b/src/mongo/scripting/mozjs/implscope.h index 943fbb9f9dd..d4ea980c311 100644 --- a/src/mongo/scripting/mozjs/implscope.h +++ b/src/mongo/scripting/mozjs/implscope.h @@ -89,6 +89,7 @@ #include "mongo/scripting/mozjs/object.h" #include "mongo/scripting/mozjs/oid.h" #include "mongo/scripting/mozjs/regexp.h" +#include "mongo/scripting/mozjs/resumetoken.h" #include "mongo/scripting/mozjs/session.h" #include "mongo/scripting/mozjs/status.h" #include "mongo/scripting/mozjs/timestamp.h" @@ -328,6 +329,12 @@ public: return _regExpProto; } + template + typename std::enable_if::value, WrapType&>::type + getProto() { + return _resumeTokenDataProto; + } + template typename std::enable_if::value, WrapType&>::type getProto() { return _sessionProto; @@ -543,6 +550,7 @@ private: WrapType _objectProto; WrapType _oidProto; WrapType _regExpProto; + WrapType _resumeTokenDataProto; WrapType _sessionProto; WrapType _statusProto; WrapType _timestampProto; diff --git a/src/mongo/scripting/mozjs/resumetoken.cpp b/src/mongo/scripting/mozjs/resumetoken.cpp new file mode 100644 index 00000000000..a003cc202b6 --- /dev/null +++ b/src/mongo/scripting/mozjs/resumetoken.cpp @@ -0,0 +1,98 @@ +/** + * Copyright (C) 2024-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include +#include +#include + +#include + +#include "mongo/db/pipeline/resume_token.h" +#include "mongo/scripting/mozjs/bson.h" +#include "mongo/scripting/mozjs/wrapconstrainedmethod.h" // IWYU pragma: keep + +namespace mongo { +namespace mozjs { + +const JSFunctionSpec ResumeTokenDataUtility::freeFunctions[2] = { + MONGO_ATTACH_JS_FUNCTION(decodeResumeToken), + JS_FS_END, +}; + +const char* const ResumeTokenDataUtility::className = "ResumeTokenDataUtility"; + +constexpr const char* kHighWaterMarkResumeTokenType = "highWaterMarkResumeTokenType"; +constexpr const char* kEventResumeTokenType = "eventResumeTokenType"; + +void ResumeTokenDataUtility::Functions::decodeResumeToken::call(JSContext* cx, JS::CallArgs args) { + uassert(ErrorCodes::BadValue, "decodeResumeToken takes 1 argument.", args.length() == 1); + + BSONObj encodedResumeTokenBson = ValueWriter(cx, args.get(0)).toBSON(); + const auto resumeTokenDataBson = ResumeToken::parse(encodedResumeTokenBson).getData().toBSON(); + + JS::RootedObject thisValue(cx); + auto scope = getScope(cx); + scope->getProto().newObject(&thisValue); + BSONInfo::make(cx, &thisValue, std::move(resumeTokenDataBson), nullptr, true); + + args.rval().setObjectOrNull(thisValue); +} + +void ResumeTokenDataUtility::postInstall(JSContext* cx, + JS::HandleObject global, + JS::HandleObject proto) { + JS::RootedValue undef(cx); + undef.setUndefined(); + + // The following code initialises two constants which will be visible in the global JS scope. + // The constants correspond to 'ResumeTokenData::TokenType' enum type values and + // they are used to improve readability of test assertions involving the 'tokenType' field. + + // highWaterMarkResumeTokenType + if (!JS_DefineProperty( + cx, + global, + kHighWaterMarkResumeTokenType, + JS::RootedValue(cx, JS::Int32Value(ResumeTokenData::kHighWaterMarkToken)), + JSPROP_READONLY | JSPROP_PERMANENT)) { + uasserted(ErrorCodes::JSInterpreterFailure, "Failed to JS_DefineProperty"); + } + + // eventResumeTokenType + if (!JS_DefineProperty(cx, + global, + kEventResumeTokenType, + JS::RootedValue(cx, JS::Int32Value(ResumeTokenData::kEventToken)), + JSPROP_READONLY | JSPROP_PERMANENT)) { + uasserted(ErrorCodes::JSInterpreterFailure, "Failed to JS_DefineProperty"); + } +} + +} // namespace mozjs +} // namespace mongo diff --git a/src/mongo/scripting/mozjs/resumetoken.h b/src/mongo/scripting/mozjs/resumetoken.h new file mode 100644 index 00000000000..44e296ffce2 --- /dev/null +++ b/src/mongo/scripting/mozjs/resumetoken.h @@ -0,0 +1,58 @@ +/** + * Copyright (C) 2024-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include + +#include "mongo/scripting/mozjs/base.h" +#include "mongo/scripting/mozjs/wraptype.h" + +namespace mongo { +namespace mozjs { + +/** + * Utility class offering helper methods for managing + * resume tokens in JavaScript tests. + */ +struct ResumeTokenDataUtility : public BaseInfo { + + struct Functions { + MONGO_DECLARE_JS_FUNCTION(decodeResumeToken); + }; + + static const JSFunctionSpec freeFunctions[2]; + + static const char* const className; + + static void postInstall(JSContext* cx, JS::HandleObject global, JS::HandleObject proto); +}; + +} // namespace mozjs +} // namespace mongo