Files
mongo/test/format/ops.c
Keith Bostic a2fc5559a2 Simplify the operations loop -- basically, we want to do N loops of
operations so we test opening and closing the objects, but if there
are no operations to do, we just need to verify the bulk-load and
we're done.

Don't drop core & test recovery until the last of the N loops, we
were doing it on the first.
2015-03-04 08:58:50 -05:00

1181 lines
31 KiB
C

/*-
* Public Domain 2014-2015 MongoDB, Inc.
* Public Domain 2008-2014 WiredTiger, Inc.
*
* This is free and unencumbered software released into the public domain.
*
* Anyone is free to copy, modify, publish, use, compile, sell, or
* distribute this software, either in source code form or as a compiled
* binary, for any purpose, commercial or non-commercial, and by any
* means.
*
* In jurisdictions that recognize copyright laws, the author or authors
* of this software dedicate any and all copyright interest in the
* software to the public domain. We make this dedication for the benefit
* of the public at large and to the detriment of our heirs and
* successors. We intend this dedication to be an overt act of
* relinquishment in perpetuity of all present and future rights to this
* software under copyright law.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
* IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
* OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
* ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/
#include "format.h"
static int col_insert(WT_CURSOR *, WT_ITEM *, WT_ITEM *, uint64_t *);
static int col_remove(WT_CURSOR *, WT_ITEM *, uint64_t, int *);
static int col_update(WT_CURSOR *, WT_ITEM *, WT_ITEM *, uint64_t);
static int nextprev(WT_CURSOR *, int, int *);
static int notfound_chk(const char *, int, int, uint64_t);
static void *ops(void *);
static void print_item(const char *, WT_ITEM *);
static int read_row(WT_CURSOR *, WT_ITEM *, uint64_t);
static int row_insert(WT_CURSOR *, WT_ITEM *, WT_ITEM *, uint64_t);
static int row_remove(WT_CURSOR *, WT_ITEM *, uint64_t, int *);
static int row_update(WT_CURSOR *, WT_ITEM *, WT_ITEM *, uint64_t);
static void table_append_init(void);
/*
* wts_ops --
* Perform a number of operations in a set of threads.
*/
void
wts_ops(int lastrun)
{
TINFO *tinfo, total;
WT_CONNECTION *conn;
WT_SESSION *session;
pthread_t backup_tid, compact_tid;
int64_t fourths, thread_ops;
uint32_t i;
int ret, running;
conn = g.wts_conn;
session = NULL; /* -Wconditional-uninitialized */
memset(&backup_tid, 0, sizeof(backup_tid));
memset(&compact_tid, 0, sizeof(compact_tid));
/*
* We support replay of threaded runs, but don't log random numbers
* after threaded operations start, there's no point.
*/
if (!SINGLETHREADED)
g.rand_log_stop = 1;
/*
* There are two mechanisms to specify the length of the run, a number
* of operations and a timer, when either expire the run terminates.
* Each thread does an equal share of the total operations (and make
* sure that it's not 0).
*
* Calculate how many fourth-of-a-second sleeps until any timer expires.
*/
if (g.c_ops == 0)
thread_ops = -1;
else {
if (g.c_ops < g.c_threads)
g.c_ops = g.c_threads;
thread_ops = g.c_ops / g.c_threads;
}
if (g.c_timer == 0)
fourths = -1;
else
fourths = (g.c_timer * 4 * 60) / FORMAT_OPERATION_REPS;
/* Initialize the table extension code. */
table_append_init();
/* Open a session. */
if (g.logging != 0) {
if ((ret = conn->open_session(conn, NULL, NULL, &session)) != 0)
die(ret, "connection.open_session");
(void)g.wt_api->msg_printf(g.wt_api, session,
"=============== thread ops start ===============");
}
/* Create thread structure; start the worker threads. */
if ((tinfo = calloc((size_t)g.c_threads, sizeof(*tinfo))) == NULL)
die(errno, "calloc");
for (i = 0; i < g.c_threads; ++i) {
tinfo[i].id = (int)i + 1;
tinfo[i].state = TINFO_RUNNING;
if ((ret =
pthread_create(&tinfo[i].tid, NULL, ops, &tinfo[i])) != 0)
die(ret, "pthread_create");
}
/* If a multi-threaded run, start backup and compaction threads. */
if (g.c_backups &&
(ret = pthread_create(&backup_tid, NULL, backup, NULL)) != 0)
die(ret, "pthread_create: backup");
if (g.c_compact &&
(ret = pthread_create(&compact_tid, NULL, compact, NULL)) != 0)
die(ret, "pthread_create: compaction");
/* Spin on the threads, calculating the totals. */
for (;;) {
/* Clear out the totals each pass. */
memset(&total, 0, sizeof(total));
for (i = 0, running = 0; i < g.c_threads; ++i) {
total.commit += tinfo[i].commit;
total.deadlock += tinfo[i].deadlock;
total.insert += tinfo[i].insert;
total.remove += tinfo[i].remove;
total.rollback += tinfo[i].rollback;
total.search += tinfo[i].search;
total.update += tinfo[i].update;
switch (tinfo[i].state) {
case TINFO_RUNNING:
running = 1;
break;
case TINFO_COMPLETE:
tinfo[i].state = TINFO_JOINED;
(void)pthread_join(tinfo[i].tid, NULL);
break;
case TINFO_JOINED:
break;
}
/*
* If the timer has expired or this thread has completed
* its operations, notify the thread it should quit.
*/
if (fourths == 0 ||
(thread_ops != -1 &&
tinfo[i].ops >= (uint64_t)thread_ops)) {
/*
* On the last execution, optionally drop core
* for recovery testing.
*/
if (lastrun && g.c_abort) {
static char *core = NULL;
*core = 0;
}
tinfo[i].quit = 1;
}
}
track("ops", 0ULL, &total);
if (!running)
break;
(void)usleep(250000); /* 1/4th of a second */
if (fourths != -1)
--fourths;
}
free(tinfo);
/* Wait for the backup, compaction thread. */
g.workers_finished = 1;
if (g.c_backups)
(void)pthread_join(backup_tid, NULL);
if (g.c_compact)
(void)pthread_join(compact_tid, NULL);
if (g.logging != 0) {
(void)g.wt_api->msg_printf(g.wt_api, session,
"=============== thread ops stop ===============");
if ((ret = session->close(session, NULL)) != 0)
die(ret, "session.close");
}
}
/*
* ops_session_config --
* Return the current session configuration.
*/
static const char *
ops_session_config(void)
{
u_int v;
/*
* The only current session configuration is the isolation level.
*/
if ((v = g.c_isolation_flag) == ISOLATION_RANDOM)
v = MMRAND(2, 4);
switch (v) {
case ISOLATION_READ_UNCOMMITTED:
return ("isolation=read-uncommitted");
case ISOLATION_READ_COMMITTED:
return ("isolation=read-committed");
case ISOLATION_SNAPSHOT:
default:
return ("isolation=snapshot");
}
}
static void *
ops(void *arg)
{
TINFO *tinfo;
WT_CONNECTION *conn;
WT_CURSOR *cursor, *cursor_insert;
WT_SESSION *session;
WT_ITEM key, value;
uint64_t keyno, ckpt_op, session_op;
uint32_t op;
uint8_t *keybuf, *valbuf;
u_int np;
int ckpt_available, dir, insert, intxn, notfound, readonly, ret;
char *ckpt_config, ckpt_name[64];
tinfo = arg;
conn = g.wts_conn;
keybuf = valbuf = NULL;
readonly = 0; /* -Wconditional-uninitialized */
/* Set up the default key and value buffers. */
key_gen_setup(&keybuf);
val_gen_setup(&valbuf);
/* Set the first operation where we'll create sessions and cursors. */
session_op = 0;
session = NULL;
cursor = cursor_insert = NULL;
/* Set the first operation where we'll perform checkpoint operations. */
ckpt_op = g.c_checkpoints ? MMRAND(100, 10000) : 0;
ckpt_available = 0;
for (intxn = 0; !tinfo->quit; ++tinfo->ops) {
/*
* We can't checkpoint or swap sessions/cursors while in a
* transaction, resolve any running transaction.
*/
if (intxn &&
(tinfo->ops == ckpt_op || tinfo->ops == session_op)) {
if ((ret = session->commit_transaction(
session, NULL)) != 0)
die(ret, "session.commit_transaction");
++tinfo->commit;
intxn = 0;
}
/* Open up a new session and cursors. */
if (tinfo->ops == session_op ||
session == NULL || cursor == NULL) {
if (session != NULL &&
(ret = session->close(session, NULL)) != 0)
die(ret, "session.close");
if ((ret = conn->open_session(
conn, NULL, ops_session_config(), &session)) != 0)
die(ret, "connection.open_session");
/*
* 10% of the time, perform some read-only operations
* from a checkpoint.
*
* Skip that if we single-threaded and doing checks
* against a Berkeley DB database, because that won't
* work because the Berkeley DB database records won't
* match the checkpoint. Also skip if we are using
* LSM, because it doesn't support reads from
* checkpoints.
*/
if (!SINGLETHREADED && !DATASOURCE("lsm") &&
ckpt_available && MMRAND(1, 10) == 1) {
if ((ret = session->open_cursor(session,
g.uri, NULL, ckpt_name, &cursor)) != 0)
die(ret, "session.open_cursor");
/* Pick the next session/cursor close/open. */
session_op += 250;
/* Checkpoints are read-only. */
readonly = 1;
} else {
/*
* Open two cursors: one for overwriting and one
* for append (if it's a column-store).
*
* The reason is when testing with existing
* records, we don't track if a record was
* deleted or not, which means we must use
* cursor->insert with overwriting configured.
* But, in column-store files where we're
* testing with new, appended records, we don't
* want to have to specify the record number,
* which requires an append configuration.
*/
if ((ret = session->open_cursor(session, g.uri,
NULL, "overwrite", &cursor)) != 0)
die(ret, "session.open_cursor");
if ((g.type == FIX || g.type == VAR) &&
(ret = session->open_cursor(session, g.uri,
NULL, "append", &cursor_insert)) != 0)
die(ret, "session.open_cursor");
/* Pick the next session/cursor close/open. */
session_op += 100 * MMRAND(1, 50);
/* Updates supported. */
readonly = 0;
}
}
/* Checkpoint the database. */
if (tinfo->ops == ckpt_op && g.c_checkpoints) {
/*
* LSM and data-sources don't support named checkpoints,
* and we can't drop a named checkpoint while there's a
* cursor open on it, otherwise 20% of the time name the
* checkpoint.
*/
if (DATASOURCE("helium") || DATASOURCE("kvsbdb") ||
DATASOURCE("lsm") || readonly || MMRAND(1, 5) == 1)
ckpt_config = NULL;
else {
(void)snprintf(ckpt_name, sizeof(ckpt_name),
"name=thread-%d", tinfo->id);
ckpt_config = ckpt_name;
}
/* Named checkpoints lock out backups */
if (ckpt_config != NULL &&
(ret = pthread_rwlock_wrlock(&g.backup_lock)) != 0)
die(ret,
"pthread_rwlock_wrlock: backup lock");
if ((ret =
session->checkpoint(session, ckpt_config)) != 0)
die(ret, "session.checkpoint%s%s",
ckpt_config == NULL ? "" : ": ",
ckpt_config == NULL ? "" : ckpt_config);
if (ckpt_config != NULL &&
(ret = pthread_rwlock_unlock(&g.backup_lock)) != 0)
die(ret,
"pthread_rwlock_wrlock: backup lock");
/* Rephrase the checkpoint name for cursor open. */
if (ckpt_config == NULL)
strcpy(ckpt_name,
"checkpoint=WiredTigerCheckpoint");
else
(void)snprintf(ckpt_name, sizeof(ckpt_name),
"checkpoint=thread-%d", tinfo->id);
ckpt_available = 1;
/* Pick the next checkpoint operation. */
ckpt_op += 1000 * MMRAND(5, 20);
}
/*
* If we're not single-threaded and we're not in a transaction,
* start a transaction 20% of the time.
*/
if (!SINGLETHREADED && !intxn && MMRAND(1, 10) >= 8) {
if ((ret =
session->begin_transaction(session, NULL)) != 0)
die(ret, "session.begin_transaction");
intxn = 1;
}
insert = notfound = 0;
keyno = MMRAND(1, g.rows);
key.data = keybuf;
value.data = valbuf;
/*
* Perform some number of operations: the percentage of deletes,
* inserts and writes are specified, reads are the rest. The
* percentages don't have to add up to 100, a high percentage
* of deletes will mean fewer inserts and writes. Modifications
* are always followed by a read to confirm it worked.
*/
op = readonly ? UINT32_MAX : (uint32_t)(rng() % 100);
if (op < g.c_delete_pct) {
++tinfo->remove;
switch (g.type) {
case ROW:
/*
* If deleting a non-existent record, the cursor
* won't be positioned, and so can't do a next.
*/
if (row_remove(cursor, &key, keyno, &notfound))
goto deadlock;
break;
case FIX:
case VAR:
if (col_remove(cursor, &key, keyno, &notfound))
goto deadlock;
break;
}
} else if (op < g.c_delete_pct + g.c_insert_pct) {
++tinfo->insert;
switch (g.type) {
case ROW:
if (row_insert(cursor, &key, &value, keyno))
goto deadlock;
insert = 1;
break;
case FIX:
case VAR:
/*
* We can only append so many new records, if
* we've reached that limit, update a record
* instead of doing an insert.
*/
if (g.append_cnt >= g.append_max)
goto skip_insert;
/* Insert, then reset the insert cursor. */
if (col_insert(
cursor_insert, &key, &value, &keyno))
goto deadlock;
if ((ret =
cursor_insert->reset(cursor_insert)) != 0)
die(ret, "cursor.reset");
insert = 1;
break;
}
} else if (
op < g.c_delete_pct + g.c_insert_pct + g.c_write_pct) {
++tinfo->update;
switch (g.type) {
case ROW:
if (row_update(cursor, &key, &value, keyno))
goto deadlock;
break;
case FIX:
case VAR:
skip_insert: if (col_update(cursor, &key, &value, keyno))
goto deadlock;
break;
}
} else {
++tinfo->search;
if (read_row(cursor, &key, keyno))
if (intxn)
goto deadlock;
continue;
}
/*
* The cursor is positioned if we did any operation other than
* insert, do a small number of next/prev cursor operations in
* a random direction.
*/
if (!insert) {
dir = (int)MMRAND(0, 1);
for (np = 0; np < MMRAND(1, 8); ++np) {
if (notfound)
break;
if (nextprev(cursor, dir, &notfound))
goto deadlock;
}
}
/* Read to confirm the operation. */
++tinfo->search;
if (read_row(cursor, &key, keyno))
goto deadlock;
/* Reset the cursor: there is no reason to keep pages pinned. */
if (cursor != NULL && (ret = cursor->reset(cursor)) != 0)
die(ret, "cursor.reset");
/*
* If we're in the transaction, commit 40% of the time and
* rollback 10% of the time.
*/
if (intxn)
switch (MMRAND(1, 10)) {
case 1: case 2: case 3: case 4: /* 40% */
if ((ret = session->commit_transaction(
session, NULL)) != 0)
die(ret, "session.commit_transaction");
++tinfo->commit;
intxn = 0;
break;
case 5: /* 10% */
if (0) {
deadlock: ++tinfo->deadlock;
}
if ((ret = session->rollback_transaction(
session, NULL)) != 0)
die(ret,
"session.rollback_transaction");
++tinfo->rollback;
intxn = 0;
break;
default:
break;
}
}
if (session != NULL && (ret = session->close(session, NULL)) != 0)
die(ret, "session.close");
free(keybuf);
free(valbuf);
tinfo->state = TINFO_COMPLETE;
return (NULL);
}
/*
* wts_read_scan --
* Read and verify all elements in a file.
*/
void
wts_read_scan(void)
{
WT_CONNECTION *conn;
WT_CURSOR *cursor;
WT_ITEM key;
WT_SESSION *session;
uint64_t cnt, last_cnt;
uint8_t *keybuf;
int ret;
conn = g.wts_conn;
/* Set up the default key buffer. */
key_gen_setup(&keybuf);
/* Open a session and cursor pair. */
if ((ret = conn->open_session(
conn, NULL, ops_session_config(), &session)) != 0)
die(ret, "connection.open_session");
if ((ret = session->open_cursor(
session, g.uri, NULL, NULL, &cursor)) != 0)
die(ret, "session.open_cursor");
/* Check a random subset of the records using the key. */
for (last_cnt = cnt = 0; cnt < g.key_cnt;) {
cnt += rng() % 17 + 1;
if (cnt > g.rows)
cnt = g.rows;
if (cnt - last_cnt > 1000) {
track("read row scan", cnt, NULL);
last_cnt = cnt;
}
key.data = keybuf;
if ((ret = read_row(cursor, &key, cnt)) != 0)
die(ret, "read_scan");
}
if ((ret = session->close(session, NULL)) != 0)
die(ret, "session.close");
free(keybuf);
}
/*
* read_row --
* Read and verify a single element in a row- or column-store file.
*/
static int
read_row(WT_CURSOR *cursor, WT_ITEM *key, uint64_t keyno)
{
WT_ITEM bdb_value, value;
WT_SESSION *session;
int notfound, ret;
uint8_t bitfield;
session = cursor->session;
/* Log the operation */
if (g.logging == LOG_OPS)
(void)g.wt_api->msg_printf(g.wt_api,
session, "%-10s%" PRIu64, "read", keyno);
/* Retrieve the key/value pair by key. */
switch (g.type) {
case FIX:
case VAR:
cursor->set_key(cursor, keyno);
break;
case ROW:
key_gen((uint8_t *)key->data, &key->size, keyno, 0);
cursor->set_key(cursor, key);
break;
}
if ((ret = cursor->search(cursor)) == 0) {
if (g.type == FIX) {
ret = cursor->get_value(cursor, &bitfield);
value.data = &bitfield;
value.size = 1;
} else {
ret = cursor->get_value(cursor, &value);
}
}
if (ret == WT_ROLLBACK)
return (WT_ROLLBACK);
if (ret != 0 && ret != WT_NOTFOUND)
die(ret, "read_row: read row %" PRIu64, keyno);
/*
* In fixed length stores, zero values at the end of the key space are
* returned as not found. Treat this the same as a zero value in the
* key space, to match BDB's behavior.
*/
if (ret == WT_NOTFOUND && g.type == FIX) {
bitfield = 0;
value.data = &bitfield;
value.size = 1;
ret = 0;
}
if (!SINGLETHREADED)
return (0);
/* Retrieve the BDB value. */
bdb_read(keyno, &bdb_value.data, &bdb_value.size, &notfound);
/* Check for not-found status. */
if (notfound_chk("read_row", ret, notfound, keyno))
return (0);
/* Compare the two. */
if (value.size != bdb_value.size ||
memcmp(value.data, bdb_value.data, value.size) != 0) {
fprintf(stderr,
"read_row: read row value mismatch %" PRIu64 ":\n", keyno);
print_item("bdb", &bdb_value);
print_item(" wt", &value);
die(0, NULL);
}
return (0);
}
/*
* nextprev --
* Read and verify the next/prev element in a row- or column-store file.
*/
static int
nextprev(WT_CURSOR *cursor, int next, int *notfoundp)
{
WT_ITEM key, value, bdb_key, bdb_value;
WT_SESSION *session;
uint64_t keyno;
int notfound, ret;
uint8_t bitfield;
const char *which;
char *p;
session = cursor->session;
which = next ? "next" : "prev";
keyno = 0;
ret = next ? cursor->next(cursor) : cursor->prev(cursor);
if (ret == WT_ROLLBACK)
return (WT_ROLLBACK);
if (ret == 0)
switch (g.type) {
case FIX:
if ((ret = cursor->get_key(cursor, &keyno)) == 0 &&
(ret = cursor->get_value(cursor, &bitfield)) == 0) {
value.data = &bitfield;
value.size = 1;
}
break;
case ROW:
if ((ret = cursor->get_key(cursor, &key)) == 0)
ret = cursor->get_value(cursor, &value);
break;
case VAR:
if ((ret = cursor->get_key(cursor, &keyno)) == 0)
ret = cursor->get_value(cursor, &value);
break;
}
if (ret != 0 && ret != WT_NOTFOUND)
die(ret, "%s", which);
*notfoundp = (ret == WT_NOTFOUND);
if (!SINGLETHREADED)
return (0);
/* Retrieve the BDB value. */
bdb_np(next, &bdb_key.data, &bdb_key.size,
&bdb_value.data, &bdb_value.size, &notfound);
if (notfound_chk(
next ? "nextprev(next)" : "nextprev(prev)", ret, notfound, keyno))
return (0);
/* Compare the two. */
if (g.type == ROW) {
if (key.size != bdb_key.size ||
memcmp(key.data, bdb_key.data, key.size) != 0) {
fprintf(stderr, "nextprev: %s key mismatch:\n", which);
print_item("bdb-key", &bdb_key);
print_item(" wt-key", &key);
die(0, NULL);
}
} else {
if (keyno != (uint64_t)atoll(bdb_key.data)) {
if ((p = strchr((char *)bdb_key.data, '.')) != NULL)
*p = '\0';
fprintf(stderr,
"nextprev: %s key mismatch: %.*s != %" PRIu64 "\n",
which,
(int)bdb_key.size, (char *)bdb_key.data, keyno);
die(0, NULL);
}
}
if (value.size != bdb_value.size ||
memcmp(value.data, bdb_value.data, value.size) != 0) {
fprintf(stderr, "nextprev: %s value mismatch:\n", which);
print_item("bdb-value", &bdb_value);
print_item(" wt-value", &value);
die(0, NULL);
}
if (g.logging == LOG_OPS)
switch (g.type) {
case FIX:
(void)g.wt_api->msg_printf(g.wt_api,
session, "%-10s%" PRIu64 " {0x%02x}", which,
keyno, ((char *)value.data)[0]);
break;
case ROW:
(void)g.wt_api->msg_printf(
g.wt_api, session, "%-10s{%.*s/%.*s}", which,
(int)key.size, (char *)key.data,
(int)value.size, (char *)value.data);
break;
case VAR:
(void)g.wt_api->msg_printf(g.wt_api, session,
"%-10s%" PRIu64 " {%.*s}", which,
keyno, (int)value.size, (char *)value.data);
break;
}
return (0);
}
/*
* row_update --
* Update a row in a row-store file.
*/
static int
row_update(
WT_CURSOR *cursor, WT_ITEM *key, WT_ITEM *value, uint64_t keyno)
{
WT_SESSION *session;
int notfound, ret;
session = cursor->session;
key_gen((uint8_t *)key->data, &key->size, keyno, 0);
value_gen((uint8_t *)value->data, &value->size, keyno);
/* Log the operation */
if (g.logging == LOG_OPS)
(void)g.wt_api->msg_printf(g.wt_api, session,
"%-10s{%.*s}\n%-10s{%.*s}",
"putK", (int)key->size, (char *)key->data,
"putV", (int)value->size, (char *)value->data);
cursor->set_key(cursor, key);
cursor->set_value(cursor, value);
ret = cursor->update(cursor);
if (ret == WT_ROLLBACK)
return (WT_ROLLBACK);
if (ret != 0 && ret != WT_NOTFOUND)
die(ret, "row_update: update row %" PRIu64 " by key", keyno);
if (!SINGLETHREADED)
return (0);
bdb_update(key->data, key->size, value->data, value->size, &notfound);
(void)notfound_chk("row_update", ret, notfound, keyno);
return (0);
}
/*
* col_update --
* Update a row in a column-store file.
*/
static int
col_update(WT_CURSOR *cursor, WT_ITEM *key, WT_ITEM *value, uint64_t keyno)
{
WT_SESSION *session;
int notfound, ret;
session = cursor->session;
value_gen((uint8_t *)value->data, &value->size, keyno);
/* Log the operation */
if (g.logging == LOG_OPS) {
if (g.type == FIX)
(void)g.wt_api->msg_printf(g.wt_api, session,
"%-10s%" PRIu64 " {0x%02" PRIx8 "}",
"update", keyno,
((uint8_t *)value->data)[0]);
else
(void)g.wt_api->msg_printf(g.wt_api, session,
"%-10s%" PRIu64 " {%.*s}",
"update", keyno,
(int)value->size, (char *)value->data);
}
cursor->set_key(cursor, keyno);
if (g.type == FIX)
cursor->set_value(cursor, *(uint8_t *)value->data);
else
cursor->set_value(cursor, value);
ret = cursor->update(cursor);
if (ret == WT_ROLLBACK)
return (WT_ROLLBACK);
if (ret != 0 && ret != WT_NOTFOUND)
die(ret, "col_update: %" PRIu64, keyno);
if (!SINGLETHREADED)
return (0);
key_gen((uint8_t *)key->data, &key->size, keyno, 0);
bdb_update(key->data, key->size, value->data, value->size, &notfound);
(void)notfound_chk("col_update", ret, notfound, keyno);
return (0);
}
/*
* table_append_init --
* Re-initialize the appended records list.
*/
static void
table_append_init(void)
{
/* Append up to 10 records per thread before waiting on resolution. */
g.append_max = (size_t)g.c_threads * 10;
g.append_cnt = 0;
if (g.append != NULL) {
free(g.append);
g.append = NULL;
}
if ((g.append = calloc(g.append_max, sizeof(uint64_t))) == NULL)
die(errno, "calloc");
}
/*
* table_append --
* Resolve the appended records.
*/
static void
table_append(uint64_t keyno)
{
uint64_t *p, *ep;
int done, ret;
ep = g.append + g.append_max;
/*
* We don't want to ignore records we append, which requires we update
* the "last row" as we insert new records. Threads allocating record
* numbers can race with other threads, so the thread allocating record
* N may return after the thread allocating N + 1. We can't update a
* record before it's been inserted, and so we can't leave gaps when the
* count of records in the table is incremented.
*
* The solution is the append table, which contains an unsorted list of
* appended records. Every time we finish appending a record, process
* the table, trying to update the total records in the object.
*
* First, enter the new key into the append list.
*
* It's technically possible to race: we allocated space for 10 records
* per thread, but the check for the maximum number of records being
* appended doesn't lock. If a thread allocated a new record and went
* to sleep (so the append table fills up), then N threads of control
* used the same g.append_cnt value to decide there was an available
* slot in the append table and both allocated new records, we could run
* out of space in the table. It's unfortunately not even unlikely in
* the case of a large number of threads all inserting as fast as they
* can and a single thread going to sleep for an unexpectedly long time.
* If it happens, sleep and retry until earlier records are resolved
* and we find a slot.
*/
for (done = 0;;) {
if ((ret = pthread_rwlock_wrlock(&g.append_lock)) != 0)
die(ret, "pthread_rwlock_wrlock: append_lock");
/*
* If this is the thread we've been waiting for, and its record
* won't fit, we'd loop infinitely. If there are many append
* operations and a thread goes to sleep for a little too long,
* it can happen.
*/
if (keyno == g.rows + 1) {
g.rows = keyno;
done = 1;
/*
* Clean out the table, incrementing the total count of
* records until we don't find the next key.
*/
for (;;) {
for (p = g.append; p < ep; ++p)
if (*p == g.rows + 1) {
g.rows = *p;
*p = 0;
--g.append_cnt;
break;
}
if (p == ep)
break;
}
} else
/* Enter the key into the table. */
for (p = g.append; p < ep; ++p)
if (*p == 0) {
*p = keyno;
++g.append_cnt;
done = 1;
break;
}
if ((ret = pthread_rwlock_unlock(&g.append_lock)) != 0)
die(ret, "pthread_rwlock_unlock: append_lock");
if (done)
break;
sleep(1);
}
}
/*
* row_insert --
* Insert a row in a row-store file.
*/
static int
row_insert(
WT_CURSOR *cursor, WT_ITEM *key, WT_ITEM *value, uint64_t keyno)
{
WT_SESSION *session;
int notfound, ret;
session = cursor->session;
key_gen((uint8_t *)key->data, &key->size, keyno, 1);
value_gen((uint8_t *)value->data, &value->size, keyno);
/* Log the operation */
if (g.logging == LOG_OPS)
(void)g.wt_api->msg_printf(g.wt_api, session,
"%-10s{%.*s}\n%-10s{%.*s}",
"insertK", (int)key->size, (char *)key->data,
"insertV", (int)value->size, (char *)value->data);
cursor->set_key(cursor, key);
cursor->set_value(cursor, value);
ret = cursor->insert(cursor);
if (ret == WT_ROLLBACK)
return (WT_ROLLBACK);
if (ret != 0 && ret != WT_NOTFOUND)
die(ret, "row_insert: insert row %" PRIu64 " by key", keyno);
if (!SINGLETHREADED)
return (0);
bdb_update(key->data, key->size, value->data, value->size, &notfound);
(void)notfound_chk("row_insert", ret, notfound, keyno);
return (0);
}
/*
* col_insert --
* Insert an element in a column-store file.
*/
static int
col_insert(WT_CURSOR *cursor, WT_ITEM *key, WT_ITEM *value, uint64_t *keynop)
{
WT_SESSION *session;
uint64_t keyno;
int notfound, ret;
session = cursor->session;
value_gen((uint8_t *)value->data, &value->size, g.rows + 1);
if (g.type == FIX)
cursor->set_value(cursor, *(uint8_t *)value->data);
else
cursor->set_value(cursor, value);
if ((ret = cursor->insert(cursor)) != 0) {
if (ret == WT_ROLLBACK)
return (WT_ROLLBACK);
die(ret, "cursor.insert");
}
if ((ret = cursor->get_key(cursor, &keyno)) != 0)
die(ret, "cursor.get_key");
*keynop = (uint32_t)keyno;
table_append(keyno); /* Extend the object. */
if (g.logging == LOG_OPS) {
if (g.type == FIX)
(void)g.wt_api->msg_printf(g.wt_api, session,
"%-10s%" PRIu64 " {0x%02" PRIx8 "}",
"insert", keyno,
((uint8_t *)value->data)[0]);
else
(void)g.wt_api->msg_printf(g.wt_api, session,
"%-10s%" PRIu64 " {%.*s}",
"insert", keyno,
(int)value->size, (char *)value->data);
}
if (!SINGLETHREADED)
return (0);
key_gen((uint8_t *)key->data, &key->size, keyno, 0);
bdb_update(key->data, key->size, value->data, value->size, &notfound);
return (0);
}
/*
* row_remove --
* Remove an row from a row-store file.
*/
static int
row_remove(WT_CURSOR *cursor, WT_ITEM *key, uint64_t keyno, int *notfoundp)
{
WT_SESSION *session;
int notfound, ret;
session = cursor->session;
key_gen((uint8_t *)key->data, &key->size, keyno, 0);
/* Log the operation */
if (g.logging == LOG_OPS)
(void)g.wt_api->msg_printf(
g.wt_api, session, "%-10s%" PRIu64, "remove", keyno);
cursor->set_key(cursor, key);
/* We use the cursor in overwrite mode, check for existence. */
if ((ret = cursor->search(cursor)) == 0)
ret = cursor->remove(cursor);
if (ret == WT_ROLLBACK)
return (WT_ROLLBACK);
if (ret != 0 && ret != WT_NOTFOUND)
die(ret, "row_remove: remove %" PRIu64 " by key", keyno);
*notfoundp = (ret == WT_NOTFOUND);
if (!SINGLETHREADED)
return (0);
bdb_remove(keyno, &notfound);
(void)notfound_chk("row_remove", ret, notfound, keyno);
return (0);
}
/*
* col_remove --
* Remove a row from a column-store file.
*/
static int
col_remove(WT_CURSOR *cursor, WT_ITEM *key, uint64_t keyno, int *notfoundp)
{
WT_SESSION *session;
int notfound, ret;
session = cursor->session;
/* Log the operation */
if (g.logging == LOG_OPS)
(void)g.wt_api->msg_printf(
g.wt_api, session, "%-10s%" PRIu64, "remove", keyno);
cursor->set_key(cursor, keyno);
/* We use the cursor in overwrite mode, check for existence. */
if ((ret = cursor->search(cursor)) == 0)
ret = cursor->remove(cursor);
if (ret == WT_ROLLBACK)
return (WT_ROLLBACK);
if (ret != 0 && ret != WT_NOTFOUND)
die(ret, "col_remove: remove %" PRIu64 " by key", keyno);
*notfoundp = (ret == WT_NOTFOUND);
if (!SINGLETHREADED)
return (0);
/*
* Deleting a fixed-length item is the same as setting the bits to 0;
* do the same thing for the BDB store.
*/
if (g.type == FIX) {
key_gen((uint8_t *)key->data, &key->size, keyno, 0);
bdb_update(key->data, key->size, "\0", 1, &notfound);
} else
bdb_remove(keyno, &notfound);
(void)notfound_chk("col_remove", ret, notfound, keyno);
return (0);
}
/*
* notfound_chk --
* Compare notfound returns for consistency.
*/
static int
notfound_chk(const char *f, int wt_ret, int bdb_notfound, uint64_t keyno)
{
/* Check for not found status. */
if (bdb_notfound && wt_ret == WT_NOTFOUND)
return (1);
if (bdb_notfound) {
fprintf(stderr, "%s: %s:", g.progname, f);
if (keyno != 0)
fprintf(stderr, " row %" PRIu64 ":", keyno);
fprintf(stderr,
" not found in Berkeley DB, found in WiredTiger\n");
die(0, NULL);
}
if (wt_ret == WT_NOTFOUND) {
fprintf(stderr, "%s: %s:", g.progname, f);
if (keyno != 0)
fprintf(stderr, " row %" PRIu64 ":", keyno);
fprintf(stderr,
" found in Berkeley DB, not found in WiredTiger\n");
die(0, NULL);
}
return (0);
}
/*
* print_item --
* Display a single data/size pair, with a tag.
*/
static void
print_item(const char *tag, WT_ITEM *item)
{
static const char hex[] = "0123456789abcdef";
const uint8_t *data;
size_t size;
int ch;
data = item->data;
size = item->size;
fprintf(stderr, "\t%s {", tag);
if (g.type == FIX)
fprintf(stderr, "0x%02x", data[0]);
else
for (; size > 0; --size, ++data) {
ch = data[0];
if (isprint(ch))
fprintf(stderr, "%c", ch);
else
fprintf(stderr, "%x%x",
hex[(data[0] & 0xf0) >> 4],
hex[data[0] & 0x0f]);
}
fprintf(stderr, "}\n");
}