Skip to content

Commit

Permalink
Performance optimizations during flush flow
Browse files Browse the repository at this point in the history
Signed-off-by: Bharathwaj G <[email protected]>
  • Loading branch information
bharath-techie committed Oct 17, 2024
1 parent 0c3e3c0 commit 731320f
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import org.apache.lucene.store.TrackingDirectoryWrapper;
import org.apache.lucene.util.NumericUtils;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.index.compositeindex.datacube.Metric;
import org.opensearch.index.compositeindex.datacube.MetricStat;
import org.opensearch.index.compositeindex.datacube.startree.StarTreeDocument;
import org.opensearch.index.compositeindex.datacube.startree.StarTreeField;
import org.opensearch.index.compositeindex.datacube.startree.aggregators.MetricAggregatorInfo;
Expand Down Expand Up @@ -71,7 +73,11 @@ private void setDocSizeInBytes(int numBytes) {
*/
protected int writeStarTreeDocument(StarTreeDocument starTreeDocument, IndexOutput output, boolean isAggregatedDoc) throws IOException {
int numBytes = writeDimensions(starTreeDocument, output);
numBytes += writeMetrics(starTreeDocument, output, isAggregatedDoc);
if (isAggregatedDoc == false) {
numBytes += writeMetrics(starTreeDocument, output);
} else {
numBytes += writeMetrics(starTreeDocument, output, isAggregatedDoc);
}
setDocSizeInBytes(numBytes);
return numBytes;
}
Expand All @@ -89,6 +95,20 @@ protected int writeDimensions(StarTreeDocument starTreeDocument, IndexOutput out
return numBytes;
}

/**
* Write star tree document metrics to file. Here we only write the metric field values. [ we avoid writing duplicate
* values for each of the stats ]
*/
protected int writeMetrics(StarTreeDocument starTreeDocument, IndexOutput output) throws IOException {
int numBytes = 0;
for (int i = 0; i < starTreeDocument.metrics.length; i++) {
output.writeLong(starTreeDocument.metrics[i] == null ? 0L : (Long) starTreeDocument.metrics[i]);
numBytes += Long.BYTES;
}
numBytes += StarTreeDocumentBitSetUtil.writeBitSet(starTreeDocument.metrics, output);
return numBytes;
}

/**
* Write star tree document metrics to file
*/
Expand Down Expand Up @@ -132,7 +152,11 @@ protected StarTreeDocument readStarTreeDocument(RandomAccessInput input, long of
offset = readDimensions(dimensions, input, offset);

Object[] metrics = new Object[numMetrics];
offset = readMetrics(input, offset, numMetrics, metrics, isAggregatedDoc);
if (isAggregatedDoc == false) {
offset = readMetrics(input, offset, metrics);
} else {
offset = readMetrics(input, offset, numMetrics, metrics, isAggregatedDoc);
}
assert (offset - initialOffset) == docSizeInBytes;
return new StarTreeDocument(dimensions, metrics);
}
Expand All @@ -154,10 +178,32 @@ protected long readDimensions(Long[] dimensions, RandomAccessInput input, long o
return offset;
}

/**
* Read metrics based on metric field values. Then we reuse the metric field values to each of the metric stats.
*/
private long readMetrics(RandomAccessInput input, long offset, Object[] metrics) throws IOException {
Object[] fieldMetrics = new Object[starTreeField.getMetrics().size()];
for (int i = 0; i < starTreeField.getMetrics().size(); i++) {
fieldMetrics[i] = input.readLong(offset);
offset += Long.BYTES;
}
offset += StarTreeDocumentBitSetUtil.readBitSet(input, offset, fieldMetrics, index -> null);
int fieldIndex = 0;
int numMetrics = 0;
for (Metric metric : starTreeField.getMetrics()) {
for (MetricStat stat : metric.getBaseMetrics()) {
metrics[numMetrics] = fieldMetrics[fieldIndex];
numMetrics++;
}
fieldIndex++;
}
return offset;
}

/**
* Read star tree metrics from file
*/
protected long readMetrics(RandomAccessInput input, long offset, int numMetrics, Object[] metrics, boolean isAggregatedDoc)
private long readMetrics(RandomAccessInput input, long offset, int numMetrics, Object[] metrics, boolean isAggregatedDoc)
throws IOException {
for (int i = 0; i < numMetrics; i++) {
FieldValueConverter aggregatedValueType = metricAggregatorInfos.get(i).getValueAggregators().getAggregatedValueType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ public List<MetricAggregatorInfo> generateMetricAggregatorInfos(MapperService ma

/**
* Generates the configuration required to perform aggregation for all the metrics on a field
* Each metric field is associated with a metric reader
*
* @return list of MetricAggregatorInfo
*/
Expand All @@ -191,24 +192,20 @@ public List<SequentialDocValuesIterator> getMetricReaders(SegmentWriteState stat

List<SequentialDocValuesIterator> metricReaders = new ArrayList<>();
for (Metric metric : this.starTreeField.getMetrics()) {
for (MetricStat metricStat : metric.getBaseMetrics()) {
SequentialDocValuesIterator metricReader;
FieldInfo metricFieldInfo = state.fieldInfos.fieldInfo(metric.getField());
if (metricStat.equals(MetricStat.DOC_COUNT)) {
// _doc_count is numeric field , so we convert to sortedNumericDocValues and get iterator
metricReader = getIteratorForNumericField(fieldProducerMap, metricFieldInfo, DocCountFieldMapper.NAME);
} else {
if (metricFieldInfo == null) {
metricFieldInfo = getFieldInfo(metric.getField(), DocValuesType.SORTED_NUMERIC);
}
metricReader = new SequentialDocValuesIterator(
new SortedNumericStarTreeValuesIterator(
fieldProducerMap.get(metricFieldInfo.name).getSortedNumeric(metricFieldInfo)
)
);
SequentialDocValuesIterator metricReader;
FieldInfo metricFieldInfo = state.fieldInfos.fieldInfo(metric.getField());
if (metric.getField().equals(DocCountFieldMapper.NAME)) {
metricReader = getIteratorForNumericField(fieldProducerMap, metricFieldInfo, DocCountFieldMapper.NAME);
} else {
if (metric.getBaseMetrics().isEmpty()) continue;
if (metricFieldInfo == null) {
metricFieldInfo = getFieldInfo(metric.getField(), DocValuesType.SORTED_NUMERIC);
}
metricReaders.add(metricReader);
metricReader = new SequentialDocValuesIterator(
new SortedNumericStarTreeValuesIterator(fieldProducerMap.get(metricFieldInfo.name).getSortedNumeric(metricFieldInfo))
);
}
metricReaders.add(metricReader);
}
return metricReaders;
}
Expand Down Expand Up @@ -572,21 +569,29 @@ Long[] getStarTreeDimensionsFromSegment(int currentDocId, SequentialDocValuesIte
*/
private Object[] getStarTreeMetricsFromSegment(int currentDocId, List<SequentialDocValuesIterator> metricsReaders) throws IOException {
Object[] metrics = new Object[numMetrics];
for (int i = 0; i < numMetrics; i++) {
SequentialDocValuesIterator metricStatReader = metricsReaders.get(i);
if (metricStatReader != null) {
int metricIndex = 0;
for (int i = 0; i < starTreeField.getMetrics().size(); i++) {
Metric metric = starTreeField.getMetrics().get(i);
if (metric.getBaseMetrics().isEmpty()) continue;
SequentialDocValuesIterator metricReader = metricsReaders.get(i);
if (metricReader != null) {
try {
metricStatReader.nextEntry(currentDocId);
metricReader.nextEntry(currentDocId);
Object metricValue = metricReader.value(currentDocId);

for (MetricStat metricStat : metric.getBaseMetrics()) {
metrics[metricIndex] = metricValue;
metricIndex++;
}
} catch (IOException e) {
logger.error("unable to iterate to next doc", e);
throw new RuntimeException("unable to iterate to next doc", e);
} catch (Exception e) {
logger.error("unable to read the metric values from the segment", e);
throw new IllegalStateException("unable to read the metric values from the segment", e);
}
metrics[i] = metricStatReader.value(currentDocId);
} else {
throw new IllegalStateException("metric readers are empty");
throw new IllegalStateException("metric reader is empty for metric: " + metric.getField());
}
}
return metrics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public Iterator<StarTreeDocument> sortAndAggregateSegmentDocuments(
}
try {
for (int i = 0; i < totalSegmentDocs; i++) {
StarTreeDocument document = getSegmentStarTreeDocument(i, dimensionReaders, metricReaders);
StarTreeDocument document = getSegmentStarTreeDocumentWithMetricFieldValues(i, dimensionReaders, metricReaders);
segmentDocumentFileManager.writeStarTreeDocument(document, false);
}
} catch (IOException ex) {
Expand All @@ -128,6 +128,46 @@ public Iterator<StarTreeDocument> sortAndAggregateSegmentDocuments(
return sortAndReduceDocuments(sortedDocIds, totalSegmentDocs, false);
}

/**
* Returns the star-tree document from the segment based on the current doc id
*/
StarTreeDocument getSegmentStarTreeDocumentWithMetricFieldValues(
int currentDocId,
SequentialDocValuesIterator[] dimensionReaders,
List<SequentialDocValuesIterator> metricReaders
) throws IOException {
Long[] dimensions = getStarTreeDimensionsFromSegment(currentDocId, dimensionReaders);
Object[] metricValues = getStarTreeMetricFieldValuesFromSegment(currentDocId, metricReaders);
return new StarTreeDocument(dimensions, metricValues);
}

/**
* Returns the metric field values for the star-tree document from the segment based on the current doc id
*/
private Object[] getStarTreeMetricFieldValuesFromSegment(int currentDocId, List<SequentialDocValuesIterator> metricReaders)
throws IOException {
Object[] metricValues = new Object[starTreeField.getMetrics().size()];
for (int i = 0; i < starTreeField.getMetrics().size(); i++) {
if (starTreeField.getMetrics().get(i).getBaseMetrics().isEmpty()) continue;
SequentialDocValuesIterator metricReader = metricReaders.get(i);
if (metricReader != null) {
try {
metricReader.nextEntry(currentDocId);
metricValues[i] = metricReader.value(currentDocId);
} catch (IOException e) {
logger.error("unable to iterate to next doc", e);
throw new RuntimeException("unable to iterate to next doc", e);
} catch (Exception e) {
logger.error("unable to read the metric values from the segment", e);
throw new IllegalStateException("unable to read the metric values from the segment", e);
}
} else {
throw new IllegalStateException("metric reader is empty");
}
}
return metricValues;
}

/**
* Sorts and aggregates the star-tree documents from multiple segments and builds star tree based on the newly
* aggregated star-tree documents
Expand Down

0 comments on commit 731320f

Please sign in to comment.