diff --git a/CHANGELOG.md b/CHANGELOG.md index 33d1f6ee02027..5c13c9a1d2718 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -113,6 +113,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add kuromoji_completion analyzer and filter ([#4835](https://github.com/opensearch-project/OpenSearch/issues/4835)) - The org.opensearch.bootstrap.Security should support codebase for JAR files with classifiers ([#12586](https://github.com/opensearch-project/OpenSearch/issues/12586)) - Make search query counters dynamic to support all query types ([#12601](https://github.com/opensearch-project/OpenSearch/pull/12601)) +- [Query insights] Add remote address info in top queries ([#12529](https://github.com/opensearch-project/OpenSearch/pull/12529)) ### Dependencies - Bump `peter-evans/find-comment` from 2 to 3 ([#12288](https://github.com/opensearch-project/OpenSearch/pull/12288)) diff --git a/plugins/query-insights/src/internalClusterTest/java/org/opensearch/plugin/insights/QueryInsightsPluginTransportIT.java b/plugins/query-insights/src/internalClusterTest/java/org/opensearch/plugin/insights/QueryInsightsPluginTransportIT.java index 04e715444f50a..93e03143a9c17 100644 --- a/plugins/query-insights/src/internalClusterTest/java/org/opensearch/plugin/insights/QueryInsightsPluginTransportIT.java +++ b/plugins/query-insights/src/internalClusterTest/java/org/opensearch/plugin/insights/QueryInsightsPluginTransportIT.java @@ -14,19 +14,26 @@ import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse; import org.opensearch.action.admin.cluster.node.info.PluginsAndModules; import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.opensearch.action.search.SearchRequestBuilder; import org.opensearch.action.index.IndexResponse; import org.opensearch.action.search.SearchResponse; import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.index.query.QueryBuilders; +import org.opensearch.plugin.insights.rules.action.top_queries.TopQueries; import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesAction; import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesRequest; import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesResponse; +import org.opensearch.plugin.insights.rules.model.Attribute; import org.opensearch.plugin.insights.rules.model.MetricType; +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; import org.opensearch.plugins.Plugin; import org.opensearch.plugins.PluginInfo; import org.opensearch.test.OpenSearchIntegTestCase; import org.junit.Assert; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.util.Arrays; import java.util.Collection; import java.util.List; @@ -49,6 +56,8 @@ public class QueryInsightsPluginTransportIT extends OpenSearchIntegTestCase { private final int TOTAL_NUMBER_OF_NODES = 2; private final int TOTAL_SEARCH_REQUESTS = 5; + private final String remoteAddress = "1.2.3.4"; + private final int remotePort = 1234; @Override protected Collection> nodePlugins() { @@ -143,7 +152,7 @@ public void testUpdateRecordWhenFeatureDisabledThenEnabled() throws ExecutionExc /** * Test get top queries when feature enabled */ - public void testGetTopQueriesWhenFeatureEnabled() throws InterruptedException { + public void testGetTopQueriesWhenFeatureEnabled() throws InterruptedException, UnknownHostException { Settings commonSettings = Settings.builder() .put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "true") .put(TOP_N_LATENCY_QUERIES_SIZE.getKey(), "100") @@ -168,10 +177,11 @@ public void testGetTopQueriesWhenFeatureEnabled() throws InterruptedException { } // making search requests to get top queries for (int i = 0; i < TOTAL_SEARCH_REQUESTS; i++) { - SearchResponse searchResponse = internalCluster().client(randomFrom(nodes)) + SearchRequestBuilder requestBuilder = internalCluster().client(randomFrom(nodes)) .prepareSearch() - .setQuery(QueryBuilders.matchAllQuery()) - .get(); + .setQuery(QueryBuilders.matchAllQuery()); + requestBuilder.request().remoteAddress(new TransportAddress(InetAddress.getByName(remoteAddress), remotePort)); + SearchResponse searchResponse = requestBuilder.get(); assertEquals(searchResponse.getFailedShards(), 0); } // Sleep to wait for queue drained to top queries store @@ -181,6 +191,11 @@ public void testGetTopQueriesWhenFeatureEnabled() throws InterruptedException { Assert.assertEquals(0, response.failures().size()); Assert.assertEquals(TOTAL_NUMBER_OF_NODES, response.getNodes().size()); Assert.assertEquals(TOTAL_SEARCH_REQUESTS, response.getNodes().stream().mapToInt(o -> o.getTopQueriesRecord().size()).sum()); + for (TopQueries nodeRecords : response.getNodes()) { + for (SearchQueryRecord record : nodeRecords.getTopQueriesRecord()) { + Assert.assertEquals(remoteAddress + ":" + remotePort, record.getAttributes().get(Attribute.REMOTE_ADDRESS)); + } + } internalCluster().stopAllNodes(); } diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java index 9ec8673147c38..672e32fefce2b 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java @@ -138,6 +138,7 @@ public void onRequestEnd(final SearchPhaseContext context, final SearchRequestCo attributes.put(Attribute.TOTAL_SHARDS, context.getNumShards()); attributes.put(Attribute.INDICES, request.indices()); attributes.put(Attribute.PHASE_LATENCY_MAP, searchRequestContext.phaseTookMap()); + attributes.put(Attribute.REMOTE_ADDRESS, String.valueOf(searchRequestContext.getRequestRemoteAddress())); SearchQueryRecord record = new SearchQueryRecord(request.getOrCreateAbsoluteStartMillis(), measurements, attributes); queryInsightsService.addRecord(record); } catch (Exception e) { diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java index c1d17edf9ff14..3a29415618b97 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java @@ -43,7 +43,11 @@ public enum Attribute { /** * The node id for this request */ - NODE_ID; + NODE_ID, + /** + * The remote address of this request + */ + REMOTE_ADDRESS; /** * Read an Attribute from a StreamInput diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java index f340950017a5c..591313440c221 100644 --- a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java @@ -15,9 +15,12 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.plugin.insights.core.service.QueryInsightsService; import org.opensearch.plugin.insights.core.service.TopQueriesService; +import org.opensearch.plugin.insights.rules.model.Attribute; import org.opensearch.plugin.insights.rules.model.MetricType; +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; import org.opensearch.plugin.insights.settings.QueryInsightsSettings; import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.opensearch.search.aggregations.support.ValueType; @@ -25,13 +28,18 @@ import org.opensearch.test.OpenSearchTestCase; import org.junit.Before; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Phaser; +import org.mockito.ArgumentCaptor; + import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -47,12 +55,14 @@ public class QueryInsightsListenerTests extends OpenSearchTestCase { private final SearchRequest searchRequest = mock(SearchRequest.class); private final QueryInsightsService queryInsightsService = mock(QueryInsightsService.class); private final TopQueriesService topQueriesService = mock(TopQueriesService.class); + private final Settings.Builder settingsBuilder = Settings.builder(); + private final Settings settings = settingsBuilder.build(); + private final String remoteAddress = "1.2.3.4"; + private final int remotePort = 1234; private ClusterService clusterService; @Before - public void setup() { - Settings.Builder settingsBuilder = Settings.builder(); - Settings settings = settingsBuilder.build(); + public void setup() throws UnknownHostException { ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE); @@ -60,10 +70,11 @@ public void setup() { clusterService = new ClusterService(settings, clusterSettings, null); when(queryInsightsService.isCollectionEnabled(MetricType.LATENCY)).thenReturn(true); when(queryInsightsService.getTopQueriesService(MetricType.LATENCY)).thenReturn(topQueriesService); + when(searchRequestContext.getRequestRemoteAddress()).thenReturn(new TransportAddress(InetAddress.getByName(remoteAddress), remotePort)); } - public void testOnRequestEnd() throws InterruptedException { - Long timestamp = System.currentTimeMillis() - 100L; + public void testOnRequestEnd() { + long timestamp = System.currentTimeMillis() - 100L; SearchType searchType = SearchType.QUERY_THEN_FETCH; SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); @@ -92,6 +103,15 @@ public void testOnRequestEnd() throws InterruptedException { queryInsightsListener.onRequestEnd(searchPhaseContext, searchRequestContext); verify(queryInsightsService, times(1)).addRecord(any()); + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(SearchQueryRecord.class); + verify(queryInsightsService).addRecord(argumentCaptor.capture()); + assertEquals(timestamp, argumentCaptor.getValue().getTimestamp()); + Map attrs = argumentCaptor.getValue().getAttributes(); + assertEquals(searchType.toString().toLowerCase(Locale.ROOT), attrs.get(Attribute.SEARCH_TYPE)); + assertEquals(numberOfShards, attrs.get(Attribute.TOTAL_SHARDS)); + assertEquals(indices, attrs.get(Attribute.INDICES)); + assertEquals(phaseLatencyMap, attrs.get(Attribute.PHASE_LATENCY_MAP)); + assertEquals(remoteAddress + ":" + remotePort, attrs.get(Attribute.REMOTE_ADDRESS)); } public void testConcurrentOnRequestEnd() throws InterruptedException { diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java b/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java index b8bbde65ca6bc..4a89c548f97ca 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java @@ -10,6 +10,7 @@ import org.apache.lucene.search.TotalHits; import org.opensearch.common.annotation.InternalApi; +import org.opensearch.core.common.transport.TransportAddress; import java.util.EnumMap; import java.util.HashMap; @@ -51,6 +52,10 @@ public Map phaseTookMap() { return phaseTookMap; } + public TransportAddress getRequestRemoteAddress() { + return searchRequest.remoteAddress(); + } + SearchResponse.PhaseTook getPhaseTook() { if (searchRequest != null && searchRequest.isPhaseTook() != null && searchRequest.isPhaseTook()) { return new SearchResponse.PhaseTook(phaseTookMap); diff --git a/server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java b/server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java index 80dc34c4d5d68..f96fec932694f 100644 --- a/server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java +++ b/server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java @@ -42,6 +42,7 @@ import org.opensearch.common.Booleans; import org.opensearch.core.common.Strings; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.index.query.QueryBuilder; import org.opensearch.rest.BaseRestHandler; @@ -223,6 +224,9 @@ public static void parseSearchRequest( } searchRequest.setCancelAfterTimeInterval(request.paramAsTime("cancel_after_time_interval", null)); + + // set remote address for searchRequest + searchRequest.remoteAddress(new TransportAddress(request.getHttpChannel().getRemoteAddress())); } /**