Files
mongo/jstests/serverless/tenant_migration_concurrent_bulk_writes.js
2023-08-31 19:10:18 +00:00

701 lines
24 KiB
JavaScript

/**
* Tests that bulk writes during a tenant migration correctly report write errors and
* retries writes that returned TenantMigrationCommitted.
*
* @tags: [
* incompatible_with_macos,
* incompatible_with_windows_tls,
* requires_majority_read_concern,
* requires_persistence,
* serverless,
* requires_fcv_71,
* ]
*/
import {configureFailPoint} from "jstests/libs/fail_point_util.js";
import {Thread} from "jstests/libs/parallelTester.js";
import {extractUUIDFromObject} from "jstests/libs/uuid_util.js";
import {TenantMigrationTest} from "jstests/replsets/libs/tenant_migration_test.js";
import {
makeTenantDB,
makeX509OptionsForTest,
runMigrationAsync
} from "jstests/replsets/libs/tenant_migration_util.js";
import {createRstArgs} from "jstests/replsets/rslib.js";
const kMaxBatchSize = 2;
const kCollName = "testColl";
const kTenantDefinedDbName = "0";
const kNumWriteOps = 6; // num of writes to run in bulk.
const kNumWriteBatchesWithoutMigrationConflict =
2; // num of write batches we allow to complete before migration blocks writes.
const kNumUpdatesWithoutMigrationConflict = 2;
const kMaxSleepTimeMS = 1000;
function setup() {
const migrationX509Options = makeX509OptionsForTest();
const donorRst = new ReplSetTest({
nodes: 1,
name: 'donor',
serverless: true,
nodeOptions: Object.assign(migrationX509Options.donor, {
setParameter: {
internalInsertMaxBatchSize:
kMaxBatchSize, /* Decrease internal max batch size so we can still show writes
are batched without inserting hundreds of documents. */
// Allow non-timestamped reads on donor after migration completes for testing.
'failpoint.tenantMigrationDonorAllowsNonTimestampedReads':
tojson({mode: 'alwaysOn'}),
}
})
});
donorRst.startSet();
donorRst.initiate();
const recipientRst = new ReplSetTest({
nodes: 1,
name: 'recipient',
serverless: true,
nodeOptions: Object.assign(migrationX509Options.recipient, {
setParameter: {
internalInsertMaxBatchSize:
kMaxBatchSize /* Decrease internal max batch size so we can
still show writes are batched without
inserting hundreds of documents. */
},
})
});
recipientRst.startSet();
recipientRst.initiate();
const tenantMigrationTest =
new TenantMigrationTest({name: jsTestName(), donorRst, recipientRst});
return {
tenantMigrationTest,
donorRst,
recipientRst,
teardown: function() {
tenantMigrationTest.stop();
donorRst.stopSet();
recipientRst.stopSet();
}
};
}
function bulkInsertDocsOrdered(primaryHost, dbName, collName, numDocs) {
const primary = new Mongo(primaryHost);
let primaryDB = primary.getDB(dbName);
let bulk = primaryDB[collName].initializeOrderedBulkOp();
for (let i = 0; i < numDocs; ++i) {
bulk.insert({x: i});
}
let res;
try {
res = bulk.execute();
} catch (e) {
res = e;
}
return {res: res.getRawResponse(), ops: bulk.getOperations()};
}
function bulkInsertDocsUnordered(primaryHost, dbName, collName, numDocs) {
const primary = new Mongo(primaryHost);
let primaryDB = primary.getDB(dbName);
let bulk = primaryDB[collName].initializeUnorderedBulkOp();
for (let i = 0; i < numDocs; ++i) {
bulk.insert({x: i});
}
let res;
try {
res = bulk.execute();
} catch (e) {
res = e;
}
return {res: res.getRawResponse(), ops: bulk.getOperations()};
}
function bulkMultiUpdateDocsOrdered(primaryHost, dbName, collName, numDocs) {
const primary = new Mongo(primaryHost);
let primaryDB = primary.getDB(dbName);
// Insert initial docs to be updated.
let insertBulk = primaryDB[collName].initializeOrderedBulkOp();
for (let i = 0; i < numDocs; ++i) {
insertBulk.insert({x: i});
}
insertBulk.execute();
let updateBulk = primaryDB[collName].initializeOrderedBulkOp();
for (let i = 0; i < numDocs; ++i) {
updateBulk.find({x: i}).update({$set: {ordered_update: true}});
}
let res;
try {
res = updateBulk.execute();
} catch (e) {
res = e;
}
return {res: res.getRawResponse ? res.getRawResponse() : res, ops: updateBulk.getOperations()};
}
function bulkMultiUpdateDocsUnordered(primaryHost, dbName, collName, numDocs) {
const primary = new Mongo(primaryHost);
let primaryDB = primary.getDB(dbName);
// Insert initial docs to be updated.
let insertBulk = primaryDB[collName].initializeOrderedBulkOp();
for (let i = 0; i < numDocs; ++i) {
insertBulk.insert({x: i});
}
insertBulk.execute();
let updateBulk = primaryDB[collName].initializeUnorderedBulkOp();
for (let i = 0; i < numDocs; ++i) {
updateBulk.find({x: i}).update({$set: {unordered_update: true}});
}
let res;
try {
res = updateBulk.execute();
} catch (e) {
res = e;
}
return {res: res.getRawResponse ? res.getRawResponse() : res, ops: updateBulk.getOperations()};
}
(() => {
jsTestLog("Testing unordered bulk insert against a tenant migration that commits.");
const {tenantMigrationTest, donorRst, teardown} = setup();
const tenantId = ObjectId().str;
const migrationOpts = {
migrationIdString: extractUUIDFromObject(UUID()),
tenantId,
};
const dbName = makeTenantDB(tenantId, kTenantDefinedDbName);
const primary = donorRst.getPrimary();
const primaryDB = primary.getDB(dbName);
assert.commandWorked(primaryDB.runCommand({create: kCollName}));
const writeFp = configureFailPoint(
primaryDB, "hangDuringBatchInsert", {}, {skip: kNumWriteBatchesWithoutMigrationConflict});
const bulkWriteThread =
new Thread(bulkInsertDocsUnordered, primary.host, dbName, kCollName, kNumWriteOps);
bulkWriteThread.start();
writeFp.wait();
TenantMigrationTest.assertCommitted(tenantMigrationTest.runMigration(migrationOpts));
writeFp.off();
bulkWriteThread.join();
const bulkWriteRes = bulkWriteThread.returnData();
const writeErrors = bulkWriteRes.res.writeErrors;
assert.eq(primaryDB[kCollName].count(), bulkWriteRes.res.nInserted);
assert.eq(writeErrors.length,
(kNumWriteOps - (kMaxBatchSize * kNumWriteBatchesWithoutMigrationConflict)));
let expectedErrorIndex = kMaxBatchSize * kNumWriteBatchesWithoutMigrationConflict;
writeErrors.forEach((err, arrIndex) => {
assert.eq(err.code, ErrorCodes.TenantMigrationCommitted);
assert.eq(err.index, expectedErrorIndex++);
if (arrIndex == 0) {
assert(err.errmsg);
} else {
assert(!err.errmsg);
}
});
teardown();
})();
(() => {
jsTestLog(
"Testing unordered bulk insert against a tenant migration that blocks a few inserts and commits.");
const {tenantMigrationTest, donorRst, recipientRst, teardown} = setup();
const tenantId = ObjectId().str;
const migrationOpts = {
migrationIdString: extractUUIDFromObject(UUID()),
recipientConnString: recipientRst.getURL(),
tenantId,
};
const donorRstArgs = createRstArgs(donorRst);
const dbName = makeTenantDB(tenantId, kTenantDefinedDbName);
const primary = donorRst.getPrimary();
const primaryDB = primary.getDB(dbName);
assert.commandWorked(primaryDB.runCommand({create: kCollName}));
const writeFp = configureFailPoint(
primaryDB, "hangDuringBatchInsert", {}, {skip: kNumWriteBatchesWithoutMigrationConflict});
const bulkWriteThread =
new Thread(bulkInsertDocsUnordered, primary.host, dbName, kCollName, kNumWriteOps);
const blockFp = configureFailPoint(primaryDB, "pauseTenantMigrationBeforeLeavingBlockingState");
const migrationThread = new Thread(runMigrationAsync, migrationOpts, donorRstArgs);
bulkWriteThread.start();
writeFp.wait();
migrationThread.start();
blockFp.wait();
writeFp.off();
sleep(Math.random() * kMaxSleepTimeMS);
blockFp.off();
bulkWriteThread.join();
migrationThread.join();
TenantMigrationTest.assertCommitted(migrationThread.returnData());
tenantMigrationTest.forgetMigration(migrationOpts.migrationIdString);
let bulkWriteRes = bulkWriteThread.returnData();
let writeErrors = bulkWriteRes.res.writeErrors;
assert.eq(primaryDB[kCollName].count(), bulkWriteRes.res.nInserted);
assert.eq(writeErrors.length,
(kNumWriteOps - (kMaxBatchSize * kNumWriteBatchesWithoutMigrationConflict)));
let expectedErrorIndex = kMaxBatchSize * kNumWriteBatchesWithoutMigrationConflict;
writeErrors.forEach((err, index) => {
assert.eq(err.code, ErrorCodes.TenantMigrationCommitted);
assert.eq(err.index, expectedErrorIndex++);
if (index == 0) {
assert.eq(err.errmsg,
"Write or read must be re-routed to the new owner of this tenant");
} else {
assert.eq(err.errmsg, "");
}
});
teardown();
})();
(() => {
jsTestLog("Testing unordered bulk insert against a tenant migration that aborts.");
const {tenantMigrationTest, donorRst, teardown} = setup();
const tenantId = ObjectId().str;
const migrationOpts = {
migrationIdString: extractUUIDFromObject(UUID()),
recipientConnString: tenantMigrationTest.getRecipientConnString(),
tenantId,
};
const donorRstArgs = createRstArgs(donorRst);
const dbName = makeTenantDB(tenantId, kTenantDefinedDbName);
const primary = donorRst.getPrimary();
const primaryDB = primary.getDB(dbName);
assert.commandWorked(primaryDB.runCommand({create: kCollName}));
const writeFp = configureFailPoint(
primaryDB, "hangDuringBatchInsert", {}, {skip: kNumWriteBatchesWithoutMigrationConflict});
const bulkWriteThread =
new Thread(bulkInsertDocsUnordered, primary.host, dbName, kCollName, kNumWriteOps);
const abortFp = configureFailPoint(primaryDB, "abortTenantMigrationBeforeLeavingBlockingState");
// The failpoint below is used to ensure that a write to throw
// TenantMigrationConflict in the op observer. Without this failpoint, the migration
// could have already aborted by the time the write gets to the op observer.
const blockFp = configureFailPoint(primaryDB, "pauseTenantMigrationBeforeLeavingBlockingState");
const migrationThread = new Thread(runMigrationAsync, migrationOpts, donorRstArgs);
bulkWriteThread.start();
writeFp.wait();
migrationThread.start();
blockFp.wait();
writeFp.off();
sleep(Math.random() * kMaxSleepTimeMS);
blockFp.off();
bulkWriteThread.join();
migrationThread.join();
abortFp.off();
TenantMigrationTest.assertAborted(migrationThread.returnData());
tenantMigrationTest.forgetMigration(migrationOpts.migrationIdString);
const bulkWriteRes = bulkWriteThread.returnData();
const writeErrors = bulkWriteRes.res.writeErrors;
assert.eq(primaryDB[kCollName].count(), bulkWriteRes.res.nInserted);
assert.eq(writeErrors.length,
(kNumWriteOps - (kMaxBatchSize * kNumWriteBatchesWithoutMigrationConflict)));
let expectedErrorIndex = kMaxBatchSize * kNumWriteBatchesWithoutMigrationConflict;
writeErrors.forEach((err, arrIndex) => {
assert.eq(err.code, ErrorCodes.TenantMigrationAborted);
assert.eq(err.index, expectedErrorIndex++);
if (arrIndex == 0) {
assert(err.errmsg);
} else {
assert(!err.errmsg);
}
});
teardown();
})();
(() => {
jsTestLog("Testing ordered bulk inserts against a tenant migration that commits.");
const {tenantMigrationTest, donorRst, teardown} = setup();
const tenantId = ObjectId().str;
const migrationOpts = {
migrationIdString: extractUUIDFromObject(UUID()),
tenantId,
};
const dbName = makeTenantDB(tenantId, kTenantDefinedDbName);
const primary = donorRst.getPrimary();
const primaryDB = primary.getDB(dbName);
assert.commandWorked(primaryDB.runCommand({create: kCollName}));
const writeFp = configureFailPoint(
primaryDB, "hangDuringBatchInsert", {}, {skip: kNumWriteBatchesWithoutMigrationConflict});
const bulkWriteThread =
new Thread(bulkInsertDocsOrdered, primary.host, dbName, kCollName, kNumWriteOps);
bulkWriteThread.start();
writeFp.wait();
TenantMigrationTest.assertCommitted(tenantMigrationTest.runMigration(migrationOpts));
writeFp.off();
bulkWriteThread.join();
const bulkWriteRes = bulkWriteThread.returnData();
const writeErrors = bulkWriteRes.res.writeErrors;
assert.eq(primaryDB[kCollName].count(), bulkWriteRes.res.nInserted);
assert.eq(writeErrors.length, 1);
assert(writeErrors[0].errmsg);
// The single write error should correspond to the first write after the migration
// started blocking writes.
assert.eq(writeErrors[0].index, kNumWriteBatchesWithoutMigrationConflict * kMaxBatchSize);
assert.eq(writeErrors[0].code, ErrorCodes.TenantMigrationCommitted);
teardown();
})();
(() => {
jsTestLog(
"Testing ordered bulk insert against a tenant migration that blocks a few inserts and commits.");
const {tenantMigrationTest, donorRst, teardown} = setup();
const tenantId = ObjectId().str;
const migrationOpts = {
migrationIdString: extractUUIDFromObject(UUID()),
recipientConnString: tenantMigrationTest.getRecipientConnString(),
tenantId,
};
const donorRstArgs = createRstArgs(donorRst);
const dbName = makeTenantDB(tenantId, kTenantDefinedDbName);
const primary = donorRst.getPrimary();
const primaryDB = primary.getDB(dbName);
assert.commandWorked(primaryDB.runCommand({create: kCollName}));
const writeFp = configureFailPoint(
primaryDB, "hangDuringBatchInsert", {}, {skip: kNumWriteBatchesWithoutMigrationConflict});
const bulkWriteThread =
new Thread(bulkInsertDocsOrdered, primary.host, dbName, kCollName, kNumWriteOps);
const blockFp = configureFailPoint(primaryDB, "pauseTenantMigrationBeforeLeavingBlockingState");
const migrationThread = new Thread(runMigrationAsync, migrationOpts, donorRstArgs);
bulkWriteThread.start();
writeFp.wait();
migrationThread.start();
blockFp.wait();
writeFp.off();
sleep(Math.random() * kMaxSleepTimeMS);
blockFp.off();
bulkWriteThread.join();
migrationThread.join();
TenantMigrationTest.assertCommitted(migrationThread.returnData());
tenantMigrationTest.forgetMigration(migrationOpts.migrationIdString);
const bulkWriteRes = bulkWriteThread.returnData();
const writeErrors = bulkWriteRes.res.writeErrors;
assert.eq(primaryDB[kCollName].count(), bulkWriteRes.res.nInserted);
assert.eq(writeErrors.length, 1);
assert(writeErrors[0].errmsg);
// The single write error should correspond to the first write after the migration
// started blocking writes.
assert.eq(writeErrors[0].index, kNumWriteBatchesWithoutMigrationConflict * kMaxBatchSize);
assert.eq(writeErrors[0].code, ErrorCodes.TenantMigrationCommitted);
teardown();
})();
(() => {
jsTestLog("Testing ordered bulk write against a tenant migration that aborts.");
const {tenantMigrationTest, donorRst, teardown} = setup();
const tenantId = ObjectId().str;
const migrationOpts = {
migrationIdString: extractUUIDFromObject(UUID()),
recipientConnString: tenantMigrationTest.getRecipientConnString(),
tenantId,
};
const donorRstArgs = createRstArgs(donorRst);
const dbName = makeTenantDB(tenantId, kTenantDefinedDbName);
const primary = donorRst.getPrimary();
const primaryDB = primary.getDB(dbName);
assert.commandWorked(primaryDB.runCommand({create: kCollName}));
const writeFp = configureFailPoint(
primaryDB, "hangDuringBatchInsert", {}, {skip: kNumWriteBatchesWithoutMigrationConflict});
const bulkWriteThread =
new Thread(bulkInsertDocsOrdered, primary.host, dbName, kCollName, kNumWriteOps);
const abortFp = configureFailPoint(primaryDB, "abortTenantMigrationBeforeLeavingBlockingState");
// The failpoint below is used to ensure that a write to throw
// TenantMigrationConflict in the op observer. Without this failpoint, the migration
// could have already aborted by the time the write gets to the op observer.
const blockFp = configureFailPoint(primaryDB, "pauseTenantMigrationBeforeLeavingBlockingState");
const migrationThread = new Thread(runMigrationAsync, migrationOpts, donorRstArgs);
bulkWriteThread.start();
writeFp.wait();
migrationThread.start();
blockFp.wait();
writeFp.off();
sleep(Math.random() * kMaxSleepTimeMS);
blockFp.off();
bulkWriteThread.join();
migrationThread.join();
abortFp.off();
TenantMigrationTest.assertAborted(migrationThread.returnData());
tenantMigrationTest.forgetMigration(migrationOpts.migrationIdString);
const bulkWriteRes = bulkWriteThread.returnData();
const writeErrors = bulkWriteRes.res.writeErrors;
assert.eq(primaryDB[kCollName].count(), bulkWriteRes.res.nInserted);
assert.eq(writeErrors.length, 1);
assert(writeErrors[0].errmsg);
// The single write error should correspond to the first write after the migration
// started blocking writes.
assert.eq(writeErrors[0].index, kNumWriteBatchesWithoutMigrationConflict * kMaxBatchSize);
assert.eq(writeErrors[0].code, ErrorCodes.TenantMigrationAborted);
teardown();
})();
(() => {
jsTestLog("Testing unordered bulk multi update that blocks.");
const {tenantMigrationTest, donorRst, recipientRst, teardown} = setup();
const tenantId = ObjectId().str;
const migrationOpts = {
migrationIdString: extractUUIDFromObject(UUID()),
recipientConnString: recipientRst.getURL(),
tenantId,
};
const donorRstArgs = createRstArgs(donorRst);
const dbName = makeTenantDB(tenantId, kTenantDefinedDbName);
const primary = donorRst.getPrimary();
const primaryDB = primary.getDB(dbName);
assert.commandWorked(primaryDB.runCommand({create: kCollName}));
const writeFp = configureFailPoint(
primaryDB, "hangDuringBatchUpdate", {}, {skip: kNumUpdatesWithoutMigrationConflict});
const bulkWriteThread =
new Thread(bulkMultiUpdateDocsUnordered, primary.host, dbName, kCollName, kNumWriteOps);
const blockFp = configureFailPoint(primaryDB, "pauseTenantMigrationBeforeLeavingBlockingState");
const migrationThread = new Thread(runMigrationAsync, migrationOpts, donorRstArgs);
bulkWriteThread.start();
writeFp.wait();
migrationThread.start();
blockFp.wait();
writeFp.off();
sleep(Math.random() * kMaxSleepTimeMS);
blockFp.off();
bulkWriteThread.join();
migrationThread.join();
TenantMigrationTest.assertCommitted(migrationThread.returnData());
tenantMigrationTest.forgetMigration(migrationOpts.migrationIdString);
let bulkWriteRes = bulkWriteThread.returnData();
assert.eq(bulkWriteRes.res.code, ErrorCodes.Interrupted, tojson(bulkWriteRes));
assert.eq(
bulkWriteRes.res.errmsg,
"Operation interrupted by an internal data migration and could not be automatically retried",
tojson(bulkWriteRes));
teardown();
})();
(() => {
jsTestLog("Testing ordered bulk multi update that blocks.");
const {tenantMigrationTest, donorRst, recipientRst, teardown} = setup();
const tenantId = ObjectId().str;
const migrationOpts = {
migrationIdString: extractUUIDFromObject(UUID()),
recipientConnString: recipientRst.getURL(),
tenantId,
};
const donorRstArgs = createRstArgs(donorRst);
const dbName = makeTenantDB(tenantId, kTenantDefinedDbName);
const primary = donorRst.getPrimary();
const primaryDB = primary.getDB(dbName);
assert.commandWorked(primaryDB.runCommand({create: kCollName}));
const writeFp = configureFailPoint(
primaryDB, "hangDuringBatchUpdate", {}, {skip: kNumUpdatesWithoutMigrationConflict});
const bulkWriteThread =
new Thread(bulkMultiUpdateDocsOrdered, primary.host, dbName, kCollName, kNumWriteOps);
const blockFp = configureFailPoint(primaryDB, "pauseTenantMigrationBeforeLeavingBlockingState");
const migrationThread = new Thread(runMigrationAsync, migrationOpts, donorRstArgs);
bulkWriteThread.start();
writeFp.wait();
migrationThread.start();
blockFp.wait();
writeFp.off();
sleep(Math.random() * kMaxSleepTimeMS);
blockFp.off();
bulkWriteThread.join();
migrationThread.join();
TenantMigrationTest.assertCommitted(migrationThread.returnData());
tenantMigrationTest.forgetMigration(migrationOpts.migrationIdString);
let bulkWriteRes = bulkWriteThread.returnData();
assert.eq(bulkWriteRes.res.code, ErrorCodes.Interrupted, tojson(bulkWriteRes));
assert.eq(
bulkWriteRes.res.errmsg,
"Operation interrupted by an internal data migration and could not be automatically retried",
tojson(bulkWriteRes));
teardown();
})();
(() => {
jsTestLog("Testing unordered multi updates against a tenant migration that has completed.");
const {tenantMigrationTest, donorRst, teardown} = setup();
const tenantId = ObjectId().str;
const migrationOpts = {
migrationIdString: extractUUIDFromObject(UUID()),
tenantId,
};
const dbName = makeTenantDB(tenantId, kTenantDefinedDbName);
const primary = donorRst.getPrimary();
const primaryDB = primary.getDB(dbName);
assert.commandWorked(primaryDB.runCommand({create: kCollName}));
const writeFp = configureFailPoint(
primaryDB, "hangDuringBatchUpdate", {}, {skip: kNumUpdatesWithoutMigrationConflict});
const bulkWriteThread =
new Thread(bulkMultiUpdateDocsUnordered, primary.host, dbName, kCollName, kNumWriteOps);
bulkWriteThread.start();
writeFp.wait();
TenantMigrationTest.assertCommitted(tenantMigrationTest.runMigration(migrationOpts));
writeFp.off();
bulkWriteThread.join();
const bulkWriteRes = bulkWriteThread.returnData();
assert.eq(bulkWriteRes.res.code, ErrorCodes.Interrupted, tojson(bulkWriteRes));
assert.eq(
bulkWriteRes.res.errmsg,
"Operation interrupted by an internal data migration and could not be automatically retried",
tojson(bulkWriteRes));
teardown();
})();
(() => {
jsTestLog("Testing ordered multi updates against a tenant migration that has completed.");
const {tenantMigrationTest, donorRst, teardown} = setup();
const tenantId = ObjectId().str;
const migrationOpts = {
migrationIdString: extractUUIDFromObject(UUID()),
tenantId,
};
const dbName = makeTenantDB(tenantId, kTenantDefinedDbName);
const primary = donorRst.getPrimary();
const primaryDB = primary.getDB(dbName);
assert.commandWorked(primaryDB.runCommand({create: kCollName}));
const writeFp = configureFailPoint(
primaryDB, "hangDuringBatchUpdate", {}, {skip: kNumUpdatesWithoutMigrationConflict});
const bulkWriteThread =
new Thread(bulkMultiUpdateDocsOrdered, primary.host, dbName, kCollName, kNumWriteOps);
bulkWriteThread.start();
writeFp.wait();
TenantMigrationTest.assertCommitted(tenantMigrationTest.runMigration(migrationOpts));
writeFp.off();
bulkWriteThread.join();
const bulkWriteRes = bulkWriteThread.returnData();
assert.eq(bulkWriteRes.res.code, ErrorCodes.Interrupted, tojson(bulkWriteRes));
assert.eq(
bulkWriteRes.res.errmsg,
"Operation interrupted by an internal data migration and could not be automatically retried",
tojson(bulkWriteRes));
teardown();
})();