Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
38a70214a5 | ||
|
|
c2c4cb72aa |
@@ -1,6 +1,7 @@
|
||||
// this tests 1% of all points
|
||||
//
|
||||
// @tags: [
|
||||
// # TODO(SERVER-60823): remove incompatible_with_gcov
|
||||
// incompatible_with_gcov,
|
||||
// ]
|
||||
|
||||
|
||||
@@ -2,6 +2,8 @@
|
||||
// All V2 2dsphere indices are sparse in the geo fields.
|
||||
//
|
||||
// @tags: [
|
||||
// # TODO(SERVER-60823): remove incompatible_with_gcov
|
||||
// incompatible_with_gcov,
|
||||
// ]
|
||||
|
||||
(function() {
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
/**
|
||||
* Test the $sampleRate match expression.
|
||||
* @tags: [
|
||||
* # TODO(SERVER-60823): remove incompatible_with_gcov
|
||||
* incompatible_with_gcov,
|
||||
* ]
|
||||
*/
|
||||
(function() {
|
||||
|
||||
@@ -24,14 +24,6 @@
|
||||
* jstests/libs/txns/txn_passthrough_runner_selftest.js
|
||||
*/
|
||||
|
||||
import {
|
||||
hasError,
|
||||
hasWriteConcernError,
|
||||
isSuccess,
|
||||
Result,
|
||||
RetryTracker
|
||||
} from "jstests/libs/override_methods/retry_utils.js";
|
||||
|
||||
(function() {
|
||||
"use strict";
|
||||
|
||||
@@ -269,13 +261,18 @@ function isFailedToSatisfyPrimaryReadPreferenceError(res) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Tracks if the current command is being run as part of a transaction retry.
|
||||
let inTransactionRetry = false;
|
||||
|
||||
function isTransactionRetry() {
|
||||
return inTransactionRetry;
|
||||
function hasError(res) {
|
||||
return res.ok !== 1 || res.writeErrors;
|
||||
}
|
||||
|
||||
function hasWriteConcernError(res) {
|
||||
return res.hasOwnProperty("writeConcernError");
|
||||
}
|
||||
|
||||
// Tracks if the current command is being run in a network retry. This is specifically for
|
||||
// retries that this file initiates, not ones that retryable writes initiates.
|
||||
let inCommandNetworkErrorRetry = false;
|
||||
|
||||
// "Command ID" is an identifier for a given command being overridden. This is to track what log
|
||||
// messages come from what commands. This override is highly recursive and this is helpful for
|
||||
// debugging that recursion and following what commands initiated other commands.
|
||||
@@ -330,8 +327,7 @@ function logMsgFull(msgHeader, msgFooter) {
|
||||
// Validate the command before running it, to prevent tests with non-retryable commands
|
||||
// from being run.
|
||||
function validateCmdNetworkErrorCompatibility(cmdName, cmdObj) {
|
||||
assert(!isCmdInTransaction(cmdObj));
|
||||
assert(!isTransactionRetry());
|
||||
assert(!inCommandNetworkErrorRetry);
|
||||
assert(!isNested());
|
||||
|
||||
const isRetryableWriteCmd = RetryableWritesUtil.isRetryableWriteCmdName(cmdName);
|
||||
@@ -529,24 +525,18 @@ function commitTransaction(conn, lsid, txnNumber) {
|
||||
logMsgFull('commitTransaction',
|
||||
`Committing transaction ${txnNumber} on session ${tojsononeline(lsid)}`);
|
||||
|
||||
const cmdObj = {
|
||||
// Running the command on conn will reenter from the top of `runCommandOverride`, retrying
|
||||
// as needed.
|
||||
assert.commandWorked(conn.adminCommand({
|
||||
commitTransaction: 1,
|
||||
autocommit: false,
|
||||
lsid: lsid,
|
||||
txnNumber: txnNumber,
|
||||
};
|
||||
// Append this override-generated commit to the transaction state.
|
||||
// The transaction is being ended, but the commit may need to be retried
|
||||
// if it fails.
|
||||
continueTransaction("admin", "commitTransaction", cmdObj);
|
||||
// Running the command on conn will reenter from the top of `runCommandOverride`, retrying
|
||||
// as needed.
|
||||
const res = assert.commandWorked(conn.adminCommand(cmdObj));
|
||||
}));
|
||||
|
||||
// We've successfully committed the transaction, so we can forget the ops we've successfully
|
||||
// run.
|
||||
clearOpsList();
|
||||
return res;
|
||||
}
|
||||
|
||||
function abortTransaction(conn, lsid, txnNumber) {
|
||||
@@ -603,7 +593,7 @@ function calculateStmtIdInc(cmdName, cmdObj) {
|
||||
}
|
||||
}
|
||||
|
||||
function continueTransaction(dbName, cmdName, cmdObj) {
|
||||
function continueTransaction(conn, dbName, cmdName, cmdObj) {
|
||||
cmdObj.txnNumber = txnOptions.txnNumber;
|
||||
cmdObj.stmtId = txnOptions.stmtId;
|
||||
cmdObj.autocommit = false;
|
||||
@@ -616,7 +606,7 @@ function continueTransaction(dbName, cmdName, cmdObj) {
|
||||
assert(!cmdObj.hasOwnProperty('writeConcern'), cmdObj);
|
||||
|
||||
// If this is the first time we are running this command, push it to the ops array.
|
||||
if (!isNested()) {
|
||||
if (!isNested() && !inCommandNetworkErrorRetry) {
|
||||
// Make a copy so the command does not get changed by the test.
|
||||
const objCopy = TransactionsUtil.deepCopyObject({}, cmdObj);
|
||||
|
||||
@@ -658,16 +648,13 @@ function setupTransactionCommand(conn, dbName, cmdName, cmdObj, lsid) {
|
||||
const driverSession = conn.getDB(dbName).getSession();
|
||||
const commandSupportsTransaction = TransactionsUtil.commandSupportsTxn(dbName, cmdName, cmdObj);
|
||||
const isSystemDotProfile = isNamespaceSystemDotProfile(cmdObj);
|
||||
const isNonTxnGetMore = isCommandNonTxnGetMore(cmdName, cmdObj);
|
||||
|
||||
const includeInTransaction = commandSupportsTransaction && !isSystemDotProfile &&
|
||||
driverSession.getSessionId() !== null && !isNonTxnGetMore;
|
||||
|
||||
if (includeInTransaction) {
|
||||
if (commandSupportsTransaction && !isSystemDotProfile &&
|
||||
driverSession.getSessionId() !== null && !isCommandNonTxnGetMore(cmdName, cmdObj)) {
|
||||
if (isNested()) {
|
||||
// Nested commands should never start a new transaction.
|
||||
} else if (ops.length === 0) {
|
||||
// We should never start a transaction on a getMore.
|
||||
// We should never end a transaction on a getMore.
|
||||
assert.neq(cmdName, "getMore", cmdObj);
|
||||
startNewTransaction(conn, cmdObj);
|
||||
} else if (cmdName === "getMore") {
|
||||
@@ -679,7 +666,8 @@ function setupTransactionCommand(conn, dbName, cmdName, cmdObj, lsid) {
|
||||
commitTransaction(conn, lsid, txnOptions.txnNumber);
|
||||
startNewTransaction(conn, cmdObj);
|
||||
}
|
||||
continueTransaction(dbName, cmdName, cmdObj);
|
||||
continueTransaction(conn, dbName, cmdName, cmdObj);
|
||||
|
||||
} else {
|
||||
if (ops.length > 0 && !isNested() && !isSystemDotProfile) {
|
||||
// Operations on system.profile must be allowed to execute in parallel with open
|
||||
@@ -693,7 +681,101 @@ function setupTransactionCommand(conn, dbName, cmdName, cmdObj, lsid) {
|
||||
}
|
||||
}
|
||||
appendReadAndWriteConcern(conn, dbName, cmdName, cmdObj);
|
||||
return includeInTransaction;
|
||||
}
|
||||
|
||||
// Retries the entire transaction without committing it. Returns immediately on an error with
|
||||
// the response from the failed command. This may recursively retry the entire transaction in
|
||||
// which case parent retries are completed early.
|
||||
function retryEntireTransaction(conn, lsid) {
|
||||
// Re-run every command in the ops array.
|
||||
assert.gt(ops.length, 0);
|
||||
|
||||
// Keep track of what txnNumber this retry is attempting.
|
||||
const retriedTxnNumber = startNewTransaction(conn, {"ignored object": 1});
|
||||
|
||||
logMsgFull('Retrying entire transaction',
|
||||
`txnNumber: ${retriedTxnNumber}, lsid: ${tojsononeline(lsid)}`);
|
||||
let res;
|
||||
for (let op of ops) {
|
||||
logMsgFull('Retrying op',
|
||||
`txnNumber: ${retriedTxnNumber}, lsid: ${tojsononeline(lsid)},` +
|
||||
` db: ${op.dbName}, op: ${tojsononeline(op.cmdObj)}`);
|
||||
// Running the command on conn will reenter from the top of `runCommandOverride`,
|
||||
// individual statement retries will be suppressed by tracking nesting level.
|
||||
res = conn.getDB(op.dbName).runCommand(op.cmdObj);
|
||||
|
||||
if (hasError(res) || hasWriteConcernError(res)) {
|
||||
return res;
|
||||
}
|
||||
// Sanity check that we checked for an error correctly.
|
||||
assert.commandWorked(res);
|
||||
|
||||
// If we recursively retried the entire transaction, we do not want to continue this
|
||||
// retry. We just pass up the response from the retry that completed.
|
||||
if (txnOptions.txnNumber !== retriedTxnNumber) {
|
||||
return res;
|
||||
}
|
||||
}
|
||||
|
||||
// We do not commit the transaction and let it continue in the next operation.
|
||||
return res;
|
||||
}
|
||||
|
||||
// Creates the given collection, retrying if needed. Throws on failure.
|
||||
function createCollectionExplicitly(conn, dbName, collName, lsid) {
|
||||
logMsgFull(
|
||||
'create',
|
||||
`Explicitly creating collection ${dbName}.${collName} and then retrying transaction`);
|
||||
|
||||
// Always majority commit the create because this is not expected to roll back once
|
||||
// successful.
|
||||
const createCmdObj = {
|
||||
create: collName,
|
||||
lsid: lsid,
|
||||
writeConcern: {w: 'majority'},
|
||||
};
|
||||
|
||||
// Running the command on conn will reenter from the top of `runCommandOverride`, retrying
|
||||
// as needed. If an error returned by `create` were tolerable, it would already have been
|
||||
// retried by the time it surfaced here.
|
||||
assert.commandWorked(conn.getDB(dbName).runCommand(createCmdObj));
|
||||
}
|
||||
|
||||
// Processes the response to the command if we are configured for txn override. Performs retries
|
||||
// if necessary for implicit collection creation or transient transaction errors.
|
||||
// Returns the last command response received by a command or retry.
|
||||
function retryWithTxnOverride(res, conn, dbName, cmdName, cmdObj, lsid, logError) {
|
||||
assert(configuredForTxnOverride());
|
||||
|
||||
const failedOnCRUDStatement =
|
||||
hasError(res) && !isCommitOrAbort(cmdName) && isCmdInTransaction(cmdObj);
|
||||
if (failedOnCRUDStatement) {
|
||||
assert.gt(ops.length, 0);
|
||||
abortTransaction(conn, lsid, txnOptions.txnNumber);
|
||||
|
||||
// Transaction statements cannot be retried, but retryable codes are expected to succeed
|
||||
// on full transaction retry.
|
||||
if (configuredForNetworkRetry() && RetryableWritesUtil.isRetryableCode(res.code)) {
|
||||
logError("Retrying on retryable error for transaction statement");
|
||||
return retryEntireTransaction(conn, lsid);
|
||||
}
|
||||
}
|
||||
|
||||
// Transient transaction errors should retry the entire transaction. A
|
||||
// TransientTransactionError on "abortTransaction" is considered a success.
|
||||
if (TransactionsUtil.isTransientTransactionError(res) && cmdName !== "abortTransaction") {
|
||||
logError("Retrying on TransientTransactionError response");
|
||||
res = retryEntireTransaction(conn, lsid);
|
||||
|
||||
// If we got a TransientTransactionError on 'commitTransaction' retrying the transaction
|
||||
// will not retry it, so we retry it here.
|
||||
if (!hasError(res) && cmdName === "commitTransaction") {
|
||||
commitTransaction(conn, lsid, txnOptions.txnNumber);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
// Returns true if any error code in a response's "raw" field is retryable.
|
||||
@@ -857,6 +939,32 @@ function shouldRetryForBackgroundReconfigOverride(res, cmdName, logError) {
|
||||
return res;
|
||||
}
|
||||
|
||||
// Processes exceptions if configured for txn override. Retries the entire transaction on
|
||||
// transient transaction errors or network errors if configured for network errors as well.
|
||||
// If a retry fails, returns the response, or returns null for further exception processing.
|
||||
function retryWithTxnOverrideException(e, conn, cmdName, cmdObj, lsid, logError) {
|
||||
assert(configuredForTxnOverride());
|
||||
|
||||
if (TransactionsUtil.isTransientTransactionError(e) && cmdName !== "abortTransaction") {
|
||||
logError("Retrying on TransientTransactionError exception for command");
|
||||
const res = retryEntireTransaction(conn, lsid);
|
||||
|
||||
// If we got a TransientTransactionError on 'commitTransaction' retrying the transaction
|
||||
// will not retry it, so we retry it here.
|
||||
if (!hasError(res) && cmdName === "commitTransaction") {
|
||||
commitTransaction(conn, lsid, txnOptions.txnNumber);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
if (configuredForNetworkRetry() && isNetworkError(e) &&
|
||||
!canRetryNetworkErrorForCommand(cmdName, cmdObj)) {
|
||||
logError("Retrying on network exception for transaction statement");
|
||||
return retryEntireTransaction(conn, lsid);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
// Processes exceptions if configured for network error retry. Returns whether to subtract one
|
||||
// from the number of command retries this override counts. Throws if we should not retry.
|
||||
function shouldRetryWithNetworkExceptionOverride(
|
||||
@@ -895,36 +1003,48 @@ function shouldRetryWithNetworkExceptionOverride(
|
||||
return true;
|
||||
}
|
||||
|
||||
const kMaxNumNetworkErrorRetries = 3;
|
||||
const kMaxNumRetries = 3;
|
||||
|
||||
// This function is the heart of the override with the main error retry loop.
|
||||
function networkRetryOverrideBody(conn, cmdName, cmdObj, clientFunction, makeFuncArgs) {
|
||||
function runCommandOverrideBody(conn, dbName, cmdName, cmdObj, lsid, clientFunction, makeFuncArgs) {
|
||||
const startTime = Date.now();
|
||||
|
||||
const isTxnStatement = isCmdInTransaction(cmdObj);
|
||||
|
||||
if (configuredForNetworkRetry() && !isTxnStatement) {
|
||||
if (configuredForNetworkRetry() && !isNested() && !isTxnStatement) {
|
||||
// If this is a top level command, make sure that the command supports network error
|
||||
// retries. Don't validate transaction statements because their encompassing transaction
|
||||
// can be retried at a higher level, even if each statement isn't retryable on its own.
|
||||
validateCmdNetworkErrorCompatibility(cmdName, cmdObj);
|
||||
}
|
||||
|
||||
if (configuredForTxnOverride()) {
|
||||
setupTransactionCommand(conn, dbName, cmdName, cmdObj, lsid);
|
||||
}
|
||||
|
||||
const canRetryNetworkError = canRetryNetworkErrorForCommand(cmdName, cmdObj);
|
||||
const canRetryReadError = canRetryReadErrorDuringBackgroundReconfig(cmdName);
|
||||
let numNetworkErrorRetriesRemaining = canRetryNetworkError ? kMaxNumNetworkErrorRetries : 0;
|
||||
let numNetworkErrorRetries = canRetryNetworkError ? kMaxNumRetries : 0;
|
||||
do {
|
||||
try {
|
||||
// Actually run the provided command.
|
||||
let res = clientFunction.apply(conn, makeFuncArgs(cmdObj));
|
||||
if (configuredForTxnOverride()) {
|
||||
logMsgFull("Override got response",
|
||||
`res: ${tojsononeline(res)}, cmd: ${tojsononeline(cmdObj)}`);
|
||||
|
||||
// There's no error, no retries need to be attempted.
|
||||
if (isSuccess(res)) {
|
||||
return res;
|
||||
if (!hasError(res) &&
|
||||
TransactionsUtil.commandIsNonTxnAggregation(cmdName, cmdObj)) {
|
||||
nonTxnAggCursorSet[res.cursor.id] = true;
|
||||
}
|
||||
}
|
||||
|
||||
const logError = (msg) => logErrorFull(msg, cmdName, cmdObj, res);
|
||||
|
||||
if (configuredForTxnOverride()) {
|
||||
res = retryWithTxnOverride(res, conn, dbName, cmdName, cmdObj, lsid, logError);
|
||||
}
|
||||
|
||||
if (canRetryNetworkError) {
|
||||
const networkRetryRes =
|
||||
shouldRetryWithNetworkErrorOverride(res, cmdName, startTime, logError);
|
||||
@@ -950,263 +1070,50 @@ function networkRetryOverrideBody(conn, cmdName, cmdObj, clientFunction, makeFun
|
||||
} catch (e) {
|
||||
const logError = (msg) => logErrorFull(msg, cmdName, cmdObj, e);
|
||||
|
||||
if (configuredForTxnOverride()) {
|
||||
const txnRetryOnException =
|
||||
retryWithTxnOverrideException(e, conn, cmdName, cmdObj, lsid, logError);
|
||||
if (txnRetryOnException) {
|
||||
return txnRetryOnException;
|
||||
}
|
||||
}
|
||||
|
||||
if (canRetryNetworkError) {
|
||||
const decrementRetryCount = shouldRetryWithNetworkExceptionOverride(
|
||||
e, cmdName, cmdObj, startTime, numNetworkErrorRetriesRemaining, logError);
|
||||
e, cmdName, cmdObj, startTime, numNetworkErrorRetries, logError);
|
||||
if (decrementRetryCount) {
|
||||
--numNetworkErrorRetriesRemaining;
|
||||
--numNetworkErrorRetries;
|
||||
logMsgFull("Decrementing command network error retry count",
|
||||
`New count: ${numNetworkErrorRetriesRemaining}`);
|
||||
`New count: ${numNetworkErrorRetries}`);
|
||||
}
|
||||
|
||||
logErrorFull("Retrying on network error for command", cmdName, cmdObj, e);
|
||||
inCommandNetworkErrorRetry = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
throw e;
|
||||
}
|
||||
} while (numNetworkErrorRetriesRemaining >= 0);
|
||||
} while (numNetworkErrorRetries >= 0);
|
||||
throw new Error("MONGO UNREACHABLE");
|
||||
}
|
||||
|
||||
function networkRunCommandOverride(conn, dbName, cmdName, cmdObj, clientFunction, makeFuncArgs) {
|
||||
if (!configuredForNetworkRetry() && !configuredForBackgroundReconfigs()) {
|
||||
return clientFunction.apply(conn, makeFuncArgs(cmdObj));
|
||||
}
|
||||
return networkRetryOverrideBody(conn, cmdName, cmdObj, clientFunction, makeFuncArgs);
|
||||
}
|
||||
|
||||
function shouldRetryTxn(cmdName, cmdObj, result) {
|
||||
try {
|
||||
return shouldRetryTxnOnStatus(cmdName, cmdObj, result.unwrap());
|
||||
} catch (e) {
|
||||
return shouldRetryTxnOnException(cmdName, cmdObj, e);
|
||||
}
|
||||
}
|
||||
|
||||
function shouldRetryTxnOnStatus(cmdName, cmdObj, res) {
|
||||
if (isSuccess(res)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (TransactionsUtil.isConflictingOperationInProgress(res)) {
|
||||
// Other overrides, or session.js, may interfere, and retry an op which starts
|
||||
// a transaction if it reported a failure e.g., on a network error, but succeeded
|
||||
// server-side. Retry the transaction _again_, with a new txnNumber.
|
||||
return true;
|
||||
}
|
||||
|
||||
const logError = (msg) => logErrorFull(msg, cmdName, cmdObj, res);
|
||||
|
||||
// Transient transaction errors should retry the entire transaction. A
|
||||
// TransientTransactionError on "abortTransaction" is considered a success.
|
||||
if (TransactionsUtil.isTransientTransactionError(res) && cmdName !== "abortTransaction") {
|
||||
logError("Retrying on TransientTransactionError response");
|
||||
return true;
|
||||
}
|
||||
|
||||
const failedOnCRUDStatement = !isCommitOrAbort(cmdName);
|
||||
if (failedOnCRUDStatement) {
|
||||
// If configured for BOTH txn override, and network error override, a network error
|
||||
// will NOT retry a single op, instead it must retry the entire transaction.
|
||||
if (configuredForNetworkRetry() && RetryableWritesUtil.isRetryableCode(res.code)) {
|
||||
logError("Retrying on retryable error for transaction statement");
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
function shouldRetryTxnOnException(cmdName, cmdObj, exception) {
|
||||
const logError = (msg) => logErrorFull(msg, cmdName, cmdObj, exception);
|
||||
if (TransactionsUtil.isTransientTransactionError(exception) && cmdName !== "abortTransaction") {
|
||||
logError("Retrying on TransientTransactionError exception for command");
|
||||
return true;
|
||||
}
|
||||
|
||||
if (configuredForNetworkRetry() && isNetworkError(exception) &&
|
||||
!canRetryNetworkErrorForCommand(cmdName, cmdObj)) {
|
||||
// If configured for BOTH txn override, and network error override, a network error
|
||||
// will NOT retry a single op, instead it must retry the entire transaction.
|
||||
logError("Retrying on network exception for transaction statement");
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
// Maximum number of times a transaction can be retried within a given op.
|
||||
// Note that a transaction may fail and be retried up to this many times on
|
||||
// each call to runCommand, for a total number of retries on the order of
|
||||
// maxOpsInTransaction * kMaxNumTxnErrorRetries.
|
||||
const kMaxNumTxnErrorRetries = 5;
|
||||
const kMaxTxnRetryTimeout = 5 * 60 * 1000;
|
||||
|
||||
function txnRetryOverrideBody(conn, dbName, cmdName, cmdObj, lsid, clientFunction, makeFuncArgs) {
|
||||
assert(isCmdInTransaction(cmdObj));
|
||||
const retryTracker = new RetryTracker(kMaxNumTxnErrorRetries, kMaxTxnRetryTimeout);
|
||||
|
||||
const logResult = (res) => {
|
||||
res.apply(value =>
|
||||
logMsgFull("Override got response",
|
||||
`res: ${tojsononeline(value)}, cmd: ${tojsononeline(cmdObj)}`));
|
||||
};
|
||||
|
||||
// ==== Initial Attempt ====
|
||||
let res = Result.wrap(() => clientFunction.apply(conn, makeFuncArgs(cmdObj)));
|
||||
logResult(res);
|
||||
if (res.ok()) {
|
||||
return res.status;
|
||||
}
|
||||
|
||||
if (!shouldRetryTxn(cmdName, cmdObj, res)) {
|
||||
return res.unwrap();
|
||||
}
|
||||
|
||||
if (!isCommitOrAbort(cmdName)) {
|
||||
// Abort the transaction before trying again in a new transaction.
|
||||
try {
|
||||
// Abort returns in successful cases, or throws - if it fails,
|
||||
// we will still retry the transaction anyway.
|
||||
abortTransaction(conn, lsid, txnOptions.txnNumber);
|
||||
} catch {
|
||||
}
|
||||
}
|
||||
|
||||
// On failure of a single request within a transaction, retry the entire transaction.
|
||||
// This involves re-playing all preceding requests in the transaction.
|
||||
// Note that commits/aborts may be individually retried by networkRunCommandOverride,
|
||||
// but any other failed op will retry the whole transaction here.
|
||||
|
||||
// ==== Retry Loop ====
|
||||
for (let retry of retryTracker) {
|
||||
// Track the new transaction state.
|
||||
const retriedTxnNumber = startNewTransaction(conn, {"ignored object": 1});
|
||||
|
||||
logMsgFull('Retrying entire transaction',
|
||||
`txnNumber: ${retriedTxnNumber}, lsid: ${
|
||||
tojsononeline(lsid)}, remainingAttempts: ${retry.remaining}`);
|
||||
|
||||
for (let op of ops) {
|
||||
logMsgFull('Retrying op',
|
||||
`txnNumber: ${retriedTxnNumber}, lsid: ${tojsononeline(lsid)},` +
|
||||
` db: ${op.dbName}, op: ${tojsononeline(op.cmdObj)}`);
|
||||
|
||||
// Running the command on conn will reenter from the top of `runCommandOverride`,
|
||||
// this will re-enter `txnRunCommandOverride` but such ops bypass transaction
|
||||
// retry logic, to avoid recursive retries.
|
||||
cmdObj = {...op.cmdObj, txnNumber: retriedTxnNumber};
|
||||
cmdName = Object.keys(cmdObj)[0];
|
||||
appendReadAndWriteConcern(conn, op.dbName, cmdName, cmdObj);
|
||||
res = Result.wrap(() => conn.getDB(op.dbName).runCommand(cmdObj));
|
||||
logResult(res);
|
||||
|
||||
if (!res.ok()) {
|
||||
// Failed while replaying operations for the transaction.
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (res.ok()) {
|
||||
// Replayed the entire transaction successfully.
|
||||
break;
|
||||
}
|
||||
|
||||
if (!shouldRetryTxn(cmdName, cmdObj, res)) {
|
||||
break;
|
||||
}
|
||||
if (!isCommitOrAbort(cmdName)) {
|
||||
// Abort the transaction before trying again in a new transaction.
|
||||
// Abort returns in successful cases, or throws, re-starting the retry.
|
||||
// In any case, the only way to progress is to retry again with a new
|
||||
// transaction number.
|
||||
try {
|
||||
abortTransaction(conn, lsid, txnOptions.txnNumber);
|
||||
} catch (e) {
|
||||
}
|
||||
}
|
||||
}
|
||||
return res.unwrap();
|
||||
}
|
||||
|
||||
// Top level runCommand override function.
|
||||
function txnRunCommandOverride(conn, dbName, cmdName, cmdObj, clientFunction, makeFuncArgs) {
|
||||
currentCommandID.push(newestCommandID);
|
||||
newestCommandID++;
|
||||
function runCommandOverride(conn, dbName, cmdName, cmdObj, clientFunction, makeFuncArgs) {
|
||||
currentCommandID.push(newestCommandID++);
|
||||
nestingLevel++;
|
||||
|
||||
const lsid = cmdObj.lsid;
|
||||
const passthrough = () => {
|
||||
try {
|
||||
// This request isn't eligible for transaction level retries,
|
||||
// so pass it down to the next override directly.
|
||||
const res = clientFunction.apply(conn, makeFuncArgs(cmdObj));
|
||||
|
||||
logMsgFull(
|
||||
"Txn override passthrough got response",
|
||||
`res: ${tojsononeline(res)}, cmdName: ${cmdName} cmd: ${tojsononeline(cmdObj)}`);
|
||||
// Record non-transaction agg cursor IDs so subsequent getMores for this cursor
|
||||
// can also avoid being forced into an ongoing transaction.
|
||||
if (isSuccess(res) && TransactionsUtil.commandIsNonTxnAggregation(cmdName, cmdObj)) {
|
||||
nonTxnAggCursorSet[res.cursor.id] = true;
|
||||
}
|
||||
return res;
|
||||
} catch (e) {
|
||||
logErrorFull("Txn override passthrough got exception", cmdName, cmdObj, e);
|
||||
throw e;
|
||||
}
|
||||
};
|
||||
try {
|
||||
if (!configuredForTxnOverride()) {
|
||||
// Not currently enabled - but tests can change this at runtime, so it must be
|
||||
// checked for each operation.
|
||||
logMsgFull(
|
||||
"Txn override not enabled",
|
||||
`Will not apply override to cmdName: ${cmdName} cmd: ${tojsononeline(cmdObj)}`);
|
||||
return passthrough();
|
||||
}
|
||||
|
||||
// Record this operation, starting a new transaction if not currently in one.
|
||||
// If this command is not eligible for inclusion in the current transaction,
|
||||
// it will commit the current transaction here.
|
||||
if (!setupTransactionCommand(conn, dbName, cmdName, cmdObj, cmdObj.lsid)) {
|
||||
// This is a command which does not support executing in a transaction,
|
||||
// or a getMore for a cursor created by such a command.
|
||||
logMsgFull(
|
||||
"Operation cannot be wrapped in a transaction",
|
||||
`Will not apply override to cmdName: ${cmdName} cmd: ${tojsononeline(cmdObj)}`);
|
||||
return passthrough();
|
||||
}
|
||||
|
||||
// A nested call means we have re-entered runCommand from _within_ an ongoing call to
|
||||
// txnRunCommandOverride. This is either for retries, or "injecting" a commit to close
|
||||
// a transaction (reached max ops, need to run a non-transaction command).
|
||||
if (isTransactionRetry()) {
|
||||
logMsgFull(
|
||||
"Operation is a retry attempt from txn override",
|
||||
`Will not apply override to cmdName: ${cmdName} cmd: ${tojsononeline(cmdObj)}`);
|
||||
return passthrough();
|
||||
}
|
||||
|
||||
let res;
|
||||
try {
|
||||
assert(!inTransactionRetry);
|
||||
inTransactionRetry = true;
|
||||
|
||||
// Enter the override body, where retries will be handled at the transaction level.
|
||||
res = txnRetryOverrideBody(
|
||||
conn, dbName, cmdName, cmdObj, lsid, clientFunction, makeFuncArgs);
|
||||
} finally {
|
||||
inTransactionRetry = false;
|
||||
}
|
||||
const res = runCommandOverrideBody(
|
||||
conn, dbName, cmdName, cmdObj, lsid, clientFunction, makeFuncArgs);
|
||||
|
||||
// Many tests run queries that are expected to fail. In this case, when we wrap CRUD ops
|
||||
// in transactions, the transaction including the failed query will not be able to
|
||||
// commit. This override expects transactions to be able to commit. Rather than
|
||||
// denylisting all tests containing queries that are expected to fail, we clear the ops
|
||||
// list when we return an error to the test so we do not retry the failed query.
|
||||
if (hasError(res) && (ops.length > 0)) {
|
||||
if (configuredForTxnOverride() && !isNested() && hasError(res) && (ops.length > 0)) {
|
||||
logMsgFull("Clearing ops on failed command",
|
||||
`res: ${tojsononeline(res)}, cmd: ${tojsononeline(cmdObj)}`);
|
||||
clearOpsList();
|
||||
@@ -1218,6 +1125,7 @@ function txnRunCommandOverride(conn, dbName, cmdName, cmdObj, clientFunction, ma
|
||||
// Reset recursion and retry state tracking.
|
||||
nestingLevel--;
|
||||
currentCommandID.pop();
|
||||
inCommandNetworkErrorRetry = false;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1265,6 +1173,5 @@ if (configuredForTxnOverride()) {
|
||||
};
|
||||
}
|
||||
|
||||
OverrideHelpers.overrideRunCommand(networkRunCommandOverride);
|
||||
OverrideHelpers.overrideRunCommand(txnRunCommandOverride);
|
||||
OverrideHelpers.overrideRunCommand(runCommandOverride);
|
||||
})();
|
||||
|
||||
@@ -1,106 +0,0 @@
|
||||
/**
|
||||
* Miscellaneous utilities which are commonly useful when dealing
|
||||
* with success/failure, and retries.
|
||||
*/
|
||||
|
||||
export function hasError(res) {
|
||||
return res.ok !== 1 || res.writeErrors || (res.hasOwnProperty("nErrors") && res.nErrors != 0);
|
||||
}
|
||||
|
||||
export function hasWriteConcernError(res) {
|
||||
return res.hasOwnProperty("writeConcernError");
|
||||
}
|
||||
|
||||
export function isSuccess(res) {
|
||||
return !(hasError(res) || hasWriteConcernError(res));
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrapper for a status or an exception.
|
||||
*
|
||||
* Typical usage:
|
||||
*
|
||||
* let res = Result.wrap(() => mightReturnOrThrow());
|
||||
* if (res.ok()) {...}
|
||||
* return res.unwrap(); // Returns or throws.
|
||||
*/
|
||||
export class Result {
|
||||
constructor(status, exception) {
|
||||
// Status or exception must be present.
|
||||
assert((status != null) != (exception != null));
|
||||
this.status = status;
|
||||
this.exception = exception;
|
||||
}
|
||||
static status(status) {
|
||||
let res = new Result(status, null);
|
||||
return res;
|
||||
}
|
||||
|
||||
static exception(exception) {
|
||||
let res = new Result(null, exception);
|
||||
return res;
|
||||
}
|
||||
|
||||
static wrap(callable) {
|
||||
try {
|
||||
return Result.status(callable());
|
||||
} catch (e) {
|
||||
return Result.exception(e);
|
||||
}
|
||||
}
|
||||
|
||||
ok() {
|
||||
return this.status != null && isSuccess(this.status);
|
||||
}
|
||||
|
||||
unwrap() {
|
||||
if (this.exception != null) {
|
||||
throw this.exception;
|
||||
}
|
||||
return this.status;
|
||||
}
|
||||
apply(callback) {
|
||||
if (this.exception != null) {
|
||||
return callback(this.exception);
|
||||
}
|
||||
return callback(this.status);
|
||||
}
|
||||
dispatch(onStatus, onException) {
|
||||
if (this.exception != null) {
|
||||
return onException(this.exception);
|
||||
}
|
||||
return onStatus(this.status);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Utility to simplify looping until X iterations or Y milliseconds elapses;
|
||||
* a common pattern when retrying operations.
|
||||
*
|
||||
* let helper = new RetryTracker(3, 3000);
|
||||
* for (let _ of helper) {
|
||||
* ... // E.g., try a command. Will loop until break or exceeding either limit.
|
||||
* }
|
||||
* The timer starts from creation of the helper.
|
||||
*/
|
||||
export class RetryTracker {
|
||||
constructor(retryCount, timeout) {
|
||||
this.retryCount = retryCount;
|
||||
this.timeout = timeout;
|
||||
this.startTime = Date.now();
|
||||
}
|
||||
|
||||
* [Symbol.iterator]() {
|
||||
if (this.retryCount <= 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (let i = 0; i < this.retryCount; ++i) {
|
||||
let elapsed = Date.now() - this.startTime;
|
||||
if (elapsed > this.timeout) {
|
||||
break;
|
||||
}
|
||||
yield {remaining: this.retryCount - i - 1, remainingTime: this.timeout - elapsed};
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -31,8 +31,7 @@ var TransactionsUtil = (function() {
|
||||
// still execute concurrently with other transactions. Pipelines with $changeStream or $out
|
||||
// cannot run within a transaction.
|
||||
function commandIsNonTxnAggregation(cmdName, cmdObj) {
|
||||
return !("explain" in cmdObj) &&
|
||||
OverrideHelpers.isAggregationWithOutOrMergeStage(cmdName, cmdObj) ||
|
||||
return OverrideHelpers.isAggregationWithOutOrMergeStage(cmdName, cmdObj) ||
|
||||
OverrideHelpers.isAggregationWithChangeStreamStage(cmdName, cmdObj);
|
||||
}
|
||||
|
||||
@@ -108,11 +107,6 @@ var TransactionsUtil = (function() {
|
||||
res.errorLabels.includes('TransientTransactionError');
|
||||
}
|
||||
|
||||
function isConflictingOperationInProgress(res) {
|
||||
return res != null && res.hasOwnProperty('codeName') &&
|
||||
res.codeName === "ConflictingOperationInProgress";
|
||||
}
|
||||
|
||||
// Runs a function 'func()' in a transaction on database 'db'. Invokes function
|
||||
// 'beforeTransactionFunc()' before the transaction (can be used to get references to
|
||||
// collections etc.). Ensures that the transaction is successfully committed, by retrying the
|
||||
@@ -147,7 +141,6 @@ var TransactionsUtil = (function() {
|
||||
commandTypeCanSupportTxn,
|
||||
deepCopyObject,
|
||||
isTransientTransactionError,
|
||||
isConflictingOperationInProgress,
|
||||
runInTransaction,
|
||||
};
|
||||
})();
|
||||
|
||||
113
jstests/noPassthrough/libs/proxy_protocol_helpers.js
Normal file
113
jstests/noPassthrough/libs/proxy_protocol_helpers.js
Normal file
@@ -0,0 +1,113 @@
|
||||
/**
|
||||
* Helpers for testing the proxy protocol.
|
||||
*/
|
||||
|
||||
"use strict";
|
||||
|
||||
load("jstests/libs/fail_point_util.js");
|
||||
load("jstests/libs/parallelTester.js"); // For Thread.
|
||||
load("jstests/sharding/libs/proxy_protocol.js");
|
||||
|
||||
const connectAndHello = (port, isRouter) => {
|
||||
jsTestLog(`Attempting to connect to port ${port}`);
|
||||
const connStart = Date.now();
|
||||
const conn = new Mongo(`mongodb://127.0.0.1:${port}${isRouter ? '/?loadBalanced=true' : ''}`);
|
||||
assert.neq(null, conn, `Client was unable to connect to port ${port}`);
|
||||
assert.lt(Date.now() - connStart, 10 * 1000, 'Client was unable to connect within 10 seconds');
|
||||
assert.commandWorked(conn.getDB('admin').runCommand({hello: 1}));
|
||||
};
|
||||
|
||||
const timeoutEmptyConnection = (ingressPort, egressPort, isRouter) => {
|
||||
// Use the connection to set a lower proxy header timeout and validate that empty connections
|
||||
// timeout.
|
||||
const conn =
|
||||
new Mongo(`mongodb://127.0.0.1:${ingressPort}${isRouter ? '/?loadBalanced=true' : ''}`);
|
||||
const proxyTimeoutFailPoint = configureFailPoint(conn, "asioTransportLayer1sProxyTimeout");
|
||||
|
||||
// runProgram blocks until the program is complete. nc should be finished when the server times
|
||||
// out the connection that doesn't send data after 1 second, otherwise the test will hang.
|
||||
assert.eq(0, runProgram("bash", "-c", `cat </dev/tcp/127.0.0.1/${egressPort}`));
|
||||
|
||||
proxyTimeoutFailPoint.off();
|
||||
};
|
||||
|
||||
const emptyMessageTest = (ingressPort, egressPort, node, isRouter) => {
|
||||
jsTestLog("Connect to proxy port without sending data");
|
||||
const pid = _startMongoProgram("bash", "-c", `exec cat < /dev/tcp/127.0.0.1/${egressPort}`);
|
||||
|
||||
// Connecting to the proxy port still succeeds within a reasonable time limit.
|
||||
connectAndHello(ingressPort, isRouter);
|
||||
|
||||
// Connecting to the default port still succeeds within a reasonable time limit.
|
||||
connectAndHello(node.port, isRouter);
|
||||
|
||||
// A connection with no data will timeout.
|
||||
timeoutEmptyConnection(ingressPort, egressPort, isRouter);
|
||||
|
||||
assert(checkProgram(pid).alive);
|
||||
stopMongoProgramByPid(pid);
|
||||
};
|
||||
|
||||
const fuzzingTest = (ingressPort, egressPort, node, isRouter) => {
|
||||
const numConnections = 200;
|
||||
|
||||
for (let i = 0; i < numConnections; i++) {
|
||||
jsTestLog("Sending random data to proxy port");
|
||||
const pid = _startMongoProgram(
|
||||
'bash',
|
||||
'-c',
|
||||
`head -c ${Math.floor(Math.random() * 5000)} /dev/urandom >/dev/tcp/127.0.0.1/${
|
||||
egressPort}`);
|
||||
|
||||
// Connecting to the to the proxy port still succeeds within a reasonable time
|
||||
// limit.
|
||||
connectAndHello(ingressPort, isRouter);
|
||||
|
||||
// Connecting to the default port still succeeds within a reasonable time limit.
|
||||
connectAndHello(node.port, isRouter);
|
||||
|
||||
assert.soon(() => !checkProgram(pid).alive,
|
||||
"Server should have closed connection with invalid proxy protocol header");
|
||||
}
|
||||
};
|
||||
|
||||
const loadTest = (ingressPort, egressPort, node, isRouter) => {
|
||||
const numConnections = 200;
|
||||
let threads = [];
|
||||
|
||||
for (let i = 0; i < numConnections; i++) {
|
||||
threads.push(new Thread((regularPort, ingressPort, egressPort, connectFn, isRouter) => {
|
||||
// Throw in some connections without data to make sure we handle those correctly.
|
||||
const pid =
|
||||
_startMongoProgram("bash", "-c", `exec cat < /dev/tcp/127.0.0.1/${egressPort}`);
|
||||
|
||||
// Connecting to the proxy port still succeeds within a reasonable time
|
||||
// limit.
|
||||
connectFn(ingressPort, isRouter);
|
||||
|
||||
// Connecting to the default port still succeeds within a reasonable time limit.
|
||||
connectFn(regularPort, isRouter);
|
||||
|
||||
assert(checkProgram(pid).alive);
|
||||
stopMongoProgramByPid(pid);
|
||||
}, node.port, ingressPort, egressPort, connectAndHello, isRouter));
|
||||
threads[i].start();
|
||||
}
|
||||
|
||||
for (let i = 0; i < numConnections; i++) {
|
||||
threads[i].join();
|
||||
}
|
||||
};
|
||||
|
||||
const testProxyProtocolShardedCluster = (ingressPort, egressPort, version, testFn) => {
|
||||
const proxy_server = new ProxyProtocolServer(ingressPort, egressPort, version);
|
||||
proxy_server.start();
|
||||
|
||||
const st = new ShardingTest(
|
||||
{shards: 1, mongos: 1, mongosOptions: {setParameter: {"loadBalancerPort": egressPort}}});
|
||||
|
||||
testFn(ingressPort, egressPort, st.s, true);
|
||||
|
||||
proxy_server.stop();
|
||||
st.stop();
|
||||
};
|
||||
26
jstests/noPassthrough/mongos_proxy_protocol.js
Normal file
26
jstests/noPassthrough/mongos_proxy_protocol.js
Normal file
@@ -0,0 +1,26 @@
|
||||
/**
|
||||
* Verify mongos supports proxy protocol connections.
|
||||
* @tags: [
|
||||
* # TODO (SERVER-97257): Re-enable this test or add an explanation why it is incompatible.
|
||||
* embedded_router_incompatible,
|
||||
* grpc_incompatible,
|
||||
* ]
|
||||
*/
|
||||
|
||||
load("jstests/noPassthrough/libs/proxy_protocol_helpers.js");
|
||||
|
||||
if (_isWindows()) {
|
||||
quit();
|
||||
}
|
||||
|
||||
const ingressPort = allocatePort();
|
||||
const egressPort = allocatePort();
|
||||
|
||||
testProxyProtocolShardedCluster(ingressPort, egressPort, 1, emptyMessageTest);
|
||||
testProxyProtocolShardedCluster(ingressPort, egressPort, 2, emptyMessageTest);
|
||||
|
||||
testProxyProtocolShardedCluster(ingressPort, egressPort, 1, fuzzingTest);
|
||||
testProxyProtocolShardedCluster(ingressPort, egressPort, 2, fuzzingTest);
|
||||
|
||||
testProxyProtocolShardedCluster(ingressPort, egressPort, 1, loadTest);
|
||||
testProxyProtocolShardedCluster(ingressPort, egressPort, 2, loadTest);
|
||||
@@ -51,6 +51,7 @@ MONGO_FAIL_POINT_DEFINE(asioTransportLayerShortOpportunisticReadWrite);
|
||||
MONGO_FAIL_POINT_DEFINE(asioTransportLayerSessionPauseBeforeSetSocketOption);
|
||||
MONGO_FAIL_POINT_DEFINE(asioTransportLayerBlockBeforeOpportunisticRead);
|
||||
MONGO_FAIL_POINT_DEFINE(asioTransportLayerBlockBeforeAddSession);
|
||||
MONGO_FAIL_POINT_DEFINE(asioTransportLayer1sProxyTimeout);
|
||||
|
||||
namespace {
|
||||
|
||||
@@ -423,15 +424,27 @@ auto CommonAsioSession::getSocket() -> GenericSocket& {
|
||||
ExecutorFuture<void> CommonAsioSession::parseProxyProtocolHeader(const ReactorHandle& reactor) {
|
||||
invariant(_isIngressSession);
|
||||
invariant(reactor);
|
||||
const Backoff kExponentialBackoff(Milliseconds(2), Milliseconds::max());
|
||||
const Seconds proxyHeaderTimeout =
|
||||
MONGO_unlikely(asioTransportLayer1sProxyTimeout.shouldFail()) ? Seconds(1) : Seconds(120);
|
||||
const Date_t deadline = reactor->now() + proxyHeaderTimeout;
|
||||
|
||||
auto buffer = std::make_shared<std::array<char, kProxyProtocolHeaderSizeUpperBound>>();
|
||||
return AsyncTry([this, buffer] {
|
||||
const auto bytesRead = peekASIOStream(
|
||||
_socket, asio::buffer(buffer->data(), kProxyProtocolHeaderSizeUpperBound));
|
||||
return transport::parseProxyProtocolHeader(StringData(buffer->data(), bytesRead));
|
||||
})
|
||||
.until([](StatusWith<boost::optional<ParserResults>> sw) {
|
||||
.until([deadline, proxyHeaderTimeout, reactor](
|
||||
StatusWith<boost::optional<ParserResults>> sw) {
|
||||
uassert(10382800,
|
||||
fmt::format("Did not receive proxy protocol header within the time limit: {}",
|
||||
proxyHeaderTimeout.toString()),
|
||||
reactor->now() < deadline);
|
||||
|
||||
return !sw.isOK() || sw.getValue();
|
||||
})
|
||||
.withBackoffBetweenIterations(kExponentialBackoff)
|
||||
.on(reactor, CancellationToken::uncancelable())
|
||||
.then([this, buffer](const boost::optional<ParserResults>& results) mutable {
|
||||
invariant(results);
|
||||
|
||||
@@ -153,6 +153,15 @@ StatusWith<unsigned> pollASIOSocket(asio::generic::stream_protocol::socket& sock
|
||||
*/
|
||||
template <typename Stream, typename MutableBufferSequence>
|
||||
size_t peekASIOStream(Stream& stream, const MutableBufferSequence& buffers) {
|
||||
// Check that the socket has bytes available to read so that receive does not block.
|
||||
asio::socket_base::bytes_readable command;
|
||||
stream.io_control(command);
|
||||
std::size_t bytes_readable = command.get();
|
||||
|
||||
if (bytes_readable == 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
std::error_code ec;
|
||||
size_t bytesRead;
|
||||
do {
|
||||
|
||||
@@ -56,6 +56,15 @@ void writeToSocketAndPollForResponse(Stream& writeSocket, Stream& readSocket, St
|
||||
}
|
||||
}
|
||||
|
||||
template <typename Stream>
|
||||
void peekEmptySocket(Stream& readSocket) {
|
||||
const auto bufferSize = 10;
|
||||
auto inBuffer = std::make_unique<char[]>(bufferSize);
|
||||
const auto bytesRead =
|
||||
peekASIOStream(readSocket, asio::mutable_buffer(inBuffer.get(), bufferSize));
|
||||
ASSERT_EQ(bytesRead, 0);
|
||||
}
|
||||
|
||||
template <typename Stream>
|
||||
void peekAllSubstrings(Stream& writeSocket, Stream& readSocket, StringData data) {
|
||||
writeToSocketAndPollForResponse(writeSocket, readSocket, data);
|
||||
@@ -85,31 +94,59 @@ void peekPastBuffer(Stream& writeSocket, Stream& readSocket, StringData data) {
|
||||
}
|
||||
|
||||
#ifdef ASIO_HAS_LOCAL_SOCKETS
|
||||
auto prepareUnixSocketPair(asio::io_context& io_context) {
|
||||
auto prepareUnixSocketPair(asio::io_context& io_context, bool blocking) {
|
||||
asio::local::stream_protocol::socket writeSocket(io_context);
|
||||
asio::local::stream_protocol::socket readSocket(io_context);
|
||||
asio::local::connect_pair(writeSocket, readSocket);
|
||||
readSocket.non_blocking(true);
|
||||
readSocket.non_blocking(blocking);
|
||||
|
||||
return std::pair(std::move(writeSocket), std::move(readSocket));
|
||||
}
|
||||
|
||||
TEST(ASIOUtils, PeekAvailableBytes) {
|
||||
TEST(ASIOUtils, PeekEmptySocketBlocking) {
|
||||
asio::io_context io_context;
|
||||
auto [writeSocket, readSocket] = prepareUnixSocketPair(io_context);
|
||||
auto [_, readSocket] = prepareUnixSocketPair(io_context, false);
|
||||
|
||||
peekEmptySocket(readSocket);
|
||||
}
|
||||
|
||||
TEST(ASIOUtils, PeekEmptySocketNonBlocking) {
|
||||
asio::io_context io_context;
|
||||
auto [_, readSocket] = prepareUnixSocketPair(io_context, true);
|
||||
|
||||
peekEmptySocket(readSocket);
|
||||
}
|
||||
|
||||
TEST(ASIOUtils, PeekAvailableBytesBlocking) {
|
||||
asio::io_context io_context;
|
||||
auto [writeSocket, readSocket] = prepareUnixSocketPair(io_context, false);
|
||||
|
||||
peekAllSubstrings(writeSocket, readSocket, "example"_sd);
|
||||
}
|
||||
|
||||
TEST(ASIOUtils, PeekPastAvailableBytes) {
|
||||
TEST(ASIOUtils, PeekAvailableBytesNonBlocking) {
|
||||
asio::io_context io_context;
|
||||
auto [writeSocket, readSocket] = prepareUnixSocketPair(io_context);
|
||||
auto [writeSocket, readSocket] = prepareUnixSocketPair(io_context, true);
|
||||
|
||||
peekAllSubstrings(writeSocket, readSocket, "example"_sd);
|
||||
}
|
||||
|
||||
TEST(ASIOUtils, PeekPastAvailableBytesBlocking) {
|
||||
asio::io_context io_context;
|
||||
auto [writeSocket, readSocket] = prepareUnixSocketPair(io_context, false);
|
||||
|
||||
peekPastBuffer(writeSocket, readSocket, "example"_sd);
|
||||
}
|
||||
|
||||
TEST(ASIOUtils, PeekPastAvailableBytesNonBlocking) {
|
||||
asio::io_context io_context;
|
||||
auto [writeSocket, readSocket] = prepareUnixSocketPair(io_context, true);
|
||||
|
||||
peekPastBuffer(writeSocket, readSocket, "example"_sd);
|
||||
}
|
||||
#endif // ASIO_HAS_LOCAL_SOCKETS
|
||||
|
||||
auto prepareTCPSocketPair(asio::io_context& io_context) {
|
||||
auto prepareTCPSocketPair(asio::io_context& io_context, bool blocking) {
|
||||
// Make a local loopback connection on an arbitrary ephemeral port.
|
||||
asio::ip::tcp::endpoint ep(asio::ip::make_address("127.0.0.1"), 0);
|
||||
asio::ip::tcp::acceptor acceptor(io_context, ep.protocol());
|
||||
@@ -127,21 +164,49 @@ auto prepareTCPSocketPair(asio::io_context& io_context) {
|
||||
writeSocket.non_blocking(false);
|
||||
// Set no_delay so that our output doesn't get buffered in a kernel buffer.
|
||||
writeSocket.set_option(asio::ip::tcp::no_delay(true));
|
||||
readSocket.non_blocking(true);
|
||||
readSocket.non_blocking(blocking);
|
||||
|
||||
return std::pair(std::move(writeSocket), std::move(readSocket));
|
||||
}
|
||||
|
||||
TEST(ASIOUtils, PeekAvailableBytesTCP) {
|
||||
TEST(ASIOUtils, PeekEmptySocketTCPBlocking) {
|
||||
asio::io_context io_context;
|
||||
auto [writeSocket, readSocket] = prepareTCPSocketPair(io_context);
|
||||
auto [_, readSocket] = prepareTCPSocketPair(io_context, false);
|
||||
|
||||
peekEmptySocket(readSocket);
|
||||
}
|
||||
|
||||
TEST(ASIOUtils, PeekEmptySocketTCPNonBlocking) {
|
||||
asio::io_context io_context;
|
||||
auto [_, readSocket] = prepareTCPSocketPair(io_context, true);
|
||||
|
||||
peekEmptySocket(readSocket);
|
||||
}
|
||||
|
||||
TEST(ASIOUtils, PeekAvailableBytesTCPBlocking) {
|
||||
asio::io_context io_context;
|
||||
auto [writeSocket, readSocket] = prepareTCPSocketPair(io_context, false);
|
||||
|
||||
peekAllSubstrings(writeSocket, readSocket, "example"_sd);
|
||||
}
|
||||
|
||||
TEST(ASIOUtils, PeekPastAvailableBytesTCP) {
|
||||
TEST(ASIOUtils, PeekAvailableBytesTCPNonBlocking) {
|
||||
asio::io_context io_context;
|
||||
auto [writeSocket, readSocket] = prepareTCPSocketPair(io_context);
|
||||
auto [writeSocket, readSocket] = prepareTCPSocketPair(io_context, true);
|
||||
|
||||
peekAllSubstrings(writeSocket, readSocket, "example"_sd);
|
||||
}
|
||||
|
||||
TEST(ASIOUtils, PeekPastAvailableBytesTCPBlocking) {
|
||||
asio::io_context io_context;
|
||||
auto [writeSocket, readSocket] = prepareTCPSocketPair(io_context, false);
|
||||
|
||||
peekPastBuffer(writeSocket, readSocket, "example"_sd);
|
||||
}
|
||||
|
||||
TEST(ASIOUtils, PeekPastAvailableBytesTCPNonBlocking) {
|
||||
asio::io_context io_context;
|
||||
auto [writeSocket, readSocket] = prepareTCPSocketPair(io_context, true);
|
||||
|
||||
peekPastBuffer(writeSocket, readSocket, "example"_sd);
|
||||
}
|
||||
|
||||
@@ -34,6 +34,7 @@
|
||||
#include "mongo/platform/atomic_word.h"
|
||||
#include "mongo/transport/baton.h"
|
||||
#include "mongo/transport/transport_layer.h"
|
||||
#include "mongo/util/future_util.h"
|
||||
|
||||
namespace mongo {
|
||||
namespace transport {
|
||||
@@ -58,5 +59,23 @@ const Status TransportLayer::TicketSessionClosedStatus = Status(
|
||||
|
||||
ReactorTimer::ReactorTimer() : _id(reactorTimerIdCounter.addAndFetch(1)) {}
|
||||
|
||||
ExecutorFuture<void> Reactor::sleepFor(Milliseconds duration, const CancellationToken& token) {
|
||||
auto when = now() + duration;
|
||||
|
||||
if (token.isCanceled()) {
|
||||
return ExecutorFuture<void>(
|
||||
shared_from_this(), Status(ErrorCodes::CallbackCanceled, "Cancelled reactor sleep"));
|
||||
}
|
||||
|
||||
if (when <= now()) {
|
||||
return ExecutorFuture<void>(shared_from_this());
|
||||
}
|
||||
|
||||
std::unique_ptr<ReactorTimer> timer = makeTimer();
|
||||
return future_util::withCancellation(timer->waitUntil(when), token)
|
||||
.thenRunOn(shared_from_this())
|
||||
.onCompletion([t = std::move(timer)](const Status& s) { return s; });
|
||||
}
|
||||
|
||||
} // namespace transport
|
||||
} // namespace mongo
|
||||
|
||||
@@ -193,7 +193,7 @@ private:
|
||||
const size_t _id;
|
||||
};
|
||||
|
||||
class Reactor : public OutOfLineExecutor {
|
||||
class Reactor : public OutOfLineExecutor, public std::enable_shared_from_this<Reactor> {
|
||||
public:
|
||||
Reactor(const Reactor&) = delete;
|
||||
Reactor& operator=(const Reactor&) = delete;
|
||||
@@ -218,6 +218,11 @@ public:
|
||||
* executed in a thread calling run() or runFor().
|
||||
*/
|
||||
virtual std::unique_ptr<ReactorTimer> makeTimer() = 0;
|
||||
|
||||
// sleepFor is implemented so that the reactor is compatible with the AsyncTry exponential
|
||||
// backoff API.
|
||||
ExecutorFuture<void> sleepFor(Milliseconds duration, const CancellationToken& token);
|
||||
|
||||
virtual Date_t now() = 0;
|
||||
|
||||
virtual void appendStats(BSONObjBuilder& bob) const = 0;
|
||||
|
||||
Reference in New Issue
Block a user