Skip to content

Commit

Permalink
address PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Rishabh Maurya <[email protected]>
  • Loading branch information
rishabhmaurya committed Nov 24, 2024
1 parent 24e7ad2 commit a4d3d99
Show file tree
Hide file tree
Showing 117 changed files with 298 additions and 1,411 deletions.
121 changes: 121 additions & 0 deletions libs/arrow-spi/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

dependencies {
implementation project(':libs:opensearch-common')
implementation project(':libs:opensearch-core')
api "org.apache.arrow:arrow-vector:${versions.arrow}"
api "org.apache.arrow:arrow-format:${versions.arrow}"
api "org.apache.arrow:arrow-memory-core:${versions.arrow}"
runtimeOnly "org.apache.arrow:arrow-memory-netty-buffer-patch:${versions.arrow}"
runtimeOnly "org.apache.arrow:arrow-memory-netty:${versions.arrow}"
runtimeOnly "io.netty:netty-buffer:${versions.netty}"
runtimeOnly "io.netty:netty-common:${versions.netty}"

runtimeOnly "com.google.flatbuffers:flatbuffers-java:${versions.flatbuffers}"
implementation "org.slf4j:slf4j-api:${versions.slf4j}"
implementation "com.fasterxml.jackson.core:jackson-core:${versions.jackson}"
implementation "com.fasterxml.jackson.core:jackson-databind:${versions.jackson}"
implementation "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:${versions.jackson}"
implementation "com.fasterxml.jackson.datatype:jackson-datatype-jsr310:${versions.jackson}"
api "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}"

implementation "commons-codec:commons-codec:${versions.commonscodec}"

testImplementation "com.carrotsearch.randomizedtesting:randomizedtesting-runner:${versions.randomizedrunner}"
testImplementation "junit:junit:${versions.junit}"
testImplementation "org.hamcrest:hamcrest:${versions.hamcrest}"
testImplementation(project(":test:framework")) {
exclude group: 'org.opensearch', module: 'opensearch-arrow-spi'
}
}

tasks.named('forbiddenApisMain').configure {
replaceSignatureFiles 'jdk-signatures'
}

tasks.named('thirdPartyAudit').configure {
ignoreMissingClasses(
// Logging frameworks
'org.apache.commons.logging.Log',
'org.apache.commons.logging.LogFactory',
'org.apache.log4j.Level',
'org.apache.log4j.Logger',
'org.slf4j.impl.StaticLoggerBinder',
'org.slf4j.impl.StaticMDCBinder',
'org.slf4j.impl.StaticMarkerBinder',
// SnakeYAML classes
'org.yaml.snakeyaml.DumperOptions',
'org.yaml.snakeyaml.DumperOptions$FlowStyle',
'org.yaml.snakeyaml.DumperOptions$LineBreak',
'org.yaml.snakeyaml.DumperOptions$ScalarStyle',
'org.yaml.snakeyaml.DumperOptions$Version',
'org.yaml.snakeyaml.LoaderOptions',
'org.yaml.snakeyaml.emitter.Emitter',
'org.yaml.snakeyaml.error.Mark',
'org.yaml.snakeyaml.error.MarkedYAMLException',
'org.yaml.snakeyaml.error.YAMLException',
'org.yaml.snakeyaml.events.AliasEvent',
'org.yaml.snakeyaml.events.CollectionStartEvent',
'org.yaml.snakeyaml.events.Event',
'org.yaml.snakeyaml.events.Event$ID',
'org.yaml.snakeyaml.events.ImplicitTuple',
'org.yaml.snakeyaml.events.MappingStartEvent',
'org.yaml.snakeyaml.events.NodeEvent',
'org.yaml.snakeyaml.events.ScalarEvent',
'org.yaml.snakeyaml.nodes.NodeId',
'org.yaml.snakeyaml.nodes.Tag',
'org.yaml.snakeyaml.parser.ParserImpl',
'org.yaml.snakeyaml.resolver.Resolver',

// Reactor BlockHound
'reactor.blockhound.BlockHound$Builder',
'reactor.blockhound.integration.BlockHoundIntegration'
)

ignoreViolations(
"io.netty.util.internal.PlatformDependent0",
"io.netty.util.internal.PlatformDependent0\$1",
"io.netty.util.internal.PlatformDependent0\$2",
"io.netty.util.internal.PlatformDependent0\$3",
"io.netty.util.internal.PlatformDependent0\$4",
"io.netty.util.internal.PlatformDependent0\$6",
"io.netty.util.internal.shaded.org.jctools.queues.BaseLinkedQueueConsumerNodeRef",
"io.netty.util.internal.shaded.org.jctools.queues.BaseLinkedQueueProducerNodeRef",
"io.netty.util.internal.shaded.org.jctools.queues.BaseMpscLinkedArrayQueueColdProducerFields",
"io.netty.util.internal.shaded.org.jctools.queues.BaseMpscLinkedArrayQueueConsumerFields",
"io.netty.util.internal.shaded.org.jctools.queues.BaseMpscLinkedArrayQueueProducerFields",
"io.netty.util.internal.shaded.org.jctools.queues.LinkedQueueNode",
"io.netty.util.internal.shaded.org.jctools.queues.MpmcArrayQueueConsumerIndexField",
"io.netty.util.internal.shaded.org.jctools.queues.MpmcArrayQueueProducerIndexField",
"io.netty.util.internal.shaded.org.jctools.queues.MpscArrayQueueConsumerIndexField",
"io.netty.util.internal.shaded.org.jctools.queues.MpscArrayQueueProducerIndexField",
"io.netty.util.internal.shaded.org.jctools.queues.MpscArrayQueueProducerLimitField",
"io.netty.util.internal.shaded.org.jctools.util.UnsafeAccess",
"io.netty.util.internal.shaded.org.jctools.util.UnsafeLongArrayAccess",
"io.netty.util.internal.shaded.org.jctools.util.UnsafeRefArrayAccess",
"io.netty.util.internal.shaded.org.jctools.queues.unpadded.MpscUnpaddedArrayQueueConsumerIndexField",
"io.netty.util.internal.shaded.org.jctools.queues.unpadded.MpscUnpaddedArrayQueueProducerIndexField",
"io.netty.util.internal.shaded.org.jctools.queues.unpadded.MpscUnpaddedArrayQueueProducerLimitField",
"org.apache.arrow.memory.ArrowBuf",
"org.apache.arrow.memory.util.ByteFunctionHelpers",
"org.apache.arrow.memory.util.MemoryUtil",
"org.apache.arrow.memory.util.MemoryUtil\$1",
"org.apache.arrow.memory.util.hash.MurmurHasher",
"org.apache.arrow.memory.util.hash.SimpleHasher",
"org.apache.arrow.vector.BaseFixedWidthVector",
"org.apache.arrow.vector.BitVectorHelper",
"org.apache.arrow.vector.Decimal256Vector",
"org.apache.arrow.vector.DecimalVector",
"org.apache.arrow.vector.util.DecimalUtility",
"org.apache.arrow.vector.util.VectorAppender"
)
}
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,7 @@
* compatible open source license.
*/

/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.arrow;
package org.opensearch.arrow.spi;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.core.tasks.TaskId;
Expand Down Expand Up @@ -54,7 +46,7 @@ public interface StreamManager extends AutoCloseable {
* @throws IllegalArgumentException if the ticket is invalid
* @throws IllegalStateException if the stream has been cancelled or closed
*/
StreamIterator getStreamIterator(StreamTicket ticket);
StreamReader getStreamIterator(StreamTicket ticket);

/**
* Generates a unique ticket identifier for stream registration.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* compatible open source license.
*/

package org.opensearch.arrow;
package org.opensearch.arrow.spi;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
Expand All @@ -19,7 +19,7 @@
* Represents a producer of Arrow streams. The producer first needs to define the job by implementing this interface and
* then register the job with the {@link StreamManager#registerStream(StreamProducer, TaskId)}, which will return {@link StreamTicket}
* which can be distributed to the consumer. The consumer can then use the ticket to retrieve the stream using
* {@link StreamManager#getStreamIterator(StreamTicket)} and then consume the stream using {@link StreamIterator}.
* {@link StreamManager#getStreamIterator(StreamTicket)} and then consume the stream using {@link StreamReader}.
* <p>
* BatchedJob supports streaming of intermediate results, allowing consumers to begin processing data before the entire
* result set is generated. This is particularly useful for memory-intensive operations or when dealing with large datasets
Expand Down Expand Up @@ -74,7 +74,7 @@
*
* @see StreamManager
* @see StreamTicket
* @see StreamIterator
* @see StreamReader
*/
@ExperimentalApi
public interface StreamProducer extends Closeable {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.arrow.spi;

import org.apache.arrow.vector.VectorSchemaRoot;
import org.opensearch.common.annotation.ExperimentalApi;

import java.io.Closeable;

/**
* StreamReader is a stateful iterator that can be used to read data from a stream.
* It is used to read data from a stream in batches. The iterator will return a
* VectorSchemaRoot that contains the data for the current batch. The iterator will
* return true if there is more data to read, false if the stream is exhausted.
* Example usage:
* <pre>{@code
* // producer
* StreamProducer producer = new QueryStreamProducer(searchRequest);
* StreamTicket ticket = streamManager.registerStream(producer, taskId);
*
* // consumer
* StreamIterator iterator = streamManager.getStreamIterator(ticket);
* try (VectorSchemaRoot root = iterator.getRoot()) {
* while (iterator.next()) {
* VarCharVector idVector = (VarCharVector)root.getVector("id");
* Float8Vector scoreVector = (Float8Vector) root.getVector("score");
* }
* }
* }</pre>
*
* @see StreamProducer
*/
@ExperimentalApi
public interface StreamReader extends Closeable {

/**
* Blocking request to load next batch into root.
*
* @return true if more data was found, false if the stream is exhausted
*/
boolean next();

/**
* Returns the VectorSchemaRoot associated with this iterator.
* The content of this root is updated with each successful call to next().
*
* @return the VectorSchemaRoot
*/
VectorSchemaRoot getRoot();
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* compatible open source license.
*/

package org.opensearch.arrow;
package org.opensearch.arrow.spi;

import org.opensearch.common.annotation.ExperimentalApi;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
/**
* Contains Apache Arrow related classes and Stream generic interfaces
*/
package org.opensearch.arrow;
package org.opensearch.arrow.spi;
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* compatible open source license.
*/

package org.opensearch.arrow;
package org.opensearch.arrow.spi;

import org.opensearch.test.OpenSearchTestCase;

Expand Down
Loading

0 comments on commit a4d3d99

Please sign in to comment.