SERVER-120805: Make sure PinnedConnectionTaskExecutor is always destroyed before shutting down the underlying executor (#48998)

GitOrigin-RevId: 6875fee17a602e2dbadc1f72a328d76d53fac55f
This commit is contained in:
Will Buerger
2026-03-09 12:40:47 -04:00
committed by MongoDB Bot
parent ee54c7ce97
commit 133211a195
4 changed files with 160 additions and 7 deletions

View File

@@ -117,9 +117,11 @@ std::unique_ptr<executor::TaskExecutorCursor> mockTaskExecutorCursor(OperationCo
executor::TaskExecutorCursorOptions opts(/*pinConnection*/ gPinTaskExecCursorConns.load(),
/*batchSize*/ boost::none,
/*preFetchNextBatch*/ false);
// When pinning is enabled, TaskExecutorCursor requires both executor and underlying executor;
// tests use the same executor for both since there is no real connection to pin.
return std::make_unique<executor::TaskExecutorCursor>(
testExecutor,
nullptr /* underlyingExec */,
testExecutor /* underlyingExec: same as executor in tests */,
CursorResponse{NamespaceString::kEmpty, cursorId, batchVec},
req,
std::move(opts));

View File

@@ -754,9 +754,11 @@ protected:
executor::TaskExecutorCursorOptions opts(/*pinConnection*/ gPinTaskExecCursorConns.load(),
/*batchSize*/ boost::none,
/*preFetchNextBatch*/ false);
// When pinning is enabled, TaskExecutorCursor requires both executor and underlying
// executor; tests use the same executor for both since there is no real connection to pin.
return std::make_unique<executor::TaskExecutorCursor>(
testExecutor,
nullptr /* underlyingExec */,
testExecutor /* underlyingExec: same as executor in tests */,
CursorResponse{NamespaceString::kEmpty, cursorId, batchVec},
req,
std::move(opts));

View File

@@ -94,6 +94,18 @@ TaskExecutorCursor::TaskExecutorCursor(std::shared_ptr<executor::TaskExecutor> e
_batchIter(_batch.end()) {
tassert(6253101, "rcr must have an opCtx to use construct cursor from response", rcr.opCtx);
if (_options.pinConnection) {
tassert(
12080500,
"TaskExecutorCursor in pinning mode must have an executor and an underlying executor",
_executor != nullptr && _underlyingExecutor != nullptr);
// It's possible we're registering a new token for an already registered underlying
// executor. That's fine since shutdown() and join() are idempotent, so having the same PCTE
// (or underlying executor) drained multiple times is safe.
_pcteToken = std::make_unique<PinnedExecutorRegistryToken>(
rcr.opCtx->getServiceContext(), _executor, _underlyingExecutor);
}
_lsid = rcr.opCtx->getLogicalSessionId();
_processResponse(rcr.opCtx, std::move(response));
}
@@ -101,6 +113,7 @@ TaskExecutorCursor::TaskExecutorCursor(std::shared_ptr<executor::TaskExecutor> e
TaskExecutorCursor::TaskExecutorCursor(TaskExecutorCursor&& other) noexcept
: _executor(std::move(other._executor)),
_underlyingExecutor(std::move(other._underlyingExecutor)),
_pcteToken(std::move(other._pcteToken)),
_rcr(other._rcr), // NOLINT
_options(std::move(other._options)),
_lsid(other._lsid), // NOLINT
@@ -168,8 +181,9 @@ TaskExecutorCursor::~TaskExecutorCursor() {
TaskExecutor::RemoteCommandCallbackFn callbackToRun = [](const auto&) {
};
if (_options.pinConnection) {
invariant(_underlyingExecutor,
"TaskExecutorCursor in pinning mode must have an underlying executor");
tassert(12080501,
"TaskExecutorCursor in pinning mode must have an underlying executor",
_underlyingExecutor != nullptr);
callbackToRun = [main = _executor, underlying = _underlyingExecutor](const auto&) {
underlying->schedule([main = std::move(main)](const auto&) {
if (MONGO_unlikely(

View File

@@ -31,13 +31,11 @@
#include "mongo/executor/task_executor_cursor.h"
#include "mongo/base/error_codes.h"
#include "mongo/base/status_with.h"
#include "mongo/base/string_data.h"
#include "mongo/bson/bsonelement.h"
#include "mongo/bson/bsonmisc.h"
#include "mongo/db/client.h"
#include "mongo/db/service_context.h"
#include "mongo/db/session/logical_session_id.h"
#include "mongo/executor/pinned_connection_task_executor_registry.h"
#include "mongo/executor/task_executor_cursor_test_fixture.h"
#include "mongo/stdx/thread.h"
#include "mongo/unittest/thread_assertion_monitor.h"
@@ -773,6 +771,143 @@ TEST_F(NonPinningDefaultTaskExecutorCursorTestFixture, MultipleCursorsCancellati
CancelTECWhileSharedPCTEInUse();
}
/**
* Tests that we make sure to destroy the PinnedConnectionTaskExecutor (PCTE) before shutting down
* the underlying executor, avoiding invariants from shutting down an executor with in-flight work.
*/
TEST_F(PinnedConnDefaultTaskExecutorCursorTestFixture,
PinnedCursorDrainedBeforeUnderlyingShutdownSucceeds) {
const auto aggCmd =
BSON("aggregate" << "test"
<< "pipeline" << BSON_ARRAY(BSON("returnMultipleCursors" << false)));
RemoteCommandRequest rcr(HostAndPort("localhost"),
DatabaseName::createDatabaseName_forTest(boost::none, "test"),
aggCmd,
opCtx.get());
TaskExecutorCursorOptions options(
/*pinConnection=*/true, /*batchSize=*/boost::none, /*preFetchNextBatch=*/true);
auto tec = std::make_unique<TaskExecutorCursor>(getExecutorPtr(), rcr, std::move(options));
// Do not schedule a response: leave the initial command in flight so a callback is pending.
executor::shutdownPinnedExecutors(opCtx->getServiceContext(), getExecutorPtr());
getExecutorPtr()->shutdown();
getExecutorPtr()->join();
// Cursor destroyed after PCTE was drained; no invariant.
ASSERT_NO_THROW(tec.reset());
}
/**
* Same as PinnedCursorDrainedBeforeUnderlyingShutdownSucceeds, but we keep an additional cursor
* (from the second TEC constructor) alive after destroying the parent. Verifies that the additional
* cursor's PCTE is drained before we shut down the underlying executor.
*/
TEST_F(PinnedConnDefaultTaskExecutorCursorTestFixture,
AdditionalPinnedCursorDrainedBeforeUnderlyingShutdownSucceeds) {
const auto aggCmd =
BSON("aggregate" << "test"
<< "pipeline" << BSON_ARRAY(BSON("returnMultipleCursors" << true)));
std::vector<size_t> cursorIds{1, 2};
RemoteCommandRequest rcr(HostAndPort("localhost"),
DatabaseName::createDatabaseName_forTest(boost::none, "test"),
aggCmd,
opCtx.get());
std::unique_ptr<TaskExecutorCursor> additionalCursor;
{
auto tec = makeTec(rcr, /*batchSize=*/boost::none, /*preFetchNextBatch=*/false);
ASSERT_BSONOBJ_EQ(aggCmd,
scheduleSuccessfulMultiCursorResponse("firstBatch", 1, 2, cursorIds));
ASSERT_EQUALS(tec->getNext(opCtx.get()).value()["x"].Int(), 1);
ASSERT_EQUALS(tec->getNext(opCtx.get()).value()["x"].Int(), 2);
auto cursorVec = tec->releaseAdditionalCursors();
ASSERT_EQUALS(cursorVec.size(), 1u);
additionalCursor = std::move(cursorVec[0]);
}
// Additional cursor (from second constructor) prefetches with default options; satisfy that
// getMore(2) first, then the killCursors(1) from the parent destructor.
ASSERT_BSONOBJ_EQ(BSON("getMore" << 2LL << "collection"
<< "test"),
scheduleSuccessfulCursorResponse("nextBatch", 2, 4, cursorIds[1]));
ASSERT_BSONOBJ_EQ(BSON("killCursors" << "test"
<< "cursors" << BSON_ARRAY((int)cursorIds[0])),
scheduleSuccessfulKillCursorResponse(cursorIds[0]));
// Trigger another getMore on the additional cursor and leave it in flight (no response).
ServiceContext* svc = opCtx->getServiceContext();
TaskExecutorCursor* additionalCursorPtr = additionalCursor.get();
stdx::thread getNextThread([svc, additionalCursorPtr]() {
auto client = svc->getService()->makeClient("AdditionalCursorGetNextThread");
auto threadOpCtx = client->makeOperationContext();
try {
// Consume batch then schedule getMore and block; will complete when we drain the
// PCTE or underlying shuts down.
while (additionalCursorPtr->getNext(threadOpCtx.get())) {
}
} catch (...) {
// Expected when we shutdown (InterruptedAtShutdown, CallbackCanceled, etc.).
}
});
ASSERT_TRUE(tryWaitUntilReadyRequests()) << "getMore should be in flight";
executor::shutdownPinnedExecutors(svc, getExecutorPtr());
getExecutorPtr()->shutdown();
getExecutorPtr()->join();
getNextThread.join();
ASSERT_NO_THROW(additionalCursor.reset());
}
/**
* Tests that after moving a pinned cursor, the PCTE remains in the registry so we can drain it
* before shutting down the underlying executor.
*/
TEST_F(PinnedConnDefaultTaskExecutorCursorTestFixture,
MovePinnedCursorThenCorrectShutdownOrderSucceeds) {
const auto aggCmd =
BSON("aggregate" << "test"
<< "pipeline" << BSON_ARRAY(BSON("returnMultipleCursors" << false)));
RemoteCommandRequest rcr(HostAndPort("localhost"),
DatabaseName::createDatabaseName_forTest(boost::none, "test"),
aggCmd,
opCtx.get());
TaskExecutorCursorOptions options(
/*pinConnection=*/true, /*batchSize=*/boost::none, /*preFetchNextBatch=*/true);
auto tec = std::make_unique<TaskExecutorCursor>(getExecutorPtr(), rcr, std::move(options));
// Do not schedule a response: leave the initial command in flight so a callback is pending.
auto movedCursor = std::make_unique<TaskExecutorCursor>(std::move(*tec));
tec.reset();
executor::shutdownPinnedExecutors(opCtx->getServiceContext(), getExecutorPtr());
getExecutorPtr()->shutdown();
getExecutorPtr()->join();
ASSERT_NO_THROW(movedCursor.reset());
}
/**
* Same as MovePinnedCursorThenCorrectShutdownOrderSucceeds, but we complete the initial command
* first so there is no callback in flight. Verifies that move + correct shutdown order succeeds.
*/
TEST_F(PinnedConnDefaultTaskExecutorCursorTestFixture, MovePinnedCursorThenShutdownSucceeds) {
const auto aggCmd =
BSON("aggregate" << "test"
<< "pipeline" << BSON_ARRAY(BSON("returnMultipleCursors" << false)));
RemoteCommandRequest rcr(HostAndPort("localhost"),
DatabaseName::createDatabaseName_forTest(boost::none, "test"),
aggCmd,
opCtx.get());
TaskExecutorCursorOptions options(
/*pinConnection=*/true, /*batchSize=*/boost::none, /*preFetchNextBatch=*/true);
auto tec = std::make_unique<TaskExecutorCursor>(getExecutorPtr(), rcr, std::move(options));
ASSERT_BSONOBJ_EQ(aggCmd, scheduleSuccessfulCursorResponse("firstBatch", 1, 2, 0));
ASSERT_TRUE(tec->getNext(opCtx.get())); // Consume so initial command is done.
auto movedCursor = std::make_unique<TaskExecutorCursor>(std::move(*tec));
tec.reset();
executor::shutdownPinnedExecutors(opCtx->getServiceContext(), getExecutorPtr());
getExecutorPtr()->shutdown();
getExecutorPtr()->join();
ASSERT_NO_THROW(movedCursor.reset());
}
} // namespace
} // namespace executor
} // namespace mongo