diff --git a/server/lib/lucene-core-9.7.0.jar b/server/lib/lucene-core-9.7.0.jar deleted file mode 100644 index 6965bb977efca..0000000000000 Binary files a/server/lib/lucene-core-9.7.0.jar and /dev/null differ diff --git a/server/src/main/java/org/opensearch/action/search/SearchStarTreeAction.java b/server/src/main/java/org/opensearch/action/search/SearchStarTreeAction.java new file mode 100644 index 0000000000000..4fe3ff1a97a42 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/SearchStarTreeAction.java @@ -0,0 +1,2 @@ +package org.opensearch.action.search;public class SearchStarTreeAction { +} diff --git a/server/src/main/java/org/opensearch/action/search/SearchStarTreeResponse.java b/server/src/main/java/org/opensearch/action/search/SearchStarTreeResponse.java new file mode 100644 index 0000000000000..764fc9616d041 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/SearchStarTreeResponse.java @@ -0,0 +1,2 @@ +package org.opensearch.action.search;public class SearchStarTreeResponse { +} diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchStarTreeAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchStarTreeAction.java new file mode 100644 index 0000000000000..baa113997f243 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/TransportSearchStarTreeAction.java @@ -0,0 +1,133 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.action.StepListener; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.search.SearchPhaseResult; +import org.opensearch.search.internal.ShardSearchContextId; +import org.opensearch.tasks.Task; +import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.util.Arrays; + +/** + * Transport action for creating PIT reader context + */ +public class TransportCreatePitAction extends HandledTransportAction { + + public static final String CREATE_PIT_ACTION = "create_pit"; + private final TransportService transportService; + private final SearchTransportService searchTransportService; + private final ClusterService clusterService; + private final TransportSearchAction transportSearchAction; + private final NamedWriteableRegistry namedWriteableRegistry; + private final CreatePitController createPitController; + + @Inject + public TransportCreatePitAction( + TransportService transportService, + ActionFilters actionFilters, + SearchTransportService searchTransportService, + ClusterService clusterService, + TransportSearchAction transportSearchAction, + NamedWriteableRegistry namedWriteableRegistry, + CreatePitController createPitController + ) { + super(CreatePitAction.NAME, transportService, actionFilters, in -> new CreatePitRequest(in)); + this.transportService = transportService; + this.searchTransportService = searchTransportService; + this.clusterService = clusterService; + this.transportSearchAction = transportSearchAction; + this.namedWriteableRegistry = namedWriteableRegistry; + this.createPitController = createPitController; + } + + @Override + protected void doExecute(Task task, CreatePitRequest request, ActionListener listener) { + final StepListener createPitListener = new StepListener<>(); + final ActionListener updatePitIdListener = ActionListener.wrap(r -> listener.onResponse(r), e -> { + logger.error( + () -> new ParameterizedMessage( + "PIT creation failed while updating PIT ID for indices [{}]", + Arrays.toString(request.indices()) + ) + ); + listener.onFailure(e); + }); + createPitController.executeCreatePit(request, task, createPitListener, updatePitIdListener); + } + + /** + * Request to create pit reader context with keep alive + */ + public static class CreateReaderContextRequest extends TransportRequest { + private final ShardId shardId; + private final TimeValue keepAlive; + + public CreateReaderContextRequest(ShardId shardId, TimeValue keepAlive) { + this.shardId = shardId; + this.keepAlive = keepAlive; + } + + public ShardId getShardId() { + return shardId; + } + + public TimeValue getKeepAlive() { + return keepAlive; + } + + public CreateReaderContextRequest(StreamInput in) throws IOException { + super(in); + this.shardId = new ShardId(in); + this.keepAlive = in.readTimeValue(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + shardId.writeTo(out); + out.writeTimeValue(keepAlive); + } + } + + /** + * Create pit reader context response which holds the contextId + */ + public static class CreateReaderContextResponse extends SearchPhaseResult { + public CreateReaderContextResponse(ShardSearchContextId shardSearchContextId) { + this.contextId = shardSearchContextId; + } + + public CreateReaderContextResponse(StreamInput in) throws IOException { + super(in); + contextId = new ShardSearchContextId(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + contextId.writeTo(out); + } + } + +} diff --git a/server/src/main/java/org/opensearch/common/lucene/Lucene.java b/server/src/main/java/org/opensearch/common/lucene/Lucene.java index c61b1bcc676a6..75743229db887 100644 --- a/server/src/main/java/org/opensearch/common/lucene/Lucene.java +++ b/server/src/main/java/org/opensearch/common/lucene/Lucene.java @@ -1077,6 +1077,12 @@ public SortedDocValues getSortedDocValues(String field) { return null; } + @Override + public Object getAggregatedDocValues() + throws IOException { + return null; + } + public SortedNumericDocValues getSortedNumericDocValues(String field) { return null; } diff --git a/server/src/main/java/org/opensearch/index/codec/freshstartree/builder/BaseSingleTreeBuilder.java b/server/src/main/java/org/opensearch/index/codec/freshstartree/builder/BaseSingleTreeBuilder.java index 5f843d315030b..ec3ec563bcf54 100644 --- a/server/src/main/java/org/opensearch/index/codec/freshstartree/builder/BaseSingleTreeBuilder.java +++ b/server/src/main/java/org/opensearch/index/codec/freshstartree/builder/BaseSingleTreeBuilder.java @@ -59,11 +59,11 @@ /** Base class for star tree builder */ public abstract class BaseSingleTreeBuilder { public static final int STAR_IN_DOC_VALUES_INDEX = -1; - final static int SECOND = 1000; - final static int MINUTE = 60 * SECOND; - final static int HOUR = 60 * 60 * SECOND; - final static int DAY = 24 * HOUR; - final static int YEAR = 365 * DAY; + public final static int SECOND = 1000; + public final static int MINUTE = 60 * SECOND; + public final static int HOUR = 60 * 60 * SECOND; + public final static int DAY = 24 * HOUR; + public final static int YEAR = 365 * DAY; private static final Logger logger = LogManager.getLogger(BaseSingleTreeBuilder.class); final int _numDimensions; final String[] _dimensionsSplitOrder; @@ -91,10 +91,11 @@ public abstract class BaseSingleTreeBuilder { indexOutput = state.directory.createOutput(docFileName, state.context); CodecUtil.writeIndexHeader(indexOutput, "STARTreeCodec", 0, state.segmentInfo.getId(), state.segmentSuffix); dimensionsSplitOrder = new ArrayList<>(); - // dimensionsSplitOrder.add("hour"); + dimensionsSplitOrder.add("minute"); + dimensionsSplitOrder.add("hour"); dimensionsSplitOrder.add("day"); dimensionsSplitOrder.add("month"); - dimensionsSplitOrder.add("year"); + //dimensionsSplitOrder.add("year"); dimensionsSplitOrder.add("status"); _numDimensions = dimensionsSplitOrder.size(); _dimensionsSplitOrder = new String[_numDimensions]; @@ -560,6 +561,8 @@ private long getTimeStampVal(final String fieldName, final long val) { return val / HOUR; case "day": return val / DAY; + case "month": + return val/DAY * 30; // TODO case "year": return val / YEAR; default: diff --git a/server/src/main/java/org/opensearch/index/codec/freshstartree/codec/StarTreeDocValuesWriter.java b/server/src/main/java/org/opensearch/index/codec/freshstartree/codec/StarTreeDocValuesWriter.java index 758e718155651..d6b2ef3a74c77 100644 --- a/server/src/main/java/org/opensearch/index/codec/freshstartree/codec/StarTreeDocValuesWriter.java +++ b/server/src/main/java/org/opensearch/index/codec/freshstartree/codec/StarTreeDocValuesWriter.java @@ -99,11 +99,11 @@ public void addSortedNumericField(FieldInfo field, DocValuesProducer valuesProdu } if (field.name.equalsIgnoreCase("@timestamp")) { //logger.info("Adding timestamp fields"); - //dimensionReaders.put("minute_dim", valuesProducer.getSortedNumeric(field)); + dimensionReaders.put("minute_dim", valuesProducer.getSortedNumeric(field)); dimensionReaders.put("hour_dim", valuesProducer.getSortedNumeric(field)); dimensionReaders.put("day_dim", valuesProducer.getSortedNumeric(field)); dimensionReaders.put("month_dim", valuesProducer.getSortedNumeric(field)); - dimensionReaders.put("year_dim", valuesProducer.getSortedNumeric(field)); + //dimensionReaders.put("year_dim", valuesProducer.getSortedNumeric(field)); //dimensionsSplitOrder.add("minute"); } else { diff --git a/server/src/main/java/org/opensearch/index/codec/freshstartree/query/StarTreeQuery.java b/server/src/main/java/org/opensearch/index/codec/freshstartree/query/StarTreeQuery.java index cff1114043d0e..2e0376cbe95c5 100644 --- a/server/src/main/java/org/opensearch/index/codec/freshstartree/query/StarTreeQuery.java +++ b/server/src/main/java/org/opensearch/index/codec/freshstartree/query/StarTreeQuery.java @@ -38,10 +38,10 @@ /** Query class for querying star tree data structure */ public class StarTreeQuery extends Query implements Accountable { - Map>> compositePredicateMap; + Map>> compositePredicateMap; Set groupByColumns; - public StarTreeQuery(Map>> compositePredicateMap, Set groupByColumns) { + public StarTreeQuery(Map>> compositePredicateMap, Set groupByColumns) { this.compositePredicateMap = compositePredicateMap; this.groupByColumns = groupByColumns; } @@ -78,14 +78,13 @@ public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float bo @Override public Scorer scorer(LeafReaderContext context) throws IOException { - StarTreeAggregatedValues val = null; + Object obj = context.reader().getAggregatedDocValues(); DocIdSetIterator result = null; -// Object obj = context.reader().getAggregatedDocValues(); -// if (obj != null) { -// val = (StarTreeAggregatedValues) obj; -// StarTreeFilter filter = new StarTreeFilter(val, compositePredicateMap, groupByColumns); -// result = filter.getStarTreeResult(); -// } + if (obj != null) { + StarTreeAggregatedValues val = (StarTreeAggregatedValues) obj; + StarTreeFilter filter = new StarTreeFilter(val, compositePredicateMap, groupByColumns); + result = filter.getStarTreeResult(); + } return new ConstantScoreScorer(this, score(), scoreMode, result); } diff --git a/server/src/main/java/org/opensearch/index/engine/TranslogLeafReader.java b/server/src/main/java/org/opensearch/index/engine/TranslogLeafReader.java index 6aacb6c1cbedf..753ec7c7d7c53 100644 --- a/server/src/main/java/org/opensearch/index/engine/TranslogLeafReader.java +++ b/server/src/main/java/org/opensearch/index/engine/TranslogLeafReader.java @@ -159,6 +159,12 @@ public SortedDocValues getSortedDocValues(String field) { throw new UnsupportedOperationException(); } + @Override + public Object getAggregatedDocValues() + throws IOException { + throw new UnsupportedOperationException(); + } + @Override public SortedNumericDocValues getSortedNumericDocValues(String field) { throw new UnsupportedOperationException(); diff --git a/server/src/main/java/org/opensearch/rest/action/search/RestSearchStarTreeAction.java b/server/src/main/java/org/opensearch/rest/action/search/RestSearchStarTreeAction.java new file mode 100644 index 0000000000000..c8fa0e08a9941 --- /dev/null +++ b/server/src/main/java/org/opensearch/rest/action/search/RestSearchStarTreeAction.java @@ -0,0 +1,2 @@ +package org.opensearch.rest.action.search;public class RestSearchStarTreeAction { +} diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java index 4b8e882cd69bc..6bbf2a6e1569f 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java @@ -31,11 +31,16 @@ package org.opensearch.search.aggregations.metrics; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.search.ScoreMode; import org.opensearch.common.lease.Releasables; import org.opensearch.common.util.BigArrays; import org.opensearch.common.util.DoubleArray; +import org.opensearch.index.codec.freshstartree.codec.StarTreeAggregatedValues; import org.opensearch.index.fielddata.SortedNumericDoubleValues; import org.opensearch.search.DocValueFormat; import org.opensearch.search.aggregations.Aggregator; @@ -48,6 +53,8 @@ import java.io.IOException; import java.util.Map; +import org.opensearch.transport.TransportService; + /** * Aggregate all docs into a single sum value @@ -62,6 +69,11 @@ public class SumAggregator extends NumericMetricsAggregator.SingleValue { private DoubleArray sums; private DoubleArray compensations; + private AtomicInteger bucket2; + + private static final Logger logger = LogManager.getLogger(SumAggregator.class); + + SumAggregator( String name, ValuesSourceConfig valuesSourceConfig, @@ -77,6 +89,7 @@ public class SumAggregator extends NumericMetricsAggregator.SingleValue { sums = context.bigArrays().newDoubleArray(1, true); compensations = context.bigArrays().newDoubleArray(1, true); } + bucket2 = new AtomicInteger(0); } @Override @@ -91,14 +104,57 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc } final BigArrays bigArrays = context.bigArrays(); final SortedNumericDoubleValues values = valuesSource.doubleValues(ctx); + StarTreeAggregatedValues aggrVals = (StarTreeAggregatedValues) ctx.reader().getAggregatedDocValues(); final CompensatedSum kahanSummation = new CompensatedSum(0, 0); + //int bucket = 0; return new LeafBucketCollectorBase(sub, values) { @Override public void collect(int doc, long bucket) throws IOException { + //long bucket = bucket2.get(); sums = bigArrays.grow(sums, bucket + 1); compensations = bigArrays.grow(compensations, bucket + 1); + NumericDocValues dv = aggrVals.metricValues.get("status_sum"); + NumericDocValues hourValueDim = aggrVals.dimensionValues.get("hour"); + NumericDocValues dayValueDim = aggrVals.dimensionValues.get("day"); + NumericDocValues statusValueDim = aggrVals.dimensionValues.get("status"); + NumericDocValues minuteValueDim = aggrVals.dimensionValues.get("minute"); + NumericDocValues monthValueDim = aggrVals.dimensionValues.get("month"); + + if (dv.advanceExact(doc)) { + final int valuesCount = values.docValueCount(); + // Compute the sum of double values with Kahan summation algorithm which is more + // accurate than naive summation. + double sum = sums.get(bucket); + double compensation = compensations.get(bucket); + kahanSummation.reset(sum, compensation); + double value = dv.longValue(); + hourValueDim.advanceExact(doc); + long hour = hourValueDim.longValue(); - if (values.advanceExact(doc)) { + dayValueDim.advanceExact(doc); + long day = dayValueDim.longValue(); + + statusValueDim.advanceExact(doc); + long status = statusValueDim.longValue(); + + minuteValueDim.advanceExact(doc); + long minute = minuteValueDim.longValue(); + + + monthValueDim.advanceExact(doc); + long month = monthValueDim.longValue(); + + logger.info("Day : {} , hour : {} , status : {}, month : {} , minute: {}", day, hour, status, month, minute); + + kahanSummation.add(value); + + compensations.set(bucket, kahanSummation.delta()); + sums.set(bucket, kahanSummation.value()); + //bucket2.set((int) (bucket + 1l)); + //bucket++; + } + else + if (values.advanceExact(doc)) { final int valuesCount = values.docValueCount(); // Compute the sum of double values with Kahan summation algorithm which is more // accurate than naive summation. diff --git a/server/src/main/java/org/opensearch/search/query/QueryPhase.java b/server/src/main/java/org/opensearch/search/query/QueryPhase.java index fa767f69d1ac6..4eaeded73fdf1 100644 --- a/server/src/main/java/org/opensearch/search/query/QueryPhase.java +++ b/server/src/main/java/org/opensearch/search/query/QueryPhase.java @@ -32,6 +32,11 @@ package org.opensearch.search.query; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Set; +import java.util.function.Predicate; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.index.IndexReader; @@ -52,6 +57,7 @@ import org.opensearch.common.lucene.search.TopDocsAndMaxScore; import org.opensearch.common.util.concurrent.QueueResizingOpenSearchThreadPoolExecutor; import org.opensearch.core.tasks.TaskCancelledException; +import org.opensearch.index.codec.freshstartree.query.StarTreeQuery; import org.opensearch.lucene.queries.SearchAfterSortedDocQuery; import org.opensearch.search.DocValueFormat; import org.opensearch.search.SearchContextSourcePrinter; @@ -190,6 +196,14 @@ static boolean executeInternal(SearchContext searchContext, QueryPhaseSearcher q final ContextIndexSearcher searcher = searchContext.searcher(); final IndexReader reader = searcher.getIndexReader(); QuerySearchResult queryResult = searchContext.queryResult(); + Set groupByCols = new HashSet<>(); + //groupByCols.add("day"); + groupByCols.add("status"); + Map>> predicateMap = new HashMap<>(); + List> predicates = new ArrayList<>(); + predicates.add(status -> status == 200); + predicateMap.put("status", predicates); + Query q = new StarTreeQuery(new HashMap<>(), groupByCols); queryResult.searchTimedOut(false); try { queryResult.from(searchContext.from()); @@ -280,10 +294,12 @@ static boolean executeInternal(SearchContext searchContext, QueryPhaseSearcher q boolean shouldRescore = queryPhaseSearcher.searchWith( searchContext, searcher, - query, + q, + //query, collectors, hasFilterCollector, timeoutSet + ); ExecutorService executor = searchContext.indexShard().getThreadPool().executor(ThreadPool.Names.SEARCH);