Skip to content

Commit

Permalink
write startreedoc optimizations
Browse files Browse the repository at this point in the history
Signed-off-by: Bharathwaj G <[email protected]>
  • Loading branch information
bharath-techie committed Oct 17, 2024
1 parent 731320f commit 00bab32
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@

package org.opensearch.common.util;

import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.RandomAccessInput;

import java.io.IOException;
import java.nio.ByteBuffer;

/**
* A bitset backed by a byte array. This will initialize and set bits in the byte array based on the index.
Expand All @@ -39,18 +38,6 @@ public ByteArrayBackedBitset(RandomAccessInput in, long offset, int length) thro
}
}

/**
* Constructor which set the Lucene's IndexInput to read the bitset into a read-only buffer.
*/
public ByteArrayBackedBitset(IndexInput in, int length) throws IOException {
byteArray = new byte[length];
int i = 0;
while (i < length) {
byteArray[i] = in.readByte();
i++;
}
}

/**
* Sets the bit at the given index to 1.
* Each byte can indicate 8 bits, so the index is divided by 8 to get the byte array index.
Expand All @@ -61,10 +48,10 @@ public void set(int index) {
byteArray[byteArrIndex] |= (byte) (1 << (index & 7));
}

public int write(IndexOutput output) throws IOException {
public int write(ByteBuffer output) throws IOException {
int numBytes = 0;
for (Byte bitSet : byteArray) {
output.writeByte(bitSet);
output.put(bitSet);
numBytes += Byte.BYTES;
}
return numBytes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.List;

import static org.opensearch.index.mapper.NumberFieldMapper.NumberType.DOUBLE;
Expand Down Expand Up @@ -69,72 +71,84 @@ private void setDocSizeInBytes(int numBytes) {
}

/**
* Write the star tree document to file associated with dimensions and metrics
* Write the star tree document to a byte buffer
*/
protected int writeStarTreeDocument(StarTreeDocument starTreeDocument, IndexOutput output, boolean isAggregatedDoc) throws IOException {
int numBytes = writeDimensions(starTreeDocument, output);
int numBytes = calculateDocumentSize(starTreeDocument, isAggregatedDoc);
byte[] bytes = new byte[numBytes];
ByteBuffer buffer = ByteBuffer.wrap(bytes).order(ByteOrder.nativeOrder());
writeDimensions(starTreeDocument, buffer);
if (isAggregatedDoc == false) {
numBytes += writeMetrics(starTreeDocument, output);
writeFlushMetrics(starTreeDocument, buffer);
} else {
numBytes += writeMetrics(starTreeDocument, output, isAggregatedDoc);
writeMetrics(starTreeDocument, buffer, isAggregatedDoc);
}
output.writeBytes(bytes, bytes.length);
setDocSizeInBytes(numBytes);
return numBytes;
return bytes.length;
}

/**
* Write dimensions to file
* Write dimensions to the byte buffer
*/
protected int writeDimensions(StarTreeDocument starTreeDocument, IndexOutput output) throws IOException {
int numBytes = 0;
for (int i = 0; i < starTreeDocument.dimensions.length; i++) {
output.writeLong(starTreeDocument.dimensions[i] == null ? 0L : starTreeDocument.dimensions[i]);
numBytes += Long.BYTES;
protected void writeDimensions(StarTreeDocument starTreeDocument, ByteBuffer buffer) throws IOException {
for (Long dimension : starTreeDocument.dimensions) {
buffer.putLong(dimension == null ? 0L : dimension);
}
numBytes += StarTreeDocumentBitSetUtil.writeBitSet(starTreeDocument.dimensions, output);
return numBytes;
StarTreeDocumentBitSetUtil.writeBitSet(starTreeDocument.dimensions, buffer);
}

/**
* Write star tree document metrics to file. Here we only write the metric field values. [ we avoid writing duplicate
* values for each of the stats ]
* Write star tree document metrics to file
*/
protected int writeMetrics(StarTreeDocument starTreeDocument, IndexOutput output) throws IOException {
int numBytes = 0;
protected void writeFlushMetrics(StarTreeDocument starTreeDocument, ByteBuffer buffer) throws IOException {
for (int i = 0; i < starTreeDocument.metrics.length; i++) {
output.writeLong(starTreeDocument.metrics[i] == null ? 0L : (Long) starTreeDocument.metrics[i]);
numBytes += Long.BYTES;
buffer.putLong(starTreeDocument.metrics[i] == null ? 0L : (Long) starTreeDocument.metrics[i]);
}
numBytes += StarTreeDocumentBitSetUtil.writeBitSet(starTreeDocument.metrics, output);
return numBytes;
StarTreeDocumentBitSetUtil.writeBitSet(starTreeDocument.metrics, buffer);
}

/**
* Write star tree document metrics to file
* Write star tree document metrics to the byte buffer
*/
protected int writeMetrics(StarTreeDocument starTreeDocument, IndexOutput output, boolean isAggregatedDoc) throws IOException {
int numBytes = 0;
protected void writeMetrics(StarTreeDocument starTreeDocument, ByteBuffer buffer, boolean isAggregatedDoc) throws IOException {
for (int i = 0; i < starTreeDocument.metrics.length; i++) {
FieldValueConverter aggregatedValueType = metricAggregatorInfos.get(i).getValueAggregators().getAggregatedValueType();
if (aggregatedValueType.equals(LONG)) {
output.writeLong(starTreeDocument.metrics[i] == null ? 0L : (Long) starTreeDocument.metrics[i]);
numBytes += Long.BYTES;
buffer.putLong(starTreeDocument.metrics[i] == null ? 0L : (Long) starTreeDocument.metrics[i]);
} else if (aggregatedValueType.equals(DOUBLE)) {
if (isAggregatedDoc) {
long val = NumericUtils.doubleToSortableLong(
starTreeDocument.metrics[i] == null ? 0.0 : (Double) starTreeDocument.metrics[i]
);
output.writeLong(val);
buffer.putLong(val);
} else {
output.writeLong(starTreeDocument.metrics[i] == null ? 0L : (Long) starTreeDocument.metrics[i]);
buffer.putLong(starTreeDocument.metrics[i] == null ? 0L : (Long) starTreeDocument.metrics[i]);
}
numBytes += Long.BYTES;
} else {
throw new IllegalStateException("Unsupported metric type");
}
}
numBytes += StarTreeDocumentBitSetUtil.writeBitSet(starTreeDocument.metrics, output);
return numBytes;
StarTreeDocumentBitSetUtil.writeBitSet(starTreeDocument.metrics, buffer);
}

/**
* Calculate the size of the serialized StarTreeDocument
*/
private int calculateDocumentSize(StarTreeDocument starTreeDocument, boolean isAggregatedDoc) {
int size = starTreeDocument.dimensions.length * Long.BYTES;
size += getLength(starTreeDocument.dimensions);

for (int i = 0; i < starTreeDocument.metrics.length; i++) {
size += Long.BYTES;
}
size += getLength(starTreeDocument.metrics);

return size;
}

private static int getLength(Object[] array) {
return (array.length / 8) + (array.length % 8 == 0 ? 0 : 1);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,7 @@ StarTreeDocument getSegmentStarTreeDocumentWithMetricFieldValues(
/**
* Returns the metric field values for the star-tree document from the segment based on the current doc id
*/
private Object[] getStarTreeMetricFieldValuesFromSegment(int currentDocId, List<SequentialDocValuesIterator> metricReaders)
throws IOException {
private Object[] getStarTreeMetricFieldValuesFromSegment(int currentDocId, List<SequentialDocValuesIterator> metricReaders) {
Object[] metricValues = new Object[starTreeField.getMetrics().size()];
for (int i = 0; i < starTreeField.getMetrics().size(); i++) {
if (starTreeField.getMetrics().get(i).getBaseMetrics().isEmpty()) continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,34 +8,17 @@

package org.opensearch.index.compositeindex.datacube.startree.utils;

import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.RandomAccessInput;
import org.opensearch.common.util.ByteArrayBackedBitset;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.function.Function;

/**
* Helper class to read/write bitset for null values and identity values.
*/
public class StarTreeDocumentBitSetUtil {
/**
* Write bitset for null values.
*
* @param array array of objects
* @param output output stream
* @return number of bytes written
* @throws IOException if an I/O error occurs while writing to the output stream
*/
public static int writeBitSet(Object[] array, IndexOutput output) throws IOException {
ByteArrayBackedBitset bitset = new ByteArrayBackedBitset(getLength(array));
for (int i = 0; i < array.length; i++) {
if (array[i] == null) {
bitset.set(i);
}
}
return bitset.write(output);
}

/**
* Set identity values based on bitset.
Expand All @@ -51,6 +34,19 @@ public static int readBitSet(RandomAccessInput input, long offset, Object[] arra
return bitset.getCurrBytesRead();
}

/**
* Write the bitset for the given array to the ByteBuffer
*/
public static void writeBitSet(Object[] array, ByteBuffer buffer) throws IOException {
ByteArrayBackedBitset bitset = new ByteArrayBackedBitset(getLength(array));
for (int i = 0; i < array.length; i++) {
if (array[i] == null) {
bitset.set(i);
}
}
bitset.write(buffer);
}

private static int getLength(Object[] array) {
return (array.length / 8) + (array.length % 8 == 0 ? 0 : 1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.file.Path;

/**
Expand All @@ -39,7 +41,10 @@ private static void testWriteAndReadBitset(int randomArraySize, int randomIndex1
IndexOutput indexOutput = fsDirectory.createOutput(TEST_FILE, IOContext.DEFAULT);
bitset.set(randomIndex1);
bitset.set(randomIndex2);
bitset.write(indexOutput);
byte[] bytes = new byte[randomArraySize];
ByteBuffer buffer = ByteBuffer.wrap(bytes).order(ByteOrder.nativeOrder());
bitset.write(buffer);
indexOutput.writeBytes(bytes, bytes.length);
indexOutput.close();

IndexInput in = fsDirectory.openInput(TEST_FILE, IOContext.DEFAULT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.function.Function;
Expand Down Expand Up @@ -46,7 +48,11 @@ void testNullBasedOnBitset(Long[] dims) throws IOException {
FSDirectory fsDirectory = FSDirectory.open(basePath);
String TEST_FILE = "test_file";
IndexOutput indexOutput = fsDirectory.createOutput(TEST_FILE, IOContext.DEFAULT);
StarTreeDocumentBitSetUtil.writeBitSet(dims, indexOutput);
int numBytes = getLength(dims);
byte[] bytes = new byte[numBytes];
ByteBuffer buffer = ByteBuffer.wrap(bytes).order(ByteOrder.nativeOrder());
StarTreeDocumentBitSetUtil.writeBitSet(dims, buffer);
indexOutput.writeBytes(bytes, numBytes);
indexOutput.close();

// test null value on read
Expand All @@ -69,4 +75,8 @@ void testNullBasedOnBitset(Long[] dims) throws IOException {
assertEquals(randomLong, (long) dims1[randomNullIndex2]);
in.close();
}

private static int getLength(Object[] array) {
return (array.length / 8) + (array.length % 8 == 0 ? 0 : 1);
}
}

0 comments on commit 00bab32

Please sign in to comment.