Compare commits
17 Commits
WT-10598-s
...
r5.3
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bd2abb1b3c | ||
|
|
10f64703a6 | ||
|
|
914d57bf82 | ||
|
|
6ff120d1ad | ||
|
|
624df79cbb | ||
|
|
be4b2f2a05 | ||
|
|
0a8923221d | ||
|
|
bd27885ec3 | ||
|
|
a93d373852 | ||
|
|
c6cf3ce83c | ||
|
|
59e5dbbc35 | ||
|
|
e3a6fe7911 | ||
|
|
49ef39ef5e | ||
|
|
a60910e95b | ||
|
|
e2aebdba1b | ||
|
|
77fa4bdc64 | ||
|
|
b2eb28a45a |
@@ -61,8 +61,8 @@ def generate_version_expansions():
|
||||
raise ValueError("Unable to parse version from stdin and no version.json provided")
|
||||
|
||||
if version_parts[0]:
|
||||
expansions["suffix"] = "latest"
|
||||
expansions["src_suffix"] = "latest"
|
||||
expansions["suffix"] = "v5.3-latest"
|
||||
expansions["src_suffix"] = "v5.3-latest"
|
||||
expansions["is_release"] = "false"
|
||||
else:
|
||||
expansions["suffix"] = version_line
|
||||
|
||||
@@ -61,8 +61,8 @@ def generate_version_expansions():
|
||||
raise ValueError("Unable to parse version from stdin and no version.json provided")
|
||||
|
||||
if version_parts[0]:
|
||||
expansions["suffix"] = "latest"
|
||||
expansions["src_suffix"] = "latest"
|
||||
expansions["suffix"] = "v5.3-latest"
|
||||
expansions["src_suffix"] = "v5.3-latest"
|
||||
expansions["is_release"] = "false"
|
||||
else:
|
||||
expansions["suffix"] = version_line
|
||||
|
||||
@@ -371,7 +371,6 @@ variables:
|
||||
- enterprise-windows-debug-unoptimized
|
||||
- enterprise-windows-inmem
|
||||
- enterprise-windows-required
|
||||
- enterprise-windows-wtdevelop
|
||||
- ubuntu1804-debug-asan
|
||||
- ubuntu1804-debug-ubsan
|
||||
- ubuntu1804-debug-aubsan-lite-required
|
||||
@@ -480,7 +479,6 @@ functions:
|
||||
directory: ${git_project_directory|src}
|
||||
revisions: # for each module include revision as <module_name> : ${<module_name>_rev}
|
||||
enterprise: ${enterprise_rev}
|
||||
wtdevelop: ${wtdevelop_rev}
|
||||
|
||||
# Get get the mongo repo, no modules. Useful for inspecting the commit history with the
|
||||
# `git` Python tool.
|
||||
@@ -649,7 +647,7 @@ functions:
|
||||
"get buildnumber": &get_buildnumber
|
||||
command: keyval.inc
|
||||
params:
|
||||
key: "${build_variant}_master"
|
||||
key: "${build_variant}_v5.3"
|
||||
destination: "builder_num"
|
||||
|
||||
"run diskstats": &run_diskstats
|
||||
@@ -7621,12 +7619,7 @@ modules:
|
||||
- name: enterprise
|
||||
repo: git@github.com:10gen/mongo-enterprise-modules.git
|
||||
prefix: src/mongo/db/modules
|
||||
branch: master
|
||||
|
||||
- name: wtdevelop
|
||||
repo: git@github.com:wiredtiger/wiredtiger.git
|
||||
prefix: src/third_party
|
||||
branch: develop
|
||||
branch: v5.3
|
||||
|
||||
#######################################
|
||||
# Buildvariants #
|
||||
@@ -7701,19 +7694,6 @@ buildvariants:
|
||||
- rhel80-large
|
||||
- name: generate_buildid_to_debug_symbols_mapping
|
||||
|
||||
- <<: *linux-64-debug-required-template
|
||||
name: linux-64-debug-wtdevelop
|
||||
display_name: "~ Linux DEBUG WiredTiger develop"
|
||||
cron: "0 */4 * * *" # Every 4 hours starting at midnight
|
||||
modules:
|
||||
- wtdevelop
|
||||
expansions:
|
||||
use_wt_develop: true
|
||||
resmoke_jobs_factor: 0.5 # Avoid starting too many mongod's
|
||||
compile_flags: --dbg=on --opt=on -j$(grep -c ^processor /proc/cpuinfo) --variables-files=etc/scons/mongodbtoolchain_v3_gcc.vars --enable-free-mon=on --enable-http-client=on
|
||||
scons_cache_mode: nolinked
|
||||
test_flags: --excludeWithAnyTags=requires_http_client
|
||||
|
||||
- name: linux-64-duroff
|
||||
display_name: Linux (No Journal)
|
||||
cron: "0 12 * * *" # Every day starting at 12:00
|
||||
@@ -9155,17 +9135,6 @@ buildvariants:
|
||||
- windows-vsCurrent-large
|
||||
- name: .benchmarks !benchmarks_orphaned
|
||||
|
||||
- <<: *enterprise-windows-nopush-template
|
||||
name: enterprise-windows-wtdevelop
|
||||
display_name: "~ Enterprise Windows WiredTiger develop"
|
||||
cron: "0 */4 * * *" # Every 4 hours starting at midnight
|
||||
modules:
|
||||
- enterprise
|
||||
- wtdevelop
|
||||
expansions:
|
||||
<<: *enterprise-windows-nopush-expansions-template
|
||||
use_wt_develop: true
|
||||
|
||||
- name: enterprise-windows-ninja
|
||||
display_name: "Ninja Build: Enterprise Windows"
|
||||
cron: "0 0 * * *" # Every day starting at midnight
|
||||
|
||||
@@ -104,7 +104,7 @@ modules:
|
||||
- name: enterprise
|
||||
repo: git@github.com:10gen/mongo-enterprise-modules.git
|
||||
prefix: src/mongo/db/modules
|
||||
branch: master
|
||||
branch: v5.3
|
||||
- name: mongo-tools
|
||||
repo: git@github.com:mongodb/mongo-tools.git
|
||||
prefix: mongo-tools/src/github.com/mongodb
|
||||
|
||||
@@ -121,7 +121,7 @@ modules:
|
||||
- name: enterprise
|
||||
repo: git@github.com:10gen/mongo-enterprise-modules.git
|
||||
prefix: src/mongo/db/modules
|
||||
branch: master
|
||||
branch: v5.3
|
||||
- name: mongo-tools
|
||||
repo: git@github.com:mongodb/mongo-tools.git
|
||||
prefix: mongo-tools/src/github.com/mongodb
|
||||
|
||||
@@ -5,7 +5,15 @@
|
||||
(function() {
|
||||
'use strict';
|
||||
|
||||
var st = new ShardingTest({mongos: 1, shards: 3});
|
||||
const chunkSizeMB = 1;
|
||||
|
||||
let st = new ShardingTest({
|
||||
shards: 3,
|
||||
other: {
|
||||
// Set global max chunk size to 1MB
|
||||
chunkSize: chunkSizeMB
|
||||
}
|
||||
});
|
||||
|
||||
function runBalancer(rounds) {
|
||||
st.startBalancer();
|
||||
@@ -28,7 +36,7 @@ assert.commandFailedWithCode(st.s0.adminCommand({balancerCollectionStatus: 'db'}
|
||||
|
||||
// only sharded databases are allowed
|
||||
assert.commandFailedWithCode(st.s0.adminCommand({balancerCollectionStatus: 'db.col'}),
|
||||
ErrorCodes.NamespaceNotFound);
|
||||
ErrorCodes.NamespaceNotSharded);
|
||||
|
||||
// setup the collection for the test
|
||||
assert.commandWorked(st.s0.adminCommand({enableSharding: 'db'}));
|
||||
@@ -39,13 +47,13 @@ assert.commandWorked(st.s0.getDB('db').runCommand({create: "col2"}));
|
||||
assert.commandFailedWithCode(st.s0.adminCommand({balancerCollectionStatus: 'db.col2'}),
|
||||
ErrorCodes.NamespaceNotSharded);
|
||||
|
||||
var result = assert.commandWorked(st.s0.adminCommand({balancerCollectionStatus: 'db.col'}));
|
||||
let result = assert.commandWorked(st.s0.adminCommand({balancerCollectionStatus: 'db.col'}));
|
||||
|
||||
// new collections must be balanced
|
||||
assert.eq(result.balancerCompliant, true);
|
||||
|
||||
// get shardIds
|
||||
var shards = st.s0.getDB('config').shards.find().toArray();
|
||||
const shards = st.s0.getDB('config').shards.find().toArray();
|
||||
|
||||
// manually split and place the 3 chunks on the same shard
|
||||
assert.commandWorked(st.s0.adminCommand({split: 'db.col', middle: {key: 10}}));
|
||||
@@ -98,5 +106,11 @@ result = assert.commandWorked(st.s0.adminCommand({balancerCollectionStatus: 'db.
|
||||
// All chunks are balanced and in the correct zone
|
||||
assert.eq(result.balancerCompliant, true);
|
||||
|
||||
const configDB = st.configRS.getPrimary().getDB('config');
|
||||
const fcvDoc = configDB.adminCommand({getParameter: 1, featureCompatibilityVersion: 1});
|
||||
if (MongoRunner.compareBinVersions(fcvDoc.featureCompatibilityVersion.version, '5.3') >= 0) {
|
||||
// Ensure that the expected chunk size is part of the response.
|
||||
assert.eq(result.chunkSize, chunkSizeMB);
|
||||
}
|
||||
st.stop();
|
||||
})();
|
||||
})();
|
||||
|
||||
@@ -957,40 +957,67 @@ void Balancer::abortCollectionDefragmentation(OperationContext* opCtx, const Nam
|
||||
_defragmentationPolicy->abortCollectionDefragmentation(opCtx, nss);
|
||||
}
|
||||
|
||||
Balancer::BalancerStatus Balancer::getBalancerStatusForNs(OperationContext* opCtx,
|
||||
const NamespaceString& ns) {
|
||||
BalancerCollectionStatusResponse Balancer::getBalancerStatusForNs(OperationContext* opCtx,
|
||||
const NamespaceString& ns) {
|
||||
CollectionType coll;
|
||||
try {
|
||||
auto coll = Grid::get(opCtx)->catalogClient()->getCollection(opCtx, ns, {});
|
||||
bool isDefragmenting = coll.getDefragmentCollection();
|
||||
if (isDefragmenting) {
|
||||
return {false,
|
||||
kBalancerPolicyStatusDefragmentingChunks.toString(),
|
||||
_defragmentationPolicy->reportProgressOn(coll.getUuid())};
|
||||
coll = Grid::get(opCtx)->catalogClient()->getCollection(opCtx, ns, {});
|
||||
} catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>&) {
|
||||
uasserted(ErrorCodes::NamespaceNotSharded, "Collection unsharded or undefined");
|
||||
}
|
||||
|
||||
const auto maxChunkSizeMB = [&]() -> int64_t {
|
||||
int64_t value = 0;
|
||||
if (const auto& collOverride = coll.getMaxChunkSizeBytes(); collOverride.is_initialized()) {
|
||||
value = *collOverride;
|
||||
} else {
|
||||
auto balancerConfig = Grid::get(opCtx)->getBalancerConfiguration();
|
||||
uassertStatusOK(balancerConfig->refreshAndCheck(opCtx));
|
||||
value = balancerConfig->getMaxChunkSizeBytes();
|
||||
}
|
||||
} catch (DBException&) {
|
||||
// Catch exceptions to keep consistency with errors thrown before defragmentation
|
||||
return value / (1024 * 1024);
|
||||
}();
|
||||
BalancerCollectionStatusResponse response(maxChunkSizeMB, true /*balancerCompliant*/);
|
||||
auto setViolationOnResponse = [&response](const StringData& reason,
|
||||
const boost::optional<BSONObj>& details =
|
||||
boost::none) {
|
||||
response.setBalancerCompliant(false);
|
||||
response.setFirstComplianceViolation(reason);
|
||||
response.setDetails(details);
|
||||
};
|
||||
|
||||
bool isDefragmenting = coll.getDefragmentCollection();
|
||||
if (isDefragmenting) {
|
||||
setViolationOnResponse(kBalancerPolicyStatusDefragmentingChunks,
|
||||
_defragmentationPolicy->reportProgressOn(coll.getUuid()));
|
||||
return response;
|
||||
}
|
||||
|
||||
auto splitChunks = uassertStatusOK(_chunkSelectionPolicy->selectChunksToSplit(opCtx, ns));
|
||||
if (!splitChunks.empty()) {
|
||||
return {false, kBalancerPolicyStatusZoneViolation.toString()};
|
||||
setViolationOnResponse(kBalancerPolicyStatusZoneViolation);
|
||||
return response;
|
||||
}
|
||||
|
||||
auto chunksToMove = uassertStatusOK(_chunkSelectionPolicy->selectChunksToMove(opCtx, ns));
|
||||
if (chunksToMove.empty()) {
|
||||
return {true, boost::none, boost::none};
|
||||
return response;
|
||||
}
|
||||
const auto& migrationInfo = chunksToMove.front();
|
||||
|
||||
const auto& migrationInfo = chunksToMove.front();
|
||||
switch (migrationInfo.reason) {
|
||||
case MigrateInfo::drain:
|
||||
return {false, kBalancerPolicyStatusDraining.toString(), boost::none};
|
||||
setViolationOnResponse(kBalancerPolicyStatusDraining);
|
||||
break;
|
||||
case MigrateInfo::zoneViolation:
|
||||
return {false, kBalancerPolicyStatusZoneViolation.toString(), boost::none};
|
||||
setViolationOnResponse(kBalancerPolicyStatusZoneViolation);
|
||||
break;
|
||||
case MigrateInfo::chunksImbalance:
|
||||
return {false, kBalancerPolicyStatusChunksImbalance.toString(), boost::none};
|
||||
setViolationOnResponse(kBalancerPolicyStatusChunksImbalance);
|
||||
break;
|
||||
}
|
||||
|
||||
return {true, boost::none, boost::none};
|
||||
return response;
|
||||
}
|
||||
|
||||
} // namespace mongo
|
||||
|
||||
@@ -33,6 +33,7 @@
|
||||
#include "mongo/db/s/balancer/balancer_chunk_selection_policy.h"
|
||||
#include "mongo/db/s/balancer/balancer_random.h"
|
||||
#include "mongo/platform/mutex.h"
|
||||
#include "mongo/s/request_types/balancer_collection_status_gen.h"
|
||||
#include "mongo/stdx/condition_variable.h"
|
||||
#include "mongo/stdx/thread.h"
|
||||
|
||||
@@ -183,16 +184,12 @@ public:
|
||||
*/
|
||||
void abortCollectionDefragmentation(OperationContext* opCtx, const NamespaceString& nss);
|
||||
|
||||
struct BalancerStatus {
|
||||
bool balancerCompliant;
|
||||
boost::optional<std::string> firstComplianceViolation;
|
||||
boost::optional<BSONObj> details;
|
||||
};
|
||||
/**
|
||||
* Returns if a given collection is draining due to a removed shard, has chunks on an invalid
|
||||
* zone or the number of chunks is imbalanced across the cluster
|
||||
*/
|
||||
BalancerStatus getBalancerStatusForNs(OperationContext* opCtx, const NamespaceString& nss);
|
||||
BalancerCollectionStatusResponse getBalancerStatusForNs(OperationContext* opCtx,
|
||||
const NamespaceString& nss);
|
||||
|
||||
private:
|
||||
/**
|
||||
|
||||
@@ -169,15 +169,13 @@ BalancerCommandsSchedulerImpl::~BalancerCommandsSchedulerImpl() {
|
||||
|
||||
void BalancerCommandsSchedulerImpl::start(OperationContext* opCtx,
|
||||
const MigrationsRecoveryDefaultValues& defaultValues) {
|
||||
auto requestsToRecover = rebuildRequestsFromRecoveryInfo(opCtx, defaultValues);
|
||||
LOGV2(5847200,
|
||||
"Balancer command scheduler start requested",
|
||||
"numRequestsToRecover"_attr = requestsToRecover.size());
|
||||
LOGV2(5847200, "Balancer command scheduler start requested");
|
||||
stdx::lock_guard<Latch> lg(_mutex);
|
||||
invariant(!_workerThreadHandle.joinable());
|
||||
if (!_executor) {
|
||||
_executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
|
||||
}
|
||||
auto requestsToRecover = rebuildRequestsFromRecoveryInfo(opCtx, defaultValues);
|
||||
_numRequestsToRecover = requestsToRecover.size();
|
||||
_state = _numRequestsToRecover == 0 ? SchedulerState::Running : SchedulerState::Recovering;
|
||||
|
||||
|
||||
@@ -71,15 +71,7 @@ public:
|
||||
uassert(ErrorCodes::InvalidNamespace,
|
||||
str::stream() << "Invalid namespace specified '" << nss.ns() << "'",
|
||||
nss.isValid());
|
||||
const auto& balancerStatus = Balancer::get(opCtx)->getBalancerStatusForNs(opCtx, nss);
|
||||
Response response(balancerStatus.balancerCompliant);
|
||||
response.setDetails(balancerStatus.details);
|
||||
response.setFirstComplianceViolation(
|
||||
balancerStatus.firstComplianceViolation.is_initialized()
|
||||
? boost::optional<StringData>(
|
||||
StringData(*balancerStatus.firstComplianceViolation))
|
||||
: boost::optional<StringData>(boost::none));
|
||||
return response;
|
||||
return Balancer::get(opCtx)->getBalancerStatusForNs(opCtx, nss);
|
||||
}
|
||||
|
||||
private:
|
||||
|
||||
@@ -109,9 +109,11 @@ bool checkMetadataForSuccessfulSplitChunk(OperationContext* opCtx,
|
||||
ChunkType nextChunk;
|
||||
for (auto it = splitPoints.begin(); it != splitPoints.end(); ++it) {
|
||||
// Check that all new chunks fit the new chunk boundaries
|
||||
const auto& currentChunkMinKey = it == splitPoints.begin() ? chunkRange.getMin() : *it;
|
||||
const auto& currentChunkMinKey =
|
||||
it == splitPoints.begin() ? chunkRange.getMin() : *std::prev(it);
|
||||
const auto& currentChunkMaxKey = *it;
|
||||
if (!metadataAfterSplit->getNextChunk(currentChunkMinKey, &nextChunk) ||
|
||||
nextChunk.getMax().woCompare(*it)) {
|
||||
nextChunk.getMax().woCompare(currentChunkMaxKey)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -39,6 +39,9 @@ structs:
|
||||
description: "Response of the config server command"
|
||||
strict: false
|
||||
fields:
|
||||
chunkSize:
|
||||
type: safeInt64
|
||||
description: "Configured chunk size in MiB for this collection"
|
||||
balancerCompliant:
|
||||
type: bool
|
||||
description: "true if there are no actions needed, if false, then firstComplianceViolation will contain the violation with the highest priority which will be addressed next"
|
||||
|
||||
9
src/third_party/wiredtiger/cmake/toolchains/mongodbtoolchain_v4_clang.cmake
vendored
Normal file
9
src/third_party/wiredtiger/cmake/toolchains/mongodbtoolchain_v4_clang.cmake
vendored
Normal file
@@ -0,0 +1,9 @@
|
||||
cmake_minimum_required(VERSION 3.10.0)
|
||||
|
||||
if(NOT TOOLCHAIN_ROOT)
|
||||
set(TOOLCHAIN_ROOT "/opt/mongodbtoolchain/v4")
|
||||
endif()
|
||||
|
||||
set(CMAKE_C_COMPILER "${TOOLCHAIN_ROOT}/bin/clang")
|
||||
set(CMAKE_CXX_COMPILER "${TOOLCHAIN_ROOT}/bin/clang++")
|
||||
set(CMAKE_ASM_COMPILER "${TOOLCHAIN_ROOT}/bin/clang")
|
||||
9
src/third_party/wiredtiger/cmake/toolchains/mongodbtoolchain_v4_gcc.cmake
vendored
Normal file
9
src/third_party/wiredtiger/cmake/toolchains/mongodbtoolchain_v4_gcc.cmake
vendored
Normal file
@@ -0,0 +1,9 @@
|
||||
cmake_minimum_required(VERSION 3.10.0)
|
||||
|
||||
if(NOT TOOLCHAIN_ROOT)
|
||||
set(TOOLCHAIN_ROOT "/opt/mongodbtoolchain/v4")
|
||||
endif()
|
||||
|
||||
set(CMAKE_C_COMPILER "${TOOLCHAIN_ROOT}/bin/gcc")
|
||||
set(CMAKE_CXX_COMPILER "${TOOLCHAIN_ROOT}/bin/g++")
|
||||
set(CMAKE_ASM_COMPILER "${TOOLCHAIN_ROOT}/bin/gcc")
|
||||
@@ -134,12 +134,14 @@ S3Connection::GetObject(const std::string &objectKey, const std::string &path) c
|
||||
|
||||
/*
|
||||
* ObjectExists --
|
||||
* Checks whether an object with the given key exists in the S3 bucket.
|
||||
* Checks whether an object with the given key exists in the S3 bucket and also retrieves
|
||||
* size of the object.
|
||||
*/
|
||||
int
|
||||
S3Connection::ObjectExists(const std::string &objectKey, bool &exists) const
|
||||
S3Connection::ObjectExists(const std::string &objectKey, bool &exists, size_t &objectSize) const
|
||||
{
|
||||
exists = false;
|
||||
objectSize = 0;
|
||||
|
||||
Aws::S3Crt::Model::HeadObjectRequest request;
|
||||
request.SetBucket(_bucketName);
|
||||
@@ -152,6 +154,7 @@ S3Connection::ObjectExists(const std::string &objectKey, bool &exists) const
|
||||
*/
|
||||
if (outcome.IsSuccess()) {
|
||||
exists = true;
|
||||
objectSize = outcome.GetResult().GetContentLength();
|
||||
return (0);
|
||||
} else if (outcome.GetError().GetResponseCode() == Aws::Http::HttpResponseCode::NOT_FOUND)
|
||||
return (0);
|
||||
|
||||
@@ -20,7 +20,7 @@ class S3Connection {
|
||||
uint32_t batchSize = 1000, bool listSingle = false) const;
|
||||
int PutObject(const std::string &objectKey, const std::string &fileName) const;
|
||||
int DeleteObject(const std::string &objectKey) const;
|
||||
int ObjectExists(const std::string &objectKey, bool &exists) const;
|
||||
int ObjectExists(const std::string &objectKey, bool &exists, size_t &objectSize) const;
|
||||
int GetObject(const std::string &objectKey, const std::string &path) const;
|
||||
|
||||
~S3Connection() = default;
|
||||
|
||||
@@ -2,14 +2,9 @@
|
||||
#include "s3_log_system.h"
|
||||
#include <cstdarg>
|
||||
|
||||
S3LogSystem::S3LogSystem(WT_EXTENSION_API *wtApi, uint32_t wtVerbosityLevel)
|
||||
: _wtApi(wtApi), _wtVerbosityLevel(wtVerbosityLevel)
|
||||
S3LogSystem::S3LogSystem(WT_EXTENSION_API *wtApi, uint32_t wtVerbosityLevel) : _wtApi(wtApi)
|
||||
{
|
||||
// If the verbosity level is out of range it will default to AWS SDK Error level.
|
||||
if (verbosityMapping.find(wtVerbosityLevel) != verbosityMapping.end())
|
||||
_awsLogLevel = verbosityMapping.at(wtVerbosityLevel);
|
||||
else
|
||||
_awsLogLevel = Aws::Utils::Logging::LogLevel::Error;
|
||||
SetWtVerbosityLevel(wtVerbosityLevel);
|
||||
}
|
||||
|
||||
void
|
||||
@@ -52,10 +47,39 @@ S3LogSystem::LogAwsMessage(const char *tag, const std::string &message) const
|
||||
}
|
||||
|
||||
void
|
||||
S3LogSystem::LogVerboseMessage(int32_t verbosityLevel, const std::string &message)
|
||||
S3LogSystem::LogVerboseMessage(int32_t verbosityLevel, const std::string &message) const
|
||||
{
|
||||
if (verbosityLevel <= _wtVerbosityLevel)
|
||||
_wtApi->err_printf(_wtApi, NULL, "%s", message.c_str());
|
||||
if (verbosityLevel <= _wtVerbosityLevel) {
|
||||
/* Use err_printf for error and warning messages and use msg_printf for notice, info and
|
||||
* debug messages. */
|
||||
if (verbosityLevel < -1)
|
||||
_wtApi->err_printf(_wtApi, NULL, "%s", message.c_str());
|
||||
else
|
||||
_wtApi->msg_printf(_wtApi, NULL, "%s", message.c_str());
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
S3LogSystem::LogVerboseErrorMessage(const std::string &message) const
|
||||
{
|
||||
LogVerboseMessage(WT_VERBOSE_ERROR, message);
|
||||
}
|
||||
|
||||
void
|
||||
S3LogSystem::LogVerboseDebugMessage(const std::string &message) const
|
||||
{
|
||||
LogVerboseMessage(WT_VERBOSE_DEBUG, message);
|
||||
}
|
||||
|
||||
void
|
||||
S3LogSystem::SetWtVerbosityLevel(int32_t wtVerbosityLevel)
|
||||
{
|
||||
_wtVerbosityLevel = wtVerbosityLevel;
|
||||
/* If the verbosity level is out of range it will default to AWS SDK Error level. */
|
||||
if (verbosityMapping.find(_wtVerbosityLevel) != verbosityMapping.end())
|
||||
_awsLogLevel = verbosityMapping.at(_wtVerbosityLevel);
|
||||
else
|
||||
_awsLogLevel = Aws::Utils::Logging::LogLevel::Error;
|
||||
}
|
||||
|
||||
void
|
||||
|
||||
@@ -26,11 +26,14 @@ class S3LogSystem : public Aws::Utils::Logging::LogSystemInterface {
|
||||
Aws::Utils::Logging::LogLevel logLevel, const char *tag, const char *format, ...) override;
|
||||
void LogStream(Aws::Utils::Logging::LogLevel logLevel, const char *tag,
|
||||
const Aws::OStringStream &messageStream) override;
|
||||
void LogVerboseErrorMessage(const std::string &message) const;
|
||||
void LogVerboseDebugMessage(const std::string &message) const;
|
||||
void SetWtVerbosityLevel(int32_t wtVerbosityLevel);
|
||||
void Flush() override;
|
||||
|
||||
private:
|
||||
void LogAwsMessage(const char *tag, const std::string &message) const;
|
||||
void LogVerboseMessage(int32_t verbosityLevel, const std::string &message);
|
||||
void LogVerboseMessage(int32_t verbosityLevel, const std::string &message) const;
|
||||
std::atomic<Aws::Utils::Logging::LogLevel> _awsLogLevel;
|
||||
WT_EXTENSION_API *_wtApi;
|
||||
int32_t _wtVerbosityLevel;
|
||||
|
||||
@@ -62,6 +62,7 @@ struct S3_STATISTICS {
|
||||
struct S3_STORAGE {
|
||||
WT_STORAGE_SOURCE storageSource; /* Must come first */
|
||||
WT_EXTENSION_API *wtApi; /* Extension API */
|
||||
std::shared_ptr<S3LogSystem> log;
|
||||
|
||||
std::mutex fsListMutex; /* Protect the file system list */
|
||||
std::list<S3_FILE_SYSTEM *> fsList; /* List of initiated file systems */
|
||||
@@ -85,7 +86,6 @@ struct S3_FILE_SYSTEM {
|
||||
*/
|
||||
WT_FILE_SYSTEM *wtFileSystem;
|
||||
S3Connection *connection;
|
||||
S3LogSystem *log;
|
||||
std::string cacheDir; /* Directory for cached objects */
|
||||
std::string homeDir; /* Owned by the connection */
|
||||
};
|
||||
@@ -102,14 +102,14 @@ struct S3_FILE_HANDLE {
|
||||
};
|
||||
|
||||
/* Configuration variables for connecting to S3CrtClient. */
|
||||
const Aws::String region = Aws::Region::AP_SOUTHEAST_2;
|
||||
const double throughputTargetGbps = 5;
|
||||
const uint64_t partSize = 8 * 1024 * 1024; /* 8 MB. */
|
||||
|
||||
/* Setting SDK options. */
|
||||
Aws::SDKOptions options;
|
||||
|
||||
static int S3GetDirectory(const std::string &, const std::string &, bool, std::string &);
|
||||
static int S3GetDirectory(
|
||||
const S3_STORAGE &, const std::string &, const std::string &, bool, std::string &);
|
||||
static bool S3CacheExists(WT_FILE_SYSTEM *, const std::string &);
|
||||
static std::string S3Path(const std::string &, const std::string &);
|
||||
static std::string S3HomePath(WT_FILE_SYSTEM *, const char *);
|
||||
@@ -126,13 +126,16 @@ static int S3FileRead(WT_FILE_HANDLE *, WT_SESSION *, wt_off_t, size_t, void *);
|
||||
static int S3ObjectList(
|
||||
WT_FILE_SYSTEM *, WT_SESSION *, const char *, const char *, char ***, uint32_t *);
|
||||
static int S3ObjectListAdd(
|
||||
S3_STORAGE *, char ***, const std::vector<std::string> &, const uint32_t);
|
||||
const S3_STORAGE &, char ***, const std::vector<std::string> &, const uint32_t);
|
||||
static int S3ObjectListSingle(
|
||||
WT_FILE_SYSTEM *, WT_SESSION *, const char *, const char *, char ***, uint32_t *);
|
||||
static int S3ObjectListFree(WT_FILE_SYSTEM *, WT_SESSION *, char **, uint32_t);
|
||||
static void S3ShowStatistics(const S3_STATISTICS &);
|
||||
static void S3ShowStatistics(const S3_STORAGE &);
|
||||
|
||||
static int S3FileClose(WT_FILE_HANDLE *, WT_SESSION *);
|
||||
static int S3FileSize(WT_FILE_HANDLE *, WT_SESSION *, wt_off_t *);
|
||||
static int S3Size(WT_FILE_SYSTEM *, WT_SESSION *, const char *, wt_off_t *);
|
||||
|
||||
/*
|
||||
* S3Path --
|
||||
* Construct a pathname from the directory and the object name.
|
||||
@@ -160,7 +163,9 @@ S3Path(const std::string &dir, const std::string &name)
|
||||
static int
|
||||
S3Exist(WT_FILE_SYSTEM *fileSystem, WT_SESSION *session, const char *name, bool *exist)
|
||||
{
|
||||
size_t objectSize;
|
||||
S3_FILE_SYSTEM *fs = (S3_FILE_SYSTEM *)fileSystem;
|
||||
S3_STORAGE *s3 = FS2S3(fileSystem);
|
||||
int ret = 0;
|
||||
|
||||
/* Check if file exists in the cache. */
|
||||
@@ -169,9 +174,10 @@ S3Exist(WT_FILE_SYSTEM *fileSystem, WT_SESSION *session, const char *name, bool
|
||||
return (ret);
|
||||
|
||||
/* It's not in the cache, try the S3 bucket. */
|
||||
FS2S3(fileSystem)->statistics.objectExistsCount++;
|
||||
if ((ret = fs->connection->ObjectExists(name, *exist)) != 0)
|
||||
std::cerr << "S3Exist: ObjectExists request to S3 failed." << std::endl;
|
||||
s3->statistics.objectExistsCount++;
|
||||
if ((ret = fs->connection->ObjectExists(name, *exist, objectSize)) != 0)
|
||||
s3->log->LogVerboseErrorMessage("S3Exist: ObjectExists request to S3 failed.");
|
||||
|
||||
return (ret);
|
||||
}
|
||||
|
||||
@@ -202,7 +208,8 @@ LocalFileExists(const std::string &path)
|
||||
* Return a copy of a directory name after verifying that it is a directory.
|
||||
*/
|
||||
static int
|
||||
S3GetDirectory(const std::string &home, const std::string &name, bool create, std::string ©)
|
||||
S3GetDirectory(const S3_STORAGE &s3, const std::string &home, const std::string &name, bool create,
|
||||
std::string ©)
|
||||
{
|
||||
copy = "";
|
||||
|
||||
@@ -218,14 +225,16 @@ S3GetDirectory(const std::string &home, const std::string &name, bool create, st
|
||||
|
||||
ret = stat(dirName.c_str(), &sb);
|
||||
if (ret != 0 && errno == ENOENT && create) {
|
||||
(void)mkdir(dirName.c_str(), 0777);
|
||||
mkdir(dirName.c_str(), 0777);
|
||||
ret = stat(dirName.c_str(), &sb);
|
||||
}
|
||||
|
||||
if (ret != 0)
|
||||
if (ret != 0) {
|
||||
s3.log->LogVerboseErrorMessage("S3GetDirectory: stat system call failed.");
|
||||
ret = errno;
|
||||
else if ((sb.st_mode & S_IFMT) != S_IFDIR)
|
||||
} else if ((sb.st_mode & S_IFMT) != S_IFDIR) {
|
||||
s3.log->LogVerboseErrorMessage("S3GetDirectory: invalid directory name.");
|
||||
ret = EINVAL;
|
||||
}
|
||||
|
||||
copy = dirName;
|
||||
return (ret);
|
||||
@@ -240,24 +249,24 @@ S3FileClose(WT_FILE_HANDLE *fileHandle, WT_SESSION *session)
|
||||
{
|
||||
int ret = 0;
|
||||
S3_FILE_HANDLE *s3FileHandle = (S3_FILE_HANDLE *)fileHandle;
|
||||
S3_STORAGE *storage = s3FileHandle->storage;
|
||||
S3_STORAGE *s3 = s3FileHandle->storage;
|
||||
WT_FILE_HANDLE *wtFileHandle = s3FileHandle->wtFileHandle;
|
||||
/*
|
||||
* We require exclusive access to the list of file handles when removing file handles. The
|
||||
* lock_guard will be unlocked automatically once the scope is exited.
|
||||
*/
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(storage->fhMutex);
|
||||
storage->fhList.remove(s3FileHandle);
|
||||
std::lock_guard<std::mutex> lock(s3->fhMutex);
|
||||
s3->fhList.remove(s3FileHandle);
|
||||
}
|
||||
if (wtFileHandle != NULL) {
|
||||
storage->statistics.fhOps++;
|
||||
ret = wtFileHandle->close(wtFileHandle, session);
|
||||
s3->statistics.fhOps++;
|
||||
if ((ret = wtFileHandle->close(wtFileHandle, session)) != 0)
|
||||
s3->log->LogVerboseErrorMessage("S3FileClose: close file handle failed.");
|
||||
}
|
||||
|
||||
free(s3FileHandle->iface.name);
|
||||
free(s3FileHandle);
|
||||
|
||||
return (ret);
|
||||
}
|
||||
|
||||
@@ -280,7 +289,7 @@ S3Open(WT_FILE_SYSTEM *fileSystem, WT_SESSION *session, const char *name,
|
||||
|
||||
/* We only support opening the file in read only mode. */
|
||||
if ((flags & WT_FS_OPEN_READONLY) == 0 || (flags & WT_FS_OPEN_CREATE) != 0) {
|
||||
std::cerr << "ss_open_object: readonly access required: " << name << std::endl;
|
||||
s3->log->LogVerboseErrorMessage("S3Open: read-only access required.");
|
||||
return (EINVAL);
|
||||
}
|
||||
|
||||
@@ -289,19 +298,21 @@ S3Open(WT_FILE_SYSTEM *fileSystem, WT_SESSION *session, const char *name,
|
||||
* the future.
|
||||
*/
|
||||
if (fileType != WT_FS_OPEN_FILE_TYPE_DATA && fileType != WT_FS_OPEN_FILE_TYPE_REGULAR) {
|
||||
std::cerr << name << ": open: only data file and regular types supported" << std::endl;
|
||||
s3->log->LogVerboseErrorMessage("S3Open: only data file and regular types supported.");
|
||||
return (EINVAL);
|
||||
}
|
||||
|
||||
if ((s3FileHandle = (S3_FILE_HANDLE *)calloc(1, sizeof(S3_FILE_HANDLE))) == NULL)
|
||||
if ((s3FileHandle = (S3_FILE_HANDLE *)calloc(1, sizeof(S3_FILE_HANDLE))) == NULL) {
|
||||
s3->log->LogVerboseErrorMessage("S3Open: unable to allocate memory for file handle.");
|
||||
return (ENOMEM);
|
||||
}
|
||||
|
||||
/* Make a copy from S3 if the file is not in the cache. */
|
||||
const std::string cachePath = S3Path(fs->cacheDir, name);
|
||||
if (!LocalFileExists(cachePath)) {
|
||||
s3->statistics.getObjectCount++;
|
||||
if ((ret = fs->connection->GetObject(name, cachePath)) != 0) {
|
||||
std::cerr << "ss_open_object: GetObject request to S3 failed." << std::endl;
|
||||
s3->log->LogVerboseErrorMessage("S3Open: GetObject request to S3 failed.");
|
||||
return (ret);
|
||||
}
|
||||
}
|
||||
@@ -310,7 +321,7 @@ S3Open(WT_FILE_SYSTEM *fileSystem, WT_SESSION *session, const char *name,
|
||||
ret = wtFileSystem->fs_open_file(
|
||||
wtFileSystem, session, cachePath.c_str(), fileType, flags, &wtFileHandle);
|
||||
if (ret != 0) {
|
||||
std::cerr << "ss_open_object: fs_open_file failed." << name << std::endl;
|
||||
s3->log->LogVerboseErrorMessage("S3Open: fs_open_file failed.");
|
||||
return (ret);
|
||||
}
|
||||
|
||||
@@ -328,7 +339,7 @@ S3Open(WT_FILE_SYSTEM *fileSystem, WT_SESSION *session, const char *name,
|
||||
fileHandle->fh_map_preload = NULL;
|
||||
fileHandle->fh_unmap = NULL;
|
||||
fileHandle->fh_read = S3FileRead;
|
||||
fileHandle->fh_size = NULL;
|
||||
fileHandle->fh_size = S3FileSize;
|
||||
fileHandle->fh_sync = NULL;
|
||||
fileHandle->fh_sync_nowait = NULL;
|
||||
fileHandle->fh_truncate = NULL;
|
||||
@@ -336,7 +347,7 @@ S3Open(WT_FILE_SYSTEM *fileSystem, WT_SESSION *session, const char *name,
|
||||
|
||||
fileHandle->name = strdup(name);
|
||||
if (fileHandle->name == NULL) {
|
||||
std::cout << "ss_open_object: unable to allocate memory for object name" << std::endl;
|
||||
s3->log->LogVerboseErrorMessage("S3Open: unable to allocate memory for object name.");
|
||||
return (ENOMEM);
|
||||
}
|
||||
|
||||
@@ -353,6 +364,27 @@ S3Open(WT_FILE_SYSTEM *fileSystem, WT_SESSION *session, const char *name,
|
||||
return (0);
|
||||
}
|
||||
|
||||
/*
|
||||
* S3Size --
|
||||
* Get the size of a file in bytes, by file name.
|
||||
*/
|
||||
static int
|
||||
S3Size(WT_FILE_SYSTEM *fileSystem, WT_SESSION *session, const char *name, wt_off_t *sizep)
|
||||
{
|
||||
S3_STORAGE *s3 = FS2S3(fileSystem);
|
||||
size_t objectSize;
|
||||
bool exist;
|
||||
*sizep = 0;
|
||||
int ret;
|
||||
|
||||
S3_FILE_SYSTEM *fs = (S3_FILE_SYSTEM *)fileSystem;
|
||||
s3->statistics.objectExistsCount++;
|
||||
if ((ret = fs->connection->ObjectExists(name, exist, objectSize)) != 0)
|
||||
return (ret);
|
||||
*sizep = objectSize;
|
||||
return (ret);
|
||||
}
|
||||
|
||||
/*
|
||||
* S3FileRead --
|
||||
* Read a file using WiredTiger's native file handle read.
|
||||
@@ -361,15 +393,29 @@ static int
|
||||
S3FileRead(WT_FILE_HANDLE *fileHandle, WT_SESSION *session, wt_off_t offset, size_t len, void *buf)
|
||||
{
|
||||
S3_FILE_HANDLE *s3FileHandle = (S3_FILE_HANDLE *)fileHandle;
|
||||
S3_STORAGE *storage = s3FileHandle->storage;
|
||||
S3_STORAGE *s3 = s3FileHandle->storage;
|
||||
WT_FILE_HANDLE *wtFileHandle = s3FileHandle->wtFileHandle;
|
||||
int ret;
|
||||
storage->statistics.fhReadOps++;
|
||||
s3->statistics.fhReadOps++;
|
||||
if ((ret = wtFileHandle->fh_read(wtFileHandle, session, offset, len, buf)) != 0)
|
||||
std::cerr << "S3FileRead: fh_read failed." << std::endl;
|
||||
s3->log->LogVerboseErrorMessage("S3FileRead: fh_read failed.");
|
||||
return (ret);
|
||||
}
|
||||
|
||||
/*
|
||||
* S3FileSize --
|
||||
* Get the size of a file in bytes, by file handle.
|
||||
*/
|
||||
static int
|
||||
S3FileSize(WT_FILE_HANDLE *fileHandle, WT_SESSION *session, wt_off_t *sizep)
|
||||
{
|
||||
S3_FILE_HANDLE *s3FileHandle = (S3_FILE_HANDLE *)fileHandle;
|
||||
S3_STORAGE *s3 = s3FileHandle->storage;
|
||||
WT_FILE_HANDLE *wtFileHandle = s3FileHandle->wtFileHandle;
|
||||
s3->statistics.fhOps++;
|
||||
return (wtFileHandle->fh_size(wtFileHandle, session, sizep));
|
||||
}
|
||||
|
||||
/*
|
||||
* S3CustomizeFileSystem --
|
||||
* Return a customized file system to access the s3 storage source objects.
|
||||
@@ -391,7 +437,7 @@ S3CustomizeFileSystem(WT_STORAGE_SOURCE *storageSource, WT_SESSION *session, con
|
||||
|
||||
/* We need to have a bucket to setup the file system. */
|
||||
if (bucketName == NULL || strlen(bucketName) == 0) {
|
||||
std::cerr << "Error: Bucket not specified";
|
||||
s3->log->LogVerboseErrorMessage("S3CustomizeFileSystem: bucket not specified.");
|
||||
return (EINVAL);
|
||||
}
|
||||
|
||||
@@ -404,12 +450,35 @@ S3CustomizeFileSystem(WT_STORAGE_SOURCE *storageSource, WT_SESSION *session, con
|
||||
std::string objPrefix;
|
||||
if ((ret = s3->wtApi->config_get_string(
|
||||
s3->wtApi, session, config, "prefix", &objPrefixConf)) == 0)
|
||||
objPrefix = objPrefixConf.str;
|
||||
objPrefix = std::string(objPrefixConf.str, objPrefixConf.len);
|
||||
else if (ret != WT_NOTFOUND) {
|
||||
std::cerr << "Error: customize_file_system: config parsing for object prefix";
|
||||
return (1);
|
||||
s3->log->LogVerboseErrorMessage(
|
||||
"S3CustomizeFileSystem: error parsing config for object prefix.");
|
||||
return (ret);
|
||||
}
|
||||
|
||||
/* Configure the AWS Client configuration. */
|
||||
Aws::S3Crt::ClientConfiguration awsConfig;
|
||||
awsConfig.throughputTargetGbps = throughputTargetGbps;
|
||||
awsConfig.partSize = partSize;
|
||||
|
||||
/*
|
||||
* Get the AWS region to be used. The allowable values for AWS region are listed here in the AWS
|
||||
* documentation: http://sdk.amazonaws.com/cpp/api/LATEST/namespace_aws_1_1_region.html
|
||||
*/
|
||||
WT_CONFIG_ITEM regionConf;
|
||||
std::string region;
|
||||
if ((ret = s3->wtApi->config_get_string(s3->wtApi, session, config, "region", ®ionConf)) ==
|
||||
0)
|
||||
awsConfig.region = std::string(regionConf.str, regionConf.len);
|
||||
else if (ret != WT_NOTFOUND) {
|
||||
s3->log->LogVerboseErrorMessage(
|
||||
"S3CustomizeFileSystem: error parsing config for AWS region.");
|
||||
return (ret);
|
||||
} else {
|
||||
s3->log->LogVerboseErrorMessage("S3CustomizeFileSystem: AWS region not specified.");
|
||||
return (EINVAL);
|
||||
}
|
||||
/*
|
||||
* Get the directory to setup the cache, or use the default one. The default cache directory is
|
||||
* named "cache-<name>", where name is the last component of the bucket name's path. We'll
|
||||
@@ -419,12 +488,15 @@ S3CustomizeFileSystem(WT_STORAGE_SOURCE *storageSource, WT_SESSION *session, con
|
||||
std::string cacheStr;
|
||||
if ((ret = s3->wtApi->config_get_string(
|
||||
s3->wtApi, session, config, "cache_directory", &cacheDirConf)) == 0)
|
||||
cacheStr = cacheDirConf.str;
|
||||
cacheStr = std::string(cacheDirConf.str, cacheDirConf.len);
|
||||
else if (ret == WT_NOTFOUND) {
|
||||
cacheStr = "cache-" + std::string(bucketName);
|
||||
ret = 0;
|
||||
} else
|
||||
} else {
|
||||
s3->log->LogVerboseErrorMessage(
|
||||
"wiredtiger_extension_init: error parsing config for cache directory.");
|
||||
return (ret);
|
||||
}
|
||||
|
||||
/* Fetch the native WT file system. */
|
||||
if ((ret = s3->wtApi->file_system_get(s3->wtApi, session, &wtFileSystem)) != 0)
|
||||
@@ -432,22 +504,20 @@ S3CustomizeFileSystem(WT_STORAGE_SOURCE *storageSource, WT_SESSION *session, con
|
||||
|
||||
/* Get a copy of the home and cache directory. */
|
||||
const std::string homeDir = session->connection->get_home(session->connection);
|
||||
if ((ret = S3GetDirectory(homeDir, cacheStr, true, cacheDir)) != 0)
|
||||
if ((ret = S3GetDirectory(*s3, homeDir, cacheStr, true, cacheDir)) != 0)
|
||||
return (ret);
|
||||
|
||||
/* Create the file system. */
|
||||
if ((fs = (S3_FILE_SYSTEM *)calloc(1, sizeof(S3_FILE_SYSTEM))) == NULL)
|
||||
return (errno);
|
||||
if ((fs = (S3_FILE_SYSTEM *)calloc(1, sizeof(S3_FILE_SYSTEM))) == NULL) {
|
||||
s3->log->LogVerboseErrorMessage(
|
||||
"S3CustomizeFileSystem: unable to allocate memory for file system.");
|
||||
return (ENOMEM);
|
||||
}
|
||||
fs->storage = s3;
|
||||
fs->wtFileSystem = wtFileSystem;
|
||||
fs->homeDir = homeDir;
|
||||
fs->cacheDir = cacheDir;
|
||||
|
||||
Aws::S3Crt::ClientConfiguration awsConfig;
|
||||
awsConfig.region = region;
|
||||
awsConfig.throughputTargetGbps = throughputTargetGbps;
|
||||
awsConfig.partSize = partSize;
|
||||
|
||||
/* New can fail; will deal with this later. */
|
||||
fs->connection = new S3Connection(awsConfig, bucketName, objPrefix);
|
||||
fs->fileSystem.fs_directory_list = S3ObjectList;
|
||||
@@ -456,6 +526,7 @@ S3CustomizeFileSystem(WT_STORAGE_SOURCE *storageSource, WT_SESSION *session, con
|
||||
fs->fileSystem.terminate = S3FileSystemTerminate;
|
||||
fs->fileSystem.fs_exist = S3Exist;
|
||||
fs->fileSystem.fs_open_file = S3Open;
|
||||
fs->fileSystem.fs_size = S3Size;
|
||||
|
||||
/* Add to the list of the active file systems. Lock will be freed when the scope is exited. */
|
||||
{
|
||||
@@ -516,12 +587,12 @@ S3ObjectList(WT_FILE_SYSTEM *fileSystem, WT_SESSION *session, const char *direct
|
||||
int ret;
|
||||
s3->statistics.listObjectsCount++;
|
||||
if ((ret = fs->connection->ListObjects(completePrefix, objects)) != 0) {
|
||||
std::cerr << "S3ObjectList: ListObjects request to S3 failed." << std::endl;
|
||||
s3->log->LogVerboseErrorMessage("S3ObjectList: ListObjects request to S3 failed.");
|
||||
return (ret);
|
||||
}
|
||||
*count = objects.size();
|
||||
|
||||
S3ObjectListAdd(s3, objectList, objects, *count);
|
||||
S3ObjectListAdd(*s3, objectList, objects, *count);
|
||||
|
||||
return (ret);
|
||||
}
|
||||
@@ -552,13 +623,13 @@ S3ObjectListSingle(WT_FILE_SYSTEM *fileSystem, WT_SESSION *session, const char *
|
||||
int ret;
|
||||
s3->statistics.listObjectsCount++;
|
||||
if ((ret = fs->connection->ListObjects(completePrefix, objects, 1, true)) != 0) {
|
||||
std::cerr << "S3ObjectListSingle: ListObjects request to S3 failed." << std::endl;
|
||||
s3->log->LogVerboseErrorMessage("S3ObjectListSingle: ListObjects request to S3 failed.");
|
||||
return (ret);
|
||||
}
|
||||
|
||||
*count = objects.size();
|
||||
|
||||
S3ObjectListAdd(s3, objectList, objects, *count);
|
||||
S3ObjectListAdd(*s3, objectList, objects, *count);
|
||||
|
||||
return (ret);
|
||||
}
|
||||
@@ -570,8 +641,8 @@ S3ObjectListSingle(WT_FILE_SYSTEM *fileSystem, WT_SESSION *session, const char *
|
||||
static int
|
||||
S3ObjectListFree(WT_FILE_SYSTEM *fileSystem, WT_SESSION *session, char **objectList, uint32_t count)
|
||||
{
|
||||
(void)fileSystem;
|
||||
(void)session;
|
||||
UNUSED(fileSystem);
|
||||
UNUSED(session);
|
||||
|
||||
if (objectList != NULL) {
|
||||
while (count > 0)
|
||||
@@ -587,12 +658,22 @@ S3ObjectListFree(WT_FILE_SYSTEM *fileSystem, WT_SESSION *session, char **objectL
|
||||
* Add objects retrieved from S3 bucket into the object list, and allocate the memory needed.
|
||||
*/
|
||||
static int
|
||||
S3ObjectListAdd(
|
||||
S3_STORAGE *s3, char ***objectList, const std::vector<std::string> &objects, const uint32_t count)
|
||||
S3ObjectListAdd(const S3_STORAGE &s3, char ***objectList, const std::vector<std::string> &objects,
|
||||
const uint32_t count)
|
||||
{
|
||||
char **entries = (char **)malloc(sizeof(char *) * count);
|
||||
char **entries;
|
||||
if ((entries = (char **)malloc(sizeof(char *) * count)) == NULL) {
|
||||
s3.log->LogVerboseErrorMessage(
|
||||
"S3ObjectListAdd: unable to allocate memory for object list.");
|
||||
return (ENOMEM);
|
||||
}
|
||||
|
||||
for (int i = 0; i < count; i++) {
|
||||
entries[i] = strdup(objects[i].c_str());
|
||||
if ((entries[i] = strdup(objects[i].c_str())) == NULL) {
|
||||
s3.log->LogVerboseErrorMessage(
|
||||
"S3ObjectListAdd: unable to allocate memory for object string.");
|
||||
return (ENOMEM);
|
||||
}
|
||||
}
|
||||
*objectList = entries;
|
||||
|
||||
@@ -609,11 +690,10 @@ S3AddReference(WT_STORAGE_SOURCE *storageSource)
|
||||
{
|
||||
S3_STORAGE *s3 = (S3_STORAGE *)storageSource;
|
||||
|
||||
/*
|
||||
* Missing reference or overflow?
|
||||
*/
|
||||
if (s3->referenceCount == 0 || s3->referenceCount + 1 == 0)
|
||||
if (s3->referenceCount == 0 || s3->referenceCount + 1 == 0) {
|
||||
s3->log->LogVerboseErrorMessage("S3AddReference: missing reference or overflow.");
|
||||
return (EINVAL);
|
||||
}
|
||||
|
||||
++s3->referenceCount;
|
||||
return (0);
|
||||
@@ -651,7 +731,7 @@ S3Terminate(WT_STORAGE_SOURCE *storageSource, WT_SESSION *session)
|
||||
}
|
||||
|
||||
/* Log collected statistics on termination. */
|
||||
S3ShowStatistics(s3->statistics);
|
||||
S3ShowStatistics(*s3);
|
||||
|
||||
Aws::Utils::Logging::ShutdownAWSLogging();
|
||||
Aws::ShutdownAPI(options);
|
||||
@@ -668,12 +748,14 @@ static int
|
||||
S3Flush(WT_STORAGE_SOURCE *storageSource, WT_SESSION *session, WT_FILE_SYSTEM *fileSystem,
|
||||
const char *source, const char *object, const char *config)
|
||||
{
|
||||
S3_STORAGE *s3 = (S3_STORAGE *)storageSource;
|
||||
S3_FILE_SYSTEM *fs = (S3_FILE_SYSTEM *)fileSystem;
|
||||
|
||||
int ret;
|
||||
FS2S3(fileSystem)->statistics.putObjectCount++;
|
||||
if (ret = (fs->connection->PutObject(object, source)) != 0)
|
||||
std::cerr << "S3Flush: PutObject request to S3 failed." << std::endl;
|
||||
s3->log->LogVerboseErrorMessage("S3Flush: PutObject request to S3 failed.");
|
||||
|
||||
return (ret);
|
||||
}
|
||||
|
||||
@@ -704,15 +786,21 @@ S3FlushFinish(WT_STORAGE_SOURCE *storage, WT_SESSION *session, WT_FILE_SYSTEM *f
|
||||
* Log collected statistics.
|
||||
*/
|
||||
static void
|
||||
S3ShowStatistics(const S3_STATISTICS &statistics)
|
||||
S3ShowStatistics(const S3_STORAGE &s3)
|
||||
{
|
||||
std::cout << "S3 list objects count: " << statistics.listObjectsCount << std::endl;
|
||||
std::cout << "S3 put object count: " << statistics.putObjectCount << std::endl;
|
||||
std::cout << "S3 get object count: " << statistics.putObjectCount << std::endl;
|
||||
std::cout << "S3 object exists count: " << statistics.objectExistsCount << std::endl;
|
||||
s3.log->LogVerboseDebugMessage(
|
||||
"S3 list objects count: " + std::to_string(s3.statistics.listObjectsCount));
|
||||
s3.log->LogVerboseDebugMessage(
|
||||
"S3 put object count: " + std::to_string(s3.statistics.putObjectCount));
|
||||
s3.log->LogVerboseDebugMessage(
|
||||
"S3 get object count: " + std::to_string(s3.statistics.getObjectCount));
|
||||
s3.log->LogVerboseDebugMessage(
|
||||
"S3 object exists count: " + std::to_string(s3.statistics.objectExistsCount));
|
||||
|
||||
std::cout << "Non read/write file handle operations: " << statistics.fhOps << std::endl;
|
||||
std::cout << "File handle read operations: " << statistics.fhReadOps << std::endl;
|
||||
s3.log->LogVerboseDebugMessage(
|
||||
"Non read/write file handle operations: " + std::to_string(s3.statistics.fhOps));
|
||||
s3.log->LogVerboseDebugMessage(
|
||||
"File handle read operations: " + std::to_string(s3.statistics.fhReadOps));
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -726,28 +814,35 @@ wiredtiger_extension_init(WT_CONNECTION *connection, WT_CONFIG_ARG *config)
|
||||
S3_FILE_SYSTEM *fs;
|
||||
WT_CONFIG_ITEM v;
|
||||
|
||||
/* No error handling for now. */
|
||||
s3 = new S3_STORAGE;
|
||||
|
||||
s3->wtApi = connection->get_extension_api(connection);
|
||||
|
||||
int ret = s3->wtApi->config_get(s3->wtApi, NULL, config, "verbose", &v);
|
||||
|
||||
// If a verbose level is not found, it will set the level to -3 (Error).
|
||||
if (ret == 0 && v.val >= -3 && v.val <= 1)
|
||||
/*
|
||||
* Create a logger for the storage source. Verbose level defaults to WT_VERBOSE_ERROR (-3) if it
|
||||
* is outside the valid range or not found.
|
||||
*/
|
||||
s3->verbose = WT_VERBOSE_ERROR;
|
||||
s3->log = Aws::MakeShared<S3LogSystem>("storage", s3->wtApi, s3->verbose);
|
||||
|
||||
if (ret == 0 && v.val >= WT_VERBOSE_ERROR && v.val <= WT_VERBOSE_DEBUG) {
|
||||
s3->verbose = v.val;
|
||||
else if (ret == WT_NOTFOUND)
|
||||
s3->verbose = -3;
|
||||
else {
|
||||
free(s3);
|
||||
s3->log->SetWtVerbosityLevel(s3->verbose);
|
||||
} else if (ret != WT_NOTFOUND) {
|
||||
s3->log->LogVerboseErrorMessage(
|
||||
"wiredtiger_extension_init: error parsing config for verbose level.");
|
||||
delete (s3);
|
||||
return (ret != 0 ? ret : EINVAL);
|
||||
}
|
||||
|
||||
/* Set up statistics. */
|
||||
s3->statistics = {0};
|
||||
|
||||
/* Create a logger for this storage source, and then initialize the AWS SDK. */
|
||||
Aws::Utils::Logging::InitializeAWSLogging(
|
||||
Aws::MakeShared<S3LogSystem>("storage", s3->wtApi, s3->verbose));
|
||||
/* Initialize the AWS SDK. */
|
||||
Aws::Utils::Logging::InitializeAWSLogging(s3->log);
|
||||
Aws::InitAPI(options);
|
||||
|
||||
/*
|
||||
@@ -768,7 +863,7 @@ wiredtiger_extension_init(WT_CONNECTION *connection, WT_CONFIG_ARG *config)
|
||||
/* Load the storage */
|
||||
if ((ret = connection->add_storage_source(connection, "s3_store", &s3->storageSource, NULL)) !=
|
||||
0)
|
||||
free(s3);
|
||||
delete (s3);
|
||||
|
||||
return (ret);
|
||||
}
|
||||
|
||||
@@ -282,7 +282,7 @@ TestGetObject(const Aws::S3Crt::ClientConfiguration &config)
|
||||
}
|
||||
/*
|
||||
* TestObjectExists --
|
||||
* Unit test to check if an object exists in an AWS bucket.
|
||||
* Unit test to check if an object exists in an AWS bucket and size of the object is correct.
|
||||
*/
|
||||
int
|
||||
TestObjectExists(const Aws::S3Crt::ClientConfiguration &config)
|
||||
@@ -290,26 +290,35 @@ TestObjectExists(const Aws::S3Crt::ClientConfiguration &config)
|
||||
S3Connection conn(config, TestDefaults::bucketName, TestDefaults::objPrefix);
|
||||
bool exists = false;
|
||||
int ret = TEST_FAILURE;
|
||||
size_t objectSize;
|
||||
|
||||
const std::string objectName = "test_object";
|
||||
const std::string fileName = "test_object.txt";
|
||||
|
||||
/* Create a file to upload to the bucket.*/
|
||||
std::ofstream File(fileName);
|
||||
File << "Test payload";
|
||||
std::string payload = "Test payload";
|
||||
File << payload;
|
||||
File.close();
|
||||
|
||||
ret = conn.ObjectExists(objectName, exists);
|
||||
if (ret != 0 || exists)
|
||||
if ((ret = conn.ObjectExists(objectName, exists, objectSize)) != 0)
|
||||
return (ret);
|
||||
if (exists || objectSize != 0)
|
||||
return (TEST_FAILURE);
|
||||
|
||||
if ((ret = conn.PutObject(objectName, fileName)) != 0)
|
||||
return (ret);
|
||||
|
||||
ret = conn.ObjectExists(objectName, exists);
|
||||
if (ret != 0 || !exists)
|
||||
if ((ret = conn.ObjectExists(objectName, exists, objectSize)) != 0)
|
||||
return (ret);
|
||||
if (!exists)
|
||||
return (TEST_FAILURE);
|
||||
|
||||
if (objectSize != payload.length()) {
|
||||
std::cerr << "TestObjectExist().objectSize failed." << std::endl;
|
||||
return (TEST_FAILURE);
|
||||
}
|
||||
|
||||
if ((ret = conn.DeleteObject(objectName)) != 0)
|
||||
return (ret);
|
||||
std::cout << "TestObjectExists() succeeded." << std::endl;
|
||||
|
||||
4
src/third_party/wiredtiger/import.data
vendored
4
src/third_party/wiredtiger/import.data
vendored
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"vendor": "wiredtiger",
|
||||
"github": "wiredtiger/wiredtiger.git",
|
||||
"branch": "mongodb-master",
|
||||
"commit": "eb843ba7b05282697f4f0d85bebfcb7fbb7ca724"
|
||||
"branch": "mongodb-5.3",
|
||||
"commit": "b1a6788043f564c5fe1956ceb94307cdd47ff9ca"
|
||||
}
|
||||
|
||||
@@ -560,7 +560,7 @@ __bm_stat(WT_BM *bm, WT_SESSION_IMPL *session, WT_DSRC_STATS *stats)
|
||||
* Switch the tiered object.
|
||||
*/
|
||||
static int
|
||||
__bm_switch_object(WT_BM *bm, WT_SESSION_IMPL *session, uint32_t objectid, uint32_t flags)
|
||||
__bm_switch_object(WT_BM *bm, WT_SESSION_IMPL *session, uint32_t objectid)
|
||||
{
|
||||
WT_BLOCK *block;
|
||||
|
||||
@@ -570,15 +570,6 @@ __bm_switch_object(WT_BM *bm, WT_SESSION_IMPL *session, uint32_t objectid, uint3
|
||||
WT_RET(__bm_close_block(session, block));
|
||||
bm->block = NULL;
|
||||
|
||||
/*
|
||||
* FIXME-WT-7596 the flags argument will be used in the future to perform various tasks,
|
||||
* to efficiently mark objects in transition (that is during a switch):
|
||||
* - mark this file as the writeable file (what currently happens)
|
||||
* - disallow writes to this object (reads still allowed, we're about to switch)
|
||||
* - close this object (about to move it, don't allow reopens yet)
|
||||
* - allow opens on this object again
|
||||
*/
|
||||
WT_UNUSED(flags);
|
||||
WT_RET(__wt_blkcache_get_handle(session, NULL, objectid, &block));
|
||||
|
||||
/*
|
||||
@@ -603,10 +594,9 @@ __bm_switch_object(WT_BM *bm, WT_SESSION_IMPL *session, uint32_t objectid, uint3
|
||||
* Switch the tiered object; readonly version.
|
||||
*/
|
||||
static int
|
||||
__bm_switch_object_readonly(WT_BM *bm, WT_SESSION_IMPL *session, uint32_t objectid, uint32_t flags)
|
||||
__bm_switch_object_readonly(WT_BM *bm, WT_SESSION_IMPL *session, uint32_t objectid)
|
||||
{
|
||||
WT_UNUSED(objectid);
|
||||
WT_UNUSED(flags);
|
||||
|
||||
return (__bm_readonly(bm, session));
|
||||
}
|
||||
|
||||
@@ -998,7 +998,7 @@ __btree_page_sizes(WT_SESSION_IMPL *session)
|
||||
* Switch to a writeable object for a tiered btree.
|
||||
*/
|
||||
int
|
||||
__wt_btree_switch_object(WT_SESSION_IMPL *session, uint32_t objectid, uint32_t flags)
|
||||
__wt_btree_switch_object(WT_SESSION_IMPL *session, uint32_t objectid)
|
||||
{
|
||||
WT_BM *bm;
|
||||
|
||||
@@ -1008,5 +1008,5 @@ __wt_btree_switch_object(WT_SESSION_IMPL *session, uint32_t objectid, uint32_t f
|
||||
* number.
|
||||
*/
|
||||
bm = S2BT(session)->bm;
|
||||
return (bm == NULL ? 0 : bm->switch_object(bm, session, objectid, flags));
|
||||
return (bm == NULL ? 0 : bm->switch_object(bm, session, objectid));
|
||||
}
|
||||
|
||||
@@ -275,25 +275,22 @@ reconciled page must be split into multiple smaller pages before being sent for
|
||||
compression and then be written to the disk. If the reconciled page can fit into
|
||||
a single on-disk page without the page growing beyond it's set max size,
|
||||
split_pct is ignored and the page isn't split.
|
||||
- an integer between 25 and 100
|
||||
- default : 75
|
||||
- an integer between 50 and 100
|
||||
- default : 90
|
||||
- Motivation to tune the value:
|
||||
\n Most applications should not need to tune the split percentage size.
|
||||
- This value should be selected to avoid creating a large number of tiny
|
||||
pages or repeatedly splitting whenever new entries are inserted.
|
||||
\n For example, if the maximum page size is 1MB, a split_pct value of 10%
|
||||
would potentially result in creating a large number of 100KB pages, which may
|
||||
not be optimal for future I/O. Or, if the maximum page size is 1MB, a split_pct
|
||||
value of 90% would potentially result in repeatedly splitting pages as the split
|
||||
pages grow to 1MB over and over. The default value for split_pct is 75%,
|
||||
intended to keep large pages relatively large, while still giving split pages
|
||||
room to grow.
|
||||
- This value should be selected based on the expected workload. Workloads
|
||||
that perform many random updates benefit from values around 66%, leaving room
|
||||
on the newly split pages for future updates. Append-only workloads benefit from
|
||||
a value of 100%, as they will not add updates to the newly split pages.
|
||||
The default value for split_pct is 90%, intended as a balance between these two
|
||||
workload types.
|
||||
- Configuration:
|
||||
\n Specified as split_pct configuration option to WT_SESSION::create. An
|
||||
example of such a configuration string is as follows:
|
||||
|
||||
<pre>
|
||||
"key_format=S,value_format=S,split_pct=60"
|
||||
"key_format=S,value_format=S,split_pct=80"
|
||||
</pre>
|
||||
|
||||
@section compression_considerations Compression considerations
|
||||
|
||||
@@ -199,7 +199,7 @@ struct __wt_bm {
|
||||
int (*salvage_valid)(WT_BM *, WT_SESSION_IMPL *, uint8_t *, size_t, bool);
|
||||
int (*size)(WT_BM *, WT_SESSION_IMPL *, wt_off_t *);
|
||||
int (*stat)(WT_BM *, WT_SESSION_IMPL *, WT_DSRC_STATS *stats);
|
||||
int (*switch_object)(WT_BM *, WT_SESSION_IMPL *, uint32_t, uint32_t);
|
||||
int (*switch_object)(WT_BM *, WT_SESSION_IMPL *, uint32_t);
|
||||
int (*sync)(WT_BM *, WT_SESSION_IMPL *, bool);
|
||||
int (*verify_addr)(WT_BM *, WT_SESSION_IMPL *, const uint8_t *, size_t);
|
||||
int (*verify_end)(WT_BM *, WT_SESSION_IMPL *);
|
||||
|
||||
@@ -302,7 +302,7 @@ extern int __wt_btree_open(WT_SESSION_IMPL *session, const char *op_cfg[])
|
||||
WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result));
|
||||
extern int __wt_btree_stat_init(WT_SESSION_IMPL *session, WT_CURSOR_STAT *cst)
|
||||
WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result));
|
||||
extern int __wt_btree_switch_object(WT_SESSION_IMPL *session, uint32_t objectid, uint32_t flags)
|
||||
extern int __wt_btree_switch_object(WT_SESSION_IMPL *session, uint32_t objectid)
|
||||
WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result));
|
||||
extern int __wt_btree_tree_open(WT_SESSION_IMPL *session, const uint8_t *addr, size_t addr_size)
|
||||
WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result));
|
||||
|
||||
@@ -215,7 +215,7 @@ __tiered_create_local(WT_SESSION_IMPL *session, WT_TIERED *tiered)
|
||||
F_SET(this_tier, WT_TIERS_OP_READ | WT_TIERS_OP_WRITE);
|
||||
|
||||
WT_WITH_DHANDLE(
|
||||
session, &tiered->iface, ret = __wt_btree_switch_object(session, tiered->current_id, 0));
|
||||
session, &tiered->iface, ret = __wt_btree_switch_object(session, tiered->current_id));
|
||||
WT_ERR(ret);
|
||||
|
||||
err:
|
||||
@@ -647,7 +647,7 @@ __tiered_open(WT_SESSION_IMPL *session, const char *cfg[])
|
||||
WT_ERR(__wt_tiered_switch(session, config));
|
||||
}
|
||||
WT_ERR(__wt_btree_open(session, tiered_cfg));
|
||||
WT_ERR(__wt_btree_switch_object(session, tiered->current_id, 0));
|
||||
WT_ERR(__wt_btree_switch_object(session, tiered->current_id));
|
||||
|
||||
#if 1
|
||||
if (0) {
|
||||
|
||||
73
src/third_party/wiredtiger/src/txn/txn.c
vendored
73
src/third_party/wiredtiger/src/txn/txn.c
vendored
@@ -1133,6 +1133,40 @@ __txn_search_prepared_op(
|
||||
return (0);
|
||||
}
|
||||
|
||||
/*
|
||||
* __txn_append_tombstone --
|
||||
* Append a tombstone to the end of a keys update chain.
|
||||
*/
|
||||
static int
|
||||
__txn_append_tombstone(WT_SESSION_IMPL *session, WT_TXN_OP *op, WT_CURSOR_BTREE *cbt)
|
||||
{
|
||||
WT_BTREE *btree;
|
||||
WT_DECL_RET;
|
||||
WT_UPDATE *tombstone;
|
||||
size_t not_used;
|
||||
tombstone = NULL;
|
||||
btree = S2BT(session);
|
||||
|
||||
WT_ERR(__wt_upd_alloc_tombstone(session, &tombstone, ¬_used));
|
||||
#ifdef HAVE_DIAGNOSTIC
|
||||
WT_WITH_BTREE(session, op->btree,
|
||||
ret = btree->type == BTREE_ROW ?
|
||||
__wt_row_modify(cbt, &cbt->iface.key, NULL, tombstone, WT_UPDATE_INVALID, false, false) :
|
||||
__wt_col_modify(cbt, cbt->recno, NULL, tombstone, WT_UPDATE_INVALID, false, false));
|
||||
#else
|
||||
WT_WITH_BTREE(session, op->btree,
|
||||
ret = btree->type == BTREE_ROW ?
|
||||
__wt_row_modify(cbt, &cbt->iface.key, NULL, tombstone, WT_UPDATE_INVALID, false) :
|
||||
__wt_col_modify(cbt, cbt->recno, NULL, tombstone, WT_UPDATE_INVALID, false));
|
||||
#endif
|
||||
WT_ERR(ret);
|
||||
tombstone = NULL;
|
||||
|
||||
err:
|
||||
__wt_free(session, tombstone);
|
||||
return (ret);
|
||||
}
|
||||
|
||||
/*
|
||||
* __txn_resolve_prepared_op --
|
||||
* Resolve a transaction's operations indirect references.
|
||||
@@ -1146,19 +1180,19 @@ __txn_resolve_prepared_op(WT_SESSION_IMPL *session, WT_TXN_OP *op, bool commit,
|
||||
WT_DECL_RET;
|
||||
WT_ITEM hs_recno_key;
|
||||
WT_PAGE *page;
|
||||
WT_TIME_WINDOW tw;
|
||||
WT_TXN *txn;
|
||||
WT_UPDATE *first_committed_upd, *fix_upd, *tombstone, *upd;
|
||||
WT_UPDATE *first_committed_upd, *fix_upd, *upd;
|
||||
#ifdef HAVE_DIAGNOSTIC
|
||||
WT_UPDATE *head_upd;
|
||||
#endif
|
||||
size_t not_used;
|
||||
uint8_t *p, hs_recno_key_buf[WT_INTPACK64_MAXSIZE];
|
||||
char ts_string[3][WT_TS_INT_STRING_SIZE];
|
||||
bool first_committed_upd_in_hs, prepare_on_disk, upd_appended;
|
||||
bool first_committed_upd_in_hs, prepare_on_disk, tw_found, upd_appended;
|
||||
|
||||
hs_cursor = NULL;
|
||||
txn = session->txn;
|
||||
fix_upd = tombstone = NULL;
|
||||
fix_upd = NULL;
|
||||
upd_appended = false;
|
||||
|
||||
WT_RET(__txn_search_prepared_op(session, op, cursorp, &upd));
|
||||
@@ -1261,26 +1295,26 @@ __txn_resolve_prepared_op(WT_SESSION_IMPL *session, WT_TXN_OP *op, bool commit,
|
||||
* we don't copy the prepared cell, which is now associated with a rolled back prepare,
|
||||
* and instead write nothing.
|
||||
*/
|
||||
WT_ERR(__wt_upd_alloc_tombstone(session, &tombstone, ¬_used));
|
||||
#ifdef HAVE_DIAGNOSTIC
|
||||
WT_WITH_BTREE(session, op->btree,
|
||||
ret = btree->type == BTREE_ROW ?
|
||||
__wt_row_modify(
|
||||
cbt, &cbt->iface.key, NULL, tombstone, WT_UPDATE_INVALID, false, false) :
|
||||
__wt_col_modify(cbt, cbt->recno, NULL, tombstone, WT_UPDATE_INVALID, false, false));
|
||||
#else
|
||||
WT_WITH_BTREE(session, op->btree,
|
||||
ret = btree->type == BTREE_ROW ?
|
||||
__wt_row_modify(cbt, &cbt->iface.key, NULL, tombstone, WT_UPDATE_INVALID, false) :
|
||||
__wt_col_modify(cbt, cbt->recno, NULL, tombstone, WT_UPDATE_INVALID, false));
|
||||
#endif
|
||||
WT_ERR(ret);
|
||||
tombstone = NULL;
|
||||
WT_ERR(__txn_append_tombstone(session, op, cbt));
|
||||
} else if (ret == 0)
|
||||
WT_ERR(__txn_locate_hs_record(
|
||||
session, hs_cursor, page, upd, commit, &fix_upd, &upd_appended, first_committed_upd));
|
||||
else
|
||||
ret = 0;
|
||||
} else if (F_ISSET(S2C(session), WT_CONN_IN_MEMORY) && !commit && first_committed_upd == NULL) {
|
||||
/*
|
||||
* For in-memory configurations of WiredTiger if a prepared update is reconciled and then
|
||||
* rolled back the on-page value will not be marked as aborted until the next eviction. In
|
||||
* the special case where this rollback results in the update chain being entirely comprised
|
||||
* of aborted updates other transactions attempting to write to the same key will look at
|
||||
* the on-page value, think the prepared transaction is still active, and falsely report a
|
||||
* write conflict. To prevent this scenario append a tombstone to the update chain when
|
||||
* rolling back a prepared reconciled update would result in only aborted updates on the
|
||||
* update chain.
|
||||
*/
|
||||
tw_found = __wt_read_cell_time_window(cbt, &tw);
|
||||
if (tw_found && tw.prepare == WT_PREPARE_INPROGRESS)
|
||||
WT_ERR(__txn_append_tombstone(session, op, cbt));
|
||||
}
|
||||
|
||||
for (; upd != NULL; upd = upd->next) {
|
||||
@@ -1384,7 +1418,6 @@ err:
|
||||
WT_TRET(hs_cursor->close(hs_cursor));
|
||||
if (!upd_appended)
|
||||
__wt_free(session, fix_upd);
|
||||
__wt_free(session, tombstone);
|
||||
return (ret);
|
||||
}
|
||||
|
||||
|
||||
132
src/third_party/wiredtiger/test/evergreen.yml
vendored
132
src/third_party/wiredtiger/test/evergreen.yml
vendored
@@ -158,7 +158,7 @@ functions:
|
||||
if [ -d cmake_build ]; then rm -r cmake_build; fi
|
||||
mkdir -p cmake_build
|
||||
cd cmake_build
|
||||
$CMAKE -DCMAKE_TOOLCHAIN_FILE=../cmake/toolchains/mongodbtoolchain_v3_clang.cmake -DCMAKE_C_FLAGS="-ggdb" -DWITH_PIC=1 \
|
||||
$CMAKE -DCMAKE_TOOLCHAIN_FILE=../cmake/toolchains/mongodbtoolchain_v4_clang.cmake -DCMAKE_C_FLAGS="-ggdb" -DWITH_PIC=1 \
|
||||
-DHAVE_DIAGNOSTIC=1 -DCMAKE_BUILD_TYPE=ASan \
|
||||
-DHAVE_BUILTIN_EXTENSION_LZ4=1 -DHAVE_BUILTIN_EXTENSION_SNAPPY=1 -DHAVE_BUILTIN_EXTENSION_ZLIB=1 ${configure_python_setting|} \
|
||||
-G "${cmake_generator|Ninja}" ../.
|
||||
@@ -2059,6 +2059,29 @@ tasks:
|
||||
display_name: " 1 Coverage report main page"
|
||||
remote_file: wiredtiger/${build_variant}/${revision}/coverage_report_${build_id}-${execution}/1_coverage_report_main.html
|
||||
|
||||
- name: s3-ext-test
|
||||
commands:
|
||||
- func: "get project"
|
||||
- func: "compile wiredtiger"
|
||||
vars:
|
||||
posix_configure_flags: -DENABLE_STRICT=0 -DHAVE_DIAGNOSTIC=1 -DENABLE_S3=1 -DIMPORT_S3_SDK=external -DENABLE_PYTHON=1
|
||||
- command: shell.exec
|
||||
params:
|
||||
working_dir: "wiredtiger/cmake_build"
|
||||
shell: bash
|
||||
silent: true
|
||||
script: |
|
||||
set -o errexit
|
||||
export AWS_ACCESS_KEY_ID=${aws_sdk_s3_ext_access_key}
|
||||
export AWS_SECRET_ACCESS_KEY=${aws_sdk_s3_ext_secret_key}
|
||||
|
||||
# Only set verbose after having put the AWS access credentials into the environment.
|
||||
set -o verbose
|
||||
|
||||
# Run unit testing
|
||||
ext/storage_sources/s3_store/test/run_s3_unit_tests
|
||||
# Run Python testing
|
||||
${test_env_vars|} ${python_binary|python3} ../test/suite/run.py s3_store01
|
||||
|
||||
- name: spinlock-gcc-test
|
||||
commands:
|
||||
@@ -2264,28 +2287,6 @@ tasks:
|
||||
ASAN_SYMBOLIZER_PATH=/opt/mongodbtoolchain/v4/bin/llvm-symbolizer
|
||||
format_test_script_args: -S
|
||||
|
||||
# FIXME-WT-8482: Replace this test with format-asan-smoke-test.
|
||||
- name: format-asan-smoke-ppc-test
|
||||
commands:
|
||||
- func: "get project"
|
||||
- func: "compile wiredtiger"
|
||||
vars:
|
||||
# FIXME-WT-8482: CC is set to the system default "clang" binary here as a workaround.
|
||||
# Change it back to mongodbtoolchain "clang" binary.
|
||||
posix_configure_flags:
|
||||
-DCMAKE_TOOLCHAIN_FILE=../cmake/toolchains/clang.cmake
|
||||
-DHAVE_DIAGNOSTIC=1
|
||||
-DWITH_PIC=1
|
||||
-DCMAKE_BUILD_TYPE=ASan
|
||||
-DHAVE_BUILTIN_EXTENSION_LZ4=1 -DHAVE_BUILTIN_EXTENSION_SNAPPY=1 -DHAVE_BUILTIN_EXTENSION_ZLIB=1
|
||||
- func: "format test script"
|
||||
# Run smoke tests, don't stop at failed tests, use default config
|
||||
vars:
|
||||
test_env_vars:
|
||||
ASAN_OPTIONS="detect_leaks=1:abort_on_error=1:disable_coredump=0:unmap_shadow_on_exit=1"
|
||||
ASAN_SYMBOLIZER_PATH=/usr/lib/llvm-6.0/bin/llvm-symbolizer
|
||||
format_test_script_args: -S
|
||||
|
||||
- name: format-wtperf-test
|
||||
commands:
|
||||
- func: "get project"
|
||||
@@ -2544,34 +2545,6 @@ tasks:
|
||||
#run for 2 hours ( 2 * 60 = 120 minutes), use default config
|
||||
format_test_script_args: -e "SEGFAULT_SIGNALS=all" -b "catchsegv ./t" -t 120
|
||||
|
||||
# FIXME-WT-8482: Replace this test with format-stress-sanitizer-test.
|
||||
- name: format-stress-sanitizer-ppc-test
|
||||
tags: ["stress-test-ppc-1"]
|
||||
# Set 2.5 hours timeout (60 * 60 * 2.5)
|
||||
exec_timeout_secs: 9000
|
||||
commands:
|
||||
- func: "get project"
|
||||
- func: "compile wiredtiger"
|
||||
vars:
|
||||
# FIXME-WT-8482: CC is set to the system default "clang" binary here as a workaround.
|
||||
# Change it back to mongodbtoolchain "clang" binary.
|
||||
posix_configure_flags:
|
||||
-DCMAKE_TOOLCHAIN_FILE=../cmake/toolchains/clang.cmake
|
||||
-DHAVE_DIAGNOSTIC=1
|
||||
-DWITH_PIC=1
|
||||
-DHAVE_BUILTIN_EXTENSION_LZ4=1
|
||||
-DHAVE_BUILTIN_EXTENSION_SNAPPY=1
|
||||
-DHAVE_BUILTIN_EXTENSION_ZLIB=1
|
||||
-DCMAKE_BUILD_TYPE=ASan
|
||||
- func: "format test script"
|
||||
vars:
|
||||
test_env_vars:
|
||||
ASAN_OPTIONS="detect_leaks=1:abort_on_error=1:disable_coredump=0:unmap_shadow_on_exit=1"
|
||||
ASAN_SYMBOLIZER_PATH=/usr/lib/llvm-6.0/bin/llvm-symbolizer
|
||||
# Run for 2 hours (2 * 60 = 120 minutes), don't stop at failed tests, use default config
|
||||
format_test_script_args: -t 120
|
||||
|
||||
|
||||
- <<: *format-stress-test
|
||||
name: format-stress-test-1
|
||||
tags: ["stress-test-1"]
|
||||
@@ -2586,10 +2559,10 @@ tasks:
|
||||
tags: ["stress-test-4"]
|
||||
- <<: *format-stress-sanitizer-test
|
||||
name: format-stress-sanitizer-test-1
|
||||
tags: ["stress-test-1"]
|
||||
tags: ["stress-test-1", "stress-test-ppc-1"]
|
||||
- <<: *format-stress-sanitizer-test
|
||||
name: format-stress-sanitizer-test-2
|
||||
tags: ["stress-test-2"]
|
||||
tags: ["stress-test-2", "stress-test-ppc-2"]
|
||||
- <<: *format-stress-sanitizer-test
|
||||
name: format-stress-sanitizer-test-3
|
||||
tags: ["stress-test-3"]
|
||||
@@ -3572,6 +3545,7 @@ buildvariants:
|
||||
- name: data-validation-stress-test-checkpoint-fp-hs-insert-s5
|
||||
- name: data-validation-stress-test-checkpoint-fp-hs-insert-s5-no-timestamp
|
||||
- name: data-validation-stress-test-checkpoint-no-timestamp
|
||||
- name: s3-ext-test
|
||||
|
||||
- name: ubuntu2004-asan
|
||||
display_name: "! Ubuntu 20.04 ASAN"
|
||||
@@ -4026,10 +4000,10 @@ buildvariants:
|
||||
make_command: ninja
|
||||
cmake_generator: Ninja
|
||||
tasks:
|
||||
- name: compile
|
||||
- name: generate-datafile-little-endian
|
||||
- name: verify-datafile-little-endian
|
||||
- name: verify-datafile-from-big-endian
|
||||
- name: compile
|
||||
- name: generate-datafile-little-endian
|
||||
- name: verify-datafile-little-endian
|
||||
- name: verify-datafile-from-big-endian
|
||||
|
||||
- name: big-endian
|
||||
display_name: "~ Big-endian (s390x/zSeries)"
|
||||
@@ -4049,10 +4023,10 @@ buildvariants:
|
||||
make_command: ninja
|
||||
cmake_generator: Ninja
|
||||
tasks:
|
||||
- name: compile
|
||||
- name: generate-datafile-big-endian
|
||||
- name: verify-datafile-big-endian
|
||||
- name: verify-datafile-from-little-endian
|
||||
- name: compile
|
||||
- name: generate-datafile-big-endian
|
||||
- name: verify-datafile-big-endian
|
||||
- name: verify-datafile-from-little-endian
|
||||
|
||||
- name: ubuntu1804-ppc
|
||||
display_name: "~ Ubuntu 18.04 PPC"
|
||||
@@ -4065,7 +4039,7 @@ buildvariants:
|
||||
WT_BUILDDIR=$(git rev-parse --show-toplevel)/cmake_build
|
||||
LD_LIBRARY_PATH=$WT_BUILDDIR
|
||||
posix_configure_flags:
|
||||
-DCMAKE_TOOLCHAIN_FILE=../cmake/toolchains/mongodbtoolchain_v3_gcc.cmake
|
||||
-DCMAKE_TOOLCHAIN_FILE=../cmake/toolchains/mongodbtoolchain_v4_gcc.cmake
|
||||
-DCMAKE_C_FLAGS="-ggdb"
|
||||
-DHAVE_DIAGNOSTIC=1
|
||||
-DENABLE_PYTHON=1
|
||||
@@ -4073,7 +4047,7 @@ buildvariants:
|
||||
-DENABLE_SNAPPY=1
|
||||
-DENABLE_STRICT=1
|
||||
-DCMAKE_INSTALL_PREFIX=$(pwd)/LOCAL_INSTALL
|
||||
python_binary: '/opt/mongodbtoolchain/v3/bin/python3'
|
||||
python_binary: '/opt/mongodbtoolchain/v4/bin/python3'
|
||||
# Use half number of vCPU to avoid OOM kill failure
|
||||
smp_command: -j $(echo $(grep -c ^processor /proc/cpuinfo) / 2 | bc)
|
||||
cmake_generator: Ninja
|
||||
@@ -4082,9 +4056,10 @@ buildvariants:
|
||||
- name: compile
|
||||
- name: unit-test
|
||||
- name: format-smoke-test
|
||||
- name: format-asan-smoke-ppc-test
|
||||
- name: format-asan-smoke-test
|
||||
- name: format-wtperf-test
|
||||
- name: ".stress-test-ppc-1"
|
||||
- name: ".stress-test-ppc-2"
|
||||
|
||||
- name: ubuntu1804-zseries
|
||||
display_name: "~ Ubuntu 18.04 zSeries"
|
||||
@@ -4157,31 +4132,4 @@ buildvariants:
|
||||
- name: long-test
|
||||
- name: configure-combinations
|
||||
- name: format-smoke-test
|
||||
|
||||
- name: ubuntu2004-tmp-s3-cmake
|
||||
display_name: "* (Temporary) Ubuntu 20.04 S3 Extension CMake"
|
||||
run_on:
|
||||
- ubuntu2004-test
|
||||
expansions:
|
||||
test_env_vars:
|
||||
WT_TOPDIR=$(git rev-parse --show-toplevel)
|
||||
WT_BUILDDIR=$WT_TOPDIR/cmake_build
|
||||
LD_LIBRARY_PATH=$WT_BUILDDIR:$WT_TOPDIR/TCMALLOC_LIB/lib
|
||||
posix_configure_flags:
|
||||
-DCMAKE_C_FLAGS="-ggdb"
|
||||
-DHAVE_DIAGNOSTIC=1
|
||||
-DENABLE_PYTHON=1
|
||||
-DENABLE_ZLIB=1
|
||||
-DENABLE_SNAPPY=1
|
||||
-DENABLE_STRICT=0
|
||||
-DENABLE_TCMALLOC=1
|
||||
-DENABLE_S3=1
|
||||
-DIMPORT_S3_SDK=external
|
||||
-DCMAKE_PREFIX_PATH="$(pwd)/../TCMALLOC_LIB"
|
||||
-DCMAKE_INSTALL_PREFIX=$(pwd)/LOCAL_INSTALL
|
||||
python_binary: '/opt/mongodbtoolchain/v3/bin/python3'
|
||||
smp_command: -j $(echo "`grep -c ^processor /proc/cpuinfo` * 2" | bc)
|
||||
cmake_generator: Ninja
|
||||
make_command: ninja
|
||||
tasks:
|
||||
- name: compile
|
||||
- name: s3-ext-test
|
||||
|
||||
@@ -40,7 +40,7 @@ disk.mmap_all=0
|
||||
format.abort=0
|
||||
format.independent_thread_rng=1
|
||||
format.major_timeout=0
|
||||
logging=1
|
||||
logging=0
|
||||
logging.archive=0
|
||||
logging.compression=none
|
||||
logging.file_max=403863
|
||||
|
||||
@@ -40,7 +40,7 @@ disk.mmap_all=0
|
||||
format.abort=0
|
||||
format.independent_thread_rng=1
|
||||
format.major_timeout=0
|
||||
logging=1
|
||||
logging=0
|
||||
logging.archive=0
|
||||
logging.compression=none
|
||||
logging.file_max=461066
|
||||
|
||||
@@ -40,7 +40,7 @@ disk.mmap_all=0
|
||||
format.abort=0
|
||||
format.independent_thread_rng=1
|
||||
format.major_timeout=0
|
||||
logging=1
|
||||
logging=0
|
||||
logging.archive=1
|
||||
logging.compression=snappy
|
||||
logging.file_max=72264
|
||||
|
||||
91
src/third_party/wiredtiger/test/suite/test_prepare19.py
vendored
Normal file
91
src/third_party/wiredtiger/test/suite/test_prepare19.py
vendored
Normal file
@@ -0,0 +1,91 @@
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# Public Domain 2014-present MongoDB, Inc.
|
||||
# Public Domain 2008-2014 WiredTiger, Inc.
|
||||
#
|
||||
# This is free and unencumbered software released into the public domain.
|
||||
#
|
||||
# Anyone is free to copy, modify, publish, use, compile, sell, or
|
||||
# distribute this software, either in source code form or as a compiled
|
||||
# binary, for any purpose, commercial or non-commercial, and by any
|
||||
# means.
|
||||
#
|
||||
# In jurisdictions that recognize copyright laws, the author or authors
|
||||
# of this software dedicate any and all copyright interest in the
|
||||
# software to the public domain. We make this dedication for the benefit
|
||||
# of the public at large and to the detriment of our heirs and
|
||||
# successors. We intend this dedication to be an overt act of
|
||||
# relinquishment in perpetuity of all present and future rights to this
|
||||
# software under copyright law.
|
||||
#
|
||||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
||||
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
||||
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
|
||||
# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
|
||||
# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
|
||||
# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
|
||||
# OTHER DEALINGS IN THE SOFTWARE.
|
||||
|
||||
|
||||
import wttest
|
||||
import wiredtiger
|
||||
|
||||
# test_prepare19.py
|
||||
# Test that for in-memory configurations of WiredTiger if rolling back a prepared, reconciled
|
||||
# update results in an empty update chain then a tombstone is appended to the chain
|
||||
class test_prepare19(wttest.WiredTigerTestCase):
|
||||
|
||||
|
||||
def conn_config(self):
|
||||
return 'in_memory=true'
|
||||
|
||||
|
||||
def test_server_example(self):
|
||||
uri = 'table:test_prepare19'
|
||||
config = 'key_format=i,value_format=S'
|
||||
|
||||
self.session.create(uri, config)
|
||||
|
||||
# Place more than 1000 aborted updates on the update chain.
|
||||
for i in range(1, 1100):
|
||||
self.session.begin_transaction()
|
||||
cursor = self.session.open_cursor(uri, None)
|
||||
cursor[1] = ""
|
||||
self.session.rollback_transaction()
|
||||
|
||||
# Make a prepared update on key 1, force eviction, and rollback.
|
||||
self.prepare_evict_rollback(uri, config, 1101)
|
||||
|
||||
# If no tombstone is written the update will be aborted in the update chain but not in the btree.
|
||||
# The transaction will see an active transaction on key 1 and raise a write conflict.
|
||||
# Expect no error is raised.
|
||||
self.session.begin_transaction()
|
||||
cursor = self.session.open_cursor(uri, None)
|
||||
cursor[1] = ""
|
||||
|
||||
|
||||
def prepare_evict_rollback(self, uri, config, timestamp):
|
||||
self.session.begin_transaction()
|
||||
cursor = self.session.open_cursor(uri, None)
|
||||
cursor[1] = ""
|
||||
self.session.prepare_transaction('prepare_timestamp=' + self.timestamp_str(timestamp))
|
||||
|
||||
# This write conflict on the same page as key 1 results in a forced
|
||||
# eviction when the key has more than 1000 updates in its update chain.
|
||||
write_conflict_session = self.conn.open_session()
|
||||
write_conflict_session.create(uri, config)
|
||||
write_conflict_session.begin_transaction()
|
||||
write_conflict_cursor = write_conflict_session.open_cursor(uri, None)
|
||||
try:
|
||||
write_conflict_cursor[1] = "",
|
||||
raise Exception
|
||||
except wiredtiger.WiredTigerError:
|
||||
pass
|
||||
write_conflict_session.rollback_transaction()
|
||||
write_conflict_session.close()
|
||||
|
||||
self.session.rollback_transaction()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
wttest.run()
|
||||
@@ -42,6 +42,8 @@ class test_s3_store01(wttest.WiredTigerTestCase):
|
||||
|
||||
fs_config = 'prefix=' + prefix
|
||||
|
||||
fs_config += ',region=ap-southeast-2'
|
||||
|
||||
# Bucket name can be overridden by an environment variable.
|
||||
bucket_name = os.getenv('WT_S3_EXT_BUCKET')
|
||||
if bucket_name is None:
|
||||
@@ -80,12 +82,13 @@ class test_s3_store01(wttest.WiredTigerTestCase):
|
||||
inbytes = bytes(1000000) # An empty buffer with a million zero bytes.
|
||||
fh.fh_read(session, 0, inbytes) # Read into the buffer.
|
||||
self.assertEquals(outbytes[0:1000000], inbytes)
|
||||
self.assertTrue(fs.fs_size(session, filename), len(outbytes))
|
||||
self.assertEquals(fh.fh_size(session), len(outbytes))
|
||||
fh.close(session)
|
||||
|
||||
# Checking that the file still exists in S3 after removing it from the cache.
|
||||
os.remove(cache_prefix + self.bucket_name + '/' + filename)
|
||||
self.assertTrue(fs.fs_exist(session, filename))
|
||||
|
||||
file_list = [self.prefix + object_name]
|
||||
self.assertEquals(fs.fs_directory_list(session, None, None), file_list)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user