From 2a06960117fa020b94783c54785fa6e3b8b6ee5c Mon Sep 17 00:00:00 2001 From: Markos Fountoulakis <44345837+mfundul@users.noreply.github.com> Date: Wed, 28 Aug 2019 17:33:51 +0300 Subject: [PATCH] Variable Granularity support for data collection (#6430) * Variable Granularity support for data collection in the dbengine. * Variable Granularity support for data collection in the daemon. * Added tests to validate the data being queried after having been collected by changing data collection interval * Fix memory corruption * Updated database engine documentation about data collection frequency behaviour --- backends/backends.c | 3 +- daemon/unit_test.c | 354 +++++++++--- database/README.md | 2 + database/engine/README.md | 2 + database/engine/pagecache.c | 284 ++++++++-- database/engine/pagecache.h | 33 +- database/engine/rrdengine.c | 3 + database/engine/rrdengineapi.c | 298 ++++++++-- database/engine/rrdengineapi.h | 11 +- database/engine/rrdenginelib.h | 2 + database/rrd.h | 7 +- database/rrddim.c | 3 +- web/api/queries/average/average.c | 9 +- web/api/queries/query.c | 876 +++++++++++++++++++++++++----- web/api/queries/rrdr.h | 11 +- 15 files changed, 1604 insertions(+), 294 deletions(-) diff --git a/backends/backends.c b/backends/backends.c index 24f84b63c867f5..120c6e7033b7f1 100644 --- a/backends/backends.c +++ b/backends/backends.c @@ -131,7 +131,8 @@ calculated_number backend_calculate_value_from_stored_data( } */ for(rd->state->query_ops.init(rd, &handle, after, before) ; !rd->state->query_ops.is_finished(&handle) ; ) { - n = rd->state->query_ops.next_metric(&handle); + time_t curr_t; + n = rd->state->query_ops.next_metric(&handle, &curr_t); if(unlikely(!does_storage_number_exist(n))) { // not collected diff --git a/daemon/unit_test.c b/daemon/unit_test.c index f9b58ce6b823b2..1c84022c00dbed 100644 --- a/daemon/unit_test.c +++ b/daemon/unit_test.c @@ -1581,34 +1581,12 @@ static inline void rrddim_set_by_pointer_fake_time(RRDDIM *rd, collected_number if(unlikely(v > rd->collected_value_max)) rd->collected_value_max = v; } -int test_dbengine(void) +static RRDHOST *dbengine_rrdhost_find_or_create(char *name) { - const int CHARTS = 128; - const int DIMS = 16; /* That gives us 2048 metrics */ - const int POINTS = 16384; /* This produces 128MiB of metric data */ - const int QUERY_BATCH = 4096; - uint8_t same; - int i, j, k, c, errors; - RRDHOST *host = NULL; - RRDSET *st[CHARTS]; - RRDDIM *rd[CHARTS][DIMS]; - char name[101]; - time_t time_now; - collected_number last; - struct rrddim_query_handle handle; - calculated_number value, expected; - storage_number n; - - error_log_limit_unlimited(); - fprintf(stderr, "\nRunning DB-engine test\n"); - - default_rrd_memory_mode = RRD_MEMORY_MODE_DBENGINE; - - debug(D_RRDHOST, "Initializing localhost with hostname 'unittest-dbengine'"); - host = rrdhost_find_or_create( - "unittest-dbengine" - , "unittest-dbengine" - , "unittest-dbengine" + return rrdhost_find_or_create( + name + , name + , name , os_type , netdata_configured_timezone , config_get(CONFIG_SECTION_BACKEND, "host tags", "") @@ -1624,15 +1602,33 @@ int test_dbengine(void) , default_rrdpush_send_charts_matching , NULL ); - if (NULL == host) - return 1; +} + +// costants for test_dbengine +static const int CHARTS = 64; +static const int DIMS = 16; // That gives us 64 * 16 = 1024 metrics +#define REGIONS (3) // 3 regions of update_every +// first region update_every is 2, second is 3, third is 1 +static const int REGION_UPDATE_EVERY[REGIONS] = {2, 3, 1}; +static const int REGION_POINTS[REGIONS] = { + 16384, // This produces 64MiB of metric data for the first region: update_every = 2 + 16384, // This produces 64MiB of metric data for the second region: update_every = 3 + 16384, // This produces 64MiB of metric data for the third region: update_every = 1 +}; +static const int QUERY_BATCH = 4096; + +static void test_dbengine_create_charts(RRDHOST *host, RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DIMS], + int update_every) +{ + int i, j; + char name[101]; for (i = 0 ; i < CHARTS ; ++i) { snprintfz(name, 100, "dbengine-chart-%d", i); // create the chart st[i] = rrdset_create(host, "netdata", name, name, "netdata", NULL, "Unit Testing", "a value", "unittest", - NULL, 1, 1, RRDSET_TYPE_LINE); + NULL, 1, update_every, RRDSET_TYPE_LINE); rrdset_flag_set(st[i], RRDSET_FLAG_DEBUG); rrdset_flag_set(st[i], RRDSET_FLAG_STORE_FIRST); for (j = 0 ; j < DIMS ; ++j) { @@ -1642,50 +1638,103 @@ int test_dbengine(void) } } + // Initialize DB with the very first entries + for (i = 0 ; i < CHARTS ; ++i) { + for (j = 0 ; j < DIMS ; ++j) { + rd[i][j]->last_collected_time.tv_sec = + st[i]->last_collected_time.tv_sec = st[i]->last_updated.tv_sec = 2 * API_RELATIVE_TIME_MAX - 1; + rd[i][j]->last_collected_time.tv_usec = + st[i]->last_collected_time.tv_usec = st[i]->last_updated.tv_usec = 0; + } + } + for (i = 0 ; i < CHARTS ; ++i) { + st[i]->usec_since_last_update = USEC_PER_SEC; + + for (j = 0; j < DIMS; ++j) { + rrddim_set_by_pointer_fake_time(rd[i][j], 69, 2 * API_RELATIVE_TIME_MAX); // set first value to 69 + } + rrdset_done(st[i]); + } + // Fluh pages for subsequent real values + for (i = 0 ; i < CHARTS ; ++i) { + for (j = 0; j < DIMS; ++j) { + rrdeng_store_metric_flush_current_page(rd[i][j]); + } + } +} + +// Feeds the database region with test data, returns last timestamp of region +static time_t test_dbengine_create_metrics(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DIMS], + int current_region, time_t time_start) +{ + time_t time_now; + int i, j, c, update_every; + collected_number next; + + update_every = REGION_UPDATE_EVERY[current_region]; + time_now = time_start + update_every; // feed it with the test data - time_now = 1; - last = 0; for (i = 0 ; i < CHARTS ; ++i) { for (j = 0 ; j < DIMS ; ++j) { rd[i][j]->last_collected_time.tv_sec = st[i]->last_collected_time.tv_sec = st[i]->last_updated.tv_sec = time_now; rd[i][j]->last_collected_time.tv_usec = - st[i]->last_collected_time.tv_usec = st[i]->last_updated.tv_usec = 0; + st[i]->last_collected_time.tv_usec = st[i]->last_updated.tv_usec = 0; } } - for(c = 0; c < POINTS ; ++c) { - ++time_now; // time_now = c + 2 + for (c = 0; c < REGION_POINTS[current_region] ; ++c) { + time_now += update_every; // time_now = start + (c + 2) * update_every for (i = 0 ; i < CHARTS ; ++i) { - st[i]->usec_since_last_update = USEC_PER_SEC; + st[i]->usec_since_last_update = USEC_PER_SEC * update_every; for (j = 0; j < DIMS; ++j) { - last = i * DIMS * POINTS + j * POINTS + c; - rrddim_set_by_pointer_fake_time(rd[i][j], last, time_now); + next = i * DIMS * REGION_POINTS[current_region] + j * REGION_POINTS[current_region] + c; + rrddim_set_by_pointer_fake_time(rd[i][j], next, time_now); } rrdset_done(st[i]); } } + return time_now; //time_end +} - // check the result +// Checks the metric data for the given region, returns number of errors +static int test_dbengine_check_metrics(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DIMS], + int current_region, time_t time_start) +{ + uint8_t same; + time_t time_now, time_retrieved; + int i, j, k, c, errors, update_every; + collected_number last; + calculated_number value, expected; + storage_number n; + struct rrddim_query_handle handle; + + update_every = REGION_UPDATE_EVERY[current_region]; errors = 0; - for(c = 0; c < POINTS ; c += QUERY_BATCH) { - time_now = c + 2; + // check the result + for (c = 0; c < REGION_POINTS[current_region] ; c += QUERY_BATCH) { + time_now = time_start + (c + 2) * update_every; for (i = 0 ; i < CHARTS ; ++i) { for (j = 0; j < DIMS; ++j) { - rd[i][j]->state->query_ops.init(rd[i][j], &handle, time_now, time_now + QUERY_BATCH); + rd[i][j]->state->query_ops.init(rd[i][j], &handle, time_now, time_now + QUERY_BATCH * update_every); for (k = 0; k < QUERY_BATCH; ++k) { - last = i * DIMS * POINTS + j * POINTS + c + k; + last = i * DIMS * REGION_POINTS[current_region] + j * REGION_POINTS[current_region] + c + k; expected = unpack_storage_number(pack_storage_number((calculated_number)last, SN_EXISTS)); - n = rd[i][j]->state->query_ops.next_metric(&handle); + n = rd[i][j]->state->query_ops.next_metric(&handle, &time_retrieved); value = unpack_storage_number(n); same = (calculated_number_round(value * 10000000.0) == calculated_number_round(expected * 10000000.0)) ? 1 : 0; if(!same) { fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, expecting value " CALCULATED_NUMBER_FORMAT ", found " CALCULATED_NUMBER_FORMAT ", ### E R R O R ###\n", - st[i]->name, rd[i][j]->name, (unsigned long)time_now + k, expected, value); + st[i]->name, rd[i][j]->name, (unsigned long)time_now + k * update_every, expected, value); + errors++; + } + if(time_retrieved != time_now + k * update_every) { + fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, found timestamp %lu ### E R R O R ###\n", + st[i]->name, rd[i][j]->name, (unsigned long)time_now + k * update_every, (unsigned long)time_retrieved); errors++; } } @@ -1693,7 +1742,184 @@ int test_dbengine(void) } } } + return errors; +} + +// Check rrdr transformations +static int test_dbengine_check_rrdr(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DIMS], + int current_region, time_t time_start, time_t time_end) +{ + uint8_t same; + time_t time_now, time_retrieved; + int i, j, errors, update_every; + long c; + collected_number last; + calculated_number value, expected; + + errors = 0; + update_every = REGION_UPDATE_EVERY[current_region]; + long points = (time_end - time_start) / update_every - 1; + for (i = 0 ; i < CHARTS ; ++i) { + RRDR *r = rrd2rrdr(st[i], points, time_start + update_every, time_end, RRDR_GROUPING_AVERAGE, 0, 0, NULL); + if (!r) { + fprintf(stderr, " DB-engine unittest %s: empty RRDR ### E R R O R ###\n", st[i]->name); + return ++errors; + } else { + assert(r->st == st[i]); + for (c = 0; c != rrdr_rows(r) ; ++c) { + RRDDIM *d; + time_now = time_start + (c + 2) * update_every; + time_retrieved = r->t[c]; + + // for each dimension + for (j = 0, d = r->st->dimensions ; d && j < r->d ; ++j, d = d->next) { + calculated_number *cn = &r->v[ c * r->d ]; + value = cn[j]; + assert(rd[i][j] == d); + + last = i * DIMS * REGION_POINTS[current_region] + j * REGION_POINTS[current_region] + c; + expected = unpack_storage_number(pack_storage_number((calculated_number)last, SN_EXISTS)); + + same = (calculated_number_round(value * 10000000.0) == calculated_number_round(expected * 10000000.0)) ? 1 : 0; + if(!same) { + fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, expecting value " + CALCULATED_NUMBER_FORMAT ", RRDR found " CALCULATED_NUMBER_FORMAT ", ### E R R O R ###\n", + st[i]->name, rd[i][j]->name, (unsigned long)time_now, expected, value); + errors++; + } + if(time_retrieved != time_now) { + fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, found RRDR timestamp %lu ### E R R O R ###\n", + st[i]->name, rd[i][j]->name, (unsigned long)time_now, (unsigned long)time_retrieved); + errors++; + } + } + } + rrdr_free(r); + } + } + return errors; +} + +int test_dbengine(void) +{ + int i, j, errors, update_every, current_region; + RRDHOST *host = NULL; + RRDSET *st[CHARTS]; + RRDDIM *rd[CHARTS][DIMS]; + time_t time_start[REGIONS], time_end[REGIONS]; + + error_log_limit_unlimited(); + fprintf(stderr, "\nRunning DB-engine test\n"); + + default_rrd_memory_mode = RRD_MEMORY_MODE_DBENGINE; + + debug(D_RRDHOST, "Initializing localhost with hostname 'unittest-dbengine'"); + host = dbengine_rrdhost_find_or_create("unittest-dbengine"); + if (NULL == host) + return 1; + + current_region = 0; // this is the first region of data + update_every = REGION_UPDATE_EVERY[current_region]; // set data collection frequency to 2 seconds + test_dbengine_create_charts(host, st, rd, update_every); + + time_start[current_region] = 2 * API_RELATIVE_TIME_MAX; + time_end[current_region] = test_dbengine_create_metrics(st,rd, current_region, time_start[current_region]); + + errors = test_dbengine_check_metrics(st, rd, current_region, time_start[current_region]); + if (errors) + goto error_out; + + current_region = 1; //this is the second region of data + update_every = REGION_UPDATE_EVERY[current_region]; // set data collection frequency to 3 seconds + // Align pages for frequency change + for (i = 0 ; i < CHARTS ; ++i) { + st[i]->update_every = update_every; + for (j = 0; j < DIMS; ++j) { + rrdeng_store_metric_flush_current_page(rd[i][j]); + } + } + + time_start[current_region] = time_end[current_region - 1] + update_every; + if (0 != time_start[current_region] % update_every) // align to update_every + time_start[current_region] += update_every - time_start[current_region] % update_every; + time_end[current_region] = test_dbengine_create_metrics(st,rd, current_region, time_start[current_region]); + + errors = test_dbengine_check_metrics(st, rd, current_region, time_start[current_region]); + if (errors) + goto error_out; + + current_region = 2; //this is the third region of data + update_every = REGION_UPDATE_EVERY[current_region]; // set data collection frequency to 1 seconds + // Align pages for frequency change + for (i = 0 ; i < CHARTS ; ++i) { + st[i]->update_every = update_every; + for (j = 0; j < DIMS; ++j) { + rrdeng_store_metric_flush_current_page(rd[i][j]); + } + } + + time_start[current_region] = time_end[current_region - 1] + update_every; + if (0 != time_start[current_region] % update_every) // align to update_every + time_start[current_region] += update_every - time_start[current_region] % update_every; + time_end[current_region] = test_dbengine_create_metrics(st,rd, current_region, time_start[current_region]); + errors = test_dbengine_check_metrics(st, rd, current_region, time_start[current_region]); + if (errors) + goto error_out; + + for (current_region = 0 ; current_region < REGIONS ; ++current_region) { + errors = test_dbengine_check_rrdr(st, rd, current_region, time_start[current_region], time_end[current_region]); + if (errors) + goto error_out; + } + + current_region = 1; + update_every = REGION_UPDATE_EVERY[current_region]; // use the maximum update_every = 3 + errors = 0; + long points = (time_end[REGIONS - 1] - time_start[0]) / update_every - 1; // cover all time regions with RRDR + long point_offset = (time_start[current_region] - time_start[0]) / update_every; + for (i = 0 ; i < CHARTS ; ++i) { + RRDR *r = rrd2rrdr(st[i], points, time_start[0] + update_every, time_end[REGIONS - 1], RRDR_GROUPING_AVERAGE, 0, 0, NULL); + if (!r) { + fprintf(stderr, " DB-engine unittest %s: empty RRDR ### E R R O R ###\n", st[i]->name); + ++errors; + } else { + long c; + + assert(r->st == st[i]); + // test current region values only, since they must be left unchanged + for (c = point_offset ; c < point_offset + rrdr_rows(r) / REGIONS / 2 ; ++c) { + RRDDIM *d; + time_t time_now = time_start[current_region] + (c - point_offset + 2) * update_every; + time_t time_retrieved = r->t[c]; + + // for each dimension + for(j = 0, d = r->st->dimensions ; d && j < r->d ; ++j, d = d->next) { + calculated_number *cn = &r->v[ c * r->d ]; + calculated_number value = cn[j]; + assert(rd[i][j] == d); + + collected_number last = i * DIMS * REGION_POINTS[current_region] + j * REGION_POINTS[current_region] + c - point_offset; + calculated_number expected = unpack_storage_number(pack_storage_number((calculated_number)last, SN_EXISTS)); + + uint8_t same = (calculated_number_round(value * 10000000.0) == calculated_number_round(expected * 10000000.0)) ? 1 : 0; + if(!same) { + fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, expecting value " + CALCULATED_NUMBER_FORMAT ", RRDR found " CALCULATED_NUMBER_FORMAT ", ### E R R O R ###\n", + st[i]->name, rd[i][j]->name, (unsigned long)time_now, expected, value); + errors++; + } + if(time_retrieved != time_now) { + fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, found RRDR timestamp %lu ### E R R O R ###\n", + st[i]->name, rd[i][j]->name, (unsigned long)time_now, (unsigned long)time_retrieved); + errors++; + } + } + } + rrdr_free(r); + } + } +error_out: rrdeng_exit(host->rrdeng_ctx); rrd_wrlock(); rrdhost_delete_charts(host); @@ -1704,43 +1930,25 @@ int test_dbengine(void) void generate_dbengine_dataset(unsigned history_seconds) { - const int DIMS = 128; + const int DSET_DIMS = 128; const uint64_t EXPECTED_COMPRESSION_RATIO = 94; - int j; + int j, update_every = 1; RRDHOST *host = NULL; RRDSET *st; - RRDDIM *rd[DIMS]; + RRDDIM *rd[DSET_DIMS]; char name[101]; time_t time_current, time_present; default_rrd_memory_mode = RRD_MEMORY_MODE_DBENGINE; default_rrdeng_page_cache_mb = 128; - /* Worst case for uncompressible data */ - default_rrdeng_disk_quota_mb = (((uint64_t)DIMS) * sizeof(storage_number) * history_seconds) / (1024 * 1024); + // Worst case for uncompressible data + default_rrdeng_disk_quota_mb = (((uint64_t)DSET_DIMS) * sizeof(storage_number) * history_seconds) / (1024 * 1024); default_rrdeng_disk_quota_mb -= default_rrdeng_disk_quota_mb * EXPECTED_COMPRESSION_RATIO / 100; error_log_limit_unlimited(); debug(D_RRDHOST, "Initializing localhost with hostname 'dbengine-dataset'"); - host = rrdhost_find_or_create( - "dbengine-dataset" - , "dbengine-dataset" - , "dbengine-dataset" - , os_type - , netdata_configured_timezone - , config_get(CONFIG_SECTION_BACKEND, "host tags", "") - , program_name - , program_version - , default_rrd_update_every - , default_rrd_history_entries - , RRD_MEMORY_MODE_DBENGINE - , default_health_enabled - , default_rrdpush_enabled - , default_rrdpush_destination - , default_rrdpush_api_key - , default_rrdpush_send_charts_matching - , NULL - ); + host = dbengine_rrdhost_find_or_create("dbengine-dataset"); if (NULL == host) return; @@ -1748,8 +1956,8 @@ void generate_dbengine_dataset(unsigned history_seconds) // create the chart st = rrdset_create(host, "example", "random", "random", "example", NULL, "random", "random", "random", - NULL, 1, 1, RRDSET_TYPE_LINE); - for (j = 0 ; j < DIMS ; ++j) { + NULL, 1, update_every, RRDSET_TYPE_LINE); + for (j = 0 ; j < DSET_DIMS ; ++j) { snprintfz(name, 100, "random%d", j); rd[j] = rrddim_add(st, name, NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); @@ -1758,7 +1966,7 @@ void generate_dbengine_dataset(unsigned history_seconds) time_present = now_realtime_sec(); // feed it with the test data time_current = time_present - history_seconds; - for (j = 0 ; j < DIMS ; ++j) { + for (j = 0 ; j < DSET_DIMS ; ++j) { rd[j]->last_collected_time.tv_sec = st->last_collected_time.tv_sec = st->last_updated.tv_sec = time_current; rd[j]->last_collected_time.tv_usec = @@ -1767,7 +1975,7 @@ void generate_dbengine_dataset(unsigned history_seconds) for( ; time_current < time_present; ++time_current) { st->usec_since_last_update = USEC_PER_SEC; - for (j = 0; j < DIMS; ++j) { + for (j = 0; j < DSET_DIMS; ++j) { rrddim_set_by_pointer_fake_time(rd[j], (time_current + j) % 128, time_current); } rrdset_done(st); diff --git a/database/README.md b/database/README.md index 90ae502012368d..2fcb69b679f509 100644 --- a/database/README.md +++ b/database/README.md @@ -47,6 +47,8 @@ Currently Netdata supports 6 memory modes: database. There is some amount of RAM dedicated to data caching and indexing and the rest of the data reside compressed on disk. The number of history entries is not fixed in this case, but depends on the configured disk space and the effective compression ratio of the data stored. + This is the **only mode** that supports changing the data collection update frequency + (`update_every`) **without losing** the previously stored metrics. For more details see [here](engine/). You can select the memory mode by editing `netdata.conf` and setting: diff --git a/database/engine/README.md b/database/engine/README.md index eb6d5a3c541314..7791a549f8279d 100644 --- a/database/engine/README.md +++ b/database/engine/README.md @@ -4,6 +4,8 @@ The Database Engine works like a traditional database. There is some amount of RAM dedicated to data caching and indexing and the rest of the data reside compressed on disk. The number of history entries is not fixed in this case, but depends on the configured disk space and the effective compression ratio of the data stored. +This is the **only mode** that supports changing the data collection update frequency +(`update_every`) **without losing** the previously stored metrics. ## Files diff --git a/database/engine/pagecache.c b/database/engine/pagecache.c index 124f2448b134d6..1bd4c94361cda9 100644 --- a/database/engine/pagecache.c +++ b/database/engine/pagecache.c @@ -419,6 +419,35 @@ static inline int is_point_in_time_in_page(struct rrdeng_page_descr *descr, usec return (point_in_time >= descr->start_time && point_in_time <= descr->end_time); } +/* The caller must hold the page index lock */ +static inline struct rrdeng_page_descr * + find_first_page_in_time_range(struct pg_cache_page_index *page_index, usec_t start_time, usec_t end_time) +{ + struct rrdeng_page_descr *descr = NULL; + Pvoid_t *PValue; + Word_t Index; + + Index = (Word_t)(start_time / USEC_PER_SEC); + PValue = JudyLLast(page_index->JudyL_array, &Index, PJE0); + if (likely(NULL != PValue)) { + descr = *PValue; + if (is_page_in_time_range(descr, start_time, end_time)) { + return descr; + } + } + + Index = (Word_t)(start_time / USEC_PER_SEC); + PValue = JudyLFirst(page_index->JudyL_array, &Index, PJE0); + if (likely(NULL != PValue)) { + descr = *PValue; + if (is_page_in_time_range(descr, start_time, end_time)) { + return descr; + } + } + + return NULL; +} + /* Update metric oldest and latest timestamps efficiently when adding new values */ void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_descr *descr) { @@ -510,70 +539,144 @@ void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock); } -/* - * Searches for a page and triggers disk I/O if necessary and possible. +usec_t pg_cache_oldest_time_in_range(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + struct rrdeng_page_descr *descr = NULL; + Pvoid_t *PValue; + struct pg_cache_page_index *page_index; + + uv_rwlock_rdlock(&pg_cache->metrics_index.lock); + PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t)); + if (likely(NULL != PValue)) { + page_index = *PValue; + } + uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); + if (NULL == PValue) { + return INVALID_TIME; + } + + uv_rwlock_rdlock(&page_index->lock); + descr = find_first_page_in_time_range(page_index, start_time, end_time); + if (NULL == descr) { + uv_rwlock_rdunlock(&page_index->lock); + return INVALID_TIME; + } + uv_rwlock_rdunlock(&page_index->lock); + return descr->start_time; +} + +/** + * Return page information for the first page before point_in_time that satisfies the filter. + * @param ctx DB context + * @param page_index page index of a metric + * @param point_in_time the pages that are searched must be older than this timestamp + * @param filter decides if the page satisfies the caller's criteria + * @param page_info the result of the search is set in this pointer + */ +void pg_cache_get_filtered_info_prev(struct rrdengine_instance *ctx, struct pg_cache_page_index *page_index, + usec_t point_in_time, pg_cache_page_info_filter_t *filter, + struct rrdeng_page_info *page_info) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + struct rrdeng_page_descr *descr = NULL; + Pvoid_t *PValue; + Word_t Index; + + (void)pg_cache; + assert(NULL != page_index); + + Index = (Word_t)(point_in_time / USEC_PER_SEC); + uv_rwlock_rdlock(&page_index->lock); + do { + PValue = JudyLPrev(page_index->JudyL_array, &Index, PJE0); + descr = unlikely(NULL == PValue) ? NULL : *PValue; + } while (descr != NULL && !filter(descr)); + if (unlikely(NULL == descr)) { + page_info->page_length = 0; + page_info->start_time = INVALID_TIME; + page_info->end_time = INVALID_TIME; + } else { + page_info->page_length = descr->page_length; + page_info->start_time = descr->start_time; + page_info->end_time = descr->end_time; + } + uv_rwlock_rdunlock(&page_index->lock); +} +/** + * Searches for pages in a time range and triggers disk I/O if necessary and possible. * Does not get a reference. - * Returns page index pointer for given metric UUID. + * @param ctx DB context + * @param id UUID + * @param start_time inclusive starting time in usec + * @param end_time inclusive ending time in usec + * @param page_info_arrayp It allocates (*page_arrayp) and populates it with information of pages that overlap + * with the time range [start_time,end_time]. The caller must free (*page_info_arrayp) with freez(). + * If page_info_arrayp is set to NULL nothing was allocated. + * @param ret_page_indexp Sets the page index pointer (*ret_page_indexp) for the given UUID. + * @return the number of pages that overlap with the time range [start_time,end_time]. */ -struct pg_cache_page_index * - pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time) +unsigned pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time, + struct rrdeng_page_info **page_info_arrayp, struct pg_cache_page_index **ret_page_indexp) { struct page_cache *pg_cache = &ctx->pg_cache; struct rrdeng_page_descr *descr = NULL, *preload_array[PAGE_CACHE_MAX_PRELOAD_PAGES]; struct page_cache_descr *pg_cache_descr = NULL; - int i, j, k, count, found; + unsigned i, j, k, preload_count, count, page_info_array_max_size; unsigned long flags; Pvoid_t *PValue; struct pg_cache_page_index *page_index; Word_t Index; uint8_t failed_to_reserve; + assert(NULL != ret_page_indexp); + uv_rwlock_rdlock(&pg_cache->metrics_index.lock); PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t)); if (likely(NULL != PValue)) { - page_index = *PValue; + *ret_page_indexp = page_index = *PValue; } uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); if (NULL == PValue) { debug(D_RRDENGINE, "%s: No page was found to attempt preload.", __func__); - return NULL; + *ret_page_indexp = NULL; + return 0; } uv_rwlock_rdlock(&page_index->lock); - /* Find first page in range */ - found = 0; - Index = (Word_t)(start_time / USEC_PER_SEC); - PValue = JudyLLast(page_index->JudyL_array, &Index, PJE0); - if (likely(NULL != PValue)) { - descr = *PValue; - if (is_page_in_time_range(descr, start_time, end_time)) { - found = 1; - } - } - if (!found) { - Index = (Word_t)(start_time / USEC_PER_SEC); - PValue = JudyLFirst(page_index->JudyL_array, &Index, PJE0); - if (likely(NULL != PValue)) { - descr = *PValue; - if (is_page_in_time_range(descr, start_time, end_time)) { - found = 1; - } - } - } - if (!found) { + descr = find_first_page_in_time_range(page_index, start_time, end_time); + if (NULL == descr) { uv_rwlock_rdunlock(&page_index->lock); debug(D_RRDENGINE, "%s: No page was found to attempt preload.", __func__); - return page_index; + *ret_page_indexp = NULL; + return 0; + } else { + Index = (Word_t)(descr->start_time / USEC_PER_SEC); + } + if (page_info_arrayp) { + page_info_array_max_size = PAGE_CACHE_MAX_PRELOAD_PAGES * sizeof(struct rrdeng_page_info); + *page_info_arrayp = mallocz(page_info_array_max_size); } - for (count = 0 ; - descr != NULL && is_page_in_time_range(descr, start_time, end_time); + for (count = 0, preload_count = 0 ; + descr != NULL && is_page_in_time_range(descr, start_time, end_time) ; PValue = JudyLNext(page_index->JudyL_array, &Index, PJE0), descr = unlikely(NULL == PValue) ? NULL : *PValue) { /* Iterate all pages in range */ if (unlikely(0 == descr->page_length)) continue; + if (page_info_arrayp) { + if (unlikely(count >= page_info_array_max_size / sizeof(struct rrdeng_page_info))) { + page_info_array_max_size += PAGE_CACHE_MAX_PRELOAD_PAGES * sizeof(struct rrdeng_page_info); + *page_info_arrayp = reallocz(*page_info_arrayp, page_info_array_max_size); + } + (*page_info_arrayp)[count].start_time = descr->start_time; + (*page_info_arrayp)[count].end_time = descr->end_time; + (*page_info_arrayp)[count].page_length = descr->page_length; + } + ++count; + rrdeng_page_descr_mutex_lock(ctx, descr); pg_cache_descr = descr->pg_cache_descr; flags = pg_cache_descr->flags; @@ -586,8 +689,8 @@ struct pg_cache_page_index * } } if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) { - preload_array[count++] = descr; - if (PAGE_CACHE_MAX_PRELOAD_PAGES == count) { + preload_array[preload_count++] = descr; + if (PAGE_CACHE_MAX_PRELOAD_PAGES == preload_count) { rrdeng_page_descr_mutex_unlock(ctx, descr); break; } @@ -598,7 +701,7 @@ struct pg_cache_page_index * uv_rwlock_rdunlock(&page_index->lock); failed_to_reserve = 0; - for (i = 0 ; i < count && !failed_to_reserve ; ++i) { + for (i = 0 ; i < preload_count && !failed_to_reserve ; ++i) { struct rrdeng_cmd cmd; struct rrdeng_page_descr *next; @@ -614,7 +717,7 @@ struct pg_cache_page_index * cmd.read_extent.page_cache_descr[0] = descr; /* don't use this page again */ preload_array[i] = NULL; - for (j = 0, k = 1 ; j < count ; ++j) { + for (j = 0, k = 1 ; j < preload_count ; ++j) { next = preload_array[j]; if (NULL == next) { continue; @@ -635,7 +738,7 @@ struct pg_cache_page_index * } if (failed_to_reserve) { debug(D_RRDENGINE, "%s: Failed to reserve enough memory, canceling I/O.", __func__); - for (i = 0 ; i < count ; ++i) { + for (i = 0 ; i < preload_count ; ++i) { descr = preload_array[i]; if (NULL == descr) { continue; @@ -643,11 +746,15 @@ struct pg_cache_page_index * pg_cache_put(ctx, descr); } } - if (!count) { + if (!preload_count) { /* no such page */ debug(D_RRDENGINE, "%s: No page was eligible to attempt preload.", __func__); } - return page_index; + if (unlikely(0 == count && page_info_arrayp)) { + freez(*page_info_arrayp); + *page_info_arrayp = NULL; + } + return count; } /* @@ -757,6 +864,105 @@ struct rrdeng_page_descr * return descr; } +/* + * Searches for the first page between start_time and end_time and gets a reference. + * start_time and end_time are inclusive. + * If index is NULL lookup by UUID (id). + */ +struct rrdeng_page_descr * +pg_cache_lookup_next(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id, + usec_t start_time, usec_t end_time) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + struct rrdeng_page_descr *descr = NULL; + struct page_cache_descr *pg_cache_descr = NULL; + unsigned long flags; + Pvoid_t *PValue; + struct pg_cache_page_index *page_index; + uint8_t page_not_in_cache; + + if (unlikely(NULL == index)) { + uv_rwlock_rdlock(&pg_cache->metrics_index.lock); + PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t)); + if (likely(NULL != PValue)) { + page_index = *PValue; + } + uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); + if (NULL == PValue) { + return NULL; + } + } else { + page_index = index; + } + pg_cache_reserve_pages(ctx, 1); + + page_not_in_cache = 0; + uv_rwlock_rdlock(&page_index->lock); + while (1) { + descr = find_first_page_in_time_range(page_index, start_time, end_time); + if (NULL == descr || 0 == descr->page_length) { + /* non-empty page not found */ + uv_rwlock_rdunlock(&page_index->lock); + + pg_cache_release_pages(ctx, 1); + return NULL; + } + rrdeng_page_descr_mutex_lock(ctx, descr); + pg_cache_descr = descr->pg_cache_descr; + flags = pg_cache_descr->flags; + if ((flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 0)) { + /* success */ + rrdeng_page_descr_mutex_unlock(ctx, descr); + debug(D_RRDENGINE, "%s: Page was found in memory.", __func__); + break; + } + if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) { + struct rrdeng_cmd cmd; + + uv_rwlock_rdunlock(&page_index->lock); + + cmd.opcode = RRDENG_READ_PAGE; + cmd.read_page.page_cache_descr = descr; + rrdeng_enq_cmd(&ctx->worker_config, &cmd); + + debug(D_RRDENGINE, "%s: Waiting for page to be asynchronously read from disk:", __func__); + if(unlikely(debug_flags & D_RRDENGINE)) + print_page_cache_descr(descr); + while (!(pg_cache_descr->flags & RRD_PAGE_POPULATED)) { + pg_cache_wait_event_unsafe(descr); + } + /* success */ + /* Downgrade exclusive reference to allow other readers */ + pg_cache_descr->flags &= ~RRD_PAGE_LOCKED; + pg_cache_wake_up_waiters_unsafe(descr); + rrdeng_page_descr_mutex_unlock(ctx, descr); + rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1); + return descr; + } + uv_rwlock_rdunlock(&page_index->lock); + debug(D_RRDENGINE, "%s: Waiting for page to be unlocked:", __func__); + if(unlikely(debug_flags & D_RRDENGINE)) + print_page_cache_descr(descr); + if (!(flags & RRD_PAGE_POPULATED)) + page_not_in_cache = 1; + pg_cache_wait_event_unsafe(descr); + rrdeng_page_descr_mutex_unlock(ctx, descr); + + /* reset scan to find again */ + uv_rwlock_rdlock(&page_index->lock); + } + uv_rwlock_rdunlock(&page_index->lock); + + if (!(flags & RRD_PAGE_DIRTY)) + pg_cache_replaceQ_set_hot(ctx, descr); + pg_cache_release_pages(ctx, 1); + if (page_not_in_cache) + rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1); + else + rrd_stat_atomic_add(&ctx->stats.pg_cache_hits, 1); + return descr; +} + struct pg_cache_page_index *create_page_index(uuid_t *id) { struct pg_cache_page_index *page_index; diff --git a/database/engine/pagecache.h b/database/engine/pagecache.h index b5670f82a12a80..d464211e9a8974 100644 --- a/database/engine/pagecache.h +++ b/database/engine/pagecache.h @@ -48,9 +48,6 @@ struct page_cache_descr { * number of descriptor users | DESTROY | LOCKED | ALLOCATED | */ struct rrdeng_page_descr { - uint32_t page_length; - usec_t start_time; - usec_t end_time; uuid_t *id; /* never changes */ struct extent_info *extent; @@ -59,8 +56,25 @@ struct rrdeng_page_descr { /* Compare-And-Swap target for page cache descriptor allocation algorithm */ volatile unsigned long pg_cache_descr_state; + + /* page information */ + usec_t start_time; + usec_t end_time; + uint32_t page_length; }; +#define PAGE_INFO_SCRATCH_SZ (8) +struct rrdeng_page_info { + uint8_t scratch[PAGE_INFO_SCRATCH_SZ]; /* scratch area to be used by page-cache users */ + + usec_t start_time; + usec_t end_time; + uint32_t page_length; +}; + +/* returns 1 for success, 0 for failure */ +typedef int pg_cache_page_info_filter_t(struct rrdeng_page_descr *); + #define PAGE_CACHE_MAX_PRELOAD_PAGES (256) /* maps time ranges to pages */ @@ -149,11 +163,20 @@ extern void pg_cache_put(struct rrdengine_instance *ctx, struct rrdeng_page_desc extern void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, struct rrdeng_page_descr *descr); extern void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, uint8_t remove_dirty); -extern struct pg_cache_page_index * - pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time); +extern usec_t pg_cache_oldest_time_in_range(struct rrdengine_instance *ctx, uuid_t *id, + usec_t start_time, usec_t end_time); +extern void pg_cache_get_filtered_info_prev(struct rrdengine_instance *ctx, struct pg_cache_page_index *page_index, + usec_t point_in_time, pg_cache_page_info_filter_t *filter, + struct rrdeng_page_info *page_info); +extern unsigned + pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time, + struct rrdeng_page_info **page_info_arrayp, struct pg_cache_page_index **ret_page_indexp); extern struct rrdeng_page_descr * pg_cache_lookup(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id, usec_t point_in_time); +extern struct rrdeng_page_descr * + pg_cache_lookup_next(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id, + usec_t start_time, usec_t end_time); extern struct pg_cache_page_index *create_page_index(uuid_t *id); extern void init_page_cache(struct rrdengine_instance *ctx); extern void free_page_cache(struct rrdengine_instance *ctx); diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c index 221216bb3d90e0..39d1485781da4d 100644 --- a/database/engine/rrdengine.c +++ b/database/engine/rrdengine.c @@ -24,6 +24,9 @@ void sanity_check(void) /* page count must fit in 8 bits */ BUILD_BUG_ON(MAX_PAGES_PER_EXTENT > 255); + + /* page info scratch space must be able to hold 2 32-bit integers */ + BUILD_BUG_ON(sizeof(((struct rrdeng_page_info *)0)->scratch) < 2 * sizeof(uint32_t)); } void read_extent_cb(uv_fs_t* req) diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c index a87ce6d648f81c..bf373f31c73183 100644 --- a/database/engine/rrdengineapi.c +++ b/database/engine/rrdengineapi.c @@ -218,6 +218,208 @@ void rrdeng_store_metric_finalize(RRDDIM *rd) } } +/* Returns 1 if the data collection interval is well defined, 0 otherwise */ +static int metrics_with_known_interval(struct rrdeng_page_descr *descr) +{ + unsigned page_entries; + + if (unlikely(INVALID_TIME == descr->start_time || INVALID_TIME == descr->end_time)) + return 0; + page_entries = descr->page_length / sizeof(storage_number); + if (likely(page_entries > 1)) { + return 1; + } + return 0; +} + +static inline uint32_t *pginfo_to_dt(struct rrdeng_page_info *page_info) +{ + return (uint32_t *)&page_info->scratch[0]; +} + +static inline uint32_t *pginfo_to_points(struct rrdeng_page_info *page_info) +{ + return (uint32_t *)&page_info->scratch[sizeof(uint32_t)]; +} + +/** + * Calculates the regions of different data collection intervals in a netdata chart in the time range + * [start_time,end_time]. This call takes the netdata chart read lock. + * @param st the netdata chart whose data collection interval boundaries are calculated. + * @param start_time inclusive starting time in usec + * @param end_time inclusive ending time in usec + * @param region_info_arrayp It allocates (*region_info_arrayp) and populates it with information of regions of a + * reference dimension that that have different data collection intervals and overlap with the time range + * [start_time,end_time]. The caller must free (*region_info_arrayp) with freez(). If region_info_arrayp is set + * to NULL nothing was allocated. + * @param max_intervalp is derefenced and set to be the largest data collection interval of all regions. + * @return number of regions with different data collection intervals. + */ +unsigned rrdeng_variable_step_boundaries(RRDSET *st, time_t start_time, time_t end_time, + struct rrdeng_region_info **region_info_arrayp, unsigned *max_intervalp) +{ + struct pg_cache_page_index *page_index; + struct rrdengine_instance *ctx; + unsigned pages_nr; + RRDDIM *rd_iter, *rd; + struct rrdeng_page_info *page_info_array, *curr, *prev, *old_prev; + unsigned i, j, page_entries, region_points, page_points, regions, max_interval; + time_t now; + usec_t dt, current_position_time, max_time = 0, min_time, curr_time, first_valid_time_in_page; + struct rrdeng_region_info *region_info_array; + uint8_t is_first_region_initialized; + + ctx = st->rrdhost->rrdeng_ctx; + regions = 1; + *max_intervalp = max_interval = 0; + region_info_array = NULL; + *region_info_arrayp = NULL; + page_info_array = NULL; + + rrdset_rdlock(st); + for(rd_iter = st->dimensions, rd = NULL, min_time = (usec_t)-1 ; rd_iter ; rd_iter = rd_iter->next) { + /* + * Choose oldest dimension as reference. This is not equivalent to the union of all dimensions + * but it is a best effort approximation with a bias towards older metrics in a chart. It + * matches netdata behaviour in the sense that dimensions are generally aligned in a chart + * and older dimensions contain more information about the time range. It does not work well + * for metrics that have recently stopped being collected. + */ + curr_time = pg_cache_oldest_time_in_range(ctx, rd_iter->state->rrdeng_uuid, + start_time * USEC_PER_SEC, end_time * USEC_PER_SEC); + if (INVALID_TIME != curr_time && curr_time < min_time) { + rd = rd_iter; + min_time = curr_time; + } + } + rrdset_unlock(st); + if (NULL == rd) { + return 1; + } + pages_nr = pg_cache_preload(ctx, rd->state->rrdeng_uuid, start_time * USEC_PER_SEC, end_time * USEC_PER_SEC, + &page_info_array, &page_index); + if (pages_nr) { + /* conservative allocation, will reduce the size later if necessary */ + region_info_array = mallocz(sizeof(*region_info_array) * pages_nr); + } + is_first_region_initialized = 0; + region_points = 0; + + /* pages loop */ + for (i = 0, curr = NULL, prev = NULL ; i < pages_nr ; ++i) { + old_prev = prev; + prev = curr; + curr = &page_info_array[i]; + *pginfo_to_points(curr) = 0; /* initialize to invalid page */ + *pginfo_to_dt(curr) = 0; /* no known data collection interval yet */ + if (unlikely(INVALID_TIME == curr->start_time || INVALID_TIME == curr->end_time)) { + info("Ignoring page with invalid timestamp."); + prev = old_prev; + continue; + } + page_entries = curr->page_length / sizeof(storage_number); + assert(0 != page_entries); + if (likely(1 != page_entries)) { + dt = (curr->end_time - curr->start_time) / (page_entries - 1); + *pginfo_to_dt(curr) = ROUND_USEC_TO_SEC(dt); + if (unlikely(0 == *pginfo_to_dt(curr))) + *pginfo_to_dt(curr) = 1; + } else { + dt = 0; + } + for (j = 0, page_points = 0 ; j < page_entries ; ++j) { + uint8_t is_metric_out_of_order, is_metric_earlier_than_range; + + is_metric_earlier_than_range = 0; + is_metric_out_of_order = 0; + + current_position_time = curr->start_time + j * dt; + now = current_position_time / USEC_PER_SEC; + if (now > end_time) { /* there will be no more pages in the time range */ + break; + } + if (now < start_time) + is_metric_earlier_than_range = 1; + if (unlikely(current_position_time < max_time)) /* just went back in time */ + is_metric_out_of_order = 1; + if (is_metric_earlier_than_range || unlikely(is_metric_out_of_order)) { + if (unlikely(is_metric_out_of_order)) + info("Ignoring metric with out of order timestamp."); + continue; /* next entry */ + } + /* here is a valid metric */ + ++page_points; + region_info_array[regions - 1].points = ++region_points; + max_time = current_position_time; + if (1 == page_points) + first_valid_time_in_page = current_position_time; + if (unlikely(!is_first_region_initialized)) { + assert(1 == regions); + /* this is the first region */ + region_info_array[0].start_time = current_position_time; + is_first_region_initialized = 1; + } + } + *pginfo_to_points(curr) = page_points; + if (0 == page_points) { + prev = old_prev; + continue; + } + + if (unlikely(0 == dt)) { /* unknown data collection interval */ + assert(1 == page_points); + + if (likely(NULL != prev)) { /* get interval from previous page */ + *pginfo_to_dt(curr) = *pginfo_to_dt(prev); + } else { /* there is no previous page in the query */ + struct rrdeng_page_info db_page_info; + + /* go to database */ + pg_cache_get_filtered_info_prev(ctx, page_index, curr->start_time, + metrics_with_known_interval, &db_page_info); + if (unlikely(db_page_info.start_time == INVALID_TIME || db_page_info.end_time == INVALID_TIME || + 0 == db_page_info.page_length)) { /* nothing in the database, default to update_every */ + *pginfo_to_dt(curr) = rd->update_every; + } else { + unsigned db_entries; + usec_t db_dt; + + db_entries = db_page_info.page_length / sizeof(storage_number); + db_dt = (db_page_info.end_time - db_page_info.start_time) / (db_entries - 1); + *pginfo_to_dt(curr) = ROUND_USEC_TO_SEC(db_dt); + if (unlikely(0 == *pginfo_to_dt(curr))) + *pginfo_to_dt(curr) = 1; + + } + } + } + if (likely(prev) && unlikely(*pginfo_to_dt(curr) != *pginfo_to_dt(prev))) { + info("Data collection interval change detected in query: %"PRIu32" -> %"PRIu32, + *pginfo_to_dt(prev), *pginfo_to_dt(curr)); + region_info_array[regions++ - 1].points -= page_points; + region_info_array[regions - 1].points = region_points = page_points; + region_info_array[regions - 1].start_time = first_valid_time_in_page; + } + if (*pginfo_to_dt(curr) > max_interval) + max_interval = *pginfo_to_dt(curr); + region_info_array[regions - 1].update_every = *pginfo_to_dt(curr); + } + if (page_info_array) + freez(page_info_array); + if (region_info_array) { + if (likely(is_first_region_initialized)) { + /* free unnecessary memory */ + region_info_array = reallocz(region_info_array, sizeof(*region_info_array) * regions); + *region_info_arrayp = region_info_array; + *max_intervalp = max_interval; + } else { + /* empty result */ + freez(region_info_array); + } + } + return regions; +} + /* * Gets a handle for loading metrics from the database. * The handle must be released with rrdeng_load_metric_final(). @@ -226,80 +428,108 @@ void rrdeng_load_metric_init(RRDDIM *rd, struct rrddim_query_handle *rrdimm_hand { struct rrdeng_query_handle *handle; struct rrdengine_instance *ctx; + unsigned pages_nr; ctx = rd->rrdset->rrdhost->rrdeng_ctx; rrdimm_handle->start_time = start_time; rrdimm_handle->end_time = end_time; handle = &rrdimm_handle->rrdeng; + handle->next_page_time = start_time; handle->now = start_time; - handle->dt = rd->rrdset->update_every; + handle->position = 0; handle->ctx = ctx; handle->descr = NULL; - handle->page_index = pg_cache_preload(ctx, rd->state->rrdeng_uuid, - start_time * USEC_PER_SEC, end_time * USEC_PER_SEC); + pages_nr = pg_cache_preload(ctx, rd->state->rrdeng_uuid, start_time * USEC_PER_SEC, end_time * USEC_PER_SEC, + NULL, &handle->page_index); + if (unlikely(NULL == handle->page_index || 0 == pages_nr)) + /* there are no metrics to load */ + handle->next_page_time = INVALID_TIME; } -storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle) +/* Returns the metric and sets its timestamp into current_time */ +storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle, time_t *current_time) { struct rrdeng_query_handle *handle; struct rrdengine_instance *ctx; struct rrdeng_page_descr *descr; storage_number *page, ret; - unsigned position; - usec_t point_in_time; + unsigned position, entries; + usec_t next_page_time, current_position_time; handle = &rrdimm_handle->rrdeng; - if (unlikely(INVALID_TIME == handle->now)) { + if (unlikely(INVALID_TIME == handle->next_page_time)) { return SN_EMPTY_SLOT; } ctx = handle->ctx; - point_in_time = handle->now * USEC_PER_SEC; - descr = handle->descr; - - if (unlikely(NULL == handle->page_index)) { - ret = SN_EMPTY_SLOT; - goto out; + if (unlikely(NULL == (descr = handle->descr))) { + /* it's the first call */ + next_page_time = handle->next_page_time * USEC_PER_SEC; } + position = handle->position + 1; + if (unlikely(NULL == descr || - point_in_time < descr->start_time || - point_in_time > descr->end_time)) { + position >= (descr->page_length / sizeof(storage_number)))) { + /* We need to get a new page */ if (descr) { + /* Drop old page's reference */ + handle->next_page_time = (descr->end_time / USEC_PER_SEC) + 1; + if (unlikely(handle->next_page_time > rrdimm_handle->end_time)) { + goto no_more_metrics; + } + next_page_time = handle->next_page_time * USEC_PER_SEC; #ifdef NETDATA_INTERNAL_CHECKS rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1); #endif pg_cache_put(ctx, descr); handle->descr = NULL; } - descr = pg_cache_lookup(ctx, handle->page_index, &handle->page_index->id, point_in_time); + descr = pg_cache_lookup_next(ctx, handle->page_index, &handle->page_index->id, + next_page_time, rrdimm_handle->end_time * USEC_PER_SEC); if (NULL == descr) { - ret = SN_EMPTY_SLOT; - goto out; + goto no_more_metrics; } #ifdef NETDATA_INTERNAL_CHECKS rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, 1); #endif handle->descr = descr; - } - if (unlikely(INVALID_TIME == descr->start_time || - INVALID_TIME == descr->end_time)) { - ret = SN_EMPTY_SLOT; - goto out; + if (unlikely(INVALID_TIME == descr->start_time || + INVALID_TIME == descr->end_time)) { + goto no_more_metrics; + } + if (unlikely(descr->start_time != descr->end_time && next_page_time > descr->start_time)) { + /* we're in the middle of the page somewhere */ + entries = descr->page_length / sizeof(storage_number); + position = ((uint64_t)(next_page_time - descr->start_time)) * entries / + (descr->end_time - descr->start_time + 1); + } else { + position = 0; + } } page = descr->pg_cache_descr->page; - if (unlikely(descr->start_time == descr->end_time)) { - ret = page[0]; - goto out; - } - position = ((uint64_t)(point_in_time - descr->start_time)) * (descr->page_length / sizeof(storage_number)) / - (descr->end_time - descr->start_time + 1); ret = page[position]; + entries = descr->page_length / sizeof(storage_number); + if (entries > 1) { + usec_t dt; -out: - handle->now += handle->dt; - if (unlikely(handle->now > rrdimm_handle->end_time)) { - handle->now = INVALID_TIME; + dt = (descr->end_time - descr->start_time) / (entries - 1); + current_position_time = descr->start_time + position * dt; + } else { + current_position_time = descr->start_time; } + handle->position = position; + handle->now = current_position_time / USEC_PER_SEC; +/* assert(handle->now >= rrdimm_handle->start_time && handle->now <= rrdimm_handle->end_time); + The above assertion is an approximation and needs to take update_every into account */ + if (unlikely(handle->now >= rrdimm_handle->end_time)) { + /* next calls will not load any more metrics */ + handle->next_page_time = INVALID_TIME; + } + *current_time = handle->now; return ret; + +no_more_metrics: + handle->next_page_time = INVALID_TIME; + return SN_EMPTY_SLOT; } int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_handle) @@ -307,7 +537,7 @@ int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_handle) struct rrdeng_query_handle *handle; handle = &rrdimm_handle->rrdeng; - return (INVALID_TIME == handle->now); + return (INVALID_TIME == handle->next_page_time); } /* diff --git a/database/engine/rrdengineapi.h b/database/engine/rrdengineapi.h index e52aabcbe64750..9b1ab18742580b 100644 --- a/database/engine/rrdengineapi.h +++ b/database/engine/rrdengineapi.h @@ -15,6 +15,12 @@ extern int default_rrdeng_page_cache_mb; extern int default_rrdeng_disk_quota_mb; +struct rrdeng_region_info { + time_t start_time; + int update_every; + unsigned points; +}; + extern void *rrdeng_create_page(struct rrdengine_instance *ctx, uuid_t *id, struct rrdeng_page_descr **ret_descr); extern void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, Word_t page_correlation_id); @@ -25,9 +31,12 @@ extern void rrdeng_store_metric_init(RRDDIM *rd); extern void rrdeng_store_metric_flush_current_page(RRDDIM *rd); extern void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number number); extern void rrdeng_store_metric_finalize(RRDDIM *rd); +extern unsigned + rrdeng_variable_step_boundaries(RRDSET *st, time_t start_time, time_t end_time, + struct rrdeng_region_info **region_info_arrayp, unsigned *max_intervalp); extern void rrdeng_load_metric_init(RRDDIM *rd, struct rrddim_query_handle *rrdimm_handle, time_t start_time, time_t end_time); -extern storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle); +extern storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle, time_t *current_time); extern int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_handle); extern void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle); extern time_t rrdeng_metric_latest_time(RRDDIM *rd); diff --git a/database/engine/rrdenginelib.h b/database/engine/rrdenginelib.h index 36d414e895e714..5685b65a5c5014 100644 --- a/database/engine/rrdenginelib.h +++ b/database/engine/rrdenginelib.h @@ -23,6 +23,8 @@ struct rrdeng_page_descr; #define ALIGN_BYTES_FLOOR(x) (((x) / RRDENG_BLOCK_SIZE) * RRDENG_BLOCK_SIZE) #define ALIGN_BYTES_CEILING(x) ((((x) + RRDENG_BLOCK_SIZE - 1) / RRDENG_BLOCK_SIZE) * RRDENG_BLOCK_SIZE) +#define ROUND_USEC_TO_SEC(x) (((x) + USEC_PER_SEC / 2 - 1) / USEC_PER_SEC) + typedef uintptr_t rrdeng_stats_t; #ifdef __ATOMIC_RELAXED diff --git a/database/rrd.h b/database/rrd.h index 5b09c2dda63a64..39e881252db971 100644 --- a/database/rrd.h +++ b/database/rrd.h @@ -273,8 +273,9 @@ struct rrddim_query_handle { struct rrdeng_page_descr *descr; struct rrdengine_instance *ctx; struct pg_cache_page_index *page_index; - time_t now; //TODO: remove now to implement next point iteration - time_t dt; //TODO: remove dt to implement next point iteration + time_t next_page_time; + time_t now; + unsigned position; } rrdeng; // state the database engine uses #endif }; @@ -307,7 +308,7 @@ struct rrddim_volatile { void (*init)(RRDDIM *rd, struct rrddim_query_handle *handle, time_t start_time, time_t end_time); // run this to load each metric number from the database - storage_number (*next_metric)(struct rrddim_query_handle *handle); + storage_number (*next_metric)(struct rrddim_query_handle *handle, time_t *current_time); // run this to test if the series of next_metric() database queries is finished int (*is_finished)(struct rrddim_query_handle *handle); diff --git a/database/rrddim.c b/database/rrddim.c index 09f364b02b83a0..019ca34a198700 100644 --- a/database/rrddim.c +++ b/database/rrddim.c @@ -118,11 +118,12 @@ static void rrddim_query_init(RRDDIM *rd, struct rrddim_query_handle *handle, ti handle->slotted.finished = 0; } -static storage_number rrddim_query_next_metric(struct rrddim_query_handle *handle) { +static storage_number rrddim_query_next_metric(struct rrddim_query_handle *handle, time_t *current_time) { RRDDIM *rd = handle->rd; long entries = rd->rrdset->entries; long slot = handle->slotted.slot; + (void)current_time; if (unlikely(handle->slotted.slot == handle->slotted.last_slot)) handle->slotted.finished = 1; storage_number n = rd->values[slot++]; diff --git a/web/api/queries/average/average.c b/web/api/queries/average/average.c index c871b8778871ac..2c64358e682a6b 100644 --- a/web/api/queries/average/average.c +++ b/web/api/queries/average/average.c @@ -46,9 +46,12 @@ calculated_number grouping_flush_average(RRDR *r, RRDR_VALUE_FLAGS *rrdr_value_ *rrdr_value_options_ptr |= RRDR_VALUE_EMPTY; } else { - if(unlikely(r->internal.resampling_group != 1)) - value = g->sum / r->internal.resampling_divisor; - else + if(unlikely(r->internal.resampling_group != 1)) { + if (unlikely(r->result_options & RRDR_RESULT_OPTION_VARIABLE_STEP)) + value = g->sum / g->count / r->internal.resampling_divisor; + else + value = g->sum / r->internal.resampling_divisor; + } else value = g->sum / g->count; } diff --git a/web/api/queries/query.c b/web/api/queries/query.c index e90dc8afe7b580..af3bcfe388c558 100644 --- a/web/api/queries/query.c +++ b/web/api/queries/query.c @@ -376,7 +376,7 @@ static inline void rrdr_done(RRDR *r, long rrdr_line) { // ---------------------------------------------------------------------------- // fill RRDR for a single dimension -static inline void do_dimension( +static inline void do_dimension_variablestep( RRDR *r , long points_wanted , RRDDIM *rd @@ -384,16 +384,16 @@ static inline void do_dimension( , time_t after_wanted , time_t before_wanted ){ - RRDSET *st = r->st; +// RRDSET *st = r->st; time_t now = after_wanted, - dt = st->update_every, + dt = r->update_every, max_date = 0, min_date = 0; long - group_size = r->group, +// group_size = r->group, points_added = 0, values_in_group = 0, values_in_group_non_zero = 0, @@ -403,102 +403,240 @@ static inline void do_dimension( group_value_flags = RRDR_VALUE_NOTHING; struct rrddim_query_handle handle; - uint8_t initialized_query; calculated_number min = r->min, max = r->max; size_t db_points_read = 0; + time_t db_now = now; + storage_number n_curr, n_prev = SN_EMPTY_SLOT; + calculated_number value; + + for(rd->state->query_ops.init(rd, &handle, now, before_wanted) ; points_added < points_wanted ; now += dt) { + // make sure we return data in the proper time range + if (unlikely(now > before_wanted)) { +#ifdef NETDATA_INTERNAL_CHECKS + r->internal.log = "stopped, because attempted to access the db after 'wanted before'"; +#endif + break; + } + if (unlikely(now < after_wanted)) { +#ifdef NETDATA_INTERNAL_CHECKS + r->internal.log = "skipped, because attempted to access the db before 'wanted after'"; +#endif + continue; + } + + while (now >= db_now && (!rd->state->query_ops.is_finished(&handle) || + does_storage_number_exist(n_prev))) { + value = NAN; + if (does_storage_number_exist(n_prev)) { + // use the previously read database value + n_curr = n_prev; + } else { + // read the value from the database + n_curr = rd->state->query_ops.next_metric(&handle, &db_now); + } + n_prev = SN_EMPTY_SLOT; + // db_now has a different value than above + if (likely(now >= db_now)) { + if (likely(does_storage_number_exist(n_curr))) { + value = unpack_storage_number(n_curr); + if (likely(value != 0.0)) + values_in_group_non_zero++; + + if (unlikely(did_storage_number_reset(n_curr))) + group_value_flags |= RRDR_VALUE_RESET; + } + } else { + // We must postpone processing the value and fill the result with gaps instead + if (likely(does_storage_number_exist(n_curr))) { + n_prev = n_curr; + } + } + // add this value to grouping + r->internal.grouping_add(r, value); + values_in_group++; + db_points_read++; + } + + if (0 == values_in_group) { + // add NAN to grouping + r->internal.grouping_add(r, NAN); + } + + rrdr_line = rrdr_line_init(r, now, rrdr_line); + + if(unlikely(!min_date)) min_date = now; + max_date = now; + + // find the place to store our values + RRDR_VALUE_FLAGS *rrdr_value_options_ptr = &r->o[rrdr_line * r->d + dim_id_in_rrdr]; + + // update the dimension options + if(likely(values_in_group_non_zero)) + r->od[dim_id_in_rrdr] |= RRDR_DIMENSION_NONZERO; + + // store the specific point options + *rrdr_value_options_ptr = group_value_flags; + + // store the value + value = r->internal.grouping_flush(r, rrdr_value_options_ptr); + r->v[rrdr_line * r->d + dim_id_in_rrdr] = value; + + if(likely(points_added || dim_id_in_rrdr)) { + // find the min/max across all dimensions + + if(unlikely(value < min)) min = value; + if(unlikely(value > max)) max = value; + + } + else { + // runs only when dim_id_in_rrdr == 0 && points_added == 0 + // so, on the first point added for the query. + min = max = value; + } + + points_added++; + values_in_group = 0; + group_value_flags = RRDR_VALUE_NOTHING; + values_in_group_non_zero = 0; + } + rd->state->query_ops.finalize(&handle); + + r->internal.db_points_read += db_points_read; + r->internal.result_points_generated += points_added; + + r->min = min; + r->max = max; + r->before = max_date; + r->after = min_date - (r->group - 1) * dt; + rrdr_done(r, rrdr_line); + + #ifdef NETDATA_INTERNAL_CHECKS + if(unlikely(r->rows != points_added)) + error("INTERNAL ERROR: %s.%s added %zu rows, but RRDR says I added %zu.", r->st->name, rd->name, (size_t)points_added, (size_t)r->rows); + #endif +} - for(initialized_query = 0 ; points_added < points_wanted ; now += dt) { +static inline void do_dimension_fixedstep( + RRDR *r + , long points_wanted + , RRDDIM *rd + , long dim_id_in_rrdr + , time_t after_wanted + , time_t before_wanted +){ + RRDSET *st = r->st; + + time_t + now = after_wanted, + dt = r->update_every / r->group, /* usually is st->update_every */ + max_date = 0, + min_date = 0; + long + group_size = r->group, + points_added = 0, + values_in_group = 0, + values_in_group_non_zero = 0, + rrdr_line = -1; + + RRDR_VALUE_FLAGS + group_value_flags = RRDR_VALUE_NOTHING; + + struct rrddim_query_handle handle; + + calculated_number min = r->min, max = r->max; + size_t db_points_read = 0; + time_t db_now = now; + + for(rd->state->query_ops.init(rd, &handle, now, before_wanted) ; points_added < points_wanted ; now += dt) { // make sure we return data in the proper time range if(unlikely(now > before_wanted)) { - #ifdef NETDATA_INTERNAL_CHECKS +#ifdef NETDATA_INTERNAL_CHECKS r->internal.log = "stopped, because attempted to access the db after 'wanted before'"; - #endif +#endif break; } if(unlikely(now < after_wanted)) { - #ifdef NETDATA_INTERNAL_CHECKS +#ifdef NETDATA_INTERNAL_CHECKS r->internal.log = "skipped, because attempted to access the db before 'wanted after'"; - #endif +#endif continue; } - - if (unlikely(!initialized_query)) { - rd->state->query_ops.init(rd, &handle, now, before_wanted); - initialized_query = 1; - } // read the value from the database //storage_number n = rd->values[slot]; #ifdef NETDATA_INTERNAL_CHECKS - if (rd->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) { -#ifdef ENABLE_DBENGINE - if (now != handle.rrdeng.now) - error("INTERNAL CHECK: Unaligned query for %s, database time: %ld, expected time: %ld", rd->id, (long)handle.rrdeng.now, (long)now); -#endif - } else if (rrdset_time2slot(st, now) != (long unsigned)handle.slotted.slot) { + if ((rd->rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE) && + (rrdset_time2slot(st, now) != (long unsigned)handle.slotted.slot)) { error("INTERNAL CHECK: Unaligned query for %s, database slot: %lu, expected slot: %lu", rd->id, (long unsigned)handle.slotted.slot, rrdset_time2slot(st, now)); } #endif - storage_number n = rd->state->query_ops.next_metric(&handle); - calculated_number value = NAN; - if(likely(does_storage_number_exist(n))) { + db_now = now; // this is needed to set db_now in case the next_metric implementation does not set it + storage_number n = rd->state->query_ops.next_metric(&handle, &db_now); + for ( ; now <= db_now ; now += dt) { + calculated_number value = NAN; + if(likely(now >= db_now && does_storage_number_exist(n))) { +#if defined(NETDATA_INTERNAL_CHECKS) && defined(ENABLE_DBENGINE) + if ((rd->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) && (now != handle.rrdeng.now)) { + error("INTERNAL CHECK: Unaligned query for %s, database time: %ld, expected time: %ld", rd->id, (long)handle.rrdeng.now, (long)now); + } +#endif + value = unpack_storage_number(n); + if(likely(value != 0.0)) + values_in_group_non_zero++; - value = unpack_storage_number(n); - if(likely(value != 0.0)) - values_in_group_non_zero++; + if(unlikely(did_storage_number_reset(n))) + group_value_flags |= RRDR_VALUE_RESET; - if(unlikely(did_storage_number_reset(n))) - group_value_flags |= RRDR_VALUE_RESET; + } - } + // add this value for grouping + r->internal.grouping_add(r, value); + values_in_group++; + db_points_read++; - // add this value for grouping - r->internal.grouping_add(r, value); - values_in_group++; - db_points_read++; + if(unlikely(values_in_group == group_size)) { + rrdr_line = rrdr_line_init(r, now, rrdr_line); - if(unlikely(values_in_group == group_size)) { - rrdr_line = rrdr_line_init(r, now, rrdr_line); + if(unlikely(!min_date)) min_date = now; + max_date = now; - if(unlikely(!min_date)) min_date = now; - max_date = now; + // find the place to store our values + RRDR_VALUE_FLAGS *rrdr_value_options_ptr = &r->o[rrdr_line * r->d + dim_id_in_rrdr]; - // find the place to store our values - RRDR_VALUE_FLAGS *rrdr_value_options_ptr = &r->o[rrdr_line * r->d + dim_id_in_rrdr]; + // update the dimension options + if(likely(values_in_group_non_zero)) + r->od[dim_id_in_rrdr] |= RRDR_DIMENSION_NONZERO; - // update the dimension options - if(likely(values_in_group_non_zero)) - r->od[dim_id_in_rrdr] |= RRDR_DIMENSION_NONZERO; + // store the specific point options + *rrdr_value_options_ptr = group_value_flags; - // store the specific point options - *rrdr_value_options_ptr = group_value_flags; + // store the value + calculated_number value = r->internal.grouping_flush(r, rrdr_value_options_ptr); + r->v[rrdr_line * r->d + dim_id_in_rrdr] = value; - // store the value - calculated_number value = r->internal.grouping_flush(r, rrdr_value_options_ptr); - r->v[rrdr_line * r->d + dim_id_in_rrdr] = value; + if(likely(points_added || dim_id_in_rrdr)) { + // find the min/max across all dimensions - if(likely(points_added || dim_id_in_rrdr)) { - // find the min/max across all dimensions + if(unlikely(value < min)) min = value; + if(unlikely(value > max)) max = value; - if(unlikely(value < min)) min = value; - if(unlikely(value > max)) max = value; + } + else { + // runs only when dim_id_in_rrdr == 0 && points_added == 0 + // so, on the first point added for the query. + min = max = value; + } + points_added++; + values_in_group = 0; + group_value_flags = RRDR_VALUE_NOTHING; + values_in_group_non_zero = 0; } - else { - // runs only when dim_id_in_rrdr == 0 && points_added == 0 - // so, on the first point added for the query. - min = max = value; - } - - points_added++; - values_in_group = 0; - group_value_flags = RRDR_VALUE_NOTHING; - values_in_group_non_zero = 0; } + now = db_now; } - if (likely(initialized_query)) - rd->state->query_ops.finalize(&handle); + rd->state->query_ops.finalize(&handle); r->internal.db_points_read += db_points_read; r->internal.result_points_generated += points_added; @@ -506,13 +644,13 @@ static inline void do_dimension( r->min = min; r->max = max; r->before = max_date; - r->after = min_date - (r->group - 1) * r->st->update_every; + r->after = min_date - (r->group - 1) * dt; rrdr_done(r, rrdr_line); - #ifdef NETDATA_INTERNAL_CHECKS +#ifdef NETDATA_INTERNAL_CHECKS if(unlikely(r->rows != points_added)) error("INTERNAL ERROR: %s.%s added %zu rows, but RRDR says I added %zu.", r->st->name, rd->name, (size_t)points_added, (size_t)r->rows); - #endif +#endif } // ---------------------------------------------------------------------------- @@ -589,22 +727,18 @@ static void rrd2rrdr_log_request_response_metdata(RRDR *r } #endif // NETDATA_INTERNAL_CHECKS -RRDR *rrd2rrdr( - RRDSET *st - , long points_requested - , long long after_requested - , long long before_requested - , RRDR_GROUPING group_method - , long resampling_time_requested - , RRDR_OPTIONS options - , const char *dimensions +// Returns 1 if an absolute period was requested or 0 if it was a relative period +static int rrdr_convert_before_after_to_absolute( + long long *after_requestedp + , long long *before_requestedp + , time_t first_entry_t + , time_t last_entry_t ) { - int aligned = !(options & RRDR_OPTION_NOT_ALIGNED); - int absolute_period_requested = -1; + long long after_requested, before_requested; - time_t first_entry_t = rrdset_first_entry_t(st); - time_t last_entry_t = rrdset_last_entry_t(st); + before_requested = *before_requestedp; + after_requested = *after_requestedp; if(before_requested == 0 && after_requested == 0) { // dump the all the data @@ -614,25 +748,15 @@ RRDR *rrd2rrdr( } // allow relative for before (smaller than API_RELATIVE_TIME_MAX) - if(((before_requested < 0)?-before_requested:before_requested) <= API_RELATIVE_TIME_MAX) { - if(abs(before_requested) % st->update_every) { - // make sure it is multiple of st->update_every - if(before_requested < 0) before_requested = before_requested - st->update_every - before_requested % st->update_every; - else before_requested = before_requested + st->update_every - before_requested % st->update_every; - } + if(abs(before_requested) <= API_RELATIVE_TIME_MAX) { if(before_requested > 0) before_requested = first_entry_t + before_requested; - else before_requested = last_entry_t + before_requested; + else before_requested = last_entry_t + before_requested; //last_entry_t is not really now_t + //TODO: fix before_requested to be relative to now_t absolute_period_requested = 0; } // allow relative for after (smaller than API_RELATIVE_TIME_MAX) - if(((after_requested < 0)?-after_requested:after_requested) <= API_RELATIVE_TIME_MAX) { - if(after_requested == 0) after_requested = -st->update_every; - if(abs(after_requested) % st->update_every) { - // make sure it is multiple of st->update_every - if(after_requested < 0) after_requested = after_requested - st->update_every - after_requested % st->update_every; - else after_requested = after_requested + st->update_every - after_requested % st->update_every; - } + if(abs(after_requested) <= API_RELATIVE_TIME_MAX) { after_requested = before_requested + after_requested; absolute_period_requested = 0; } @@ -654,9 +778,53 @@ RRDR *rrd2rrdr( after_requested = tmp; } + *before_requestedp = before_requested; + *after_requestedp = after_requested; + + return absolute_period_requested; +} + +static RRDR *rrd2rrdr_fixedstep( + RRDSET *st + , long points_requested + , long long after_requested + , long long before_requested + , RRDR_GROUPING group_method + , long resampling_time_requested + , RRDR_OPTIONS options + , const char *dimensions + , int update_every + , time_t first_entry_t + , time_t last_entry_t + , int absolute_period_requested +) { + int aligned = !(options & RRDR_OPTION_NOT_ALIGNED); + + if(!absolute_period_requested) { + if(before_requested % update_every) { + // make sure it is multiple of update_every + if(before_requested > 0) + before_requested = before_requested - update_every + before_requested % update_every; + #ifdef NETDATA_INTERNAL_CHECKS + else + error("INTERNAL ERROR: rrd2rrdr() on %s, negative or zero before_requested", st->name); + #endif + } + if(after_requested % update_every) { + // make sure it is multiple of update_every + if(after_requested < 0) + after_requested = after_requested - update_every + after_requested % update_every; + #ifdef NETDATA_INTERNAL_CHECKS + else + error("INTERNAL ERROR: rrd2rrdr() on %s, negative or zero after_requested", st->name); + #endif + } + if(after_requested == before_requested) after_requested -= update_every; + } + // the duration of the chart time_t duration = before_requested - after_requested; - long available_points = duration / st->update_every; + long available_points = duration / update_every; if(duration <= 0 || available_points <= 0) return rrdr_create(st, 1); @@ -674,7 +842,7 @@ RRDR *rrd2rrdr( // resampling_time_requested enforces a certain grouping multiple calculated_number resampling_divisor = 1.0; long resampling_group = 1; - if(unlikely(resampling_time_requested > st->update_every)) { + if(unlikely(resampling_time_requested > update_every)) { if (unlikely(resampling_time_requested > duration)) { // group_time is above the available duration @@ -684,7 +852,7 @@ RRDR *rrd2rrdr( after_requested = before_requested - resampling_time_requested; duration = before_requested - after_requested; - available_points = duration / st->update_every; + available_points = duration / update_every; group = available_points / points_requested; } @@ -696,16 +864,16 @@ RRDR *rrd2rrdr( if(delta > resampling_time_requested / 10) { after_requested -= resampling_time_requested - delta; duration = before_requested - after_requested; - available_points = duration / st->update_every; + available_points = duration / update_every; group = available_points / points_requested; } } // the points we should group to satisfy gtime - resampling_group = resampling_time_requested / st->update_every; - if(unlikely(resampling_time_requested % st->update_every)) { + resampling_group = resampling_time_requested / update_every; + if(unlikely(resampling_time_requested % update_every)) { #ifdef NETDATA_INTERNAL_CHECKS - info("INTERNAL CHECK: %s: requested gtime %ld secs, is not a multiple of the chart's data collection frequency %d secs", st->id, resampling_time_requested, st->update_every); + info("INTERNAL CHECK: %s: requested gtime %ld secs, is not a multiple of the chart's data collection frequency %d secs", st->id, resampling_time_requested, update_every); #endif resampling_group++; @@ -716,7 +884,7 @@ RRDR *rrd2rrdr( if(unlikely(group % resampling_group)) group += resampling_group - (group % resampling_group); // make sure group is multiple of resampling_group //resampling_divisor = group / resampling_group; - resampling_divisor = (calculated_number)(group * st->update_every) / (calculated_number)resampling_time_requested; + resampling_divisor = (calculated_number)(group * update_every) / (calculated_number)resampling_time_requested; } // now that we have group, @@ -724,8 +892,8 @@ RRDR *rrd2rrdr( if(aligned) { // alignement has been requested, so align the values - before_requested -= (before_requested % group); - after_requested -= (after_requested % group); + before_requested -= before_requested % (group * update_every); + after_requested -= after_requested % (group * update_every); } // we align the request on requested_before @@ -735,28 +903,28 @@ RRDR *rrd2rrdr( error("INTERNAL ERROR: rrd2rrdr() on %s, before_wanted is after db max", st->name); #endif - before_wanted = last_entry_t - (last_entry_t % ( ((aligned)?group:1) * st->update_every )); + before_wanted = last_entry_t - (last_entry_t % ( ((aligned)?group:1) * update_every )); } //size_t before_slot = rrdset_time2slot(st, before_wanted); // we need to estimate the number of points, for having // an integer number of values per point - long points_wanted = (before_wanted - after_requested) / (st->update_every * group); + long points_wanted = (before_wanted - after_requested) / (update_every * group); - time_t after_wanted = before_wanted - (points_wanted * group * st->update_every) + st->update_every; + time_t after_wanted = before_wanted - (points_wanted * group * update_every) + update_every; if(unlikely(after_wanted < first_entry_t)) { // hm... we go to the past, calculate again points_wanted using all the db from before_wanted to the beginning points_wanted = (before_wanted - first_entry_t) / group; // recalculate after wanted with the new number of points - after_wanted = before_wanted - (points_wanted * group * st->update_every) + st->update_every; + after_wanted = before_wanted - (points_wanted * group * update_every) + update_every; if(unlikely(after_wanted < first_entry_t)) { #ifdef NETDATA_INTERNAL_CHECKS error("INTERNAL ERROR: rrd2rrdr() on %s, after_wanted is before db min", st->name); #endif - after_wanted = first_entry_t - (first_entry_t % ( ((aligned)?group:1) * st->update_every )) + ( ((aligned)?group:1) * st->update_every ); + after_wanted = first_entry_t - (first_entry_t % ( ((aligned)?group:1) * update_every )) + ( ((aligned)?group:1) * update_every ); } } //size_t after_slot = rrdset_time2slot(st, after_wanted); @@ -772,7 +940,7 @@ RRDR *rrd2rrdr( } // recalculate points_wanted using the final time-frame - points_wanted = (before_wanted - after_wanted) / st->update_every / group + 1; + points_wanted = (before_wanted - after_wanted) / update_every / group + 1; if(unlikely(points_wanted < 0)) { #ifdef NETDATA_INTERNAL_CHECKS error("INTERNAL ERROR: rrd2rrdr() on %s, points_wanted is %ld", st->name, points_wanted); @@ -803,8 +971,8 @@ RRDR *rrd2rrdr( error("INTERNAL CHECK: after_slot is invalid %zu, expected 0 to %ld", after_slot, st->entries - 1); */ - if(points_wanted > (before_wanted - after_wanted) / group / st->update_every + 1) - error("INTERNAL CHECK: points_wanted %ld is more than points %ld", points_wanted, (before_wanted - after_wanted) / group / st->update_every + 1); + if(points_wanted > (before_wanted - after_wanted) / group / update_every + 1) + error("INTERNAL CHECK: points_wanted %ld is more than points %ld", points_wanted, (before_wanted - after_wanted) / group / update_every + 1); if(group < resampling_group) error("INTERNAL CHECK: group %ld is less than the desired group points %ld", group, resampling_group); @@ -844,7 +1012,7 @@ RRDR *rrd2rrdr( // initialize RRDR r->group = group; - r->update_every = (int)group * st->update_every; + r->update_every = (int)group * update_every; r->before = before_wanted; r->after = after_wanted; r->internal.points_wanted = points_wanted; @@ -913,7 +1081,7 @@ RRDR *rrd2rrdr( // reset the grouping for the new dimension r->internal.grouping_reset(r); - do_dimension( + do_dimension_fixedstep( r , points_wanted , rd @@ -961,30 +1129,426 @@ RRDR *rrd2rrdr( } #ifdef NETDATA_INTERNAL_CHECKS + if (dimensions_used) { + if(r->internal.log) + rrd2rrdr_log_request_response_metdata(r, group_method, aligned, group, resampling_time_requested, resampling_group, after_wanted, after_requested, before_wanted, before_requested, points_requested, points_wanted, /*after_slot, before_slot,*/ r->internal.log); + + if(r->rows != points_wanted) + rrd2rrdr_log_request_response_metdata(r, group_method, aligned, group, resampling_time_requested, resampling_group, after_wanted, after_requested, before_wanted, before_requested, points_requested, points_wanted, /*after_slot, before_slot,*/ "got 'points' is not wanted 'points'"); + + if(aligned && (r->before % group) != 0) + rrd2rrdr_log_request_response_metdata(r, group_method, aligned, group, resampling_time_requested, resampling_group, after_wanted, after_requested, before_wanted, before_requested, points_requested, points_wanted, /*after_slot, before_slot,*/ "'before' is not aligned but alignment is required"); + + // 'after' should not be aligned, since we start inside the first group + //if(aligned && (r->after % group) != 0) + // rrd2rrdr_log_request_response_metdata(r, group_method, aligned, group, resampling_time_requested, resampling_group, after_wanted, after_requested, before_wanted, before_requested, points_requested, points_wanted, after_slot, before_slot, "'after' is not aligned but alignment is required"); + + if(r->before != before_requested) + rrd2rrdr_log_request_response_metdata(r, group_method, aligned, group, resampling_time_requested, resampling_group, after_wanted, after_requested, before_wanted, before_requested, points_requested, points_wanted, /*after_slot, before_slot,*/ "chart is not aligned to requested 'before'"); + + if(r->before != before_wanted) + rrd2rrdr_log_request_response_metdata(r, group_method, aligned, group, resampling_time_requested, resampling_group, after_wanted, after_requested, before_wanted, before_requested, points_requested, points_wanted, /*after_slot, before_slot,*/ "got 'before' is not wanted 'before'"); + + // reported 'after' varies, depending on group + if(r->after != after_wanted) + rrd2rrdr_log_request_response_metdata(r, group_method, aligned, group, resampling_time_requested, resampling_group, after_wanted, after_requested, before_wanted, before_requested, points_requested, points_wanted, /*after_slot, before_slot,*/ "got 'after' is not wanted 'after'"); + } + #endif + + // free all resources used by the grouping method + r->internal.grouping_free(r); + + // when all the dimensions are zero, we should return all of them + if(unlikely(options & RRDR_OPTION_NONZERO && !dimensions_nonzero)) { + // all the dimensions are zero + // mark them as NONZERO to send them all + for(rd = st->dimensions, c = 0 ; rd && c < dimensions_count ; rd = rd->next, c++) { + if(unlikely(r->od[c] & RRDR_DIMENSION_HIDDEN)) continue; + r->od[c] |= RRDR_DIMENSION_NONZERO; + } + } + + rrdr_query_completed(r->internal.db_points_read, r->internal.result_points_generated); + return r; +} + +#ifdef ENABLE_DBENGINE +static RRDR *rrd2rrdr_variablestep( + RRDSET *st + , long points_requested + , long long after_requested + , long long before_requested + , RRDR_GROUPING group_method + , long resampling_time_requested + , RRDR_OPTIONS options + , const char *dimensions + , int update_every + , time_t first_entry_t + , time_t last_entry_t + , int absolute_period_requested + , struct rrdeng_region_info *region_info_array +) { + int aligned = !(options & RRDR_OPTION_NOT_ALIGNED); + + if(!absolute_period_requested) { + if(before_requested % update_every) { + // make sure it is multiple of update_every + if(before_requested > 0) + before_requested = before_requested - before_requested % update_every; + #ifdef NETDATA_INTERNAL_CHECKS + else + error("INTERNAL ERROR: rrd2rrdr() on %s, negative or zero before_requested", st->name); + #endif + } + if(after_requested % update_every) { + // make sure it is multiple of update_every + if(after_requested < 0) + after_requested = after_requested - after_requested % update_every; + #ifdef NETDATA_INTERNAL_CHECKS + else + error("INTERNAL ERROR: rrd2rrdr() on %s, negative or zero after_requested", st->name); + #endif + } + if(after_requested == before_requested) after_requested -= update_every; + } + + // the duration of the chart + time_t duration = before_requested - after_requested; + long available_points = duration / update_every; + + if(duration <= 0 || available_points <= 0) { + freez(region_info_array); + return rrdr_create(st, 1); + } + + // check the number of wanted points in the result + if(unlikely(points_requested < 0)) points_requested = -points_requested; + if(unlikely(points_requested > available_points)) points_requested = available_points; + if(unlikely(points_requested == 0)) points_requested = available_points; + + // calculate the desired grouping of source data points + long group = available_points / points_requested; + if(unlikely(group <= 0)) group = 1; + if(unlikely(available_points % points_requested > points_requested / 2)) group++; // rounding to the closest integer + + // resampling_time_requested enforces a certain grouping multiple + calculated_number resampling_divisor = 1.0; + long resampling_group = 1; + if(unlikely(resampling_time_requested > update_every)) { + if (unlikely(resampling_time_requested > duration)) { + // group_time is above the available duration + + #ifdef NETDATA_INTERNAL_CHECKS + info("INTERNAL CHECK: %s: requested gtime %ld secs, is greater than the desired duration %ld secs", st->id, resampling_time_requested, duration); + #endif + + after_requested = before_requested - resampling_time_requested; + duration = before_requested - after_requested; + available_points = duration / update_every; + group = available_points / points_requested; + } + + // if the duration is not aligned to resampling time + // extend the duration to the past, to avoid a gap at the chart + // only when the missing duration is above 1/10th of a point + if(duration % resampling_time_requested) { + time_t delta = duration % resampling_time_requested; + if(delta > resampling_time_requested / 10) { + after_requested -= resampling_time_requested - delta; + duration = before_requested - after_requested; + available_points = duration / update_every; + group = available_points / points_requested; + } + } + + // the points we should group to satisfy gtime + resampling_group = resampling_time_requested / update_every; + if(unlikely(resampling_time_requested % update_every)) { + #ifdef NETDATA_INTERNAL_CHECKS + info("INTERNAL CHECK: %s: requested gtime %ld secs, is not a multiple of the chart's data collection frequency %d secs", st->id, resampling_time_requested, update_every); + #endif + + resampling_group++; + } + + // adapt group according to resampling_group + if(unlikely(group < resampling_group)) group = resampling_group; // do not allow grouping below the desired one + if(unlikely(group % resampling_group)) group += resampling_group - (group % resampling_group); // make sure group is multiple of resampling_group + + //resampling_divisor = group / resampling_group; + resampling_divisor = (calculated_number)(group * update_every) / (calculated_number)resampling_time_requested; + } + + // now that we have group, + // align the requested timeframe to fit it. + + if(aligned) { + // alignement has been requested, so align the values + before_requested -= before_requested % (group * update_every); + after_requested -= after_requested % (group * update_every); + } + + // we align the request on requested_before + time_t before_wanted = before_requested; + if(likely(before_wanted > last_entry_t)) { + #ifdef NETDATA_INTERNAL_CHECKS + error("INTERNAL ERROR: rrd2rrdr() on %s, before_wanted is after db max", st->name); + #endif + + before_wanted = last_entry_t - (last_entry_t % ( ((aligned)?group:1) * update_every )); + } + //size_t before_slot = rrdset_time2slot(st, before_wanted); + + // we need to estimate the number of points, for having + // an integer number of values per point + long points_wanted = (before_wanted - after_requested) / (update_every * group); + + time_t after_wanted = before_wanted - (points_wanted * group * update_every) + update_every; + if(unlikely(after_wanted < first_entry_t)) { + // hm... we go to the past, calculate again points_wanted using all the db from before_wanted to the beginning + points_wanted = (before_wanted - first_entry_t) / group; + + // recalculate after wanted with the new number of points + after_wanted = before_wanted - (points_wanted * group * update_every) + update_every; + + if(unlikely(after_wanted < first_entry_t)) { + #ifdef NETDATA_INTERNAL_CHECKS + error("INTERNAL ERROR: rrd2rrdr() on %s, after_wanted is before db min", st->name); + #endif + + after_wanted = first_entry_t - (first_entry_t % ( ((aligned)?group:1) * update_every )) + ( ((aligned)?group:1) * update_every ); + } + } + //size_t after_slot = rrdset_time2slot(st, after_wanted); + + // check if they are reversed + if(unlikely(after_wanted > before_wanted)) { + #ifdef NETDATA_INTERNAL_CHECKS + error("INTERNAL ERROR: rrd2rrdr() on %s, reversed wanted after/before", st->name); + #endif + time_t tmp = before_wanted; + before_wanted = after_wanted; + after_wanted = tmp; + } - if(r->internal.log) - rrd2rrdr_log_request_response_metdata(r, group_method, aligned, group, resampling_time_requested, resampling_group, after_wanted, after_requested, before_wanted, before_requested, points_requested, points_wanted, /*after_slot, before_slot,*/ r->internal.log); + // recalculate points_wanted using the final time-frame + points_wanted = (before_wanted - after_wanted) / update_every / group + 1; + if(unlikely(points_wanted < 0)) { + #ifdef NETDATA_INTERNAL_CHECKS + error("INTERNAL ERROR: rrd2rrdr() on %s, points_wanted is %ld", st->name, points_wanted); + #endif + points_wanted = 0; + } + +#ifdef NETDATA_INTERNAL_CHECKS + duration = before_wanted - after_wanted; - if(r->rows != points_wanted) - rrd2rrdr_log_request_response_metdata(r, group_method, aligned, group, resampling_time_requested, resampling_group, after_wanted, after_requested, before_wanted, before_requested, points_requested, points_wanted, /*after_slot, before_slot,*/ "got 'points' is not wanted 'points'"); + if(after_wanted < first_entry_t) + error("INTERNAL CHECK: after_wanted %u is too small, minimum %u", (uint32_t)after_wanted, (uint32_t)first_entry_t); - if(aligned && (r->before % group) != 0) - rrd2rrdr_log_request_response_metdata(r, group_method, aligned, group, resampling_time_requested, resampling_group, after_wanted, after_requested, before_wanted, before_requested, points_requested, points_wanted, /*after_slot, before_slot,*/ "'before' is not aligned but alignment is required"); + if(after_wanted > last_entry_t) + error("INTERNAL CHECK: after_wanted %u is too big, maximum %u", (uint32_t)after_wanted, (uint32_t)last_entry_t); - // 'after' should not be aligned, since we start inside the first group - //if(aligned && (r->after % group) != 0) - // rrd2rrdr_log_request_response_metdata(r, group_method, aligned, group, resampling_time_requested, resampling_group, after_wanted, after_requested, before_wanted, before_requested, points_requested, points_wanted, after_slot, before_slot, "'after' is not aligned but alignment is required"); + if(before_wanted < first_entry_t) + error("INTERNAL CHECK: before_wanted %u is too small, minimum %u", (uint32_t)before_wanted, (uint32_t)first_entry_t); - if(r->before != before_requested) - rrd2rrdr_log_request_response_metdata(r, group_method, aligned, group, resampling_time_requested, resampling_group, after_wanted, after_requested, before_wanted, before_requested, points_requested, points_wanted, /*after_slot, before_slot,*/ "chart is not aligned to requested 'before'"); + if(before_wanted > last_entry_t) + error("INTERNAL CHECK: before_wanted %u is too big, maximum %u", (uint32_t)before_wanted, (uint32_t)last_entry_t); - if(r->before != before_wanted) - rrd2rrdr_log_request_response_metdata(r, group_method, aligned, group, resampling_time_requested, resampling_group, after_wanted, after_requested, before_wanted, before_requested, points_requested, points_wanted, /*after_slot, before_slot,*/ "got 'before' is not wanted 'before'"); +/* + if(before_slot >= (size_t)st->entries) + error("INTERNAL CHECK: before_slot is invalid %zu, expected 0 to %ld", before_slot, st->entries - 1); - // reported 'after' varies, depending on group - if(r->after != after_wanted) - rrd2rrdr_log_request_response_metdata(r, group_method, aligned, group, resampling_time_requested, resampling_group, after_wanted, after_requested, before_wanted, before_requested, points_requested, points_wanted, /*after_slot, before_slot,*/ "got 'after' is not wanted 'after'"); + if(after_slot >= (size_t)st->entries) + error("INTERNAL CHECK: after_slot is invalid %zu, expected 0 to %ld", after_slot, st->entries - 1); +*/ + if(points_wanted > (before_wanted - after_wanted) / group / update_every + 1) + error("INTERNAL CHECK: points_wanted %ld is more than points %ld", points_wanted, (before_wanted - after_wanted) / group / update_every + 1); + + if(group < resampling_group) + error("INTERNAL CHECK: group %ld is less than the desired group points %ld", group, resampling_group); + + if(group > resampling_group && group % resampling_group) + error("INTERNAL CHECK: group %ld is not a multiple of the desired group points %ld", group, resampling_group); +#endif + + // ------------------------------------------------------------------------- + // initialize our result set + // this also locks the chart for us + + RRDR *r = rrdr_create(st, points_wanted); + if(unlikely(!r)) { + #ifdef NETDATA_INTERNAL_CHECKS + error("INTERNAL CHECK: Cannot create RRDR for %s, after=%u, before=%u, duration=%u, points=%ld", st->id, (uint32_t)after_wanted, (uint32_t)before_wanted, (uint32_t)duration, points_wanted); + #endif + freez(region_info_array); + return NULL; + } + + if(unlikely(!r->d || !points_wanted)) { + #ifdef NETDATA_INTERNAL_CHECKS + error("INTERNAL CHECK: Returning empty RRDR (no dimensions in RRDSET) for %s, after=%u, before=%u, duration=%zu, points=%ld", st->id, (uint32_t)after_wanted, (uint32_t)before_wanted, (size_t)duration, points_wanted); + #endif + freez(region_info_array); + return r; + } + + r->result_options |= RRDR_RESULT_OPTION_VARIABLE_STEP; + if(unlikely(absolute_period_requested == 1)) + r->result_options |= RRDR_RESULT_OPTION_ABSOLUTE; + else + r->result_options |= RRDR_RESULT_OPTION_RELATIVE; + + // find how many dimensions we have + long dimensions_count = r->d; + + // ------------------------------------------------------------------------- + // initialize RRDR + + r->group = group; + r->update_every = (int)group * update_every; + r->before = before_wanted; + r->after = after_wanted; + r->internal.points_wanted = points_wanted; + r->internal.resampling_group = resampling_group; + r->internal.resampling_divisor = resampling_divisor; + + + // ------------------------------------------------------------------------- + // assign the processor functions + + { + int i, found = 0; + for(i = 0; !found && api_v1_data_groups[i].name ;i++) { + if(api_v1_data_groups[i].value == group_method) { + r->internal.grouping_create= api_v1_data_groups[i].create; + r->internal.grouping_reset = api_v1_data_groups[i].reset; + r->internal.grouping_free = api_v1_data_groups[i].free; + r->internal.grouping_add = api_v1_data_groups[i].add; + r->internal.grouping_flush = api_v1_data_groups[i].flush; + found = 1; + } + } + if(!found) { + errno = 0; + #ifdef NETDATA_INTERNAL_CHECKS + error("INTERNAL ERROR: grouping method %u not found for chart '%s'. Using 'average'", (unsigned int)group_method, r->st->name); + #endif + r->internal.grouping_create= grouping_create_average; + r->internal.grouping_reset = grouping_reset_average; + r->internal.grouping_free = grouping_free_average; + r->internal.grouping_add = grouping_add_average; + r->internal.grouping_flush = grouping_flush_average; + } + } + + // allocate any memory required by the grouping method + r->internal.grouping_data = r->internal.grouping_create(r); + + + // ------------------------------------------------------------------------- + // disable the not-wanted dimensions + + rrdset_check_rdlock(st); + + if(dimensions) + rrdr_disable_not_selected_dimensions(r, options, dimensions); + + + // ------------------------------------------------------------------------- + // do the work for each dimension + + time_t max_after = 0, min_before = 0; + long max_rows = 0; + + RRDDIM *rd; + long c, dimensions_used = 0, dimensions_nonzero = 0; + for(rd = st->dimensions, c = 0 ; rd && c < dimensions_count ; rd = rd->next, c++) { + + // if we need a percentage, we need to calculate all dimensions + if(unlikely(!(options & RRDR_OPTION_PERCENTAGE) && (r->od[c] & RRDR_DIMENSION_HIDDEN))) { + if(unlikely(r->od[c] & RRDR_DIMENSION_SELECTED)) r->od[c] &= ~RRDR_DIMENSION_SELECTED; + continue; + } + r->od[c] |= RRDR_DIMENSION_SELECTED; + + // reset the grouping for the new dimension + r->internal.grouping_reset(r); + + do_dimension_variablestep( + r + , points_wanted + , rd + , c + , after_wanted + , before_wanted + ); + + if(r->od[c] & RRDR_DIMENSION_NONZERO) + dimensions_nonzero++; + + // verify all dimensions are aligned + if(unlikely(!dimensions_used)) { + min_before = r->before; + max_after = r->after; + max_rows = r->rows; + } + else { + if(r->after != max_after) { + #ifdef NETDATA_INTERNAL_CHECKS + error("INTERNAL ERROR: 'after' mismatch between dimensions for chart '%s': max is %zu, dimension '%s' has %zu", + st->name, (size_t)max_after, rd->name, (size_t)r->after); + #endif + r->after = (r->after > max_after) ? r->after : max_after; + } + + if(r->before != min_before) { + #ifdef NETDATA_INTERNAL_CHECKS + error("INTERNAL ERROR: 'before' mismatch between dimensions for chart '%s': max is %zu, dimension '%s' has %zu", + st->name, (size_t)min_before, rd->name, (size_t)r->before); + #endif + r->before = (r->before < min_before) ? r->before : min_before; + } + + if(r->rows != max_rows) { + #ifdef NETDATA_INTERNAL_CHECKS + error("INTERNAL ERROR: 'rows' mismatch between dimensions for chart '%s': max is %zu, dimension '%s' has %zu", + st->name, (size_t)max_rows, rd->name, (size_t)r->rows); + #endif + r->rows = (r->rows > max_rows) ? r->rows : max_rows; + } + } + + dimensions_used++; + } + + #ifdef NETDATA_INTERNAL_CHECKS + + if (dimensions_used) { + if(r->internal.log) + rrd2rrdr_log_request_response_metdata(r, group_method, aligned, group, resampling_time_requested, resampling_group, after_wanted, after_requested, before_wanted, before_requested, points_requested, points_wanted, /*after_slot, before_slot,*/ r->internal.log); + + if(r->rows != points_wanted) + rrd2rrdr_log_request_response_metdata(r, group_method, aligned, group, resampling_time_requested, resampling_group, after_wanted, after_requested, before_wanted, before_requested, points_requested, points_wanted, /*after_slot, before_slot,*/ "got 'points' is not wanted 'points'"); + + if(aligned && (r->before % group) != 0) + rrd2rrdr_log_request_response_metdata(r, group_method, aligned, group, resampling_time_requested, resampling_group, after_wanted, after_requested, before_wanted, before_requested, points_requested, points_wanted, /*after_slot, before_slot,*/ "'before' is not aligned but alignment is required"); + + // 'after' should not be aligned, since we start inside the first group + //if(aligned && (r->after % group) != 0) + // rrd2rrdr_log_request_response_metdata(r, group_method, aligned, group, resampling_time_requested, resampling_group, after_wanted, after_requested, before_wanted, before_requested, points_requested, points_wanted, after_slot, before_slot, "'after' is not aligned but alignment is required"); + + if(r->before != before_requested) + rrd2rrdr_log_request_response_metdata(r, group_method, aligned, group, resampling_time_requested, resampling_group, after_wanted, after_requested, before_wanted, before_requested, points_requested, points_wanted, /*after_slot, before_slot,*/ "chart is not aligned to requested 'before'"); + + if(r->before != before_wanted) + rrd2rrdr_log_request_response_metdata(r, group_method, aligned, group, resampling_time_requested, resampling_group, after_wanted, after_requested, before_wanted, before_requested, points_requested, points_wanted, /*after_slot, before_slot,*/ "got 'before' is not wanted 'before'"); + + // reported 'after' varies, depending on group + if(r->after != after_wanted) + rrd2rrdr_log_request_response_metdata(r, group_method, aligned, group, resampling_time_requested, resampling_group, after_wanted, after_requested, before_wanted, before_requested, points_requested, points_wanted, /*after_slot, before_slot,*/ "got 'after' is not wanted 'after'"); + } #endif // free all resources used by the grouping method @@ -1001,5 +1565,57 @@ RRDR *rrd2rrdr( } rrdr_query_completed(r->internal.db_points_read, r->internal.result_points_generated); + freez(region_info_array); return r; } +#endif //#ifdef ENABLE_DBENGINE + +RRDR *rrd2rrdr( + RRDSET *st + , long points_requested + , long long after_requested + , long long before_requested + , RRDR_GROUPING group_method + , long resampling_time_requested + , RRDR_OPTIONS options + , const char *dimensions +) { + int rrd_update_every; + int absolute_period_requested; + time_t first_entry_t = rrdset_first_entry_t(st); + time_t last_entry_t = rrdset_last_entry_t(st); + + absolute_period_requested = rrdr_convert_before_after_to_absolute(&after_requested, &before_requested, + first_entry_t, last_entry_t); + +#ifdef ENABLE_DBENGINE + if ((st->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE)) { + struct rrdeng_region_info *region_info_array; + unsigned regions, max_interval; + + /* This call takes the chart read-lock */ + regions = rrdeng_variable_step_boundaries(st, after_requested, before_requested, + ®ion_info_array, &max_interval); + if (1 == regions) { + if (region_info_array) + rrd_update_every = region_info_array[0].update_every; + else + rrd_update_every = st->update_every; + if (region_info_array) + freez(region_info_array); + return rrd2rrdr_fixedstep(st, points_requested, after_requested, before_requested, group_method, + resampling_time_requested, options, dimensions, rrd_update_every, + first_entry_t, last_entry_t, absolute_period_requested); + } else { + rrd_update_every = (uint16_t)max_interval; + return rrd2rrdr_variablestep(st, points_requested, after_requested, before_requested, group_method, + resampling_time_requested, options, dimensions, rrd_update_every, + first_entry_t, last_entry_t, absolute_period_requested, region_info_array); + } + } +#endif + rrd_update_every = st->update_every; + return rrd2rrdr_fixedstep(st, points_requested, after_requested, before_requested, group_method, + resampling_time_requested, options, dimensions, + rrd_update_every, first_entry_t, last_entry_t, absolute_period_requested); +} \ No newline at end of file diff --git a/web/api/queries/rrdr.h b/web/api/queries/rrdr.h index 6473ae74578d52..6a031adf8977a9 100644 --- a/web/api/queries/rrdr.h +++ b/web/api/queries/rrdr.h @@ -6,7 +6,7 @@ #include "libnetdata/libnetdata.h" typedef enum rrdr_options { - RRDR_OPTION_NONZERO = 0x00000001, // don't output dimensions will just zero values + RRDR_OPTION_NONZERO = 0x00000001, // don't output dimensions with just zero values RRDR_OPTION_REVERSED = 0x00000002, // output the rows in reverse order (oldest to newest) RRDR_OPTION_ABSOLUTE = 0x00000004, // values positive, for DATASOURCE_SSV before summing RRDR_OPTION_MIN2MAX = 0x00000008, // when adding dimensions, use max - min, instead of sum @@ -18,7 +18,7 @@ typedef enum rrdr_options { RRDR_OPTION_JSON_WRAP = 0x00000200, // wrap the response in a JSON header with info about the result RRDR_OPTION_LABEL_QUOTES = 0x00000400, // in CSV output, wrap header labels in double quotes RRDR_OPTION_PERCENTAGE = 0x00000800, // give values as percentage of total - RRDR_OPTION_NOT_ALIGNED = 0x00001000, // do not align charts for persistant timeframes + RRDR_OPTION_NOT_ALIGNED = 0x00001000, // do not align charts for persistent timeframes RRDR_OPTION_DISPLAY_ABS = 0x00002000, // for badges, display the absolute value, but calculate colors with sign RRDR_OPTION_MATCH_IDS = 0x00004000, // when filtering dimensions, match only IDs RRDR_OPTION_MATCH_NAMES = 0x00008000, // when filtering dimensions, match only names @@ -40,8 +40,11 @@ typedef enum rrdr_dimension_flag { // RRDR result options typedef enum rrdr_result_flags { - RRDR_RESULT_OPTION_ABSOLUTE = 0x00000001, // the query uses absolute time-frames (can be cached by browsers and proxies) - RRDR_RESULT_OPTION_RELATIVE = 0x00000002, // the query uses relative time-frames (should not to be cached by browsers and proxies) + RRDR_RESULT_OPTION_ABSOLUTE = 0x00000001, // the query uses absolute time-frames + // (can be cached by browsers and proxies) + RRDR_RESULT_OPTION_RELATIVE = 0x00000002, // the query uses relative time-frames + // (should not to be cached by browsers and proxies) + RRDR_RESULT_OPTION_VARIABLE_STEP = 0x00000004, // the query uses variable-step time-frames } RRDR_RESULT_FLAGS; typedef struct rrdresult {