Skip to content

Commit

Permalink
Initial collector implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Harsha Vamsi Kalluri <[email protected]>
  • Loading branch information
harshavamsi committed Sep 18, 2024
1 parent 58794ad commit 0f3b7a2
Show file tree
Hide file tree
Showing 12 changed files with 735 additions and 1 deletion.
51 changes: 51 additions & 0 deletions server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,57 @@ dependencies {
// https://mvnrepository.com/artifact/org.roaringbitmap/RoaringBitmap
implementation 'org.roaringbitmap:RoaringBitmap:1.2.1'

api group: 'com.google.code.findbugs', name: 'jsr305', version: '3.0.2'
api 'org.slf4j:slf4j-api:1.7.36'
api("io.netty:netty-common:${versions.netty}") {
exclude group: 'io.netty', module: 'netty-common'
}
api("io.netty:netty-buffer:${versions.netty}") {
exclude group: 'io.netty', module: 'netty-buffer'
}
api group: 'org.apache.arrow', name: 'arrow-memory-netty-buffer-patch', version: '17.0.0'
api group: 'org.apache.arrow', name: 'arrow-vector', version: '17.0.0'
api 'org.apache.arrow:arrow-memory-core:17.0.0'
api group: 'org.apache.arrow', name: 'arrow-memory-netty', version: '17.0.0'
api 'org.apache.arrow:arrow-format:17.0.0'
api 'org.apache.arrow:arrow-flight:17.0.0'
api 'org.apache.arrow:flight-core:17.0.0'
// api 'org.apache.arrow:flight-grpc:17.0.0'
// api 'org.apache.arrow:flight-grpc:17.0.0'
api 'io.grpc:grpc-api:1.57.2'
api 'io.grpc:grpc-netty:1.63.0'
// api 'io.grpc:grpc-java:1.57.2'
api 'io.grpc:grpc-core:1.63.0'
api 'io.grpc:grpc-stub:1.63.0'
// api 'io.grpc:grpc-all:1.63.0'
api 'io.grpc:grpc-protobuf:1.63.0'
api 'io.grpc:grpc-protobuf-lite:1.63.0'


api 'io.grpc:grpc-all:1.57.2'
api "io.netty:netty-buffer:${versions.netty}"
api "io.netty:netty-codec:${versions.netty}"
api "io.netty:netty-codec-http:${versions.netty}"
api "io.netty:netty-codec-http2:${versions.netty}"
api "io.netty:netty-common:${versions.netty}"
api "io.netty:netty-handler:${versions.netty}"
api "io.netty:netty-resolver:${versions.netty}"
api "io.netty:netty-transport:${versions.netty}"
api "io.netty:netty-transport-native-unix-common:${versions.netty}"
runtimeOnly 'io.perfmark:perfmark-api:0.27.0'
// runtimeOnly('com.google.guava:guava:32.1.1-jre')
runtimeOnly "com.google.guava:failureaccess:1.0.1"


api 'com.google.flatbuffers:flatbuffers-java:2.0.0'
api 'org.apache.parquet:parquet-arrow:1.13.1'

api 'com.fasterxml.jackson.core:jackson-databind:2.17.2'
api 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.17.2'
api 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.17.2'
api 'com.fasterxml.jackson.core:jackson-annotations:2.17.2'
// api 'org.apache.arrow:arrow-compression:13.0.0'

testImplementation(project(":test:framework")) {
// tests use the locally compiled version of server
exclude group: 'org.opensearch', module: 'server'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import org.opensearch.search.internal.ShardSearchContextId;
import org.opensearch.search.internal.ShardSearchRequest;
import org.opensearch.search.profile.Profilers;
import org.opensearch.search.query.ArrowCollector;
import org.opensearch.search.query.QueryPhaseExecutionException;
import org.opensearch.search.query.QuerySearchResult;
import org.opensearch.search.query.ReduceableSearchResult;
Expand Down Expand Up @@ -199,6 +200,7 @@ final class DefaultSearchContext extends SearchContext {
private List<RescoreContext> rescore;
private Profilers profilers;
private BucketCollectorProcessor bucketCollectorProcessor = NO_OP_BUCKET_COLLECTOR_PROCESSOR;
private ArrowCollector arrowCollector = NO_OP_ARROW_COLLECTOR;
private final Map<String, SearchExtBuilder> searchExtBuilders = new HashMap<>();
private final Map<Class<?>, CollectorManager<? extends Collector, ReduceableSearchResult>> queryCollectorManagers = new HashMap<>();
private final QueryShardContext queryShardContext;
Expand Down Expand Up @@ -1052,6 +1054,16 @@ public BucketCollectorProcessor bucketCollectorProcessor() {
return bucketCollectorProcessor;
}

@Override
public ArrowCollector getArrowCollector() {
return arrowCollector;
}

@Override
public void setArrowCollector(ArrowCollector arrowCollector) {
this.arrowCollector = arrowCollector;
}

/**
* Evaluate the concurrentSearchMode based on cluster and index settings if concurrent segment search
* should be used for this request context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.opensearch.search.fetch.subphase.ScriptFieldsContext;
import org.opensearch.search.fetch.subphase.highlight.SearchHighlightContext;
import org.opensearch.search.profile.Profilers;
import org.opensearch.search.query.ArrowCollector;
import org.opensearch.search.query.QuerySearchResult;
import org.opensearch.search.query.ReduceableSearchResult;
import org.opensearch.search.rescore.RescoreContext;
Expand Down Expand Up @@ -568,6 +569,16 @@ public BucketCollectorProcessor bucketCollectorProcessor() {
return in.bucketCollectorProcessor();
}

@Override
public void setArrowCollector(ArrowCollector arrowCollector) {
in.setArrowCollector(arrowCollector);
}

@Override
public ArrowCollector getArrowCollector() {
return in.getArrowCollector();
}

@Override
public boolean shouldUseConcurrentSearch() {
return in.shouldUseConcurrentSearch();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.opensearch.search.fetch.subphase.ScriptFieldsContext;
import org.opensearch.search.fetch.subphase.highlight.SearchHighlightContext;
import org.opensearch.search.profile.Profilers;
import org.opensearch.search.query.ArrowCollector;
import org.opensearch.search.query.QuerySearchResult;
import org.opensearch.search.query.ReduceableSearchResult;
import org.opensearch.search.rescore.RescoreContext;
Expand Down Expand Up @@ -121,6 +122,8 @@ public List<InternalAggregation> toInternalAggregations(Collection<Collector> co
}
};

public static final ArrowCollector NO_OP_ARROW_COLLECTOR = new ArrowCollector();

private final List<Releasable> releasables = new CopyOnWriteArrayList<>();
private final AtomicBoolean closed = new AtomicBoolean(false);
private InnerHitsContext innerHitsContext;
Expand Down Expand Up @@ -515,6 +518,10 @@ public String toString() {

public abstract BucketCollectorProcessor bucketCollectorProcessor();

public abstract ArrowCollector getArrowCollector();

public abstract void setArrowCollector(ArrowCollector arrowCollector);

public abstract int getTargetMaxSliceCount();

public abstract boolean shouldUseTimeSeriesDescSortOptimization();
Expand Down
206 changes: 206 additions & 0 deletions server/src/main/java/org/opensearch/search/query/ArrowCollector.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.query;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.Float2Vector;
import org.apache.arrow.vector.Float4Vector;
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.SmallIntVector;
import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.UInt8Vector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.FloatingPointPrecision;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.search.Scorable;
import org.apache.lucene.search.ScoreMode;
import org.opensearch.common.annotation.ExperimentalApi;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@ExperimentalApi
public class ArrowCollector implements Collector {

BufferAllocator allocator;
Schema schema;
List<ProjectionField> projectionFields;
VectorSchemaRoot root;

final int BATCH_SIZE = 1000;

public ArrowCollector() {
this(new ArrayList<>());
}

public ArrowCollector(List<ProjectionField> projectionFields) {
// super(delegateCollector);
allocator = new RootAllocator();
this.projectionFields = projectionFields;
}

@Override
public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException {
// LeafCollector innerLeafCollector = this.in.getLeafCollector(context);
Map<String, Field> arrowFields = new HashMap<>();
Map<String, FieldVector> vectors = new HashMap<>();
Map<String, NumericDocValues> iterators = new HashMap<>();
final NumericDocValues[] numericDocValues = new NumericDocValues[1];
projectionFields.forEach(field -> {
switch (field.type) {
case INT:
Field intField = new Field(field.fieldName, FieldType.nullable(new ArrowType.Int(32, true)), null);
IntVector intVector = new IntVector(intField, allocator);
intVector.allocateNew(BATCH_SIZE);
vectors.put(field.fieldName, intVector);
arrowFields.put(field.fieldName, intField);
break;
case BOOLEAN:
Field boolField = new Field(field.fieldName, FieldType.nullable(new ArrowType.Bool()), null);
// vectors.put(field.fieldName, intVector);
arrowFields.put(field.fieldName, boolField);
break;
case DATE:
case DATE_NANOSECONDS:
case LONG:
Field longField = new Field(field.fieldName, FieldType.nullable(new ArrowType.Int(64, true)), null);
BigIntVector bigIntVector = new BigIntVector(longField, allocator);
bigIntVector.allocateNew(BATCH_SIZE);
vectors.put(field.fieldName, bigIntVector);
arrowFields.put(field.fieldName, longField);
break;
case UNSIGNED_LONG:
Field unsignedLongField = new Field(field.fieldName, FieldType.nullable(new ArrowType.Int(64, false)), null);
UInt8Vector uInt8Vector = new UInt8Vector(unsignedLongField, allocator);
uInt8Vector.allocateNew(BATCH_SIZE);
vectors.put(field.fieldName, uInt8Vector);
arrowFields.put(field.fieldName, unsignedLongField);
break;
case HALF_FLOAT:
Field halfFoatField = new Field(
field.fieldName,
FieldType.nullable(new ArrowType.FloatingPoint(FloatingPointPrecision.HALF)),
null
);
Float2Vector float2Vector = new Float2Vector(halfFoatField, allocator);
float2Vector.allocateNew(BATCH_SIZE);
vectors.put(field.fieldName, float2Vector);
arrowFields.put(field.fieldName, halfFoatField);
break;
case FLOAT:
Field floatField = new Field(
field.fieldName,
FieldType.nullable(new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)),
null
);
Float4Vector float4Vector = new Float4Vector(floatField, allocator);
float4Vector.allocateNew(BATCH_SIZE);
vectors.put(field.fieldName, float4Vector);
arrowFields.put(field.fieldName, floatField);
break;
case DOUBLE:
Field doubleField = new Field(
field.fieldName,
FieldType.nullable(new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)),
null
);
Float8Vector float8Vector = new Float8Vector(doubleField, allocator);
float8Vector.allocateNew(BATCH_SIZE);
vectors.put(field.fieldName, float8Vector);
arrowFields.put(field.fieldName, doubleField);
break;
case SHORT:
Field shortField = new Field(field.fieldName, FieldType.nullable(new ArrowType.Int(16, true)), null);
SmallIntVector smallIntVector = new SmallIntVector(shortField, allocator);
smallIntVector.allocateNew(BATCH_SIZE);
vectors.put(field.fieldName, smallIntVector);
arrowFields.put(field.fieldName, shortField);
break;
case BYTE:
Field byteField = new Field(field.fieldName, FieldType.nullable(new ArrowType.Int(8, true)), null);
TinyIntVector tinyIntVector = new TinyIntVector(byteField, allocator);
tinyIntVector.allocateNew(BATCH_SIZE);
vectors.put(field.fieldName, tinyIntVector);
arrowFields.put(field.fieldName, byteField);
break;
default:
throw new UnsupportedOperationException("Field type not supported");
}
;
try {
numericDocValues[0] = context.reader().getNumericDocValues(field.fieldName);
} catch (IOException e) {
throw new RuntimeException(e);
}
iterators.put(field.fieldName, numericDocValues[0]);
});
schema = new Schema(arrowFields.values());
root = new VectorSchemaRoot(new ArrayList<>(arrowFields.values()), new ArrayList<>(vectors.values()));
final int[] i = { 0 };
return new LeafCollector() {
@Override
public void setScorer(Scorable scorable) throws IOException {
// innerLeafCollector.setScorer(scorable);
}

@Override
public void collect(int docId) throws IOException {
// innerLeafCollector.collect(docId);
for (String field : iterators.keySet()) {
NumericDocValues iterator = iterators.get(field);
BigIntVector vector = (BigIntVector) vectors.get(field);
if (iterator == null) {
break;
}
if (iterator.advanceExact(docId)) {
if (i[0] > BATCH_SIZE) {
vector.allocateNew(BATCH_SIZE);
}
ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
buffer.putLong(iterator.longValue());
vector.set(i[0], iterator.longValue());
i[0]++;
} else {
break;
}
}
}

@Override
public void finish() throws IOException {
// innerLeafCollector.finish();
root.setRowCount(i[0]);
}
};
}

@Override
public ScoreMode scoreMode() {
return ScoreMode.COMPLETE_NO_SCORES;
}

public VectorSchemaRoot getRootVector() {
return root;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.query;

import org.apache.lucene.search.Collector;
import org.apache.lucene.search.CollectorManager;

import java.io.IOException;
import java.util.List;

public class ArrowCollectorContext extends QueryCollectorContext {

List<ProjectionField> projectionFields;

ArrowCollectorContext(String profilerName, List<ProjectionField> projectionFields) {
super(profilerName);
this.projectionFields = projectionFields;
}

@Override
Collector create(Collector in) throws IOException {
return new ArrowCollector(projectionFields);
}

@Override
CollectorManager<?, ReduceableSearchResult> createManager(CollectorManager<?, ReduceableSearchResult> in) throws IOException {
return null;
}
}
Loading

0 comments on commit 0f3b7a2

Please sign in to comment.