Skip to content

Commit

Permalink
Adding concurrent search versions of query count and time metrics (op…
Browse files Browse the repository at this point in the history
…ensearch-project#9622)

Signed-off-by: Jay Deng <[email protected]>
  • Loading branch information
jed326 authored and Jay Deng committed Sep 1, 2023
1 parent fada3f0 commit 4a4b5ee
Show file tree
Hide file tree
Showing 10 changed files with 375 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Core crypto library to perform encryption and decryption of source content ([#8466](https://github.com/opensearch-project/OpenSearch/pull/8466))
- Expose DelimitedTermFrequencyTokenFilter to allow providing term frequencies along with terms ([#9479](https://github.com/opensearch-project/OpenSearch/pull/9479))
- APIs for performing async blob reads and async downloads from the repository using multiple streams ([#9592](https://github.com/opensearch-project/OpenSearch/issues/9592))
- Add concurrent segment search related metrics to node and index stats ([#9622](https://github.com/opensearch-project/OpenSearch/issues/9622))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.17.1 to 2.20.0 ([#8307](https://github.com/opensearch-project/OpenSearch/pull/8307))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,105 @@
---
"Help":
- skip:
version: " - 2.3.99"
version: " - 2.9.99"
reason: concurrent search stats were added in 2.10.0
features: node_selector
- do:
cat.shards:
help: true
node_selector:
version: "2.10.0 - "

- match:
$body: |
/^ index .+ \n
shard .+ \n
prirep .+ \n
state .+ \n
docs .+ \n
store .+ \n
ip .+ \n
id .+ \n
node .+ \n
sync_id .+ \n
unassigned.reason .+ \n
unassigned.at .+ \n
unassigned.for .+ \n
unassigned.details .+ \n
recoverysource.type .+ \n
completion.size .+ \n
fielddata.memory_size .+ \n
fielddata.evictions .+ \n
query_cache.memory_size .+ \n
query_cache.evictions .+ \n
flush.total .+ \n
flush.total_time .+ \n
get.current .+ \n
get.time .+ \n
get.total .+ \n
get.exists_time .+ \n
get.exists_total .+ \n
get.missing_time .+ \n
get.missing_total .+ \n
indexing.delete_current .+ \n
indexing.delete_time .+ \n
indexing.delete_total .+ \n
indexing.index_current .+ \n
indexing.index_time .+ \n
indexing.index_total .+ \n
indexing.index_failed .+ \n
merges.current .+ \n
merges.current_docs .+ \n
merges.current_size .+ \n
merges.total .+ \n
merges.total_docs .+ \n
merges.total_size .+ \n
merges.total_time .+ \n
refresh.total .+ \n
refresh.time .+ \n
refresh.external_total .+ \n
refresh.external_time .+ \n
refresh.listeners .+ \n
search.fetch_current .+ \n
search.fetch_time .+ \n
search.fetch_total .+ \n
search.open_contexts .+ \n
search.query_current .+ \n
search.query_time .+ \n
search.query_total .+ \n
search.concurrent_query_current .+ \n
search.concurrent_query_time .+ \n
search.concurrent_query_total .+ \n
search.scroll_current .+ \n
search.scroll_time .+ \n
search.scroll_total .+ \n
search.point_in_time_current .+ \n
search.point_in_time_time .+ \n
search.point_in_time_total .+ \n
segments.count .+ \n
segments.memory .+ \n
segments.index_writer_memory .+ \n
segments.version_map_memory .+ \n
segments.fixed_bitset_memory .+ \n
seq_no.max .+ \n
seq_no.local_checkpoint .+ \n
seq_no.global_checkpoint .+ \n
warmer.current .+ \n
warmer.total .+ \n
warmer.total_time .+ \n
path.data .+ \n
path.state .+ \n
$/
---
"Help between 2.4.0 - 2.9.99":
- skip:
version: " - 2.3.99 , 2.9.99 - "
reason: point in time stats were added in 2.4.0
features: node_selector
- do:
cat.shards:
help: true
node_selector:
version: "2.4.0 - "
version: "2.4.0 - 2.10.0"

- match:
$body: |
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.stats;

import org.opensearch.action.admin.indices.stats.IndicesStatsRequestBuilder;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
import org.opensearch.indices.IndicesQueryCache;
import org.opensearch.indices.IndicesService;
import org.opensearch.plugins.Plugin;
import org.opensearch.script.MockScriptPlugin;
import org.opensearch.script.Script;
import org.opensearch.script.ScriptType;
import org.opensearch.test.InternalSettingsPlugin;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.function.Function;

import static org.opensearch.index.query.QueryBuilders.scriptQuery;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 2, numClientNodes = 0)
public class ConcurrentSearchStatsIT extends OpenSearchIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(ScriptedDelayedPlugin.class, InternalSettingsPlugin.class);
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
// Filter/Query cache is cleaned periodically, default is 60s, so make sure it runs often. Thread.sleep for 60s is bad
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(IndicesService.INDICES_CACHE_CLEAN_INTERVAL_SETTING.getKey(), "1ms")
.put(IndicesQueryCache.INDICES_QUERIES_CACHE_ALL_SEGMENTS_SETTING.getKey(), true)
.build();
}

@Override
public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put(IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING.getKey(), false)
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 0)
.build();
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.CONCURRENT_SEGMENT_SEARCH, "true").build();
}

public void testConcurrentQueryCount() throws Exception {
int NUM_SHARDS = randomIntBetween(1, 5);
createIndex(
"test1",
Settings.builder()
.put(indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, NUM_SHARDS)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build()
);
createIndex(
"test2",
Settings.builder()
.put(indexSettings())
.put("search.concurrent_segment_search.enabled", false)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, NUM_SHARDS)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build()
);

ensureGreen();

indexRandom(
false,
true,
client().prepareIndex("test1").setId("1").setSource("foo", "bar"),
client().prepareIndex("test1").setId("2").setSource("foo", "baz"),
client().prepareIndex("test2").setId("1").setSource("foo", "bar"),
client().prepareIndex("test2").setId("2").setSource("foo", "baz")
);

refresh();

// Search with custom plugin to ensure that queryTime is significant
client().prepareSearch("_all")
.setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", ScriptedDelayedPlugin.SCRIPT_NAME, Collections.emptyMap())))
.execute()
.actionGet();
client().prepareSearch("test1")
.setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", ScriptedDelayedPlugin.SCRIPT_NAME, Collections.emptyMap())))
.execute()
.actionGet();
client().prepareSearch("test2")
.setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", ScriptedDelayedPlugin.SCRIPT_NAME, Collections.emptyMap())))
.execute()
.actionGet();

IndicesStatsRequestBuilder builder = client().admin().indices().prepareStats();
IndicesStatsResponse stats = builder.execute().actionGet();

assertEquals(4 * NUM_SHARDS, stats.getTotal().search.getTotal().getQueryCount());
assertEquals(2 * NUM_SHARDS, stats.getTotal().search.getTotal().getConcurrentQueryCount());
assertThat(stats.getTotal().search.getTotal().getQueryTimeInMillis(), greaterThan(0L));
assertThat(stats.getTotal().search.getTotal().getConcurrentQueryTimeInMillis(), greaterThan(0L));
assertThat(
stats.getTotal().search.getTotal().getConcurrentQueryTimeInMillis(),
lessThan(stats.getTotal().search.getTotal().getQueryTimeInMillis())
);
}

public static class ScriptedDelayedPlugin extends MockScriptPlugin {
static final String SCRIPT_NAME = "search_timeout";

@Override
public Map<String, Function<Map<String, Object>, Object>> pluginScripts() {
return Collections.singletonMap(SCRIPT_NAME, params -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return true;
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ public static class Stats implements Writeable, ToXContentFragment {
private long queryTimeInMillis;
private long queryCurrent;

private long concurrentQueryCount;
private long concurrentQueryTimeInMillis;
private long concurrentQueryCurrent;

private long fetchCount;
private long fetchTimeInMillis;
private long fetchCurrent;
Expand All @@ -91,6 +95,9 @@ public Stats(
long queryCount,
long queryTimeInMillis,
long queryCurrent,
long concurrentQueryCount,
long concurrentQueryTimeInMillis,
long concurrentQueryCurrent,
long fetchCount,
long fetchTimeInMillis,
long fetchCurrent,
Expand All @@ -108,6 +115,10 @@ public Stats(
this.queryTimeInMillis = queryTimeInMillis;
this.queryCurrent = queryCurrent;

this.concurrentQueryCount = concurrentQueryCount;
this.concurrentQueryTimeInMillis = concurrentQueryTimeInMillis;
this.concurrentQueryCurrent = concurrentQueryCurrent;

this.fetchCount = fetchCount;
this.fetchTimeInMillis = fetchTimeInMillis;
this.fetchCurrent = fetchCurrent;
Expand Down Expand Up @@ -147,13 +158,23 @@ private Stats(StreamInput in) throws IOException {
pitTimeInMillis = in.readVLong();
pitCurrent = in.readVLong();
}

if (in.getVersion().onOrAfter(Version.V_2_10_0)) {
concurrentQueryCount = in.readVLong();
concurrentQueryTimeInMillis = in.readVLong();
concurrentQueryCurrent = in.readVLong();
}
}

public void add(Stats stats) {
queryCount += stats.queryCount;
queryTimeInMillis += stats.queryTimeInMillis;
queryCurrent += stats.queryCurrent;

concurrentQueryCount += stats.concurrentQueryCount;
concurrentQueryTimeInMillis += stats.concurrentQueryTimeInMillis;
concurrentQueryCurrent += stats.concurrentQueryCurrent;

fetchCount += stats.fetchCount;
fetchTimeInMillis += stats.fetchTimeInMillis;
fetchCurrent += stats.fetchCurrent;
Expand All @@ -175,6 +196,9 @@ public void addForClosingShard(Stats stats) {
queryCount += stats.queryCount;
queryTimeInMillis += stats.queryTimeInMillis;

concurrentQueryCount += stats.concurrentQueryCount;
concurrentQueryTimeInMillis += stats.concurrentQueryTimeInMillis;

fetchCount += stats.fetchCount;
fetchTimeInMillis += stats.fetchTimeInMillis;

Expand Down Expand Up @@ -207,6 +231,22 @@ public long getQueryCurrent() {
return queryCurrent;
}

public long getConcurrentQueryCount() {
return concurrentQueryCount;
}

public TimeValue getConcurrentQueryTime() {
return new TimeValue(concurrentQueryTimeInMillis);
}

public long getConcurrentQueryTimeInMillis() {
return concurrentQueryTimeInMillis;
}

public long getConcurrentQueryCurrent() {
return concurrentQueryCurrent;
}

public long getFetchCount() {
return fetchCount;
}
Expand Down Expand Up @@ -298,6 +338,12 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(pitTimeInMillis);
out.writeVLong(pitCurrent);
}

if (out.getVersion().onOrAfter(Version.V_2_10_0)) {
out.writeVLong(concurrentQueryCount);
out.writeVLong(concurrentQueryTimeInMillis);
out.writeVLong(concurrentQueryCurrent);
}
}

@Override
Expand All @@ -306,6 +352,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.humanReadableField(Fields.QUERY_TIME_IN_MILLIS, Fields.QUERY_TIME, getQueryTime());
builder.field(Fields.QUERY_CURRENT, queryCurrent);

builder.field(Fields.CONCURRENT_QUERY_TOTAL, concurrentQueryCount);
builder.humanReadableField(Fields.CONCURRENT_QUERY_TIME_IN_MILLIS, Fields.CONCURRENT_QUERY_TIME, getConcurrentQueryTime());
builder.field(Fields.CONCURRENT_QUERY_CURRENT, concurrentQueryCurrent);

builder.field(Fields.FETCH_TOTAL, fetchCount);
builder.humanReadableField(Fields.FETCH_TIME_IN_MILLIS, Fields.FETCH_TIME, getFetchTime());
builder.field(Fields.FETCH_CURRENT, fetchCurrent);
Expand Down Expand Up @@ -430,6 +480,10 @@ static final class Fields {
static final String QUERY_TIME = "query_time";
static final String QUERY_TIME_IN_MILLIS = "query_time_in_millis";
static final String QUERY_CURRENT = "query_current";
static final String CONCURRENT_QUERY_TOTAL = "concurrent_query_total";
static final String CONCURRENT_QUERY_TIME = "concurrent_query_time";
static final String CONCURRENT_QUERY_TIME_IN_MILLIS = "concurrent_query_time_in_millis";
static final String CONCURRENT_QUERY_CURRENT = "concurrent_query_current";
static final String FETCH_TOTAL = "fetch_total";
static final String FETCH_TIME = "fetch_time";
static final String FETCH_TIME_IN_MILLIS = "fetch_time_in_millis";
Expand Down
Loading

0 comments on commit 4a4b5ee

Please sign in to comment.