Skip to content

Commit

Permalink
FT-684 : estimated # of rows in a table could become inaccurate after…
Browse files Browse the repository at this point in the history
… deletes

* Implemented 'logical row count' for new tables or recounted tables.
* This will not replace the existing physical counting mechanism internally.
* Adds a new, persisted counter value to the ft_header and return this new value in toku_ft_stat64 instead of the current value.
* Counting:
  - New value will increment and decrement at high level calls from db_put, db_delete, etc:
    + toku_ft_insert_unique
    + toku_ft_maybe_insert
    + toku_ft_maybe_delete
  - Lower level leaf entry calls will also return delta values up to a point where they can be applied to the counter and are a result of message application to a leaf node.
    + inject_msg_in_locked_node
  - These deltas are needed for cases where the original count adjustment is not correct for the action:
    + An insert that turns into an update due to a duplicate key. (upsert)
    + A delete where the key doesn’t exist. (blind delete)
    + Transaction rollback. (unapplied rollback messages)
    *NOTE* If any of these three situations occur, the count will still report as somewhat inaccurate until the messages have been applied to all leaf entries affected. This is strictly a message delivery issue.
* The new counter will need some starting state:
  - On a new index, the count will be set to 0 and counted correctly forward from there.
  - On an existing/upgraded index:
    + The count value will be set to some marker value (-1). When in this state, no logical row counting will be applied to this value and the row count estimate will return the current physical row count.
    + A new function will be exposed added that will perform a logical recount of all rows. Upon successful completion the logical row count in the ft_header will be set to this recounted value.
    + int recount_rows(DB*, int (*progress_callback)(uint64_t count, uint64_t deleted)void* progress_extra), void* progress_extra);
    + Algorithm will recount all real, logical rows in the tree and update the logical count value in ft_header.
    + progress_callback will be passed the current key count and the number of deleted ules seen since the last call.
    + Returning > 0 from progress_callback will cancel the operation and will not update the ft_header will not be updated if cancelled.
    + recount_rows will return 0 on success or > 0 if cancelled or error.
    + Initial implementation is very simple and uses the existing PerconaFT cursor navigation. This may be inefficient due to needing to take individual dives (and locks) down the tree for each key.
    + Resulting row count will be (more) inaccurate if tree has unapplied messages for any of the following conditions (which can be rectified by running an OPTIMIZE TABLE before running a row recount):
      = INSERT messages that will be converted to UPDATE on apply due to key already existing. (upserts)
      = DELETE messages that get discarded in apply due to missing keys. (blind deletes)
      = Rows that need rollback applied for INSERT or DELETE. (unapplied rollback messages)
* Since the new counter extends the ft_header and bumps the version, there are data format upgrade/downgrade issues that need to be considered.
* A fair bit of code format fixes on any adjacent lines, mostly function signature reformatting to style guidelines and line length restriction to 80.
  • Loading branch information
George O. Lorch III committed Nov 14, 2015
1 parent 5573396 commit 944a0d9
Show file tree
Hide file tree
Showing 33 changed files with 2,241 additions and 742 deletions.
5 changes: 3 additions & 2 deletions buildheader/make_tdb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -510,8 +510,9 @@ static void print_db_struct (void) {
"int (*update_broadcast)(DB *, DB_TXN*, const DBT *extra, uint32_t flags)",
"int (*get_fractal_tree_info64)(DB*,uint64_t*,uint64_t*,uint64_t*,uint64_t*)",
"int (*iterate_fractal_tree_block_map)(DB*,int(*)(uint64_t,int64_t,int64_t,int64_t,int64_t,void*),void*)",
"const char *(*get_dname)(DB *db)",
"int (*get_last_key)(DB *db, YDB_CALLBACK_FUNCTION func, void* extra)",
"const char *(*get_dname)(DB *db)",
"int (*get_last_key)(DB *db, YDB_CALLBACK_FUNCTION func, void* extra)",
"int (*recount_rows)(DB* db, int (*progress_callback)(uint64_t count, uint64_t deleted, void* progress_extra), void* progress_extra)",
NULL};
sort_and_dump_fields("db", true, extra);
}
Expand Down
1 change: 1 addition & 0 deletions ft/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ set(FT_SOURCES
ft-flusher
ft-hot-flusher
ft-ops
ft-recount-rows
ft-status
ft-test-helpers
ft-verify
Expand Down
6 changes: 4 additions & 2 deletions ft/ft-flusher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1572,6 +1572,7 @@ void toku_bnc_flush_to_child(FT ft, NONLEAF_CHILDINFO bnc, FTNODE child, TXNID p
txn_gc_info *gc_info;

STAT64INFO_S stats_delta;
int64_t logical_rows_delta = 0;
size_t remaining_memsize = bnc->msg_buffer.buffer_size_in_use();

flush_msg_fn(FT t, FTNODE n, NONLEAF_CHILDINFO nl, txn_gc_info *g) :
Expand Down Expand Up @@ -1599,8 +1600,8 @@ void toku_bnc_flush_to_child(FT ft, NONLEAF_CHILDINFO bnc, FTNODE child, TXNID p
is_fresh,
gc_info,
flow_deltas,
&stats_delta
);
&stats_delta,
&logical_rows_delta);
remaining_memsize -= memsize_in_buffer;
return 0;
}
Expand All @@ -1613,6 +1614,7 @@ void toku_bnc_flush_to_child(FT ft, NONLEAF_CHILDINFO bnc, FTNODE child, TXNID p
if (flush_fn.stats_delta.numbytes || flush_fn.stats_delta.numrows) {
toku_ft_update_stats(&ft->in_memory_stats, flush_fn.stats_delta);
}
toku_ft_adjust_logical_row_count(ft, flush_fn.logical_rows_delta);
if (do_garbage_collection) {
size_t buffsize = bnc->msg_buffer.buffer_size_in_use();
// may be misleading if there's a broadcast message in there
Expand Down
5 changes: 5 additions & 0 deletions ft/ft-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ struct ft_header {
MSN msn_at_start_of_last_completed_optimize;

STAT64INFO_S on_disk_stats;

// This represents the balance of inserts - deletes and should be
// closer to a logical representation of the number of records in an index
uint64_t on_disk_logical_rows;
};
typedef struct ft_header *FT_HEADER;

Expand Down Expand Up @@ -176,6 +180,7 @@ struct ft {

// protected by atomic builtins
STAT64INFO_S in_memory_stats;
uint64_t in_memory_logical_rows;

// transient, not serialized to disk. updated when we do write to
// disk. tells us whether we can do partial eviction (we can't if
Expand Down
11 changes: 8 additions & 3 deletions ft/ft-ops.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1371,7 +1371,8 @@ static void inject_message_in_locked_node(
ft_msg msg_with_msn(msg.kdbt(), msg.vdbt(), msg.type(), msg_msn, msg.xids());
paranoid_invariant(msg_with_msn.msn().msn > node->max_msn_applied_to_node_on_disk.msn);

STAT64INFO_S stats_delta = {0,0};
STAT64INFO_S stats_delta = { 0,0 };
int64_t logical_rows_delta = 0;
toku_ftnode_put_msg(
ft->cmp,
ft->update_fun,
Expand All @@ -1381,11 +1382,12 @@ static void inject_message_in_locked_node(
true,
gc_info,
flow_deltas,
&stats_delta
);
&stats_delta,
&logical_rows_delta);
if (stats_delta.numbytes || stats_delta.numrows) {
toku_ft_update_stats(&ft->in_memory_stats, stats_delta);
}
toku_ft_adjust_logical_row_count(ft, logical_rows_delta);
//
// assumption is that toku_ftnode_put_msg will
// mark the node as dirty.
Expand Down Expand Up @@ -2169,6 +2171,7 @@ int toku_ft_insert_unique(FT_HANDLE ft_h, DBT *key, DBT *val, TOKUTXN txn, bool

if (r == 0) {
ft_txn_log_insert(ft_h->ft, key, val, txn, do_logging, FT_INSERT);
toku_ft_adjust_logical_row_count(ft_h->ft, 1);
}
return r;
}
Expand Down Expand Up @@ -2344,6 +2347,7 @@ void toku_ft_maybe_insert (FT_HANDLE ft_h, DBT *key, DBT *val, TOKUTXN txn, bool
if (r != 0) {
toku_ft_send_insert(ft_h, key, val, message_xids, type, &gc_info);
}
toku_ft_adjust_logical_row_count(ft_h->ft, 1);
}
}

Expand Down Expand Up @@ -2513,6 +2517,7 @@ void toku_ft_maybe_delete(FT_HANDLE ft_h, DBT *key, TOKUTXN txn, bool oplsn_vali
oldest_referenced_xid_estimate,
txn != nullptr ? !txn->for_recovery : false);
toku_ft_send_delete(ft_h, key, message_xids, &gc_info);
toku_ft_adjust_logical_row_count(ft_h->ft, -1);
}
}

Expand Down
9 changes: 9 additions & 0 deletions ft/ft-ops.h
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,15 @@ extern int toku_ft_debug_mode;
int toku_verify_ft (FT_HANDLE ft_h) __attribute__ ((warn_unused_result));
int toku_verify_ft_with_progress (FT_HANDLE ft_h, int (*progress_callback)(void *extra, float progress), void *extra, int verbose, int keep_going) __attribute__ ((warn_unused_result));

int toku_ft_recount_rows(
FT_HANDLE ft,
int (*progress_callback)(
uint64_t count,
uint64_t deleted,
void* progress_extra),
void* progress_extra);


DICTIONARY_ID toku_ft_get_dictionary_id(FT_HANDLE);

enum ft_flags {
Expand Down
115 changes: 115 additions & 0 deletions ft/ft-recount-rows.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id$"
/*======
This file is part of PerconaFT.
Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
PerconaFT is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License, version 2,
as published by the Free Software Foundation.
PerconaFT is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
----------------------------------------
PerconaFT is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License, version 3,
as published by the Free Software Foundation.
PerconaFT is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
======= */

#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."

#include "ft/serialize/block_table.h"
#include "ft/ft.h"
#include "ft/ft-internal.h"
#include "ft/cursor.h"

struct recount_rows_extra_t {
int (*_progress_callback)(
uint64_t count,
uint64_t deleted,
void* progress_extra);
void* _progress_extra;
uint64_t _keys;
bool _cancelled;
};

static int recount_rows_found(
uint32_t UU(keylen),
const void* key,
uint32_t UU(vallen),
const void* UU(val),
void* extra,
bool UU(lock_only)) {

recount_rows_extra_t* rre = (recount_rows_extra_t*)extra;

if (FT_LIKELY(key != nullptr)) {
rre->_keys++;
}
return rre->_cancelled
= rre->_progress_callback(rre->_keys, 0, rre->_progress_extra);
}
static bool recount_rows_interrupt(void* extra, uint64_t deleted_rows) {
recount_rows_extra_t* rre = (recount_rows_extra_t*)extra;

return rre->_cancelled =
rre->_progress_callback(rre->_keys, deleted_rows, rre->_progress_extra);
}
int toku_ft_recount_rows(
FT_HANDLE ft,
int (*progress_callback)(
uint64_t count,
uint64_t deleted,
void* progress_extra),
void* progress_extra) {

int ret = 0;
recount_rows_extra_t rre = {
progress_callback,
progress_extra,
0,
false
};

ft_cursor c;
ret = toku_ft_cursor_create(ft, &c, nullptr, C_READ_ANY, false, false);
if (ret) return ret;

toku_ft_cursor_set_check_interrupt_cb(
&c,
recount_rows_interrupt,
&rre);

ret = toku_ft_cursor_first(&c, recount_rows_found, &rre);
while (FT_LIKELY(ret == 0)) {
ret = toku_ft_cursor_next(&c, recount_rows_found, &rre);
}

toku_ft_cursor_destroy(&c);

if (rre._cancelled == false) {
// update ft count
toku_unsafe_set(&ft->ft->in_memory_logical_rows, rre._keys);
ret = 0;
}

return ret;
}
29 changes: 17 additions & 12 deletions ft/ft-test-helpers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -172,21 +172,26 @@ int toku_testsetup_insert_to_leaf (FT_HANDLE ft_handle, BLOCKNUM blocknum, const
assert(node->height==0);

DBT kdbt, vdbt;
ft_msg msg(toku_fill_dbt(&kdbt, key, keylen), toku_fill_dbt(&vdbt, val, vallen),
FT_INSERT, next_dummymsn(), toku_xids_get_root_xids());
ft_msg msg(
toku_fill_dbt(&kdbt, key, keylen),
toku_fill_dbt(&vdbt, val, vallen),
FT_INSERT,
next_dummymsn(),
toku_xids_get_root_xids());

static size_t zero_flow_deltas[] = { 0, 0 };
txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, true);
toku_ftnode_put_msg(ft_handle->ft->cmp,
ft_handle->ft->update_fun,
node,
-1,
msg,
true,
&gc_info,
zero_flow_deltas,
NULL
);
toku_ftnode_put_msg(
ft_handle->ft->cmp,
ft_handle->ft->update_fun,
node,
-1,
msg,
true,
&gc_info,
zero_flow_deltas,
NULL,
NULL);

toku_verify_or_set_counts(node);

Expand Down
44 changes: 36 additions & 8 deletions ft/ft.cc
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ static void ft_checkpoint (CACHEFILE cf, int fd, void *header_v) {
ch->time_of_last_modification = now;
ch->checkpoint_count++;
ft_hack_highest_unused_msn_for_upgrade_for_checkpoint(ft);
ch->on_disk_logical_rows =
ft->h->on_disk_logical_rows = ft->in_memory_logical_rows;

// write translation and header to disk (or at least to OS internal buffer)
toku_serialize_ft_to(fd, ch, &ft->blocktable, ft->cf);
Expand Down Expand Up @@ -383,7 +385,8 @@ ft_header_create(FT_OPTIONS options, BLOCKNUM root_blocknum, TXNID root_xid_that
.count_of_optimize_in_progress = 0,
.count_of_optimize_in_progress_read_from_disk = 0,
.msn_at_start_of_last_completed_optimize = ZERO_MSN,
.on_disk_stats = ZEROSTATS
.on_disk_stats = ZEROSTATS,
.on_disk_logical_rows = 0
};
return (FT_HEADER) toku_xmemdup(&h, sizeof h);
}
Expand Down Expand Up @@ -802,7 +805,14 @@ toku_ft_stat64 (FT ft, struct ftstat64_s *s) {
s->fsize = toku_cachefile_size(ft->cf);
// just use the in memory stats from the header
// prevent appearance of negative numbers for numrows, numbytes
int64_t n = ft->in_memory_stats.numrows;
// if the logical count was never properly re-counted on an upgrade,
// return the existing physical count instead.
int64_t n;
if (ft->in_memory_logical_rows == (uint64_t)-1) {
n = ft->in_memory_stats.numrows;
} else {
n = ft->in_memory_logical_rows;
}
if (n < 0) {
n = 0;
}
Expand Down Expand Up @@ -871,20 +881,38 @@ DESCRIPTOR toku_ft_get_cmp_descriptor(FT_HANDLE ft_handle) {
return &ft_handle->ft->cmp_descriptor;
}

void
toku_ft_update_stats(STAT64INFO headerstats, STAT64INFO_S delta) {
void toku_ft_update_stats(STAT64INFO headerstats, STAT64INFO_S delta) {
(void) toku_sync_fetch_and_add(&(headerstats->numrows), delta.numrows);
(void) toku_sync_fetch_and_add(&(headerstats->numbytes), delta.numbytes);
}

void
toku_ft_decrease_stats(STAT64INFO headerstats, STAT64INFO_S delta) {
void toku_ft_decrease_stats(STAT64INFO headerstats, STAT64INFO_S delta) {
(void) toku_sync_fetch_and_sub(&(headerstats->numrows), delta.numrows);
(void) toku_sync_fetch_and_sub(&(headerstats->numbytes), delta.numbytes);
}

void
toku_ft_remove_reference(FT ft, bool oplsn_valid, LSN oplsn, remove_ft_ref_callback remove_ref, void *extra) {
void toku_ft_adjust_logical_row_count(FT ft, int64_t delta) {
// In order to make sure that the correct count is returned from
// toku_ft_stat64, the ft->(in_memory|on_disk)_logical_rows _MUST_NOT_ be
// modified from anywhere else from here with the exceptions of
// serializing in a header, initializing a new header and analyzing
// an index for a logical_row count.
// The gist is that on an index upgrade, all logical_rows values
// in the ft header are set to -1 until an analyze can reset it to an
// accurate value. Until then, the physical count from in_memory_stats
// must be returned in toku_ft_stat64.
if (delta != 0 && ft->in_memory_logical_rows != (uint64_t)-1) {
toku_sync_fetch_and_add(&(ft->in_memory_logical_rows), delta);
}
}

void toku_ft_remove_reference(
FT ft,
bool oplsn_valid,
LSN oplsn,
remove_ft_ref_callback remove_ref,
void *extra) {

toku_ft_grab_reflock(ft);
if (toku_ft_has_one_reference_unlocked(ft)) {
toku_ft_release_reflock(ft);
Expand Down
6 changes: 5 additions & 1 deletion ft/ft.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,17 @@ DESCRIPTOR toku_ft_get_cmp_descriptor(FT_HANDLE ft_handle);

typedef struct {
// delta versions in basements could be negative
// These represent the physical leaf entries and do not account
// for pending deletes or other in-flight messages that have not been
// applied to a leaf entry.
int64_t numrows;
int64_t numbytes;
} STAT64INFO_S, *STAT64INFO;
static const STAT64INFO_S ZEROSTATS = { .numrows = 0, .numbytes = 0};
static const STAT64INFO_S ZEROSTATS = { .numrows = 0, .numbytes = 0 };

void toku_ft_update_stats(STAT64INFO headerstats, STAT64INFO_S delta);
void toku_ft_decrease_stats(STAT64INFO headerstats, STAT64INFO_S delta);
void toku_ft_adjust_logical_row_count(FT ft, int64_t delta);

typedef void (*remove_ft_ref_callback)(FT ft, void *extra);
void toku_ft_remove_reference(FT ft,
Expand Down
Loading

0 comments on commit 944a0d9

Please sign in to comment.