Skip to content

Commit

Permalink
Support scripting for composite aggs in concurrent segment search
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Deng committed Aug 1, 2024
1 parent 67a2e4c commit f106737
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 4 deletions.
1 change: 1 addition & 0 deletions modules/lang-painless/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import com.github.jengelman.gradle.plugins.shadow.ShadowBasePlugin

apply plugin: 'opensearch.validate-rest-spec'
apply plugin: 'opensearch.yaml-rest-test'
apply plugin: 'opensearch.internal-cluster-test'

opensearchplugin {
description 'An easy, safe and fast scripting language for OpenSearch'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.painless;

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.plugins.Plugin;
import org.opensearch.script.Script;
import org.opensearch.script.ScriptType;
import org.opensearch.search.aggregations.AggregationBuilder;
import org.opensearch.search.aggregations.AggregationBuilders;
import org.opensearch.search.aggregations.bucket.composite.InternalComposite;
import org.opensearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

import static org.opensearch.index.query.QueryBuilders.matchAllQuery;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse;

@OpenSearchIntegTestCase.SuiteScopeTestCase
public class SimplePainlessIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase {

public SimplePainlessIT(Settings nodeSettings) {
super(nodeSettings);
}

@ParametersFactory
public static Collection<Object[]> parameters() {
return Arrays.asList(
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() },
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() }
);
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(PainlessModulePlugin.class);
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING.getKey(), "4")
.build();
}

@Override
public void setupSuiteScopeCluster() throws Exception {
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()
.startObject()
.field("dynamic", "false")
.startObject("_meta")
.field("schema_version", 5)
.endObject()
.startObject("properties")
.startObject("entity")
.field("type", "nested")
.endObject()
.endObject()
.endObject();

assertAcked(
prepareCreate("test").setMapping(xContentBuilder)
.setSettings(
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
)
);

client().prepareIndex("test")
.setId("a")
.setSource(
"{\"entity\":[{\"name\":\"ip-field\",\"value\":\"1.2.3.4\"},{\"name\":\"keyword-field\",\"value\":\"field-1\"}]}",
MediaTypeRegistry.JSON
)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
client().prepareIndex("test")
.setId("b")
.setSource(
"{\"entity\":[{\"name\":\"ip-field\",\"value\":\"5.6.7.8\"},{\"name\":\"keyword-field\",\"value\":\"field-2\"}]}",
MediaTypeRegistry.JSON
)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
client().prepareIndex("test")
.setId("c")
.setSource(
"{\"entity\":[{\"name\":\"ip-field\",\"value\":\"1.6.3.8\"},{\"name\":\"keyword-field\",\"value\":\"field-2\"}]}",
MediaTypeRegistry.JSON
)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
client().prepareIndex("test")
.setId("d")
.setSource(
"{\"entity\":[{\"name\":\"ip-field\",\"value\":\"2.6.4.8\"},{\"name\":\"keyword-field\",\"value\":\"field-2\"}]}",
MediaTypeRegistry.JSON
)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
ensureSearchable("test");
}

public void testTermsValuesSource() throws Exception {
AggregationBuilder agg = AggregationBuilders.composite(
"multi_buckets",
Collections.singletonList(
new TermsValuesSourceBuilder("keyword-field").script(
new Script(
ScriptType.INLINE,
"painless",
"String value = null; if (params == null || params._source == null || params._source.entity == null) { return \"\"; } for (item in params._source.entity) { if (item[\"name\"] == \"keyword-field\") { value = item['value']; break; } } return value;",
Collections.emptyMap()
)
)
)
);
SearchResponse response = client().prepareSearch("test").setQuery(matchAllQuery()).addAggregation(agg).get();

assertSearchResponse(response);
assertEquals(2, ((InternalComposite) response.getAggregations().get("multi_buckets")).getBuckets().size());
assertEquals(
"field-1",
((InternalComposite) response.getAggregations().get("multi_buckets")).getBuckets().get(0).getKey().get("keyword-field")
);
assertEquals(1, ((InternalComposite) response.getAggregations().get("multi_buckets")).getBuckets().get(0).getDocCount());
assertEquals(
"field-2",
((InternalComposite) response.getAggregations().get("multi_buckets")).getBuckets().get(1).getKey().get("keyword-field")
);
assertEquals(3, ((InternalComposite) response.getAggregations().get("multi_buckets")).getBuckets().get(1).getDocCount());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.opensearch.search.internal.SearchContext;

import java.io.IOException;
import java.util.Arrays;
import java.util.Map;

/**
Expand Down Expand Up @@ -81,7 +80,6 @@ protected Aggregator createInternal(

@Override
protected boolean supportsConcurrentSegmentSearch() {
// Disable concurrent search if any scripting is used. See https://github.com/opensearch-project/OpenSearch/issues/12331 for details
return Arrays.stream(sources).noneMatch(CompositeValuesSourceConfig::hasScript);
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,14 @@ public final SearchLookup forkAndTrackFieldReferences(String field) {
return new SearchLookup(this, newFieldChain);
}

// SourceLookup is not thread safe so we create a new instance for each leaf to support concurrent segment search
public LeafSearchLookup getLeafSearchLookup(LeafReaderContext context) {
return new LeafSearchLookup(context, docMap.getLeafDocLookup(context), sourceLookup, fieldsLookup.getLeafFieldsLookup(context));
return new LeafSearchLookup(
context,
docMap.getLeafDocLookup(context),
new SourceLookup(),
fieldsLookup.getLeafFieldsLookup(context)
);
}

public DocLookup doc() {
Expand Down

0 comments on commit f106737

Please sign in to comment.