|
|
|
|
@@ -1,6 +1,12 @@
|
|
|
|
|
// Confirms that change streams only see committed operations for prepared transactions.
|
|
|
|
|
// @tags: [uses_transactions,uses_change_streams,requires_majority_read_concern,
|
|
|
|
|
// exclude_from_large_txns_due_to_change_streams]
|
|
|
|
|
/*
|
|
|
|
|
* Confirms that change streams only see committed operations for prepared transactions.
|
|
|
|
|
* @tags: [
|
|
|
|
|
* uses_transactions,
|
|
|
|
|
* uses_change_streams,
|
|
|
|
|
* requires_majority_read_concern,
|
|
|
|
|
* uses_prepare_transaction,
|
|
|
|
|
* ]
|
|
|
|
|
*/
|
|
|
|
|
(function() {
|
|
|
|
|
"use strict";
|
|
|
|
|
|
|
|
|
|
@@ -9,6 +15,12 @@
|
|
|
|
|
const dbName = "test";
|
|
|
|
|
const collName = "change_stream_transaction";
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* This test sets an internal parameter in order to force transactions with more than 4
|
|
|
|
|
* operations to span multiple oplog entries, making it easier to test that scenario.
|
|
|
|
|
*/
|
|
|
|
|
const maxOpsInOplogEntry = 4;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Asserts that the expected operation type and documentKey are found on the change stream
|
|
|
|
|
* cursor. Returns the change stream document.
|
|
|
|
|
@@ -126,18 +138,14 @@
|
|
|
|
|
prepareTimestampTxn1 = PrepareHelpers.prepareTransaction(session1);
|
|
|
|
|
assertNoChanges(changeStreamCursor);
|
|
|
|
|
|
|
|
|
|
// TODO SERVER-39036: Change writeConcern to majority. Prior to this ticket a majority write
|
|
|
|
|
// will block on a prepared transaction. We should also be able to move the check for
|
|
|
|
|
// document existence prior to the transaction commit with this change.
|
|
|
|
|
// Perform a write at writeConcern w: local.
|
|
|
|
|
assert.commandWorked(coll.insert({_id: "no-txn-doc-3"}, {writeConcern: {w: 1}}));
|
|
|
|
|
assert.commandWorked(coll.insert({_id: "no-txn-doc-3"}, {writeConcern: {w: "majority"}}));
|
|
|
|
|
assertWriteVisibleWithCapture(
|
|
|
|
|
changeStreamCursor, "insert", {_id: "no-txn-doc-3"}, changeList);
|
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|
// Commit first transaction and confirm expected changes.
|
|
|
|
|
//
|
|
|
|
|
assert.commandWorked(PrepareHelpers.commitTransaction(session1, prepareTimestampTxn1));
|
|
|
|
|
assertWriteVisibleWithCapture(
|
|
|
|
|
changeStreamCursor, "insert", {_id: "no-txn-doc-3"}, changeList);
|
|
|
|
|
assertWriteVisibleWithCapture(
|
|
|
|
|
changeStreamCursor, "insert", {_id: "txn1-doc-1"}, changeList);
|
|
|
|
|
assertWriteVisibleWithCapture(
|
|
|
|
|
@@ -155,19 +163,75 @@
|
|
|
|
|
PrepareHelpers.prepareTransaction(session2);
|
|
|
|
|
assertNoChanges(changeStreamCursor);
|
|
|
|
|
|
|
|
|
|
// TODO SERVER-39036: Change writeConcern to majority. Prior to this ticket a majority write
|
|
|
|
|
// will block on a prepared transaction. We should also be able to move the check for
|
|
|
|
|
// document existence prior to the transaction abort with this change.
|
|
|
|
|
// Perform a write at writeConcern w: local.
|
|
|
|
|
assert.commandWorked(coll.insert({_id: "no-txn-doc-4"}, {writeConcern: {w: 1}}));
|
|
|
|
|
assert.commandWorked(coll.insert({_id: "no-txn-doc-4"}, {writeConcern: {w: "majority"}}));
|
|
|
|
|
assertWriteVisibleWithCapture(
|
|
|
|
|
changeStreamCursor, "insert", {_id: "no-txn-doc-4"}, changeList);
|
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|
// Abort second transaction.
|
|
|
|
|
//
|
|
|
|
|
assert.commandWorked(session2.abortTransaction_forTesting());
|
|
|
|
|
assertWriteVisibleWithCapture(
|
|
|
|
|
changeStreamCursor, "insert", {_id: "no-txn-doc-4"}, changeList);
|
|
|
|
|
session2.abortTransaction_forTesting();
|
|
|
|
|
assertNoChanges(changeStreamCursor);
|
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|
// Start transaction 4.
|
|
|
|
|
//
|
|
|
|
|
const session4 = db.getMongo().startSession();
|
|
|
|
|
const sessionDb4 = session4.getDatabase(dbName);
|
|
|
|
|
const sessionColl4 = sessionDb4[collName];
|
|
|
|
|
session4.startTransaction({readConcern: {level: "majority"}});
|
|
|
|
|
|
|
|
|
|
// Perform enough writes to fill up one applyOps.
|
|
|
|
|
const txn4Inserts = Array.from({length: maxOpsInOplogEntry},
|
|
|
|
|
(_, index) => ({_id: {name: "txn4-doc", index: index}}));
|
|
|
|
|
txn4Inserts.forEach(function(doc) {
|
|
|
|
|
sessionColl4.insert(doc);
|
|
|
|
|
assertNoChanges(changeStreamCursor);
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
// Perform enough writes to an unwatched collection to fill up a second applyOps. We
|
|
|
|
|
// specifically want to test the case where a multi-applyOps transaction has no relevant
|
|
|
|
|
// updates in its final applyOps.
|
|
|
|
|
txn4Inserts.forEach(function(doc) {
|
|
|
|
|
assert.commandWorked(sessionDb4[unwatchedColl.getName()].insert(doc));
|
|
|
|
|
assertNoChanges(changeStreamCursor);
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|
// Start transaction 5.
|
|
|
|
|
//
|
|
|
|
|
const session5 = db.getMongo().startSession();
|
|
|
|
|
const sessionDb5 = session5.getDatabase(dbName);
|
|
|
|
|
const sessionColl5 = sessionDb5[collName];
|
|
|
|
|
session5.startTransaction({readConcern: {level: "majority"}});
|
|
|
|
|
|
|
|
|
|
// Perform enough writes to span 3 applyOps entries.
|
|
|
|
|
const txn5Inserts = Array.from({length: 3 * maxOpsInOplogEntry},
|
|
|
|
|
(_, index) => ({_id: {name: "txn5-doc", index: index}}));
|
|
|
|
|
txn5Inserts.forEach(function(doc) {
|
|
|
|
|
assert.commandWorked(sessionColl5.insert(doc));
|
|
|
|
|
assertNoChanges(changeStreamCursor);
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|
// Prepare and commit transaction 5.
|
|
|
|
|
//
|
|
|
|
|
const prepareTimestampTxn5 = PrepareHelpers.prepareTransaction(session5);
|
|
|
|
|
assertNoChanges(changeStreamCursor);
|
|
|
|
|
assert.commandWorked(PrepareHelpers.commitTransaction(session5, prepareTimestampTxn5));
|
|
|
|
|
txn5Inserts.forEach(function(doc) {
|
|
|
|
|
assertWriteVisibleWithCapture(changeStreamCursor, "insert", doc, changeList);
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|
// Commit transaction 4 without preparing.
|
|
|
|
|
//
|
|
|
|
|
session4.commitTransaction();
|
|
|
|
|
txn4Inserts.forEach(function(doc) {
|
|
|
|
|
assertWriteVisibleWithCapture(changeStreamCursor, "insert", doc, changeList);
|
|
|
|
|
});
|
|
|
|
|
assertNoChanges(changeStreamCursor);
|
|
|
|
|
|
|
|
|
|
changeStreamCursor.close();
|
|
|
|
|
|
|
|
|
|
// Test that change stream resume returns the expected set of documents at each point
|
|
|
|
|
@@ -198,7 +262,19 @@
|
|
|
|
|
assert.commandWorked(db.dropDatabase());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const rst = new ReplSetTest({nodes: 1});
|
|
|
|
|
let replSetTestDescription = {nodes: 1};
|
|
|
|
|
if (!jsTest.options().setParameters.hasOwnProperty(
|
|
|
|
|
"maxNumberOfTransactionOperationsInSingleOplogEntry")) {
|
|
|
|
|
// Configure the replica set to use our value for maxOpsInOplogEntry.
|
|
|
|
|
replSetTestDescription.nodeOptions = {
|
|
|
|
|
setParameter: {maxNumberOfTransactionOperationsInSingleOplogEntry: maxOpsInOplogEntry}
|
|
|
|
|
};
|
|
|
|
|
} else {
|
|
|
|
|
// The test is executing in a build variant that already defines its own override value for
|
|
|
|
|
// maxNumberOfTransactionOperationsInSingleOplogEntry. Even though the build variant's
|
|
|
|
|
// choice for this override won't test the same edge cases, the test should still succeed.
|
|
|
|
|
}
|
|
|
|
|
const rst = new ReplSetTest(replSetTestDescription);
|
|
|
|
|
rst.startSet();
|
|
|
|
|
rst.initiate();
|
|
|
|
|
|
|
|
|
|
|