diff --git a/server/src/main/java/org/opensearch/index/codec/freshstartree/builder/BaseSingleTreeBuilder.java b/server/src/main/java/org/opensearch/index/codec/freshstartree/builder/BaseSingleTreeBuilder.java index 5a85031226f75..7cf7106588c19 100644 --- a/server/src/main/java/org/opensearch/index/codec/freshstartree/builder/BaseSingleTreeBuilder.java +++ b/server/src/main/java/org/opensearch/index/codec/freshstartree/builder/BaseSingleTreeBuilder.java @@ -22,6 +22,7 @@ import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.codecs.DocValuesConsumer; import org.apache.lucene.codecs.DocValuesProducer; +import org.apache.lucene.index.DocValues; import org.apache.lucene.index.DocValuesType; import org.apache.lucene.index.DocsWithFieldSet; import org.apache.lucene.index.EmptyDocValuesProducer; @@ -34,6 +35,8 @@ import org.apache.lucene.index.VectorEncoding; import org.apache.lucene.index.VectorSimilarityFunction; import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.Counter; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.packed.PackedInts; import org.apache.lucene.util.packed.PackedLongValues; @@ -42,6 +45,7 @@ import org.opensearch.index.codec.freshstartree.aggregator.AggregationFunctionType; import org.opensearch.index.codec.freshstartree.aggregator.ValueAggregator; import org.opensearch.index.codec.freshstartree.aggregator.ValueAggregatorFactory; +import org.opensearch.index.codec.freshstartree.codec.SortedNumericDocValuesWriter; import org.opensearch.index.codec.freshstartree.codec.StarTreeAggregatedValues; import org.opensearch.index.codec.freshstartree.node.StarTreeNode; import org.opensearch.index.codec.freshstartree.util.BufferedAggregatedDocValues; @@ -151,7 +155,7 @@ public abstract class BaseSingleTreeBuilder { } // TODO : Removing hardcoding - _maxLeafRecords = 10000; // builderConfig.getMaxLeafRecords(); + _maxLeafRecords = 1000; // builderConfig.getMaxLeafRecords(); } private void constructStarTree(StarTreeBuilderUtils.TreeNode node, int startDocId, int endDocId) throws IOException { @@ -257,12 +261,118 @@ public void build(Iterator recordIterator, boolean isMerge) throws IOExc logger.info("Finished creating aggregated documents, got aggregated records : {}", numAggregatedRecords); // Create doc values indices in disk - createDocValuesIndices(_docValuesConsumer); + createSortedDocValuesIndices(_docValuesConsumer); // Serialize and save in disk StarTreeBuilderUtils.serializeTree(indexOutput, _rootNode, _dimensionsSplitOrder, _numNodes); } + private void createSortedDocValuesIndices(DocValuesConsumer docValuesConsumer) throws IOException { + List dimWriterList = new ArrayList<>(); + List metricWriterList = new ArrayList<>(); + FieldInfo[] dimFieldInfoArr = new FieldInfo[_dimensionReaders.length]; + FieldInfo[] metricFieldInfoArr = new FieldInfo[_metricReaders.length]; + int fieldNum = 0; + for (int i = 0; i < _dimensionReaders.length; i++) { + final FieldInfo fi = new FieldInfo( + _dimensionsSplitOrder[i] + "_dim", + fieldNum, + false, + false, + true, + IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS, + DocValuesType.SORTED_NUMERIC, + -1, + Collections.emptyMap(), + 0, + 0, + 0, + 0, + VectorEncoding.FLOAT32, + VectorSimilarityFunction.EUCLIDEAN, + false + ); + dimFieldInfoArr[i] = fi; + final SortedNumericDocValuesWriter w = new SortedNumericDocValuesWriter(fi, Counter.newCounter()); + dimWriterList.add(w); + fieldNum++; + } + for (int i = 0; i < _metricReaders.length; i++) { + FieldInfo fi = new FieldInfo( + _metrics[i] + "_metric", + fieldNum, + false, + false, + true, + IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS, + DocValuesType.SORTED_NUMERIC, + -1, + Collections.emptyMap(), + 0, + 0, + 0, + 0, + VectorEncoding.FLOAT32, + VectorSimilarityFunction.EUCLIDEAN, + false + ); + final SortedNumericDocValuesWriter w = new SortedNumericDocValuesWriter(fi, Counter.newCounter()); + metricWriterList.add(w); + metricFieldInfoArr[i] = fi; + fieldNum++; + } + + for (int docId = 0; docId < _numDocs; docId++) { + Record record = getStarTreeRecord(docId); + for (int i = 0; i < record._dimensions.length; i++) { + long val = record._dimensions[i]; + dimWriterList.get(i).addValue(docId, val); + } + for (int i = 0; i < record._metrics.length; i++) { + switch (_valueAggregators[i].getAggregatedValueType()) { + case LONG: + long val = (long) record._metrics[i]; + metricWriterList.get(i).addValue(docId, val); + break; + // TODO: support this + case DOUBLE: + // double doubleval = (double) record._metrics[i]; + // break; + case FLOAT: + case INT: + default: + throw new IllegalStateException("Unsupported value type"); + } + } + } + + for (int i = 0; i < _dimensionReaders.length; i++) { + final int finalI = i; + DocValuesProducer a1 = new EmptyDocValuesProducer() { + @Override + public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOException { + + return dimWriterList.get(finalI).getSortedNumericDocValues(); + } + }; + docValuesConsumer.addSortedNumericField(dimFieldInfoArr[i], a1); + } + + for (int i = 0; i < _metricReaders.length; i++) { + final int finalI = i; + DocValuesProducer a1 = new EmptyDocValuesProducer() { + @Override + public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOException { + + return metricWriterList.get(finalI).getSortedNumericDocValues(); + } + }; + docValuesConsumer.addSortedNumericField(metricFieldInfoArr[i], a1); + } + + + } + private void createDocValuesIndices(DocValuesConsumer docValuesConsumer) throws IOException { PackedLongValues.Builder[] pendingDimArr = new PackedLongValues.Builder[_dimensionReaders.length]; PackedLongValues.Builder[] pendingMetricArr = new PackedLongValues.Builder[_metricReaders.length]; @@ -280,7 +390,7 @@ private void createDocValuesIndices(DocValuesConsumer docValuesConsumer) throws false, true, IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS, - DocValuesType.NUMERIC, + DocValuesType.SORTED_NUMERIC, -1, Collections.emptyMap(), 0, @@ -303,7 +413,7 @@ private void createDocValuesIndices(DocValuesConsumer docValuesConsumer) throws false, true, IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS, - DocValuesType.NUMERIC, + DocValuesType.SORTED_NUMERIC, -1, Collections.emptyMap(), 0, @@ -347,24 +457,24 @@ private void createDocValuesIndices(DocValuesConsumer docValuesConsumer) throws final int finalI = i; DocValuesProducer a1 = new EmptyDocValuesProducer() { @Override - public NumericDocValues getNumeric(FieldInfo field) throws IOException { + public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOException { - return new BufferedAggregatedDocValues(pendingDimArr[finalI].build(), docsWithField.iterator()); + return DocValues.singleton(new BufferedAggregatedDocValues(pendingDimArr[finalI].build(), docsWithField.iterator())); } }; - docValuesConsumer.addNumericField(dimFieldInfoArr[i], a1); + docValuesConsumer.addSortedNumericField(dimFieldInfoArr[i], a1); } for (int i = 0; i < _metricReaders.length; i++) { final int finalI = i; DocValuesProducer a1 = new EmptyDocValuesProducer() { @Override - public NumericDocValues getNumeric(FieldInfo field) throws IOException { + public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOException { - return new BufferedAggregatedDocValues(pendingMetricArr[finalI].build(), docsWithField.iterator()); + return DocValues.singleton(new BufferedAggregatedDocValues(pendingDimArr[finalI].build(), docsWithField.iterator())); } }; - docValuesConsumer.addNumericField(metricFieldInfoArr[i], a1); + docValuesConsumer.addSortedNumericField(metricFieldInfoArr[i], a1); } } diff --git a/server/src/main/java/org/opensearch/index/codec/freshstartree/builder/OffHeapBufferedSingleTreeBuilder.java b/server/src/main/java/org/opensearch/index/codec/freshstartree/builder/OffHeapBufferedSingleTreeBuilder.java index c95778584bd97..af142ee7cbea8 100644 --- a/server/src/main/java/org/opensearch/index/codec/freshstartree/builder/OffHeapBufferedSingleTreeBuilder.java +++ b/server/src/main/java/org/opensearch/index/codec/freshstartree/builder/OffHeapBufferedSingleTreeBuilder.java @@ -129,12 +129,14 @@ private Iterator mergeRecords(List aggrList) t while (!endOfDoc) { long[] dims = new long[starTree.dimensionValues.size()]; int i = 0; - for (Map.Entry dimValue : starTree.dimensionValues.entrySet()) { - endOfDoc = dimValue.getValue().nextDoc() == DocIdSetIterator.NO_MORE_DOCS || dimValue.getValue().longValue() == -1; + for (Map.Entry dimValue : starTree.dimensionValues.entrySet()) { + int doc = dimValue.getValue().nextDoc(); + long val = dimValue.getValue().nextValue(); + + endOfDoc = doc == DocIdSetIterator.NO_MORE_DOCS || val == -1; if (endOfDoc) { break; } - long val = dimValue.getValue().longValue(); dims[i] = val; i++; } @@ -143,9 +145,9 @@ private Iterator mergeRecords(List aggrList) t } i = 0; Object[] metrics = new Object[starTree.metricValues.size()]; - for (Map.Entry metricValue : starTree.metricValues.entrySet()) { + for (Map.Entry metricValue : starTree.metricValues.entrySet()) { metricValue.getValue().nextDoc(); - metrics[i] = metricValue.getValue().longValue(); + metrics[i] = metricValue.getValue().nextValue(); i++; } Record record = new Record(dims, metrics); diff --git a/server/src/main/java/org/opensearch/index/codec/freshstartree/builder/OnHeapSingleTreeBuilder.java b/server/src/main/java/org/opensearch/index/codec/freshstartree/builder/OnHeapSingleTreeBuilder.java index 6b193957342c9..97bfdd5207816 100644 --- a/server/src/main/java/org/opensearch/index/codec/freshstartree/builder/OnHeapSingleTreeBuilder.java +++ b/server/src/main/java/org/opensearch/index/codec/freshstartree/builder/OnHeapSingleTreeBuilder.java @@ -58,12 +58,13 @@ private Iterator mergeRecords(List aggrList) t while (!endOfDoc) { long[] dims = new long[starTree.dimensionValues.size()]; int i = 0; - for (Map.Entry dimValue : starTree.dimensionValues.entrySet()) { - endOfDoc = dimValue.getValue().nextDoc() == DocIdSetIterator.NO_MORE_DOCS || dimValue.getValue().longValue() == -1; + for (Map.Entry dimValue : starTree.dimensionValues.entrySet()) { + int doc = dimValue.getValue().nextDoc(); + long val = dimValue.getValue().nextValue(); + endOfDoc = doc == DocIdSetIterator.NO_MORE_DOCS || val == -1; if (endOfDoc) { break; } - long val = dimValue.getValue().longValue(); dims[i] = val; i++; } @@ -72,9 +73,9 @@ private Iterator mergeRecords(List aggrList) t } i = 0; Object[] metrics = new Object[starTree.metricValues.size()]; - for (Map.Entry metricValue : starTree.metricValues.entrySet()) { + for (Map.Entry metricValue : starTree.metricValues.entrySet()) { metricValue.getValue().nextDoc(); - metrics[i] = metricValue.getValue().longValue(); + metrics[i] = metricValue.getValue().nextValue(); i++; } BaseSingleTreeBuilder.Record record = new BaseSingleTreeBuilder.Record(dims, metrics); diff --git a/server/src/main/java/org/opensearch/index/codec/freshstartree/codec/Lucene90DocValuesProducerCopy.java b/server/src/main/java/org/opensearch/index/codec/freshstartree/codec/Lucene90DocValuesProducerCopy.java index 190673fae7595..3a1eb8eafd533 100644 --- a/server/src/main/java/org/opensearch/index/codec/freshstartree/codec/Lucene90DocValuesProducerCopy.java +++ b/server/src/main/java/org/opensearch/index/codec/freshstartree/codec/Lucene90DocValuesProducerCopy.java @@ -174,7 +174,7 @@ public FieldInfo[] getFieldInfoArr() { false, true, IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS, - DocValuesType.NUMERIC, + DocValuesType.SORTED_NUMERIC, -1, Collections.emptyMap(), 0, @@ -195,7 +195,7 @@ public FieldInfo[] getFieldInfoArr() { false, true, IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS, - DocValuesType.NUMERIC, + DocValuesType.SORTED_NUMERIC, -1, Collections.emptyMap(), 0, diff --git a/server/src/main/java/org/opensearch/index/codec/freshstartree/codec/SortedNumericDocValuesWriter.java b/server/src/main/java/org/opensearch/index/codec/freshstartree/codec/SortedNumericDocValuesWriter.java new file mode 100644 index 0000000000000..ce2dd81125f46 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/codec/freshstartree/codec/SortedNumericDocValuesWriter.java @@ -0,0 +1,340 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.freshstartree.codec; + +import java.io.IOException; +import java.util.Arrays; +import org.apache.lucene.codecs.DocValuesConsumer; +import org.apache.lucene.codecs.DocValuesProducer; +import org.apache.lucene.index.DocValues; +import org.apache.lucene.index.DocsWithFieldSet; +import org.apache.lucene.index.EmptyDocValuesProducer; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.SegmentWriteState; +import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.index.Sorter; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.Counter; +import org.apache.lucene.util.RamUsageEstimator; +import org.apache.lucene.util.packed.PackedInts; +import org.apache.lucene.util.packed.PackedLongValues; + +import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS; + + +public class SortedNumericDocValuesWriter { + private final PackedLongValues.Builder pending; // stream of all values + private PackedLongValues.Builder pendingCounts; // count of values per doc + private final DocsWithFieldSet docsWithField; + private final Counter iwBytesUsed; + private long bytesUsed; // this only tracks differences in 'pending' and 'pendingCounts' + private final FieldInfo fieldInfo; + private int currentDoc = -1; + private long[] currentValues = new long[8]; + private int currentUpto = 0; + + private PackedLongValues finalValues; + private PackedLongValues finalValuesCount; + + public SortedNumericDocValuesWriter(FieldInfo fieldInfo, Counter iwBytesUsed) { + this.fieldInfo = fieldInfo; + this.iwBytesUsed = iwBytesUsed; + pending = PackedLongValues.deltaPackedBuilder(PackedInts.COMPACT); + docsWithField = new DocsWithFieldSet(); + bytesUsed = + pending.ramBytesUsed() + + docsWithField.ramBytesUsed() + + RamUsageEstimator.sizeOf(currentValues); + iwBytesUsed.addAndGet(bytesUsed); + } + + public void addValue(int docID, long value) { + assert docID >= currentDoc; + if (docID != currentDoc) { + finishCurrentDoc(); + currentDoc = docID; + } + + addOneValue(value); + updateBytesUsed(); + } + + // finalize currentDoc: this sorts the values in the current doc + private void finishCurrentDoc() { + if (currentDoc == -1) { + return; + } + if (currentUpto > 1) { + Arrays.sort(currentValues, 0, currentUpto); + } + for (int i = 0; i < currentUpto; i++) { + pending.add(currentValues[i]); + } + // record the number of values for this doc + if (pendingCounts != null) { + pendingCounts.add(currentUpto); + } else if (currentUpto != 1) { + pendingCounts = PackedLongValues.deltaPackedBuilder(PackedInts.COMPACT); + for (int i = 0; i < docsWithField.cardinality(); ++i) { + pendingCounts.add(1); + } + pendingCounts.add(currentUpto); + } + currentUpto = 0; + + docsWithField.add(currentDoc); + } + + private void addOneValue(long value) { + if (currentUpto == currentValues.length) { + currentValues = ArrayUtil.grow(currentValues, currentValues.length + 1); + } + + currentValues[currentUpto] = value; + currentUpto++; + } + + private void updateBytesUsed() { + final long newBytesUsed = + pending.ramBytesUsed() + + (pendingCounts == null ? 0 : pendingCounts.ramBytesUsed()) + + docsWithField.ramBytesUsed() + + RamUsageEstimator.sizeOf(currentValues); + iwBytesUsed.addAndGet(newBytesUsed - bytesUsed); + bytesUsed = newBytesUsed; + } + + static final class LongValues { + final long[] offsets; + final PackedLongValues values; + + LongValues( + int maxDoc, + Sorter.DocMap sortMap, + SortedNumericDocValues oldValues, + float acceptableOverheadRatio) + throws IOException { + offsets = new long[maxDoc]; + PackedLongValues.Builder valuesBuiler = + PackedLongValues.packedBuilder(acceptableOverheadRatio); + int docID; + long offsetIndex = 1; // 0 means the doc has no values + while ((docID = oldValues.nextDoc()) != NO_MORE_DOCS) { + int newDocID = sortMap.oldToNew(docID); + int numValues = oldValues.docValueCount(); + valuesBuiler.add(numValues); + offsets[newDocID] = offsetIndex++; + for (int i = 0; i < numValues; i++) { + valuesBuiler.add(oldValues.nextValue()); + offsetIndex++; + } + } + values = valuesBuiler.build(); + } + } + + private SortedNumericDocValues getValues( + PackedLongValues values, PackedLongValues valueCounts, DocsWithFieldSet docsWithField) { + if (valueCounts == null) { + return DocValues.singleton(new BufferedNumericDocValues(values, docsWithField.iterator())); + } else { + return new BufferedSortedNumericDocValues(values, valueCounts, docsWithField.iterator()); + } + + } + + public SortedNumericDocValues getSortedNumericDocValues() + throws IOException { + final PackedLongValues values; + final PackedLongValues valueCounts; + if (finalValues == null) { + finishCurrentDoc(); + values = pending.build(); + valueCounts = pendingCounts == null ? null : pendingCounts.build(); + finalValues = values; + finalValuesCount = valueCounts; + } else { + values = finalValues; + valueCounts = finalValuesCount; + } + + final SortedNumericDocValues buf = getValues(values, valueCounts, docsWithField); + return buf; + } + + private static class BufferedSortedNumericDocValues extends SortedNumericDocValues { + final PackedLongValues.Iterator valuesIter; + final PackedLongValues.Iterator valueCountsIter; + final DocIdSetIterator docsWithField; + private int valueCount; + private int valueUpto; + + BufferedSortedNumericDocValues( + PackedLongValues values, PackedLongValues valueCounts, DocIdSetIterator docsWithField) { + valuesIter = values.iterator(); + valueCountsIter = valueCounts.iterator(); + this.docsWithField = docsWithField; + } + + @Override + public int docID() { + return docsWithField.docID(); + } + + @Override + public int nextDoc() throws IOException { + for (int i = valueUpto; i < valueCount; ++i) { + valuesIter.next(); + } + + int docID = docsWithField.nextDoc(); + if (docID != NO_MORE_DOCS) { + valueCount = Math.toIntExact(valueCountsIter.next()); + valueUpto = 0; + } + return docID; + } + + @Override + public int advance(int target) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean advanceExact(int target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int docValueCount() { + return valueCount; + } + + @Override + public long nextValue() { + valueUpto++; + return valuesIter.next(); + } + + @Override + public long cost() { + return docsWithField.cost(); + } + } + + static class SortingSortedNumericDocValues extends SortedNumericDocValues { + private final SortedNumericDocValues in; + private final LongValues values; + private int docID = -1; + private long upto; + private int numValues = -1; + + SortingSortedNumericDocValues(SortedNumericDocValues in, LongValues values) { + this.in = in; + this.values = values; + } + + @Override + public int docID() { + return docID; + } + + @Override + public int nextDoc() { + do { + docID++; + if (docID >= values.offsets.length) { + return docID = NO_MORE_DOCS; + } + } while (values.offsets[docID] <= 0); + upto = values.offsets[docID]; + numValues = Math.toIntExact(values.values.get(upto - 1)); + return docID; + } + + @Override + public int advance(int target) { + throw new UnsupportedOperationException("use nextDoc instead"); + } + + @Override + public boolean advanceExact(int target) throws IOException { + docID = target; + upto = values.offsets[docID]; + if (values.offsets[docID] > 0) { + numValues = Math.toIntExact(values.values.get(upto - 1)); + return true; + } + return false; + } + + @Override + public long nextValue() { + return values.values.get(upto++); + } + + @Override + public long cost() { + return in.cost(); + } + + @Override + public int docValueCount() { + return numValues; + } + } + + public static class BufferedNumericDocValues extends NumericDocValues { + final PackedLongValues.Iterator iter; + final DocIdSetIterator docsWithField; + private long value; + + /** Values and doc with fields */ + public BufferedNumericDocValues(PackedLongValues values, DocIdSetIterator docsWithFields) { + this.iter = values.iterator(); + this.docsWithField = docsWithFields; + } + + @Override + public int docID() { + return docsWithField.docID(); + } + + @Override + public int nextDoc() throws IOException { + int docID = docsWithField.nextDoc(); + if (docID != NO_MORE_DOCS) { + value = iter.next(); + } + return docID; + } + + @Override + public int advance(int target) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean advanceExact(int target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public long cost() { + return docsWithField.cost(); + } + + @Override + public long longValue() { + return value; + } + } +} diff --git a/server/src/main/java/org/opensearch/index/codec/freshstartree/codec/StarTreeAggregatedValues.java b/server/src/main/java/org/opensearch/index/codec/freshstartree/codec/StarTreeAggregatedValues.java index 2922e67255246..ec971b90ec624 100644 --- a/server/src/main/java/org/opensearch/index/codec/freshstartree/codec/StarTreeAggregatedValues.java +++ b/server/src/main/java/org/opensearch/index/codec/freshstartree/codec/StarTreeAggregatedValues.java @@ -17,6 +17,7 @@ package org.opensearch.index.codec.freshstartree.codec; import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.SortedNumericDocValues; import org.opensearch.index.codec.freshstartree.node.StarTree; import java.util.Map; @@ -26,14 +27,14 @@ /** Star tree aggregated values holder for reader / query */ public class StarTreeAggregatedValues { public StarTree _starTree; - public Map dimensionValues; + public Map dimensionValues; - public Map metricValues; + public Map metricValues; public StarTreeAggregatedValues( StarTree starTree, - Map dimensionValues, - Map metricValues + Map dimensionValues, + Map metricValues ) { this._starTree = starTree; this.dimensionValues = dimensionValues; diff --git a/server/src/main/java/org/opensearch/index/codec/freshstartree/codec/StarTreeDocValuesReader.java b/server/src/main/java/org/opensearch/index/codec/freshstartree/codec/StarTreeDocValuesReader.java index 1c26151f30cdd..8494188c1771e 100644 --- a/server/src/main/java/org/opensearch/index/codec/freshstartree/codec/StarTreeDocValuesReader.java +++ b/server/src/main/java/org/opensearch/index/codec/freshstartree/codec/StarTreeDocValuesReader.java @@ -47,9 +47,9 @@ public class StarTreeDocValuesReader extends DocValuesProducer { StarTree starTree; - Map dimensionValues; + Map dimensionValues; - Map metricValues; + Map metricValues; public static final String DATA_CODEC = "Lucene90DocValuesData"; public static final String META_CODEC = "Lucene90DocValuesMetadata"; @@ -73,11 +73,11 @@ public NumericDocValues getNumeric(FieldInfo field) throws IOException { public StarTreeAggregatedValues getAggregatedDocValues() throws IOException { List dimensionsSplitOrder = starTree.getDimensionNames(); for (int i = 0; i < dimensionsSplitOrder.size(); i++) { - dimensionValues.put(dimensionsSplitOrder.get(i), valuesProducer.getNumeric(dimensionsSplitOrder.get(i) + "_dim")); + dimensionValues.put(dimensionsSplitOrder.get(i), valuesProducer.getSortedNumeric(dimensionsSplitOrder.get(i) + "_dim")); } metricValues = new HashMap<>(); - metricValues.put("elb_status_sum", valuesProducer.getNumeric("elb_status_sum_metric")); - metricValues.put("target_status_sum", valuesProducer.getNumeric("target_status_sum_metric")); + metricValues.put("elb_status_sum", valuesProducer.getSortedNumeric("elb_status_sum_metric")); + metricValues.put("target_status_sum", valuesProducer.getSortedNumeric("target_status_sum_metric")); //metricValues.put("elb_status_count", valuesProducer.getNumeric("elb_status_count_metric")); //metricValues.put("status_count", valuesProducer.getNumeric("status_count_metric")); return new StarTreeAggregatedValues(starTree, dimensionValues, metricValues); diff --git a/server/src/main/java/org/opensearch/index/codec/freshstartree/query/StarTreeFilter.java b/server/src/main/java/org/opensearch/index/codec/freshstartree/query/StarTreeFilter.java index 2bef664ebff51..120b196ff8ef5 100644 --- a/server/src/main/java/org/opensearch/index/codec/freshstartree/query/StarTreeFilter.java +++ b/server/src/main/java/org/opensearch/index/codec/freshstartree/query/StarTreeFilter.java @@ -67,7 +67,7 @@ static class StarTreeResult { DocIdSetBuilder docsWithField; DocIdSetBuilder.BulkAdder adder; - Map dimValueMap; + Map dimValueMap; int docNum; public StarTreeFilter( StarTreeAggregatedValues starTreeAggrStructure, @@ -108,7 +108,7 @@ public DocIdSetIterator getStarTreeResult() throws IOException { // TODO : set to max value of doc values DocIdSetBuilder builder = new DocIdSetBuilder(starTreeResult.numOfMatchedDocs); List> compositePredicateEvaluators = _predicateEvaluators.get(remainingPredicateColumn); - SortedNumericDocValues ndv = DocValues.singleton(this.dimValueMap.get(remainingPredicateColumn)); + SortedNumericDocValues ndv = this.dimValueMap.get(remainingPredicateColumn); long ndvStartTime1 = System.nanoTime(); while (docIdSetIterator.nextDoc() != NO_MORE_DOCS) { docCount++; @@ -184,8 +184,9 @@ private StarTreeResult traverseStarTree() throws IOException { // If all predicate columns and group-by columns are matched, we can use aggregated document if (remainingPredicateColumns.isEmpty() && remainingGroupByColumns.isEmpty()) { adder = docsWithField.grow(1); - docNum++; - adder.add(starTreeNode.getAggregatedDocId()); + int docId = starTreeNode.getAggregatedDocId(); + adder.add(docId); + docNum = docId > docNum ? docId : docNum; continue; } @@ -197,8 +198,8 @@ private StarTreeResult traverseStarTree() throws IOException { if (starTreeNode.isLeaf()) { for (long i = starTreeNode.getStartDocId(); i < starTreeNode.getEndDocId(); i++) { adder = docsWithField.grow(1); - docNum++; adder.add((int) i); + docNum = (int)i > docNum ? (int)i : docNum; } continue; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/startree/StarTreeAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/startree/StarTreeAggregator.java index ee836fc993ea4..9258c786200af 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/startree/StarTreeAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/startree/StarTreeAggregator.java @@ -12,6 +12,7 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.SortedNumericDocValues; import org.opensearch.core.ParseField; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; @@ -176,23 +177,23 @@ protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucket public void collect(int doc, long bucket) throws IOException { StarTreeAggregatedValues aggrVals = (StarTreeAggregatedValues) ctx.reader().getAggregatedDocValues(); - Map fieldColToDocValuesMap = new HashMap<>(); + Map fieldColToDocValuesMap = new HashMap<>(); // TODO : validations for (String field : fieldCols) { fieldColToDocValuesMap.put(field, aggrVals.dimensionValues.get(field)); } // Another hardcoding - NumericDocValues dv = aggrVals.metricValues.get(metrics.get(0)); + SortedNumericDocValues dv = aggrVals.metricValues.get(metrics.get(0)); if (dv.advanceExact(doc)) { String key = getKey(fieldColToDocValuesMap, doc); if (indexMap.containsKey(key)) { - sumMap.put(key, sumMap.getOrDefault(key, 0l) + dv.longValue()); + sumMap.put(key, sumMap.getOrDefault(key, 0l) + dv.nextValue()); } else { indexMap.put(key, indexMap.size()); - sumMap.put(key, dv.longValue()); + sumMap.put(key, dv.nextValue()); } collectBucket(sub, doc, subBucketOrdinal(bucket, indexMap.get(key))); } @@ -201,11 +202,11 @@ public void collect(int doc, long bucket) throws IOException { } - private String getKey(Map fieldColsMap, int doc) throws IOException { + private String getKey(Map fieldColsMap, int doc) throws IOException { StringJoiner sj = new StringJoiner("-"); - for (Map.Entry fieldEntry : fieldColsMap.entrySet()) { + for (Map.Entry fieldEntry : fieldColsMap.entrySet()) { fieldEntry.getValue().advanceExact(doc); - long val = fieldEntry.getValue().longValue(); + long val = fieldEntry.getValue().nextValue(); //System.out.println("Key field : " + fieldEntry.getKey() + " Value : " + val); sj.add("" + val); }