Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[POC] Add ThreadContext user info in top queries #12529

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add kuromoji_completion analyzer and filter ([#4835](https://github.com/opensearch-project/OpenSearch/issues/4835))
- The org.opensearch.bootstrap.Security should support codebase for JAR files with classifiers ([#12586](https://github.com/opensearch-project/OpenSearch/issues/12586))
- Make search query counters dynamic to support all query types ([#12601](https://github.com/opensearch-project/OpenSearch/pull/12601))
- [Query insights] Add remote address info in top queries ([#12529](https://github.com/opensearch-project/OpenSearch/pull/12529))

### Dependencies
- Bump `peter-evans/find-comment` from 2 to 3 ([#12288](https://github.com/opensearch-project/OpenSearch/pull/12288))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,26 @@
import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.opensearch.action.admin.cluster.node.info.PluginsAndModules;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.search.SearchRequestBuilder;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.transport.TransportAddress;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.plugin.insights.rules.action.top_queries.TopQueries;
import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesAction;
import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesRequest;
import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesResponse;
import org.opensearch.plugin.insights.rules.model.Attribute;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.PluginInfo;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.Assert;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
Expand All @@ -49,6 +56,8 @@ public class QueryInsightsPluginTransportIT extends OpenSearchIntegTestCase {

private final int TOTAL_NUMBER_OF_NODES = 2;
private final int TOTAL_SEARCH_REQUESTS = 5;
private final String remoteAddress = "1.2.3.4";
private final int remotePort = 1234;

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
Expand Down Expand Up @@ -143,7 +152,7 @@ public void testUpdateRecordWhenFeatureDisabledThenEnabled() throws ExecutionExc
/**
* Test get top queries when feature enabled
*/
public void testGetTopQueriesWhenFeatureEnabled() throws InterruptedException {
public void testGetTopQueriesWhenFeatureEnabled() throws InterruptedException, UnknownHostException {
Settings commonSettings = Settings.builder()
.put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "true")
.put(TOP_N_LATENCY_QUERIES_SIZE.getKey(), "100")
Expand All @@ -168,10 +177,11 @@ public void testGetTopQueriesWhenFeatureEnabled() throws InterruptedException {
}
// making search requests to get top queries
for (int i = 0; i < TOTAL_SEARCH_REQUESTS; i++) {
SearchResponse searchResponse = internalCluster().client(randomFrom(nodes))
SearchRequestBuilder requestBuilder = internalCluster().client(randomFrom(nodes))
.prepareSearch()
.setQuery(QueryBuilders.matchAllQuery())
.get();
.setQuery(QueryBuilders.matchAllQuery());
requestBuilder.request().remoteAddress(new TransportAddress(InetAddress.getByName(remoteAddress), remotePort));
SearchResponse searchResponse = requestBuilder.get();
assertEquals(searchResponse.getFailedShards(), 0);
}
// Sleep to wait for queue drained to top queries store
Expand All @@ -181,6 +191,11 @@ public void testGetTopQueriesWhenFeatureEnabled() throws InterruptedException {
Assert.assertEquals(0, response.failures().size());
Assert.assertEquals(TOTAL_NUMBER_OF_NODES, response.getNodes().size());
Assert.assertEquals(TOTAL_SEARCH_REQUESTS, response.getNodes().stream().mapToInt(o -> o.getTopQueriesRecord().size()).sum());
for (TopQueries nodeRecords : response.getNodes()) {
for (SearchQueryRecord record : nodeRecords.getTopQueriesRecord()) {
Assert.assertEquals(remoteAddress + ":" + remotePort, record.getAttributes().get(Attribute.REMOTE_ADDRESS));
}
}

internalCluster().stopAllNodes();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ public void onRequestEnd(final SearchPhaseContext context, final SearchRequestCo
attributes.put(Attribute.TOTAL_SHARDS, context.getNumShards());
attributes.put(Attribute.INDICES, request.indices());
attributes.put(Attribute.PHASE_LATENCY_MAP, searchRequestContext.phaseTookMap());
attributes.put(Attribute.REMOTE_ADDRESS, String.valueOf(searchRequestContext.getRequestRemoteAddress()));
SearchQueryRecord record = new SearchQueryRecord(request.getOrCreateAbsoluteStartMillis(), measurements, attributes);
queryInsightsService.addRecord(record);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ public enum Attribute {
/**
* The node id for this request
*/
NODE_ID;
NODE_ID,
/**
* The remote address of this request
*/
REMOTE_ADDRESS;

/**
* Read an Attribute from a StreamInput
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,31 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.transport.TransportAddress;
import org.opensearch.plugin.insights.core.service.QueryInsightsService;
import org.opensearch.plugin.insights.core.service.TopQueriesService;
import org.opensearch.plugin.insights.rules.model.Attribute;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;
import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.opensearch.search.aggregations.support.ValueType;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.Before;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Phaser;

import org.mockito.ArgumentCaptor;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
Expand All @@ -47,23 +55,26 @@ public class QueryInsightsListenerTests extends OpenSearchTestCase {
private final SearchRequest searchRequest = mock(SearchRequest.class);
private final QueryInsightsService queryInsightsService = mock(QueryInsightsService.class);
private final TopQueriesService topQueriesService = mock(TopQueriesService.class);
private final Settings.Builder settingsBuilder = Settings.builder();
private final Settings settings = settingsBuilder.build();
private final String remoteAddress = "1.2.3.4";
private final int remotePort = 1234;
private ClusterService clusterService;

@Before
public void setup() {
Settings.Builder settingsBuilder = Settings.builder();
Settings settings = settingsBuilder.build();
public void setup() throws UnknownHostException {
ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED);
clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE);
clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE);
clusterService = new ClusterService(settings, clusterSettings, null);
when(queryInsightsService.isCollectionEnabled(MetricType.LATENCY)).thenReturn(true);
when(queryInsightsService.getTopQueriesService(MetricType.LATENCY)).thenReturn(topQueriesService);
when(searchRequestContext.getRequestRemoteAddress()).thenReturn(new TransportAddress(InetAddress.getByName(remoteAddress), remotePort));
}

public void testOnRequestEnd() throws InterruptedException {
Long timestamp = System.currentTimeMillis() - 100L;
public void testOnRequestEnd() {
long timestamp = System.currentTimeMillis() - 100L;
SearchType searchType = SearchType.QUERY_THEN_FETCH;

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
Expand Down Expand Up @@ -92,6 +103,15 @@ public void testOnRequestEnd() throws InterruptedException {
queryInsightsListener.onRequestEnd(searchPhaseContext, searchRequestContext);

verify(queryInsightsService, times(1)).addRecord(any());
ArgumentCaptor<SearchQueryRecord> argumentCaptor = ArgumentCaptor.forClass(SearchQueryRecord.class);
verify(queryInsightsService).addRecord(argumentCaptor.capture());
assertEquals(timestamp, argumentCaptor.getValue().getTimestamp());
Map<Attribute, Object> attrs = argumentCaptor.getValue().getAttributes();
assertEquals(searchType.toString().toLowerCase(Locale.ROOT), attrs.get(Attribute.SEARCH_TYPE));
assertEquals(numberOfShards, attrs.get(Attribute.TOTAL_SHARDS));
assertEquals(indices, attrs.get(Attribute.INDICES));
assertEquals(phaseLatencyMap, attrs.get(Attribute.PHASE_LATENCY_MAP));
assertEquals(remoteAddress + ":" + remotePort, attrs.get(Attribute.REMOTE_ADDRESS));
}

public void testConcurrentOnRequestEnd() throws InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.apache.lucene.search.TotalHits;
import org.opensearch.common.annotation.InternalApi;
import org.opensearch.core.common.transport.TransportAddress;

import java.util.EnumMap;
import java.util.HashMap;
Expand Down Expand Up @@ -51,6 +52,10 @@ public Map<String, Long> phaseTookMap() {
return phaseTookMap;
}

public TransportAddress getRequestRemoteAddress() {
return searchRequest.remoteAddress();
}

SearchResponse.PhaseTook getPhaseTook() {
if (searchRequest != null && searchRequest.isPhaseTook() != null && searchRequest.isPhaseTook()) {
return new SearchResponse.PhaseTook(phaseTookMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.common.Booleans;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.common.transport.TransportAddress;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.rest.BaseRestHandler;
Expand Down Expand Up @@ -223,6 +224,9 @@ public static void parseSearchRequest(
}

searchRequest.setCancelAfterTimeInterval(request.paramAsTime("cancel_after_time_interval", null));

// set remote address for searchRequest
searchRequest.remoteAddress(new TransportAddress(request.getHttpChannel().getRemoteAddress()));
Comment on lines +227 to +229
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's safe to hijack remoteAddress() like this.

It's intended to be set by InboundHandler on transport requests. Setting it on a REST request may have unintended consequences.

Also, if the request comes through a load balancer or a proxy, this value will almost certainly be the load balancer or proxy's address.

}

/**
Expand Down
Loading