Compare commits
1 Commits
r5.0.22
...
server-490
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
120b213146 |
@@ -33,6 +33,7 @@
|
||||
#include "mongo/db/commands/server_status.h"
|
||||
#include "mongo/transport/message_compressor_registry.h"
|
||||
#include "mongo/transport/service_entry_point.h"
|
||||
#include "mongo/transport/service_executor_synchronous.h"
|
||||
#include "mongo/util/net/hostname_canonicalization.h"
|
||||
#include "mongo/util/net/socket_utils.h"
|
||||
#include "mongo/util/net/ssl_manager.h"
|
||||
@@ -77,12 +78,13 @@ public:
|
||||
return true;
|
||||
}
|
||||
|
||||
// TODO: need to track connections in server stats (see SERVER-49073)
|
||||
BSONObj generateSection(OperationContext* opCtx,
|
||||
const BSONElement& configElement) const override {
|
||||
BSONObjBuilder b;
|
||||
networkCounter.append(b);
|
||||
appendMessageCompressionStats(&b);
|
||||
auto executor = opCtx->getServiceContext()->getServiceExecutor();
|
||||
auto executor = transport::ServiceExecutorSynchronous::get(opCtx->getServiceContext());
|
||||
if (executor) {
|
||||
BSONObjBuilder section(b.subobjStart("serviceExecutorTaskStats"));
|
||||
executor->appendStats(§ion);
|
||||
|
||||
@@ -734,16 +734,7 @@ ExitCode _initAndListen(ServiceContext* serviceContext, int listenPort) {
|
||||
// operation context anymore
|
||||
startupOpCtx.reset();
|
||||
|
||||
auto start = serviceContext->getServiceExecutor()->start();
|
||||
if (!start.isOK()) {
|
||||
LOGV2_ERROR(20570,
|
||||
"Error starting service executor: {error}",
|
||||
"Error starting service executor",
|
||||
"error"_attr = start);
|
||||
return EXIT_NET_ERROR;
|
||||
}
|
||||
|
||||
start = serviceContext->getServiceEntryPoint()->start();
|
||||
auto start = serviceContext->getServiceEntryPoint()->start();
|
||||
if (!start.isOK()) {
|
||||
LOGV2_ERROR(20571,
|
||||
"Error starting service entry point: {error}",
|
||||
@@ -1279,18 +1270,6 @@ void shutdownTask(const ShutdownTaskArgs& shutdownArgs) {
|
||||
"Service entry point did not shutdown within the time limit");
|
||||
}
|
||||
}
|
||||
|
||||
// Shutdown and wait for the service executor to exit
|
||||
if (auto svcExec = serviceContext->getServiceExecutor()) {
|
||||
LOGV2_OPTIONS(4784924, {LogComponent::kExecutor}, "Shutting down the service executor");
|
||||
Status status = svcExec->shutdown(Seconds(10));
|
||||
if (!status.isOK()) {
|
||||
LOGV2_OPTIONS(20564,
|
||||
{LogComponent::kNetwork},
|
||||
"Service executor did not shutdown within the time limit",
|
||||
"error"_attr = status);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
LOGV2(4784925, "Shutting down free monitoring");
|
||||
|
||||
@@ -199,10 +199,6 @@ ServiceEntryPoint* ServiceContext::getServiceEntryPoint() const {
|
||||
return _serviceEntryPoint.get();
|
||||
}
|
||||
|
||||
transport::ServiceExecutor* ServiceContext::getServiceExecutor() const {
|
||||
return _serviceExecutor.get();
|
||||
}
|
||||
|
||||
void ServiceContext::setStorageEngine(std::unique_ptr<StorageEngine> engine) {
|
||||
invariant(engine);
|
||||
invariant(!_storageEngine);
|
||||
@@ -233,10 +229,6 @@ void ServiceContext::setTransportLayer(std::unique_ptr<transport::TransportLayer
|
||||
_transportLayer = std::move(tl);
|
||||
}
|
||||
|
||||
void ServiceContext::setServiceExecutor(std::unique_ptr<transport::ServiceExecutor> exec) {
|
||||
_serviceExecutor = std::move(exec);
|
||||
}
|
||||
|
||||
void ServiceContext::ClientDeleter::operator()(Client* client) const {
|
||||
ServiceContext* const service = client->getServiceContext();
|
||||
{
|
||||
|
||||
@@ -487,14 +487,6 @@ public:
|
||||
*/
|
||||
ServiceEntryPoint* getServiceEntryPoint() const;
|
||||
|
||||
/**
|
||||
* Get the service executor for the service context.
|
||||
*
|
||||
* See ServiceStateMachine for how this is used. Some configurations may not have a service
|
||||
* executor registered and this will return a nullptr.
|
||||
*/
|
||||
transport::ServiceExecutor* getServiceExecutor() const;
|
||||
|
||||
/**
|
||||
* Waits for the ServiceContext to be fully initialized and for all TransportLayers to have been
|
||||
* added/started.
|
||||
@@ -579,11 +571,6 @@ public:
|
||||
*/
|
||||
void setTransportLayer(std::unique_ptr<transport::TransportLayer> tl);
|
||||
|
||||
/**
|
||||
* Binds the service executor to the service context
|
||||
*/
|
||||
void setServiceExecutor(std::unique_ptr<transport::ServiceExecutor> exec);
|
||||
|
||||
/**
|
||||
* Creates a delayed execution baton with basic functionality
|
||||
*/
|
||||
|
||||
@@ -371,16 +371,6 @@ void cleanupTask(const ShutdownTaskArgs& shutdownArgs) {
|
||||
}
|
||||
}
|
||||
|
||||
// Shutdown and wait for the service executor to exit
|
||||
if (auto svcExec = serviceContext->getServiceExecutor()) {
|
||||
Status status = svcExec->shutdown(Seconds(5));
|
||||
if (!status.isOK()) {
|
||||
LOGV2_OPTIONS(22845,
|
||||
{LogComponent::kNetwork},
|
||||
"Service executor did not shutdown within the time limit",
|
||||
"error"_attr = status);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
// Shutdown Full-Time Data Capture
|
||||
@@ -788,15 +778,6 @@ ExitCode runMongosServer(ServiceContext* serviceContext) {
|
||||
std::make_unique<SessionsCollectionSharded>(),
|
||||
RouterSessionCatalog::reapSessionsOlderThan));
|
||||
|
||||
status = serviceContext->getServiceExecutor()->start();
|
||||
if (!status.isOK()) {
|
||||
LOGV2_ERROR(22859,
|
||||
"Error starting service executor: {error}",
|
||||
"Error starting service executor",
|
||||
"error"_attr = redact(status));
|
||||
return EXIT_NET_ERROR;
|
||||
}
|
||||
|
||||
status = serviceContext->getServiceEntryPoint()->start();
|
||||
if (!status.isOK()) {
|
||||
LOGV2_ERROR(22860,
|
||||
|
||||
@@ -478,10 +478,6 @@ int bridgeMain(int argc, char** argv) {
|
||||
setGlobalServiceContext(ServiceContext::make());
|
||||
auto serviceContext = getGlobalServiceContext();
|
||||
serviceContext->setServiceEntryPoint(std::make_unique<ServiceEntryPointBridge>(serviceContext));
|
||||
serviceContext->setServiceExecutor(
|
||||
std::make_unique<transport::ServiceExecutorSynchronous>(serviceContext));
|
||||
|
||||
fassert(50766, serviceContext->getServiceExecutor()->start());
|
||||
|
||||
transport::TransportLayerASIO::Options opts;
|
||||
opts.ipList.emplace_back("0.0.0.0");
|
||||
|
||||
@@ -96,6 +96,7 @@ tlEnv.Library(
|
||||
],
|
||||
LIBDEPS_PRIVATE=[
|
||||
"$BUILD_DIR/mongo/idl/server_parameter",
|
||||
"$BUILD_DIR/mongo/db/server_options_core",
|
||||
"$BUILD_DIR/mongo/util/concurrency/thread_pool",
|
||||
"$BUILD_DIR/mongo/util/processinfo",
|
||||
'$BUILD_DIR/third_party/shim_asio',
|
||||
@@ -178,7 +179,7 @@ tlEnv.CppUnitTest(
|
||||
'transport_layer_asio_test.cpp',
|
||||
'service_executor_test.cpp',
|
||||
'max_conns_override_test.cpp',
|
||||
'service_state_machine_test.cpp',
|
||||
# 'service_state_machine_test.cpp',
|
||||
],
|
||||
LIBDEPS=[
|
||||
'$BUILD_DIR/mongo/base',
|
||||
|
||||
@@ -122,12 +122,26 @@ ServiceEntryPointImpl::ServiceEntryPointImpl(ServiceContext* svcCtx) : _svcCtx(s
|
||||
}
|
||||
|
||||
Status ServiceEntryPointImpl::start() {
|
||||
if (_adminInternalPool)
|
||||
return _adminInternalPool->start();
|
||||
else
|
||||
return Status::OK();
|
||||
if (auto status = transport::ServiceExecutorSynchronous::get(_svcCtx)->start();
|
||||
!status.isOK()) {
|
||||
return status;
|
||||
}
|
||||
|
||||
if (auto exec = transport::ServiceExecutorReserved::get(_svcCtx); exec) {
|
||||
if (auto status = exec->start(); !status.isOK()) {
|
||||
return status;
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Reintroduce SEF once it is attached as initial SE in SERVER-49109
|
||||
// if (auto status = transport::ServiceExecutorFixed::get(_svcCtx)->start(); !status.isOK()) {
|
||||
// return status;
|
||||
// }
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
// TODO: explicitly start on the fixed executor
|
||||
void ServiceEntryPointImpl::startSession(transport::SessionHandle session) {
|
||||
// Setup the restriction environment on the Session, if the Session has local/remote Sockaddrs
|
||||
const auto& remoteAddr = session->remoteAddr();
|
||||
@@ -140,7 +154,7 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) {
|
||||
|
||||
const bool quiet = serverGlobalParams.quiet.load();
|
||||
size_t connectionCount;
|
||||
auto transportMode = _svcCtx->getServiceExecutor()->transportMode();
|
||||
auto transportMode = transport::ServiceExecutorSynchronous::get(_svcCtx)->transportMode();
|
||||
|
||||
auto ssm = ServiceStateMachine::create(_svcCtx, session, transportMode);
|
||||
auto usingMaxConnOverride = false;
|
||||
@@ -168,8 +182,9 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) {
|
||||
"connectionCount"_attr = connectionCount);
|
||||
}
|
||||
return;
|
||||
} else if (usingMaxConnOverride && _adminInternalPool) {
|
||||
ssm->setServiceExecutor(_adminInternalPool.get());
|
||||
} else if (auto exec = transport::ServiceExecutorReserved::get(_svcCtx);
|
||||
usingMaxConnOverride && exec) {
|
||||
ssm->setServiceExecutor(exec);
|
||||
}
|
||||
|
||||
if (!quiet) {
|
||||
@@ -256,6 +271,27 @@ bool ServiceEntryPointImpl::shutdown(Milliseconds timeout) {
|
||||
"shutdown: exhausted grace period active workers to drain; continuing with shutdown...",
|
||||
"workers"_attr = numOpenSessions());
|
||||
}
|
||||
|
||||
lk.unlock();
|
||||
// TODO: Reintroduce SEF once it is attached as initial SE in SERVER-49109
|
||||
// if (auto status = transport::ServiceExecutorFixed::get(_svcCtx)->shutdown(timeout -
|
||||
// timeSpent);
|
||||
// !status.isOK()) {
|
||||
// LOGV2(4907202, "Failed to shutdown ServiceExecutorFixed", "error"_attr = status);
|
||||
// }
|
||||
|
||||
if (auto exec = transport::ServiceExecutorReserved::get(_svcCtx)) {
|
||||
if (auto status = exec->shutdown(timeout - timeSpent); !status.isOK()) {
|
||||
LOGV2(4907201, "Failed to shutdown ServiceExecutorReserved", "error"_attr = status);
|
||||
}
|
||||
}
|
||||
|
||||
if (auto status =
|
||||
transport::ServiceExecutorSynchronous::get(_svcCtx)->shutdown(timeout - timeSpent);
|
||||
!status.isOK()) {
|
||||
LOGV2(4907200, "Failed to shutdown ServiceExecutorSynchronous", "error"_attr = status);
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
@@ -36,7 +36,9 @@
|
||||
#include "mongo/stdx/condition_variable.h"
|
||||
#include "mongo/stdx/variant.h"
|
||||
#include "mongo/transport/service_entry_point.h"
|
||||
#include "mongo/transport/service_executor_fixed.h"
|
||||
#include "mongo/transport/service_executor_reserved.h"
|
||||
#include "mongo/transport/service_executor_synchronous.h"
|
||||
#include "mongo/transport/service_state_machine.h"
|
||||
#include "mongo/util/hierarchical_acquisition.h"
|
||||
#include "mongo/util/net/cidr.h"
|
||||
|
||||
@@ -80,6 +80,18 @@ Status ServiceExecutorFixed::start() {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
const auto getServiceExecutorFixed =
|
||||
ServiceContext::declareDecoration<std::unique_ptr<ServiceExecutorFixed>>();
|
||||
|
||||
ServiceExecutorFixed* ServiceExecutorFixed::get(ServiceContext* ctx) {
|
||||
auto& ref = getServiceExecutorFixed(ctx);
|
||||
if (!ref) {
|
||||
ThreadPool::Options options{};
|
||||
ref = std::make_unique<ServiceExecutorFixed>(options);
|
||||
}
|
||||
return ref.get();
|
||||
}
|
||||
|
||||
Status ServiceExecutorFixed::shutdown(Milliseconds timeout) {
|
||||
auto waitForShutdown = [&]() mutable -> Status {
|
||||
stdx::unique_lock<Latch> lk(_mutex);
|
||||
|
||||
@@ -32,6 +32,7 @@
|
||||
#include <memory>
|
||||
|
||||
#include "mongo/base/status.h"
|
||||
#include "mongo/db/service_context.h"
|
||||
#include "mongo/platform/atomic_word.h"
|
||||
#include "mongo/platform/mutex.h"
|
||||
#include "mongo/stdx/condition_variable.h"
|
||||
@@ -54,6 +55,8 @@ public:
|
||||
explicit ServiceExecutorFixed(ThreadPool::Options options);
|
||||
virtual ~ServiceExecutorFixed();
|
||||
|
||||
static ServiceExecutorFixed* get(ServiceContext* ctx);
|
||||
|
||||
Status start() override;
|
||||
Status shutdown(Milliseconds timeout) override;
|
||||
Status schedule(Task task, ScheduleFlags flags) override;
|
||||
|
||||
@@ -33,6 +33,7 @@
|
||||
|
||||
#include "mongo/transport/service_executor_reserved.h"
|
||||
|
||||
#include "mongo/db/server_options.h"
|
||||
#include "mongo/logv2/log.h"
|
||||
#include "mongo/stdx/thread.h"
|
||||
#include "mongo/transport/service_entry_point_utils.h"
|
||||
@@ -147,6 +148,20 @@ Status ServiceExecutorReserved::_startWorker() {
|
||||
});
|
||||
}
|
||||
|
||||
const auto getServiceExecutorReserved =
|
||||
ServiceContext::declareDecoration<std::unique_ptr<ServiceExecutorReserved>>();
|
||||
|
||||
ServiceExecutorReserved* ServiceExecutorReserved::get(ServiceContext* ctx) {
|
||||
auto& ref = getServiceExecutorReserved(ctx);
|
||||
if (!ref) {
|
||||
if (serverGlobalParams.reservedAdminThreads) {
|
||||
ref = std::make_unique<transport::ServiceExecutorReserved>(
|
||||
ctx, "admin/internal connections", serverGlobalParams.reservedAdminThreads);
|
||||
} else
|
||||
return nullptr;
|
||||
}
|
||||
return ref.get();
|
||||
}
|
||||
|
||||
Status ServiceExecutorReserved::shutdown(Milliseconds timeout) {
|
||||
LOGV2_DEBUG(22980, 3, "Shutting down reserved executor");
|
||||
@@ -173,8 +188,8 @@ Status ServiceExecutorReserved::schedule(Task task, ScheduleFlags flags) {
|
||||
if (!_localWorkQueue.empty()) {
|
||||
// Execute task directly (recurse) if allowed by the caller as it produced better
|
||||
// performance in testing. Try to limit the amount of recursion so we don't blow up the
|
||||
// stack, even though this shouldn't happen with this executor that uses blocking network
|
||||
// I/O.
|
||||
// stack, even though this shouldn't happen with this executor that uses blocking
|
||||
// network I/O.
|
||||
if ((flags & ScheduleFlags::kMayRecurse) &&
|
||||
(_localRecursionDepth < reservedServiceExecutorRecursionLimit.loadRelaxed())) {
|
||||
++_localRecursionDepth;
|
||||
|
||||
@@ -32,6 +32,7 @@
|
||||
#include <deque>
|
||||
|
||||
#include "mongo/base/status.h"
|
||||
#include "mongo/db/service_context.h"
|
||||
#include "mongo/platform/atomic_word.h"
|
||||
#include "mongo/platform/mutex.h"
|
||||
#include "mongo/stdx/condition_variable.h"
|
||||
@@ -54,6 +55,8 @@ class ServiceExecutorReserved final : public ServiceExecutor {
|
||||
public:
|
||||
explicit ServiceExecutorReserved(ServiceContext* ctx, std::string name, size_t reservedThreads);
|
||||
|
||||
static ServiceExecutorReserved* get(ServiceContext* ctx);
|
||||
|
||||
Status start() override;
|
||||
Status shutdown(Milliseconds timeout) override;
|
||||
Status schedule(Task task, ScheduleFlags flags) override;
|
||||
|
||||
@@ -78,6 +78,17 @@ Status ServiceExecutorSynchronous::shutdown(Milliseconds timeout) {
|
||||
"passthrough executor couldn't shutdown all worker threads within time limit.");
|
||||
}
|
||||
|
||||
const auto getServiceExecutorSynchronous =
|
||||
ServiceContext::declareDecoration<std::unique_ptr<ServiceExecutorSynchronous>>();
|
||||
|
||||
ServiceExecutorSynchronous* ServiceExecutorSynchronous::get(ServiceContext* ctx) {
|
||||
auto& ref = getServiceExecutorSynchronous(ctx);
|
||||
if (!ref) {
|
||||
ref = std::make_unique<ServiceExecutorSynchronous>();
|
||||
}
|
||||
return ref.get();
|
||||
}
|
||||
|
||||
Status ServiceExecutorSynchronous::schedule(Task task, ScheduleFlags flags) {
|
||||
if (!_stillRunning.load()) {
|
||||
return Status{ErrorCodes::ShutdownInProgress, "Executor is not running"};
|
||||
|
||||
@@ -32,6 +32,7 @@
|
||||
#include <deque>
|
||||
|
||||
#include "mongo/base/status.h"
|
||||
#include "mongo/db/service_context.h"
|
||||
#include "mongo/platform/atomic_word.h"
|
||||
#include "mongo/platform/mutex.h"
|
||||
#include "mongo/stdx/condition_variable.h"
|
||||
@@ -47,7 +48,9 @@ namespace transport {
|
||||
*/
|
||||
class ServiceExecutorSynchronous final : public ServiceExecutor {
|
||||
public:
|
||||
explicit ServiceExecutorSynchronous(ServiceContext* ctx);
|
||||
explicit ServiceExecutorSynchronous(ServiceContext* ctx = getGlobalServiceContext());
|
||||
|
||||
static ServiceExecutorSynchronous* get(ServiceContext* ctx);
|
||||
|
||||
Status start() override;
|
||||
Status shutdown(Milliseconds timeout) override;
|
||||
|
||||
@@ -45,6 +45,7 @@
|
||||
#include "mongo/rpc/op_msg.h"
|
||||
#include "mongo/transport/message_compressor_manager.h"
|
||||
#include "mongo/transport/service_entry_point.h"
|
||||
#include "mongo/transport/service_executor_synchronous.h"
|
||||
#include "mongo/transport/session.h"
|
||||
#include "mongo/transport/transport_layer.h"
|
||||
#include "mongo/util/assert_util.h"
|
||||
@@ -300,11 +301,11 @@ ServiceStateMachine::ServiceStateMachine(ServiceContext* svcContext,
|
||||
_sep{svcContext->getServiceEntryPoint()},
|
||||
_transportMode(transportMode),
|
||||
_serviceContext(svcContext),
|
||||
_serviceExecutor(_serviceContext->getServiceExecutor()),
|
||||
_sessionHandle(session),
|
||||
_threadName{str::stream() << "conn" << _session()->id()},
|
||||
_dbClient{svcContext->makeClient(_threadName, std::move(session))},
|
||||
_dbClientPtr{_dbClient.get()} {}
|
||||
_dbClientPtr{_dbClient.get()},
|
||||
_serviceExecutor(transport::ServiceExecutorSynchronous::get(_serviceContext)) {}
|
||||
|
||||
const transport::SessionHandle& ServiceStateMachine::_session() const {
|
||||
return _sessionHandle;
|
||||
|
||||
@@ -230,12 +230,12 @@ private:
|
||||
transport::Mode _transportMode;
|
||||
|
||||
ServiceContext* const _serviceContext;
|
||||
transport::ServiceExecutor* _serviceExecutor;
|
||||
|
||||
transport::SessionHandle _sessionHandle;
|
||||
const std::string _threadName;
|
||||
ServiceContext::UniqueClient _dbClient;
|
||||
const Client* _dbClientPtr;
|
||||
transport::ServiceExecutor* _serviceExecutor;
|
||||
std::function<void()> _cleanupHook;
|
||||
|
||||
bool _inExhaust = false;
|
||||
|
||||
@@ -137,8 +137,6 @@ std::unique_ptr<TransportLayer> TransportLayerManager::createWithConfig(
|
||||
transport::TransportLayerASIO::Options opts(config);
|
||||
opts.transportMode = transport::Mode::kSynchronous;
|
||||
|
||||
ctx->setServiceExecutor(std::make_unique<ServiceExecutorSynchronous>(ctx));
|
||||
|
||||
std::vector<std::unique_ptr<TransportLayer>> retVector;
|
||||
retVector.emplace_back(std::make_unique<transport::TransportLayerASIO>(opts, sep));
|
||||
return std::make_unique<TransportLayerManager>(std::move(retVector));
|
||||
|
||||
Reference in New Issue
Block a user