Files
mongo/jstests/multiVersion/genericChangeStreams/change_streams_event_resumability.js
Jan fdf6cc5c3f SERVER-97071 Ensure stability of resumeTokens across versions (#29153)
GitOrigin-RevId: a24caa728b2cbf230aaa89b414e69774261a80bb
2024-12-02 13:58:14 +00:00

210 lines
8.4 KiB
JavaScript

/**
* Tests the compatibility of resume tokens across server version upgrade / downgrade.
* @tags: [uses_change_streams, requires_replication, requires_fcv_80]
*/
import "jstests/multiVersion/libs/multi_cluster.js";
import {
assertCreateCollection,
assertDropCollection
} from "jstests/libs/collection_drop_recreate.js";
import {ShardingTest} from "jstests/libs/shardingtest.js";
const st = new ShardingTest({
shards: 2,
config: 1,
other: {
mongosOptions: {binVersion: "last-lts"},
configOptions: {
binVersion: "last-lts",
},
rsOptions: {
binVersion: "last-lts",
},
rs: {
nodes: 2,
}
}
});
const collName = "test";
const viewName = "testView";
const timeseriesName = "ts";
let testDB = st.s.getDB(jsTestName());
let testColl = testDB[collName];
// Record high-watermark time marking the start point of the test.
const testStartOperationTime = testDB.hello().$clusterTime.clusterTime;
// An array which will list the expected sequence of change events generated by the test.
const expectedEvents = [];
//
// Below, we generate one of each type of change event so that we can later test resuming from the
// token representing such event.
//
testColl = assertCreateCollection(testDB, testColl.getName());
expectedEvents.push({operationType: "create"});
assert.commandWorked(testColl.createIndexes([{shard: 1}, {shard: 1, _id: 1}, {value: 1}]));
expectedEvents.push({operationType: "createIndexes"},
{operationType: "createIndexes"},
{operationType: "createIndexes"});
// Shard the test collection and split it into two chunks: one that contains all {shard: 1}
// documents and one that contains all {shard: 2} documents.
st.shardColl(testColl, {shard: 1} /* shard key */, {shard: 2} /* split at */);
expectedEvents.push({operationType: "shardCollection"});
assert.commandWorked(testColl.insertMany([
{_id: "a", shard: 1, value: ""},
{_id: "b", shard: 2, value: ""},
{_id: "c", shard: 2, value: ""}
]));
expectedEvents.push({operationType: "insert", documentKey: {shard: 1, _id: "a"}},
{operationType: "insert", documentKey: {shard: 2, _id: "b"}},
{operationType: "insert", documentKey: {shard: 2, _id: "c"}});
assert.commandWorked(testColl.update({_id: "a", shard: 1}, {$set: {value: "x"}}));
expectedEvents.push({operationType: "update", documentKey: {_id: "a", shard: 1}});
assert.commandWorked(testColl.update({_id: "b", shard: 2}, {$set: {value: "x"}}));
expectedEvents.push({operationType: "update", documentKey: {_id: "b", shard: 2}});
assert.commandWorked(testColl.replaceOne({_id: "a", shard: 1}, {_id: "a", shard: 1, value: "y"}));
expectedEvents.push({operationType: "replace", documentKey: {_id: "a", shard: 1}});
assert.commandWorked(testColl.replaceOne({_id: "b", shard: 2}, {_id: "b", shard: 2, value: "y"}));
expectedEvents.push({operationType: "replace", documentKey: {_id: "b", shard: 2}});
assert.commandWorked(testColl.remove({_id: "a"}));
expectedEvents.push({operationType: "delete", documentKey: {_id: "a", shard: 1}});
assert.commandWorked(testColl.remove({_id: "b"}));
expectedEvents.push({operationType: "delete", documentKey: {_id: "b", shard: 2}});
assert.commandWorked(
st.s.adminCommand({refineCollectionShardKey: testColl.getFullName(), key: {shard: 1, _id: 1}}));
expectedEvents.push({operationType: "refineCollectionShardKey"});
assert.commandWorked(st.s.adminCommand(
{reshardCollection: testColl.getFullName(), key: {_id: 1}, numInitialChunks: 2}));
expectedEvents.push({operationType: "reshardCollection"});
assert.commandWorked(testColl.dropIndex({value: 1}));
expectedEvents.push({operationType: "dropIndexes"}, {operationType: "dropIndexes"});
// Create view on a collection.
assert.commandWorked(
testDB.runCommand({create: viewName, viewOn: collName, pipeline: [{$match: {foo: "bar"}}]}));
expectedEvents.push({operationType: "create"});
assert.commandWorked(testDB.runCommand({drop: viewName}));
expectedEvents.push({operationType: "drop"});
// Create timeseries collection.
assert.commandWorked(testDB.runCommand({create: timeseriesName, timeseries: {timeField: "t"}}));
expectedEvents.push({operationType: "create"});
assert.commandWorked(testDB.runCommand({drop: timeseriesName}));
expectedEvents.push({operationType: "drop"});
const newTestCollectionName = "test_";
assert.commandWorked(testColl.renameCollection(newTestCollectionName));
expectedEvents.push({operationType: "rename"});
assertDropCollection(testDB, newTestCollectionName);
expectedEvents.push({operationType: "drop"});
assert.commandWorked(testDB.dropDatabase());
// A whole-DB stream will be invalidated by the dropDatabase event. We include a second dropDatabase
// event because one such event is generated on each shard, and will be reported if we resume after
// the invalidate. This second dropDatabase acts as a sentinel here, signifying that we have reached
// the end of the test stream.
expectedEvents.push({operationType: "dropDatabase"},
{operationType: "invalidate"},
{operationType: "dropDatabase"});
// Leave only one of two "dropIndexes" events when they have identical resume tokens, because the
// second event will be skipped when resuming from the first event's token in such a case.
// TODO SERVER-90023: Remove this workaround when no longer needed.
let resumeTokensLastLTS = [];
{
const csCursor = testDB.watch([], {
showExpandedEvents: true,
startAtOperationTime: testStartOperationTime,
batchSize: 0,
});
const hwmToken = csCursor.getResumeToken();
assert.eq(decodeResumeToken(hwmToken).tokenType,
highWaterMarkResumeTokenType,
"expected a high-watermark token");
resumeTokensLastLTS.push(hwmToken);
const dropIndexesEvents = [];
// Fetch all events and compare to the expected ones. Note that we need to exclude the second
// dropDatabase sentinel event here.
for (let i = 0; i < expectedEvents.length - 1; ++i) {
assert.soon(() => csCursor.hasNext());
const event = csCursor.next();
assert.eq(expectedEvents[i].operationType, event.operationType);
if (event.operationType === "dropIndexes") {
dropIndexesEvents.push(event);
}
resumeTokensLastLTS.push(event._id);
}
csCursor.close();
assert.eq(2, dropIndexesEvents.length, "unexpected number of 'dropIndexes' events");
if (bsonWoCompare(dropIndexesEvents[0]._id, dropIndexesEvents[1]._id) === 0) {
const dropIndexPosition =
expectedEvents.findIndex((event) => (event.operationType === "dropIndexes"));
expectedEvents.splice(dropIndexPosition, 1);
resumeTokensLastLTS.splice(dropIndexPosition + 1, 1);
}
}
// Helper function to assert on the given event fields.
function assertEventMatches(event, expectedEvent, errorMsg) {
for (const k in expectedEvent) {
assert.docEq(expectedEvent[k], event[k], errorMsg + `: value mismatch for field '${k}'`);
}
}
// Asserts that resuming from all collected resumeTokens produces the expected events.
function assertTokenResumability() {
for (let i = 1; i < expectedEvents.length; ++i) {
const options = {startAfter: resumeTokensLastLTS[i]};
const csCursor =
testDB.watch([], {showExpandedEvents: true, startAfter: resumeTokensLastLTS[i]});
const errorMsg =
"could not retrieve the expected event matching " + tojson(expectedEvents[i]);
assert.soon(() => csCursor.hasNext());
const event = csCursor.next();
assertEventMatches(event, expectedEvents[i], errorMsg);
csCursor.close();
}
}
// Upgrade the cluster to 'latest' to test resumability.
st.upgradeCluster("latest", {waitUntilStable: true});
assert.commandWorked(st.s.adminCommand({setFeatureCompatibilityVersion: latestFCV, confirm: true}));
testDB = st.s.getDB(jsTestName());
// Verify that we can resume from each of the LTS tokens on the new binary version.
// When resuming from (i-1)-th event's token we expect to get the i-th event. We do not need to test
// the last token, because it is simply a sentinel value that signifies the end of the test.
assertTokenResumability();
// Downgrade back to the original version.
assert.commandWorked(
st.s.adminCommand({setFeatureCompatibilityVersion: lastLTSFCV, confirm: true}));
st.downgradeCluster("last-lts", {waitUntilStable: true});
testDB = st.s.getDB(jsTestName());
assertTokenResumability();
st.stop();