Compare commits
13 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
247fe0f8ce | ||
|
|
450a29d1bc | ||
|
|
04faa7f24e | ||
|
|
729c4ce6db | ||
|
|
40a775d5bf | ||
|
|
e2256d6888 | ||
|
|
4166623d77 | ||
|
|
366ef0f287 | ||
|
|
d3aab45b96 | ||
|
|
e040d5a3d9 | ||
|
|
f46350b57f | ||
|
|
5d8c1e28bc | ||
|
|
3844f602ff |
1
.hgtags
1
.hgtags
@@ -15,3 +15,4 @@ ef3ccde04cb28060319be900a2d31c88071933f6 1.3.0
|
||||
961b8482202543635417399aca5b1093e5ba5cbd 1.3.2
|
||||
01380d42b30c74937b7d801062d744823a47fd3d 1.3.3
|
||||
df87effe7cd3239e3666a76312bae77b92090d98 1.3.4
|
||||
8b91f84675fd67259b1f513e3f84786501cbc16c 1.3.6
|
||||
|
||||
20
NEWS
20
NEWS
@@ -1,3 +1,23 @@
|
||||
WiredTiger release 1.3.7, 2012-11-09
|
||||
------------------------------------
|
||||
|
||||
This release fixes a bug and improves performance with Bloom filters:
|
||||
|
||||
* Drop any old Bloom filter before creating a new one -- we may have been
|
||||
interrupted in between creating it and updating the metadata. Write the
|
||||
metadata after creating missing Bloom filters.
|
||||
|
||||
* Use a separate thread for creation of Bloom filters for the newest,
|
||||
unmerged LSM chunks.
|
||||
|
||||
* Changes to the ex_test_perf example: change the default configuration to
|
||||
4KB pages and disable prefix compression. Change the "-i" command line
|
||||
option to be a simple count of records to insert. Clean up error
|
||||
handling and add option to populate using multiple threads.
|
||||
|
||||
* Clarify the docs for the default buffer_alignment setting.
|
||||
|
||||
|
||||
WiredTiger release 1.3.6, 2012-11-06
|
||||
------------------------------------
|
||||
|
||||
|
||||
4
README
4
README
@@ -1,6 +1,6 @@
|
||||
WiredTiger 1.3.6: (November 6, 2012)
|
||||
WiredTiger 1.3.7: (November 9, 2012)
|
||||
|
||||
This is version 1.3.6 of WiredTiger.
|
||||
This is version 1.3.7 of WiredTiger.
|
||||
|
||||
WiredTiger documentation can be found at:
|
||||
|
||||
|
||||
2
RELEASE
2
RELEASE
@@ -1,6 +1,6 @@
|
||||
WIREDTIGER_VERSION_MAJOR=1
|
||||
WIREDTIGER_VERSION_MINOR=3
|
||||
WIREDTIGER_VERSION_PATCH=6
|
||||
WIREDTIGER_VERSION_PATCH=7
|
||||
WIREDTIGER_VERSION="$WIREDTIGER_VERSION_MAJOR.$WIREDTIGER_VERSION_MINOR.$WIREDTIGER_VERSION_PATCH"
|
||||
|
||||
WIREDTIGER_RELEASE_DATE=`date "+%B %e, %Y"`
|
||||
|
||||
@@ -2,8 +2,8 @@ dnl build by dist/s_version
|
||||
|
||||
VERSION_MAJOR=1
|
||||
VERSION_MINOR=3
|
||||
VERSION_PATCH=6
|
||||
VERSION_STRING='"WiredTiger 1.3.6: (November 6, 2012)"'
|
||||
VERSION_PATCH=7
|
||||
VERSION_STRING='"WiredTiger 1.3.7: (November 9, 2012)"'
|
||||
|
||||
AC_SUBST(VERSION_MAJOR)
|
||||
AC_SUBST(VERSION_MINOR)
|
||||
|
||||
@@ -1,2 +1,2 @@
|
||||
dnl WiredTiger product version for AC_INIT. Maintained by dist/s_version
|
||||
1.3.6
|
||||
1.3.7
|
||||
|
||||
6
dist/api_data.py
vendored
6
dist/api_data.py
vendored
@@ -453,9 +453,9 @@ methods = {
|
||||
|
||||
'wiredtiger_open' : Method(connection_runtime_config + [
|
||||
Config('buffer_alignment', '-1', r'''
|
||||
in-memory alignment (in bytes) for buffers used for I/O. By
|
||||
default, a platform-specific alignment value is used (512 bytes
|
||||
on Linux systems, zero elsewhere)''',
|
||||
in-memory alignment (in bytes) for buffers used for I/O. The default
|
||||
value of -1 indicates that a platform-specific alignment value should
|
||||
be used (512 bytes on Linux systems, zero elsewhere)''',
|
||||
min='-1', max='1MB'),
|
||||
Config('create', 'false', r'''
|
||||
create the database if it does not exist''',
|
||||
|
||||
2
dist/s_string.ok
vendored
2
dist/s_string.ok
vendored
@@ -34,10 +34,10 @@ Bsearch
|
||||
Btree
|
||||
Bzip
|
||||
CAS
|
||||
CDLMRSTdehikrsuv
|
||||
CELL's
|
||||
CELLs
|
||||
CKPT
|
||||
CLMPRSTdehikrsuv
|
||||
CLR
|
||||
COL's
|
||||
CONCAT
|
||||
|
||||
@@ -39,6 +39,9 @@
|
||||
|
||||
#include <wiredtiger.h>
|
||||
|
||||
#define ATOMIC_ADD(v, val) \
|
||||
__sync_add_and_fetch(&(v), val)
|
||||
|
||||
typedef struct {
|
||||
const char *home;
|
||||
const char *uri;
|
||||
@@ -52,6 +55,7 @@ typedef struct {
|
||||
uint32_t report_interval;
|
||||
uint32_t read_time;
|
||||
uint32_t elapsed_time;
|
||||
uint32_t populate_threads;/* Number of populate threads. */
|
||||
uint32_t read_threads; /* Number of read threads. */
|
||||
uint32_t verbose;
|
||||
uint32_t stat_thread; /* Whether to create a stat thread. */
|
||||
@@ -65,20 +69,33 @@ typedef struct {
|
||||
} CONFIG;
|
||||
|
||||
/* Forward function definitions. */
|
||||
int execute_populate(CONFIG *);
|
||||
int execute_reads(CONFIG *);
|
||||
int populate(CONFIG *);
|
||||
int get_next_op(uint64_t *);
|
||||
int lprintf(CONFIG *cfg, int err, uint32_t level, const char *fmt, ...)
|
||||
#ifdef __GNUC__
|
||||
__attribute__((format (printf, 4, 5)))
|
||||
#endif
|
||||
;
|
||||
void *populate_thread(void *);
|
||||
void print_config(CONFIG *);
|
||||
void *read_thread(void *);
|
||||
int setup_log_file(CONFIG *);
|
||||
int start_threads(CONFIG *, int, pthread_t **, void *(*func)(void *));
|
||||
void *stat_worker(void *);
|
||||
int stop_threads(CONFIG *, int, pthread_t *);
|
||||
void usage(void);
|
||||
|
||||
#define DEFAULT_LSM_CONFIG \
|
||||
"key_format=S,value_format=S,exclusive," \
|
||||
"leaf_page_max=4kb,internal_page_max=64kb,allocation_size=4kb,"
|
||||
|
||||
/* Default values - these are tiny, we want the basic run to be fast. */
|
||||
CONFIG default_cfg = {
|
||||
"WT_TEST", /* home */
|
||||
"lsm:test", /* uri */
|
||||
"create,cache_size=200MB", /* conn_config */
|
||||
"key_format=S,value_format=S", /* table_config */
|
||||
DEFAULT_LSM_CONFIG, /* table_config */
|
||||
1, /* create */
|
||||
14023954, /* rand_seed */
|
||||
5000, /* icount */
|
||||
@@ -87,6 +104,7 @@ CONFIG default_cfg = {
|
||||
2, /* report_interval */
|
||||
2, /* read_time */
|
||||
0, /* elapsed_time */
|
||||
1, /* populate_threads */
|
||||
2, /* read_threads */
|
||||
0, /* verbose */
|
||||
0, /* stat_thread */
|
||||
@@ -100,8 +118,8 @@ CONFIG small_cfg = {
|
||||
"WT_TEST", /* home */
|
||||
"lsm:test", /* uri */
|
||||
"create,cache_size=500MB", /* conn_config */
|
||||
"key_format=S,value_format=S,lsm_chunk_size=5MB,"
|
||||
"leaf_page_max=16k,internal_page_max=16kb", /* table_config */
|
||||
DEFAULT_LSM_CONFIG /* table_config */
|
||||
"lsm_chunk_size=5MB,",
|
||||
1, /* create */
|
||||
14023954, /* rand_seed */
|
||||
500000, /* icount 0.5 million */
|
||||
@@ -110,6 +128,7 @@ CONFIG small_cfg = {
|
||||
10, /* report_interval */
|
||||
20, /* read_time */
|
||||
0, /* elapsed_time */
|
||||
1, /* populate_threads */
|
||||
8, /* read_threads */
|
||||
0, /* verbose */
|
||||
0, /* stat_thread */
|
||||
@@ -123,8 +142,8 @@ CONFIG med_cfg = {
|
||||
"WT_TEST", /* home */
|
||||
"lsm:test", /* uri */
|
||||
"create,cache_size=1GB", /* conn_config */
|
||||
"key_format=S,value_format=S,lsm_chunk_size=20MB,"
|
||||
"leaf_page_max=16k,internal_page_max=16kb", /* table_config */
|
||||
DEFAULT_LSM_CONFIG /* table_config */
|
||||
"lsm_chunk_size=20MB,",
|
||||
1, /* create */
|
||||
14023954, /* rand_seed */
|
||||
50000000, /* icount 50 million */
|
||||
@@ -133,6 +152,7 @@ CONFIG med_cfg = {
|
||||
20, /* report_interval */
|
||||
100, /* read_time */
|
||||
0, /* elapsed_time */
|
||||
1, /* populate_threads */
|
||||
16, /* read_threads */
|
||||
0, /* verbose */
|
||||
0, /* stat_thread */
|
||||
@@ -146,8 +166,8 @@ CONFIG large_cfg = {
|
||||
"WT_TEST", /* home */
|
||||
"lsm:test", /* uri */
|
||||
"create,cache_size=2GB", /* conn_config */
|
||||
"key_format=S,value_format=S,lsm_chunk_size=50MB,"
|
||||
"leaf_page_max=16k,internal_page_max=16kb", /* table_config */
|
||||
DEFAULT_LSM_CONFIG /* table_config */
|
||||
"lsm_chunk_size=50MB,",
|
||||
1, /* create */
|
||||
14023954, /* rand_seed */
|
||||
500000000, /* icount 500 million */
|
||||
@@ -156,6 +176,7 @@ CONFIG large_cfg = {
|
||||
20, /* report_interval */
|
||||
600, /* read_time */
|
||||
0, /* elapsed_time */
|
||||
1, /* populate_threads */
|
||||
16, /* read_threads */
|
||||
0, /* verbose */
|
||||
0, /* stat_thread */
|
||||
@@ -169,9 +190,10 @@ const char *debug_cconfig = "verbose=[lsm]";
|
||||
const char *debug_tconfig = "";
|
||||
|
||||
/* Global values shared by threads. */
|
||||
uint64_t nops;
|
||||
int running;
|
||||
int stat_running;
|
||||
uint64_t g_nops;
|
||||
int g_running;
|
||||
int g_stat_running;
|
||||
uint32_t g_threads_quit; /* For tracking threads that exit early. */
|
||||
|
||||
void *
|
||||
read_thread(void *arg)
|
||||
@@ -181,34 +203,125 @@ read_thread(void *arg)
|
||||
WT_SESSION *session;
|
||||
WT_CURSOR *cursor;
|
||||
char *key_buf;
|
||||
int ret;
|
||||
int ret, search_ret;
|
||||
|
||||
session = NULL;
|
||||
key_buf = NULL;
|
||||
|
||||
cfg = (CONFIG *)arg;
|
||||
conn = cfg->conn;
|
||||
key_buf = calloc(cfg->key_sz, 1);
|
||||
if (key_buf == NULL)
|
||||
return (arg);
|
||||
if (key_buf == NULL) {
|
||||
ret = ENOMEM;
|
||||
goto err;
|
||||
}
|
||||
|
||||
if ((ret = conn->open_session(conn, NULL, NULL, &session)) != 0) {
|
||||
fprintf(stderr,
|
||||
"open_session failed in read thread: %d\n", ret);
|
||||
return (NULL);
|
||||
lprintf(cfg, ret, 0,
|
||||
"open_session failed in read thread");
|
||||
goto err;
|
||||
}
|
||||
if ((ret = session->open_cursor(session, cfg->uri,
|
||||
NULL, NULL, &cursor)) != 0) {
|
||||
fprintf(stderr, "open_cursor failed in read thread: %d\n", ret);
|
||||
return (NULL);
|
||||
lprintf(cfg, ret, 0,
|
||||
"open_cursor failed in read thread");
|
||||
goto err;
|
||||
}
|
||||
|
||||
while (running) {
|
||||
++nops;
|
||||
sprintf(key_buf, "%d", rand() % cfg->icount);
|
||||
while (g_running) {
|
||||
++g_nops;
|
||||
/* Get a value in range, avoid zero. */
|
||||
sprintf(key_buf, "%d", (rand() % (cfg->icount - 1)) + 1);
|
||||
cursor->set_key(cursor, key_buf);
|
||||
cursor->search(cursor);
|
||||
/* Report errors and continue. */
|
||||
if ((search_ret = cursor->search(cursor)) != 0)
|
||||
lprintf(cfg, search_ret, 0,
|
||||
"Search failed for: %s", key_buf);
|
||||
}
|
||||
|
||||
session->close(session, NULL);
|
||||
free(key_buf);
|
||||
err: if (ret != 0)
|
||||
++g_threads_quit;
|
||||
if (session != NULL)
|
||||
session->close(session, NULL);
|
||||
if (key_buf != NULL)
|
||||
free(key_buf);
|
||||
return (arg);
|
||||
}
|
||||
|
||||
/* Retrieve an ID for the next insert operation. */
|
||||
int get_next_op(uint64_t *op)
|
||||
{
|
||||
*op = ATOMIC_ADD(g_nops, 1);
|
||||
return (0);
|
||||
}
|
||||
|
||||
void *
|
||||
populate_thread(void *arg)
|
||||
{
|
||||
CONFIG *cfg;
|
||||
WT_CONNECTION *conn;
|
||||
WT_CURSOR *cursor;
|
||||
WT_SESSION *session;
|
||||
char *data_buf, *key_buf;
|
||||
int ret;
|
||||
uint64_t op;
|
||||
|
||||
cfg = (CONFIG *)arg;
|
||||
conn = cfg->conn;
|
||||
session = NULL;
|
||||
data_buf = key_buf = NULL;
|
||||
|
||||
cfg->phase = LSM_TEST_PERF_POP;
|
||||
|
||||
data_buf = calloc(cfg->data_sz, 1);
|
||||
if (data_buf == NULL) {
|
||||
lprintf(cfg, ENOMEM, 0, "Populate data buffer");
|
||||
goto err;
|
||||
}
|
||||
key_buf = calloc(cfg->key_sz, 1);
|
||||
if (key_buf == NULL) {
|
||||
lprintf(cfg, ENOMEM, 0, "Populate key buffer");
|
||||
goto err;
|
||||
}
|
||||
|
||||
/* Open a session for the current thread's work. */
|
||||
if ((ret = conn->open_session(conn, NULL, NULL, &session)) != 0) {
|
||||
lprintf(cfg, ret, 0,
|
||||
"Error opening a session on %s", cfg->home);
|
||||
goto err;
|
||||
}
|
||||
|
||||
/* Can only use bulk load single threaded. */
|
||||
if ((ret = session->open_cursor(
|
||||
session, cfg->uri, NULL,
|
||||
cfg->populate_threads == 1 ? "bulk" : "", &cursor)) != 0) {
|
||||
lprintf(cfg, ret, 0, "Error opening cursor %s", cfg->uri);
|
||||
goto err;
|
||||
}
|
||||
|
||||
memset(data_buf, 'a', cfg->data_sz - 1);
|
||||
cursor->set_value(cursor, data_buf);
|
||||
/* Populate the database. */
|
||||
while (1) {
|
||||
get_next_op(&op);
|
||||
if (op > cfg->icount)
|
||||
break;
|
||||
sprintf(key_buf, "%"PRIu64, op);
|
||||
cursor->set_key(cursor, key_buf);
|
||||
if ((ret = cursor->insert(cursor)) != 0) {
|
||||
lprintf(cfg, ret, 0, "Failed inserting");
|
||||
goto err;
|
||||
}
|
||||
}
|
||||
/* To ensure managing thread knows if we exited early. */
|
||||
err: if (ret != 0)
|
||||
++g_threads_quit;
|
||||
if (session != NULL)
|
||||
session->close(session, NULL);
|
||||
if (data_buf)
|
||||
free(data_buf);
|
||||
if (key_buf)
|
||||
free(key_buf);
|
||||
return (arg);
|
||||
}
|
||||
|
||||
@@ -226,193 +339,193 @@ stat_worker(void *arg)
|
||||
struct timeval e;
|
||||
uint64_t value;
|
||||
|
||||
session = NULL;
|
||||
cfg = (CONFIG *)arg;
|
||||
conn = cfg->conn;
|
||||
lsm_uri = NULL;
|
||||
|
||||
if ((ret = conn->open_session(conn, NULL, NULL, &session)) != 0) {
|
||||
fprintf(stderr,
|
||||
"open_session failed in read thread: %d\n", ret);
|
||||
return (NULL);
|
||||
lprintf(cfg, ret, 0,
|
||||
"open_session failed in statistics thread.");
|
||||
goto err;
|
||||
}
|
||||
|
||||
if (strncmp(cfg->uri, "lsm:", strlen("lsm:")) == 0) {
|
||||
lsm_uri = calloc(
|
||||
strlen(cfg->uri) + strlen("statistics:") + 1, 1);
|
||||
if (lsm_uri == NULL) {
|
||||
fprintf(stderr, "No memory in stat thread.\n");
|
||||
lprintf(
|
||||
cfg, ENOMEM, 0, "Statistics thread uri create.");
|
||||
goto err;
|
||||
}
|
||||
sprintf(lsm_uri, "statistics:%s", cfg->uri);
|
||||
}
|
||||
|
||||
while (stat_running) {
|
||||
while (g_stat_running) {
|
||||
sleep(cfg->report_interval);
|
||||
/* Generic header. */
|
||||
fprintf(cfg->logf, "=======================================\n");
|
||||
lprintf(cfg, 0, cfg->verbose,
|
||||
"=======================================");
|
||||
gettimeofday(&e, NULL);
|
||||
secs = e.tv_sec + e.tv_usec / 1000000.0;
|
||||
secs -= (cfg->phase_start_time.tv_sec +
|
||||
cfg->phase_start_time.tv_usec / 1000000.0);
|
||||
if (secs == 0)
|
||||
++secs;
|
||||
fprintf(cfg->logf,
|
||||
"%s completed: %" PRIu64", elapsed time: %.2f\n",
|
||||
lprintf(cfg, 0, cfg->verbose,
|
||||
"%s completed: %" PRIu64", elapsed time: %.2f",
|
||||
cfg->phase == LSM_TEST_PERF_READ ? "reads" : "inserts",
|
||||
nops, secs);
|
||||
g_nops, secs);
|
||||
/* Report LSM tree stats, if using LSM. */
|
||||
if (lsm_uri != NULL) {
|
||||
if ((ret = session->open_cursor(session, lsm_uri,
|
||||
NULL, NULL, &cursor)) != 0) {
|
||||
fprintf(stderr,
|
||||
"open_cursor LSM statistics: %d\n", ret);
|
||||
lprintf(cfg, ret, 0,
|
||||
"open_cursor failed in LSM statistics");
|
||||
goto err;
|
||||
}
|
||||
while (
|
||||
(ret = cursor->next(cursor)) == 0 &&
|
||||
(ret = cursor->get_value(
|
||||
cursor, &desc, &pvalue, &value)) == 0)
|
||||
fprintf(cfg->logf,
|
||||
"stat:lsm: %s=%s\n", desc, pvalue);
|
||||
lprintf(cfg, 0, cfg->verbose,
|
||||
"stat:lsm: %s=%s", desc, pvalue);
|
||||
cursor->close(cursor);
|
||||
fprintf(cfg->logf, "\n");
|
||||
lprintf(cfg, 0, cfg->verbose, "-----------------");
|
||||
}
|
||||
|
||||
/* Dump the connection statistics since last time. */
|
||||
if ((ret = session->open_cursor(session, "statistics:",
|
||||
NULL, "statistics_clear", &cursor)) != 0) {
|
||||
fprintf(stderr,
|
||||
"open_cursor statistics: %d\n", ret);
|
||||
lprintf(cfg, ret, 0,
|
||||
"open_cursor failed in statistics");
|
||||
goto err;
|
||||
}
|
||||
while (
|
||||
(ret = cursor->next(cursor)) == 0 &&
|
||||
(ret = cursor->get_value(
|
||||
cursor, &desc, &pvalue, &value)) == 0)
|
||||
fprintf(cfg->logf,
|
||||
"stat:conn: %s=%s\n", desc, pvalue);
|
||||
lprintf(cfg, 0, cfg->verbose,
|
||||
"stat:conn: %s=%s", desc, pvalue);
|
||||
cursor->close(cursor);
|
||||
|
||||
}
|
||||
err: session->close(session, NULL);
|
||||
err: if (session != NULL)
|
||||
session->close(session, NULL);
|
||||
if (lsm_uri != NULL)
|
||||
free(lsm_uri);
|
||||
return (arg);
|
||||
}
|
||||
|
||||
int populate(CONFIG *cfg)
|
||||
int execute_populate(CONFIG *cfg)
|
||||
{
|
||||
WT_CONNECTION *conn;
|
||||
WT_CURSOR *cursor;
|
||||
WT_SESSION *session;
|
||||
char *data_buf, *key_buf;
|
||||
pthread_t *threads;
|
||||
double secs;
|
||||
int ret;
|
||||
uint64_t elapsed, last_ops;
|
||||
struct timeval e;
|
||||
|
||||
conn = cfg->conn;
|
||||
|
||||
cfg->phase = LSM_TEST_PERF_POP;
|
||||
if (cfg->verbose > 0)
|
||||
fprintf(cfg->logf, "Starting bulk load\n");
|
||||
lprintf(cfg, 0, 1, "Starting populate threads");
|
||||
|
||||
data_buf = calloc(cfg->data_sz, 1);
|
||||
if (data_buf == NULL)
|
||||
return (ENOMEM);
|
||||
key_buf = calloc(cfg->key_sz, 1);
|
||||
if (key_buf == NULL)
|
||||
return (ENOMEM);
|
||||
|
||||
/* Open a session for the current thread's work. */
|
||||
/* First create the table. */
|
||||
if ((ret = conn->open_session(conn, NULL, NULL, &session)) != 0) {
|
||||
fprintf(stderr, "Error opening a session on %s: %s\n",
|
||||
cfg->home, wiredtiger_strerror(ret));
|
||||
lprintf(cfg, ret, 0,
|
||||
"Error opening a session on %s", cfg->home);
|
||||
return (ret);
|
||||
}
|
||||
|
||||
if ((ret = session->create(
|
||||
session, cfg->uri, cfg->table_config)) != 0) {
|
||||
fprintf(stderr, "Error creating table %s: %s\n",
|
||||
cfg->uri, wiredtiger_strerror(ret));
|
||||
lprintf(cfg, ret, 0, "Error creating table %s", cfg->uri);
|
||||
session->close(session, NULL);
|
||||
return (ret);
|
||||
}
|
||||
session->close(session, NULL);
|
||||
|
||||
if ((ret = session->open_cursor(
|
||||
session, cfg->uri, NULL, "bulk", &cursor)) != 0) {
|
||||
fprintf(stderr, "Error opening cursor %s: %s\n",
|
||||
cfg->uri, wiredtiger_strerror(ret));
|
||||
if ((ret = start_threads(
|
||||
cfg, cfg->populate_threads, &threads, populate_thread)) != 0)
|
||||
return (ret);
|
||||
}
|
||||
|
||||
memset(data_buf, 'a', cfg->data_sz - 1);
|
||||
cursor->set_value(cursor, data_buf);
|
||||
/* Populate the database. */
|
||||
gettimeofday(&cfg->phase_start_time, NULL);
|
||||
for (nops = 0; nops < cfg->icount; nops++) {
|
||||
if (cfg->verbose > 0) {
|
||||
if (nops % 1000000 == 0)
|
||||
printf(".");
|
||||
if (nops % 50000000 == 0)
|
||||
printf("\n");
|
||||
}
|
||||
sprintf(key_buf, "%"PRIu64, nops);
|
||||
cursor->set_key(cursor, key_buf);
|
||||
if ((ret = cursor->insert(cursor)) != 0) {
|
||||
fprintf(stderr, "Failed inserting with: %d\n", ret);
|
||||
return (ret);
|
||||
for (cfg->elapsed_time = 0, elapsed = last_ops = 0;
|
||||
g_nops < cfg->icount && g_threads_quit < cfg->populate_threads;) {
|
||||
/*
|
||||
* Sleep for 100th of a second, report_interval is in second
|
||||
* granularity, so adjust accordingly.
|
||||
*/
|
||||
usleep(10000);
|
||||
elapsed += 1;
|
||||
if (elapsed % 100 == 0 &&
|
||||
(elapsed / 100) % cfg->report_interval == 0) {
|
||||
lprintf(cfg, 0, 1, "%" PRIu64 " ops in %d secs",
|
||||
g_nops - last_ops, cfg->report_interval);
|
||||
last_ops = g_nops;
|
||||
}
|
||||
}
|
||||
if (g_threads_quit == cfg->populate_threads) {
|
||||
lprintf(cfg, WT_ERROR, 0,
|
||||
"Populate threads exited without finishing.");
|
||||
return (WT_ERROR);
|
||||
}
|
||||
gettimeofday(&e, NULL);
|
||||
cursor->close(cursor);
|
||||
session->close(session, NULL);
|
||||
if (cfg->verbose > 0) {
|
||||
fprintf(cfg->logf,
|
||||
"Finished bulk load of %d items\n", cfg->icount);
|
||||
secs = e.tv_sec + e.tv_usec / 1000000.0;
|
||||
secs -= (cfg->phase_start_time.tv_sec +
|
||||
cfg->phase_start_time.tv_usec / 1000000.0);
|
||||
if (secs == 0)
|
||||
++secs;
|
||||
fprintf(cfg->logf,
|
||||
"Load time: %.2f\n" "load ops/sec: %.2f\n",
|
||||
secs, cfg->icount / secs);
|
||||
}
|
||||
|
||||
free(data_buf);
|
||||
free(key_buf);
|
||||
return (ret);
|
||||
if ((ret = stop_threads(cfg, cfg->populate_threads, threads)) != 0)
|
||||
return (ret);
|
||||
|
||||
lprintf(cfg, 0, 1,
|
||||
"Finished bulk load of %d items", cfg->icount);
|
||||
secs = e.tv_sec + e.tv_usec / 1000000.0;
|
||||
secs -= (cfg->phase_start_time.tv_sec +
|
||||
cfg->phase_start_time.tv_usec / 1000000.0);
|
||||
if (secs == 0)
|
||||
++secs;
|
||||
lprintf(cfg, 0, 1,
|
||||
"Load time: %.2f\n" "load ops/sec: %.2f",
|
||||
secs, cfg->icount / secs);
|
||||
|
||||
return (0);
|
||||
}
|
||||
|
||||
/* Setup the logging output mechanism. */
|
||||
int setup_log_file(CONFIG *cfg)
|
||||
int execute_reads(CONFIG *cfg)
|
||||
{
|
||||
char *fname;
|
||||
int offset;
|
||||
pthread_t *threads;
|
||||
int ret;
|
||||
uint64_t last_ops;
|
||||
|
||||
if (cfg->verbose < 1 && cfg->stat_thread == 0)
|
||||
return (0);
|
||||
cfg->phase = LSM_TEST_PERF_READ;
|
||||
lprintf(cfg, 0, 1, "Starting read threads");
|
||||
|
||||
if ((fname = calloc(strlen(cfg->home) +
|
||||
strlen(cfg->uri) + strlen(".stat") + 1, 1)) == NULL) {
|
||||
fprintf(stderr, "No memory in stat thread\n");
|
||||
return (ENOMEM);
|
||||
if ((ret = start_threads(
|
||||
cfg, cfg->read_threads, &threads, read_thread)) != 0)
|
||||
return (ret);
|
||||
|
||||
/* Sanity check reporting interval. */
|
||||
if (cfg->report_interval > cfg->read_time)
|
||||
cfg->report_interval = cfg->read_time;
|
||||
|
||||
gettimeofday(&cfg->phase_start_time, NULL);
|
||||
for (cfg->elapsed_time = 0, last_ops = 0;
|
||||
cfg->elapsed_time < cfg->read_time &&
|
||||
g_threads_quit < cfg->read_threads;
|
||||
cfg->elapsed_time += cfg->report_interval) {
|
||||
sleep(cfg->report_interval);
|
||||
lprintf(cfg, 0, 1, "%" PRIu64 " ops in %d secs",
|
||||
g_nops - last_ops, cfg->report_interval);
|
||||
last_ops = g_nops;
|
||||
}
|
||||
for (offset = 0;
|
||||
cfg->uri[offset] != 0 && cfg->uri[offset] != ':';
|
||||
offset++) {}
|
||||
if (cfg->uri[offset] == 0)
|
||||
offset = 0;
|
||||
else
|
||||
++offset;
|
||||
sprintf(fname, "%s/%s.stat", cfg->home, cfg->uri + offset);
|
||||
if ((cfg->logf = fopen(fname, "w")) == NULL) {
|
||||
fprintf(stderr, "Statistics failed to open log file.\n");
|
||||
return (EINVAL);
|
||||
if (g_threads_quit == cfg->populate_threads) {
|
||||
lprintf(cfg, WT_ERROR, 0,
|
||||
"Populate threads exited without finishing.");
|
||||
return (WT_ERROR);
|
||||
}
|
||||
/* Use line buffering for the log file. */
|
||||
(void)setvbuf(cfg->logf, NULL, _IOLBF, 0);
|
||||
if (fname != NULL)
|
||||
free(fname);
|
||||
|
||||
if ((ret = stop_threads(cfg, cfg->read_threads, threads)) != 0)
|
||||
return (ret);
|
||||
|
||||
return (0);
|
||||
}
|
||||
|
||||
@@ -421,7 +534,7 @@ int main(int argc, char **argv)
|
||||
CONFIG cfg;
|
||||
WT_CONNECTION *conn;
|
||||
const char *user_cconfig, *user_tconfig;
|
||||
const char *opts = "C:R:T:d:eh:i:k:lr:s:u:v:SML";
|
||||
const char *opts = "C:P:R:T:d:eh:i:k:lr:s:u:v:SML";
|
||||
char *cc_buf, *tc_buf;
|
||||
int ch, ret, stat_created;
|
||||
pthread_t stat;
|
||||
@@ -468,7 +581,7 @@ int main(int argc, char **argv)
|
||||
cfg.home = optarg;
|
||||
break;
|
||||
case 'i':
|
||||
cfg.icount = atoi(optarg) * 1000;
|
||||
cfg.icount = atoi(optarg);
|
||||
break;
|
||||
case 'k':
|
||||
cfg.key_sz = atoi(optarg);
|
||||
@@ -491,6 +604,9 @@ int main(int argc, char **argv)
|
||||
case 'C':
|
||||
user_cconfig = optarg;
|
||||
break;
|
||||
case 'P':
|
||||
cfg.populate_threads = atoi(optarg);
|
||||
break;
|
||||
case 'R':
|
||||
cfg.read_threads = atoi(optarg);
|
||||
break;
|
||||
@@ -556,46 +672,43 @@ int main(int argc, char **argv)
|
||||
/* Open a connection to the database, creating it if necessary. */
|
||||
if ((ret = wiredtiger_open(
|
||||
cfg.home, NULL, cfg.conn_config, &conn)) != 0) {
|
||||
fprintf(stderr, "Error connecting to %s: %s\n",
|
||||
cfg.home, wiredtiger_strerror(ret));
|
||||
lprintf(&cfg, ret, 0, "Error connecting to %s", cfg.home);
|
||||
goto err;
|
||||
}
|
||||
|
||||
cfg.conn = conn;
|
||||
|
||||
if (cfg.stat_thread) {
|
||||
stat_running = 1;
|
||||
g_stat_running = 1;
|
||||
if ((ret = pthread_create(
|
||||
&stat, NULL, stat_worker, &cfg)) != 0) {
|
||||
fprintf(stderr, "Error creating statistics thread.\n");
|
||||
lprintf(&cfg, ret, 0,
|
||||
"Error creating statistics thread.");
|
||||
goto err;
|
||||
}
|
||||
stat_created = 1;
|
||||
}
|
||||
if (cfg.create != 0 && (ret = populate(&cfg)) != 0)
|
||||
if (cfg.create != 0 && execute_populate(&cfg) != 0)
|
||||
goto err;
|
||||
|
||||
if (cfg.read_time != 0 && cfg.read_threads != 0)
|
||||
if ((ret = execute_reads(&cfg)) != 0)
|
||||
if (cfg.read_time != 0 && cfg.read_threads != 0 &&
|
||||
(ret = execute_reads(&cfg)) != 0)
|
||||
goto err;
|
||||
|
||||
if (cfg.verbose > 0) {
|
||||
fprintf(cfg.logf,
|
||||
"Ran performance test example with %d threads for %d seconds.\n",
|
||||
cfg.read_threads, cfg.read_time);
|
||||
fprintf(cfg.logf,
|
||||
"Executed %" PRIu64 " read operations\n", nops);
|
||||
}
|
||||
lprintf(&cfg, 0, 1,
|
||||
"Ran performance test example with %d threads for %d seconds.",
|
||||
cfg.read_threads, cfg.read_time);
|
||||
lprintf(&cfg, 0, 1,
|
||||
"Executed %" PRIu64 " read operations", g_nops);
|
||||
|
||||
if (cfg.stat_thread)
|
||||
stat_running = 0;
|
||||
err: if (cfg.stat_thread)
|
||||
g_stat_running = 0;
|
||||
|
||||
/* Cleanup. */
|
||||
err: if (stat_created != 0 && (ret = pthread_join(stat, NULL)) != 0)
|
||||
fprintf(stderr, "Error joining stat thread: %d.\n", ret);
|
||||
if (stat_created != 0 && (ret = pthread_join(stat, NULL)) != 0)
|
||||
lprintf(&cfg, ret, 0, "Error joining stat thread.");
|
||||
if (conn != NULL && (ret = conn->close(conn, NULL)) != 0)
|
||||
fprintf(stderr, "Error connecting to %s: %s\n",
|
||||
cfg.home, wiredtiger_strerror(ret));
|
||||
lprintf(&cfg, ret, 0,
|
||||
"Error closing connection to %s", cfg.home);
|
||||
if (cc_buf != NULL)
|
||||
free(cc_buf);
|
||||
if (tc_buf != NULL)
|
||||
@@ -608,60 +721,119 @@ err: if (stat_created != 0 && (ret = pthread_join(stat, NULL)) != 0)
|
||||
return (ret);
|
||||
}
|
||||
|
||||
int execute_reads(CONFIG *cfg)
|
||||
/*
|
||||
* Following are utility functions.
|
||||
*/
|
||||
int
|
||||
start_threads(CONFIG *cfg, int num, pthread_t **threadsp, void *(*func)(void *))
|
||||
{
|
||||
pthread_t *read_threads;
|
||||
int ret;
|
||||
uint32_t i;
|
||||
uint64_t last_ops;
|
||||
pthread_t *threads;
|
||||
int i, ret;
|
||||
|
||||
cfg->phase = LSM_TEST_PERF_READ;
|
||||
if (cfg->verbose > 0)
|
||||
fprintf(cfg->logf, "Starting read threads\n");
|
||||
|
||||
running = 1;
|
||||
nops = 0;
|
||||
|
||||
read_threads = calloc(cfg->read_threads, sizeof(pthread_t *));
|
||||
if (read_threads == NULL)
|
||||
g_running = 1;
|
||||
g_nops = 0;
|
||||
g_threads_quit = 0;
|
||||
threads = calloc(num, sizeof(pthread_t *));
|
||||
if (threads == NULL)
|
||||
return (ENOMEM);
|
||||
for (i = 0; i < cfg->read_threads; i++) {
|
||||
for (i = 0; i < num; i++) {
|
||||
if ((ret = pthread_create(
|
||||
&read_threads[i], NULL, read_thread, cfg)) != 0) {
|
||||
fprintf(stderr, "Error creating thread: %d\n", i);
|
||||
&threads[i], NULL, func, cfg)) != 0) {
|
||||
g_running = 0;
|
||||
lprintf(cfg, ret, 0, "Error creating thread: %d", i);
|
||||
return (ret);
|
||||
}
|
||||
}
|
||||
*threadsp = threads;
|
||||
return (0);
|
||||
}
|
||||
|
||||
int
|
||||
stop_threads(CONFIG *cfg, int num, pthread_t *threads)
|
||||
{
|
||||
int i, ret;
|
||||
g_running = 0;
|
||||
|
||||
for (i = 0; i < num; i++) {
|
||||
if ((ret = pthread_join(threads[i], NULL)) != 0) {
|
||||
lprintf(cfg, ret, 0, "Error joining thread %d", i);
|
||||
return (ret);
|
||||
}
|
||||
}
|
||||
|
||||
/* Sanity check reporting interval. */
|
||||
if (cfg->report_interval > cfg->read_time)
|
||||
cfg->report_interval = cfg->read_time;
|
||||
free(threads);
|
||||
return (0);
|
||||
}
|
||||
|
||||
gettimeofday(&cfg->phase_start_time, NULL);
|
||||
for (cfg->elapsed_time = 0, last_ops = 0;
|
||||
cfg->elapsed_time < cfg->read_time;
|
||||
cfg->elapsed_time += cfg->report_interval) {
|
||||
sleep(cfg->report_interval);
|
||||
if (cfg->verbose > 0) {
|
||||
fprintf(cfg->logf, "%" PRIu64 " ops in %d secs\n",
|
||||
nops - last_ops, cfg->report_interval);
|
||||
printf("%" PRIu64 " ops in %d secs\n",
|
||||
nops - last_ops, cfg->report_interval);
|
||||
}
|
||||
last_ops = nops;
|
||||
}
|
||||
running = 0;
|
||||
/*
|
||||
* Log printf - output a log message.
|
||||
*/
|
||||
int lprintf(CONFIG *cfg, int err, uint32_t level, const char *fmt, ...)
|
||||
{
|
||||
va_list ap;
|
||||
|
||||
for (i = 0; i < cfg->read_threads; i++) {
|
||||
if ((ret = pthread_join(read_threads[i], NULL)) != 0) {
|
||||
fprintf(stderr, "Error joining thread %d\n", i);
|
||||
return (ret);
|
||||
if (err == 0 && level <= cfg->verbose) {
|
||||
va_start(ap, fmt);
|
||||
vfprintf(cfg->logf, fmt, ap);
|
||||
va_end(ap);
|
||||
fprintf(cfg->logf, "\n");
|
||||
|
||||
if (level < cfg->verbose) {
|
||||
va_start(ap, fmt);
|
||||
vprintf(fmt, ap);
|
||||
va_end(ap);
|
||||
printf("\n");
|
||||
}
|
||||
}
|
||||
if (err == 0)
|
||||
return (0);
|
||||
|
||||
if (read_threads != NULL)
|
||||
free(read_threads);
|
||||
return (ret);
|
||||
/* We are dealing with an error. */
|
||||
va_start(ap, fmt);
|
||||
vfprintf(cfg->logf, fmt, ap);
|
||||
va_end(ap);
|
||||
fprintf(cfg->logf, " Error: %s\n", wiredtiger_strerror(err));
|
||||
if (cfg->logf != stderr) {
|
||||
va_start(ap, fmt);
|
||||
vfprintf(stderr, fmt, ap);
|
||||
va_end(ap);
|
||||
fprintf(stderr, " Error: %s\n", wiredtiger_strerror(err));
|
||||
}
|
||||
|
||||
return (0);
|
||||
}
|
||||
|
||||
/* Setup the logging output mechanism. */
|
||||
int setup_log_file(CONFIG *cfg)
|
||||
{
|
||||
char *fname;
|
||||
int offset;
|
||||
|
||||
if (cfg->verbose < 1 && cfg->stat_thread == 0)
|
||||
return (0);
|
||||
|
||||
if ((fname = calloc(strlen(cfg->home) +
|
||||
strlen(cfg->uri) + strlen(".stat") + 1, 1)) == NULL) {
|
||||
fprintf(stderr, "No memory in stat thread\n");
|
||||
return (ENOMEM);
|
||||
}
|
||||
for (offset = 0;
|
||||
cfg->uri[offset] != 0 && cfg->uri[offset] != ':';
|
||||
offset++) {}
|
||||
if (cfg->uri[offset] == 0)
|
||||
offset = 0;
|
||||
else
|
||||
++offset;
|
||||
sprintf(fname, "%s/%s.stat", cfg->home, cfg->uri + offset);
|
||||
if ((cfg->logf = fopen(fname, "w")) == NULL) {
|
||||
fprintf(stderr, "Statistics failed to open log file.\n");
|
||||
return (EINVAL);
|
||||
}
|
||||
/* Use line buffering for the log file. */
|
||||
(void)setvbuf(cfg->logf, NULL, _IOLBF, 0);
|
||||
if (fname != NULL)
|
||||
free(fname);
|
||||
return (0);
|
||||
}
|
||||
|
||||
void print_config(CONFIG *cfg)
|
||||
@@ -673,8 +845,11 @@ void print_config(CONFIG *cfg)
|
||||
printf("\t Table configuration: %s\n", cfg->table_config);
|
||||
printf("\t %s\n", cfg->create ? "Creating" : "Using existing");
|
||||
printf("\t Random seed: %d\n", cfg->rand_seed);
|
||||
if (cfg->create)
|
||||
if (cfg->create) {
|
||||
printf("\t Insert count: %d\n", cfg->icount);
|
||||
printf("\t Number populate threads: %d\n",
|
||||
cfg->populate_threads);
|
||||
}
|
||||
printf("\t key size: %d data size: %d\n", cfg->key_sz, cfg->data_sz);
|
||||
printf("\t Reporting interval: %d\n", cfg->report_interval);
|
||||
printf("\t Read workload period: %d\n", cfg->read_time);
|
||||
@@ -684,17 +859,18 @@ void print_config(CONFIG *cfg)
|
||||
|
||||
void usage(void)
|
||||
{
|
||||
printf("ex_perf_test [-CDLMRSTdehikrsuv]\n");
|
||||
printf("ex_perf_test [-CLMPRSTdehikrsuv]\n");
|
||||
printf("\t-S Use a small default configuration\n");
|
||||
printf("\t-M Use a medium default configuration\n");
|
||||
printf("\t-L Use a large default configuration\n");
|
||||
printf("\t-C <string> additional connection configuration\n");
|
||||
printf("\t-P <int> number of populate threads\n");
|
||||
printf("\t-R <int> number of read threads\n");
|
||||
printf("\t-T <string> additional table configuration\n");
|
||||
printf("\t-d <int> data item size\n");
|
||||
printf("\t-e use existing database (skip population phase)\n");
|
||||
printf("\t-h <string> Wired Tiger home must exist, default WT_TEST \n");
|
||||
printf("\t-i <int> number of records to insert in thousands\n");
|
||||
printf("\t-i <int> number of records to insert\n");
|
||||
printf("\t-k <int> key item size\n");
|
||||
printf("\t-r <int> number of seconds to run read phase\n");
|
||||
printf("\t-s <int> seed for random number generator\n");
|
||||
|
||||
@@ -13,7 +13,7 @@ To ask questions or discuss issues related to using WiredTiger, visit our
|
||||
|
||||
View the documentation online:
|
||||
|
||||
- <a href="1.3.6/index.html"><b>WiredTiger 1.3.6 (current release)</b></a>
|
||||
- <a href="1.3.7/index.html"><b>WiredTiger 1.3.7 (current release)</b></a>
|
||||
- <a href="1.2.2/index.html"><b>WiredTiger 1.2.2</b></a>
|
||||
- <a href="1.1.5/index.html"><b>WiredTiger 1.1.5</b></a>
|
||||
|
||||
|
||||
@@ -740,10 +740,9 @@ extern int __wt_lsm_tree_chunk_name( WT_SESSION_IMPL *session,
|
||||
WT_LSM_TREE *lsm_tree,
|
||||
uint32_t id,
|
||||
WT_ITEM *buf);
|
||||
extern int __wt_lsm_tree_setup_chunk(WT_SESSION_IMPL *session,
|
||||
extern int __wt_lsm_tree_setup_chunk( WT_SESSION_IMPL *session,
|
||||
WT_LSM_TREE *lsm_tree,
|
||||
WT_LSM_CHUNK *chunk,
|
||||
int create_bloom);
|
||||
WT_LSM_CHUNK *chunk);
|
||||
extern int __wt_lsm_tree_create(WT_SESSION_IMPL *session,
|
||||
const char *uri,
|
||||
int exclusive,
|
||||
@@ -772,7 +771,8 @@ extern int __wt_lsm_tree_worker(WT_SESSION_IMPL *session,
|
||||
const char *[]),
|
||||
const char *cfg[],
|
||||
uint32_t open_flags);
|
||||
extern void *__wt_lsm_worker(void *vargs);
|
||||
extern void *__wt_lsm_merge_worker(void *vargs);
|
||||
extern void *__wt_lsm_bloom_worker(void *arg);
|
||||
extern void *__wt_lsm_checkpoint_worker(void *arg);
|
||||
extern int __wt_lsm_copy_chunks(WT_SESSION_IMPL *session,
|
||||
WT_LSM_TREE *lsm_tree,
|
||||
|
||||
@@ -92,6 +92,9 @@ struct __wt_lsm_tree {
|
||||
WT_SESSION_IMPL *ckpt_session; /* For checkpoint worker */
|
||||
pthread_t ckpt_tid; /* LSM checkpoint worker thread */
|
||||
|
||||
WT_SESSION_IMPL *bloom_session; /* For bloom worker */
|
||||
pthread_t bloom_tid; /* LSM bloom worker thread */
|
||||
|
||||
WT_LSM_CHUNK **chunk; /* Array of active LSM chunks */
|
||||
size_t chunk_alloc; /* Space allocated for chunks */
|
||||
int nchunks; /* Number of active chunks */
|
||||
|
||||
@@ -1162,9 +1162,9 @@ struct __wt_connection {
|
||||
* handler is installed that writes error messages to stderr
|
||||
* @configstart{wiredtiger_open, see dist/api_data.py}
|
||||
* @config{buffer_alignment, in-memory alignment (in bytes) for buffers used for
|
||||
* I/O. By default\, a platform-specific alignment value is used (512 bytes on
|
||||
* Linux systems\, zero elsewhere).,an integer between -1 and 1MB; default \c
|
||||
* -1.}
|
||||
* I/O. The default value of -1 indicates that a platform-specific alignment
|
||||
* value should be used (512 bytes on Linux systems\, zero elsewhere).,an
|
||||
* integer between -1 and 1MB; default \c -1.}
|
||||
* @config{cache_size, maximum heap memory to allocate for the cache.,an integer
|
||||
* between 1MB and 10TB; default \c 100MB.}
|
||||
* @config{create, create the database if it does not exist.,a boolean flag;
|
||||
|
||||
@@ -70,7 +70,7 @@ __wt_lsm_merge(
|
||||
WT_CURSOR *src, *dest;
|
||||
WT_DECL_ITEM(bbuf);
|
||||
WT_DECL_RET;
|
||||
WT_ITEM key, value;
|
||||
WT_ITEM buf, key, value;
|
||||
WT_LSM_CHUNK *chunk;
|
||||
const char *cur_cfg[] =
|
||||
API_CONF_DEFAULTS(session, open_cursor, "bulk,raw");
|
||||
@@ -208,13 +208,19 @@ __wt_lsm_merge(
|
||||
WT_ERR(__wt_clsm_init_merge(src, start_chunk, start_id, nchunks));
|
||||
|
||||
WT_WITH_SCHEMA_LOCK(session, ret = __wt_lsm_tree_setup_chunk(
|
||||
session, lsm_tree, chunk, create_bloom));
|
||||
session, lsm_tree, chunk));
|
||||
WT_ERR(ret);
|
||||
if (create_bloom)
|
||||
if (create_bloom) {
|
||||
WT_CLEAR(buf);
|
||||
WT_ERR(__wt_lsm_tree_bloom_name(
|
||||
session, lsm_tree, chunk->id, &buf));
|
||||
chunk->bloom_uri = __wt_buf_steal(session, &buf, NULL);
|
||||
|
||||
WT_ERR(__wt_bloom_create(session, chunk->bloom_uri,
|
||||
lsm_tree->bloom_config,
|
||||
record_count, lsm_tree->bloom_bit_count,
|
||||
lsm_tree->bloom_hash_count, &bloom));
|
||||
}
|
||||
|
||||
WT_ERR(__wt_open_cursor(session, chunk->uri, NULL, cur_cfg, &dest));
|
||||
|
||||
|
||||
@@ -78,6 +78,8 @@ __lsm_tree_close(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree)
|
||||
WT_TRET(__wt_thread_join(
|
||||
lsm_tree->worker_tids[i]));
|
||||
WT_TRET(__wt_thread_join(lsm_tree->ckpt_tid));
|
||||
if (FLD_ISSET(lsm_tree->bloom, WT_LSM_BLOOM_NEWEST))
|
||||
WT_TRET(__wt_thread_join(lsm_tree->bloom_tid));
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -101,6 +103,21 @@ __lsm_tree_close(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree)
|
||||
*/
|
||||
__wt_free(NULL, s->hazard);
|
||||
}
|
||||
|
||||
if (lsm_tree->bloom_session != NULL) {
|
||||
F_SET(lsm_tree->bloom_session,
|
||||
F_ISSET(session, WT_SESSION_SCHEMA_LOCKED));
|
||||
|
||||
wt_session = &lsm_tree->bloom_session->iface;
|
||||
WT_TRET(wt_session->close(wt_session, NULL));
|
||||
|
||||
/*
|
||||
* This is safe after the close because session handles are
|
||||
* not freed, but are managed by the connection.
|
||||
*/
|
||||
__wt_free(NULL, lsm_tree->bloom_session->hazard);
|
||||
}
|
||||
|
||||
if (lsm_tree->ckpt_session != NULL) {
|
||||
F_SET(lsm_tree->ckpt_session,
|
||||
F_ISSET(session, WT_SESSION_SCHEMA_LOCKED));
|
||||
@@ -167,8 +184,8 @@ __wt_lsm_tree_chunk_name(
|
||||
* Initialize a chunk of an LSM tree.
|
||||
*/
|
||||
int
|
||||
__wt_lsm_tree_setup_chunk(WT_SESSION_IMPL *session,
|
||||
WT_LSM_TREE *lsm_tree, WT_LSM_CHUNK *chunk, int create_bloom)
|
||||
__wt_lsm_tree_setup_chunk(
|
||||
WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree, WT_LSM_CHUNK *chunk)
|
||||
{
|
||||
WT_DECL_RET;
|
||||
WT_ITEM buf;
|
||||
@@ -180,7 +197,7 @@ __wt_lsm_tree_setup_chunk(WT_SESSION_IMPL *session,
|
||||
|
||||
/*
|
||||
* Drop the chunk first - there may be some content hanging over from
|
||||
* an aborted merge.
|
||||
* an aborted merge or checkpoint.
|
||||
*
|
||||
* Don't do this for the very first chunk: we are called during
|
||||
* WT_SESSION::create, and doing a drop inside there does interesting
|
||||
@@ -190,11 +207,6 @@ __wt_lsm_tree_setup_chunk(WT_SESSION_IMPL *session,
|
||||
if (chunk->id > 1)
|
||||
WT_ERR(__wt_schema_drop(session, chunk->uri, cfg));
|
||||
WT_ERR(__wt_schema_create(session, chunk->uri, lsm_tree->file_config));
|
||||
if (create_bloom) {
|
||||
WT_ERR(__wt_lsm_tree_bloom_name(
|
||||
session, lsm_tree, chunk->id, &buf));
|
||||
chunk->bloom_uri = __wt_buf_steal(session, &buf, NULL);
|
||||
}
|
||||
|
||||
err: return (ret);
|
||||
}
|
||||
@@ -232,9 +244,17 @@ __lsm_tree_start_worker(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree)
|
||||
WT_RET(__wt_calloc_def(session, 1, &wargs));
|
||||
wargs->lsm_tree = lsm_tree;
|
||||
wargs->id = i;
|
||||
WT_RET(__wt_thread_create(
|
||||
&lsm_tree->worker_tids[i], __wt_lsm_worker, wargs));
|
||||
WT_RET(__wt_thread_create(&lsm_tree->worker_tids[i],
|
||||
__wt_lsm_merge_worker, wargs));
|
||||
}
|
||||
if (FLD_ISSET(lsm_tree->bloom, WT_LSM_BLOOM_NEWEST)) {
|
||||
WT_RET(wt_conn->open_session(wt_conn, NULL, NULL, &wt_session));
|
||||
lsm_tree->bloom_session = (WT_SESSION_IMPL *)wt_session;
|
||||
F_SET(lsm_tree->bloom_session, WT_SESSION_INTERNAL);
|
||||
|
||||
WT_RET(__wt_thread_create(
|
||||
&lsm_tree->bloom_tid, __wt_lsm_bloom_worker, lsm_tree));
|
||||
}
|
||||
WT_RET(__wt_thread_create(
|
||||
&lsm_tree->ckpt_tid, __wt_lsm_checkpoint_worker, lsm_tree));
|
||||
|
||||
@@ -506,8 +526,7 @@ __wt_lsm_tree_switch(
|
||||
WT_ERR(__wt_calloc_def(session, 1, &chunk));
|
||||
chunk->id = new_id;
|
||||
lsm_tree->chunk[lsm_tree->nchunks++] = chunk;
|
||||
WT_ERR(__wt_lsm_tree_setup_chunk(session, lsm_tree, chunk,
|
||||
FLD_ISSET(lsm_tree->bloom, WT_LSM_BLOOM_NEWEST) ? 1 : 0));
|
||||
WT_ERR(__wt_lsm_tree_setup_chunk(session, lsm_tree, chunk));
|
||||
|
||||
++lsm_tree->dsk_gen;
|
||||
WT_ERR(__wt_lsm_meta_write(session, lsm_tree));
|
||||
@@ -661,7 +680,7 @@ __wt_lsm_tree_truncate(
|
||||
/* Create the new chunk. */
|
||||
WT_ERR(__wt_calloc_def(session, 1, &chunk));
|
||||
chunk->id = WT_ATOMIC_ADD(lsm_tree->last, 1);
|
||||
WT_ERR(__wt_lsm_tree_setup_chunk(session, lsm_tree, chunk, 0));
|
||||
WT_ERR(__wt_lsm_tree_setup_chunk(session, lsm_tree, chunk));
|
||||
|
||||
/* Mark all chunks old. */
|
||||
WT_ERR(__wt_lsm_merge_update_tree(
|
||||
|
||||
@@ -12,12 +12,12 @@ static int __lsm_bloom_create(
|
||||
static int __lsm_free_chunks(WT_SESSION_IMPL *, WT_LSM_TREE *);
|
||||
|
||||
/*
|
||||
* __wt_lsm_worker --
|
||||
* The worker thread for an LSM tree, responsible for writing in-memory
|
||||
* trees to disk and merging on-disk trees.
|
||||
* __wt_lsm_merge_worker --
|
||||
* The merge worker thread for an LSM tree, responsible for merging
|
||||
* on-disk trees.
|
||||
*/
|
||||
void *
|
||||
__wt_lsm_worker(void *vargs)
|
||||
__wt_lsm_merge_worker(void *vargs)
|
||||
{
|
||||
WT_LSM_WORKER_ARGS *args;
|
||||
WT_LSM_TREE *lsm_tree;
|
||||
@@ -64,10 +64,80 @@ __wt_lsm_worker(void *vargs)
|
||||
return (NULL);
|
||||
}
|
||||
|
||||
/*
|
||||
* __wt_lsm_bloom_worker --
|
||||
* A worker thread for an LSM tree, responsible for creating Bloom filters
|
||||
* for the newest on-disk chunks.
|
||||
*/
|
||||
void *
|
||||
__wt_lsm_bloom_worker(void *arg)
|
||||
{
|
||||
WT_DECL_RET;
|
||||
WT_LSM_CHUNK *chunk;
|
||||
WT_LSM_TREE *lsm_tree;
|
||||
WT_LSM_WORKER_COOKIE cookie;
|
||||
WT_SESSION_IMPL *session;
|
||||
int i, j;
|
||||
|
||||
lsm_tree = arg;
|
||||
session = lsm_tree->bloom_session;
|
||||
|
||||
WT_CLEAR(cookie);
|
||||
|
||||
for (;;) {
|
||||
WT_ERR(__wt_lsm_copy_chunks(session, lsm_tree, &cookie));
|
||||
|
||||
/* Write checkpoints in all completed files. */
|
||||
for (i = 0, j = 0; i < cookie.nchunks; i++) {
|
||||
if (!F_ISSET(lsm_tree, WT_LSM_TREE_WORKING))
|
||||
goto err;
|
||||
|
||||
chunk = cookie.chunk_array[i];
|
||||
/* Stop if a thread is still active in the chunk. */
|
||||
if (chunk->ncursor != 0 ||
|
||||
!F_ISSET(chunk, WT_LSM_CHUNK_ONDISK))
|
||||
break;
|
||||
|
||||
if (F_ISSET(chunk, WT_LSM_CHUNK_BLOOM) ||
|
||||
F_ISSET(chunk, WT_LSM_CHUNK_MERGING) ||
|
||||
chunk->generation > 0 ||
|
||||
chunk->count == 0)
|
||||
continue;
|
||||
|
||||
if ((ret = __lsm_bloom_create(
|
||||
session, lsm_tree, chunk)) != 0) {
|
||||
(void)__wt_err(
|
||||
session, ret, "bloom creation failed");
|
||||
break;
|
||||
}
|
||||
|
||||
++j;
|
||||
__wt_spin_lock(session, &lsm_tree->lock);
|
||||
++lsm_tree->dsk_gen;
|
||||
ret = __wt_lsm_meta_write(session, lsm_tree);
|
||||
__wt_spin_unlock(session, &lsm_tree->lock);
|
||||
|
||||
if (ret != 0) {
|
||||
(void)__wt_err(session, ret,
|
||||
"LSM bloom worker metadata write failed");
|
||||
break;
|
||||
}
|
||||
|
||||
WT_VERBOSE_ERR(session, lsm,
|
||||
"LSM worker created bloom filter for %d.", i);
|
||||
}
|
||||
if (j == 0)
|
||||
__wt_sleep(0, 100000);
|
||||
}
|
||||
|
||||
err: __wt_free(session, cookie.chunk_array);
|
||||
return (NULL);
|
||||
}
|
||||
|
||||
/*
|
||||
* __wt_lsm_checkpoint_worker --
|
||||
* A worker thread for an LSM tree, responsible for checkpointing chunks
|
||||
* once they become read only.
|
||||
* A worker thread for an LSM tree, responsible for flushing new chunks to
|
||||
* disk.
|
||||
*/
|
||||
void *
|
||||
__wt_lsm_checkpoint_worker(void *arg)
|
||||
@@ -85,26 +155,19 @@ __wt_lsm_checkpoint_worker(void *arg)
|
||||
|
||||
WT_CLEAR(cookie);
|
||||
|
||||
while (F_ISSET(lsm_tree, WT_LSM_TREE_WORKING)) {
|
||||
for (;;) {
|
||||
WT_ERR(__wt_lsm_copy_chunks(session, lsm_tree, &cookie));
|
||||
|
||||
/* Write checkpoints in all completed files. */
|
||||
for (i = 0, j = 0; i < cookie.nchunks; i++) {
|
||||
for (i = 0, j = 0; i < cookie.nchunks - 1; i++) {
|
||||
if (!F_ISSET(lsm_tree, WT_LSM_TREE_WORKING))
|
||||
goto err;
|
||||
|
||||
chunk = cookie.chunk_array[i];
|
||||
/* Stop if a thread is still active in the chunk. */
|
||||
if (chunk->ncursor != 0 ||
|
||||
(i == cookie.nchunks - 1 &&
|
||||
!F_ISSET(chunk, WT_LSM_CHUNK_ONDISK)))
|
||||
if (chunk->ncursor != 0)
|
||||
break;
|
||||
|
||||
if (!F_ISSET(chunk, WT_LSM_CHUNK_BLOOM) &&
|
||||
(ret = __lsm_bloom_create(
|
||||
session, lsm_tree, chunk)) != 0) {
|
||||
(void)__wt_err(
|
||||
session, ret, "bloom creation failed");
|
||||
break;
|
||||
}
|
||||
|
||||
if (F_ISSET(chunk, WT_LSM_CHUNK_ONDISK))
|
||||
continue;
|
||||
|
||||
@@ -113,27 +176,36 @@ __wt_lsm_checkpoint_worker(void *arg)
|
||||
* __wt_checkpoint thinks we're closing the file.
|
||||
*/
|
||||
WT_WITH_SCHEMA_LOCK(session,
|
||||
ret = __wt_schema_worker(session, chunk->uri,
|
||||
__wt_checkpoint, cfg, 0));
|
||||
if (ret == 0) {
|
||||
++j;
|
||||
__wt_spin_lock(session, &lsm_tree->lock);
|
||||
F_SET(chunk, WT_LSM_CHUNK_ONDISK);
|
||||
++lsm_tree->dsk_gen;
|
||||
__wt_spin_unlock(session, &lsm_tree->lock);
|
||||
WT_VERBOSE_ERR(session, lsm,
|
||||
"LSM worker checkpointed %d.", i);
|
||||
} else {
|
||||
(void)__wt_err(
|
||||
session, ret, "LSM checkpoint failed");
|
||||
ret = __wt_schema_worker(session,
|
||||
chunk->uri, __wt_checkpoint, cfg, 0));
|
||||
|
||||
if (ret != 0) {
|
||||
(void)__wt_err(session, ret,
|
||||
"LSM checkpoint failed");
|
||||
break;
|
||||
}
|
||||
|
||||
++j;
|
||||
__wt_spin_lock(session, &lsm_tree->lock);
|
||||
F_SET(chunk, WT_LSM_CHUNK_ONDISK);
|
||||
++lsm_tree->dsk_gen;
|
||||
ret = __wt_lsm_meta_write(session, lsm_tree);
|
||||
__wt_spin_unlock(session, &lsm_tree->lock);
|
||||
|
||||
if (ret != 0) {
|
||||
(void)__wt_err(session, ret,
|
||||
"LSM checkpoint metadata write failed");
|
||||
break;
|
||||
}
|
||||
|
||||
WT_VERBOSE_ERR(session, lsm,
|
||||
"LSM worker checkpointed %d.", i);
|
||||
}
|
||||
if (j == 0)
|
||||
__wt_sleep(0, 1000);
|
||||
__wt_sleep(0, 10000);
|
||||
}
|
||||
err: __wt_free(session, cookie.chunk_array);
|
||||
|
||||
err: __wt_free(session, cookie.chunk_array);
|
||||
return (NULL);
|
||||
}
|
||||
|
||||
@@ -193,13 +265,10 @@ __lsm_bloom_create(WT_SESSION_IMPL *session,
|
||||
WT_CURSOR *src;
|
||||
WT_DECL_RET;
|
||||
WT_ITEM buf, key;
|
||||
WT_SESSION *wt_session;
|
||||
const char *cur_cfg[] = API_CONF_DEFAULTS(session, open_cursor, "raw");
|
||||
uint64_t insert_count;
|
||||
|
||||
if (!FLD_ISSET(lsm_tree->bloom, WT_LSM_BLOOM_NEWEST) ||
|
||||
chunk->count == 0)
|
||||
return (0);
|
||||
|
||||
/*
|
||||
* Normally, the Bloom URI is populated when the chunk struct is
|
||||
* allocated. After an open, however, it may not have been.
|
||||
@@ -212,6 +281,13 @@ __lsm_bloom_create(WT_SESSION_IMPL *session,
|
||||
chunk->bloom_uri = __wt_buf_steal(session, &buf, NULL);
|
||||
}
|
||||
|
||||
/*
|
||||
* Drop the bloom filter first - there may be some content hanging over
|
||||
* from an aborted merge or checkpoint.
|
||||
*/
|
||||
wt_session = &session->iface;
|
||||
WT_RET(wt_session->drop(wt_session, chunk->bloom_uri, "force"));
|
||||
|
||||
bloom = NULL;
|
||||
|
||||
WT_RET(__wt_bloom_create(session, chunk->bloom_uri,
|
||||
|
||||
Reference in New Issue
Block a user