From cccfa0527d5a2ec96462c57dfe3b9aca02d7b6a6 Mon Sep 17 00:00:00 2001 From: Rishabh Maurya Date: Tue, 26 Nov 2024 11:38:42 -0800 Subject: [PATCH] introduce factory for stream ticket Signed-off-by: Rishabh Maurya --- .../opensearch/arrow/spi/StreamManager.java | 8 +++++ .../opensearch/arrow/spi/StreamProducer.java | 8 ++--- .../opensearch/arrow/spi/StreamTicket.java | 11 ------ .../arrow/spi/StreamTicketFactory.java | 35 +++++++++++++++++++ 4 files changed, 45 insertions(+), 17 deletions(-) create mode 100644 libs/arrow-spi/src/main/java/org/opensearch/arrow/spi/StreamTicketFactory.java diff --git a/libs/arrow-spi/src/main/java/org/opensearch/arrow/spi/StreamManager.java b/libs/arrow-spi/src/main/java/org/opensearch/arrow/spi/StreamManager.java index c1043e07b176f..a94dd8cd9caed 100644 --- a/libs/arrow-spi/src/main/java/org/opensearch/arrow/spi/StreamManager.java +++ b/libs/arrow-spi/src/main/java/org/opensearch/arrow/spi/StreamManager.java @@ -47,4 +47,12 @@ public interface StreamManager extends AutoCloseable { * @throws IllegalStateException if the stream has been cancelled or closed */ StreamReader getStreamReader(StreamTicket ticket); + + /** + * Gets the StreamTicketFactory instance associated with this StreamManager. + * By default, returns the singleton instance of StreamTicketFactory. + * + * @return the StreamTicketFactory instance + */ + StreamTicketFactory getStreamTicketFactory(); } diff --git a/libs/arrow-spi/src/main/java/org/opensearch/arrow/spi/StreamProducer.java b/libs/arrow-spi/src/main/java/org/opensearch/arrow/spi/StreamProducer.java index 6dd443a2595cb..c5cd6f16adfdd 100644 --- a/libs/arrow-spi/src/main/java/org/opensearch/arrow/spi/StreamProducer.java +++ b/libs/arrow-spi/src/main/java/org/opensearch/arrow/spi/StreamProducer.java @@ -102,17 +102,13 @@ public interface StreamProducer extends Closeable { * * @return Estimated number of rows, or -1 if unknown */ - default int estimatedRowCount() { - return -1; - } + int estimatedRowCount(); /** * Task action name * @return action name */ - default String getAction() { - return ""; - } + String getAction(); /** * BatchedJob interface for producing stream data in batches. diff --git a/libs/arrow-spi/src/main/java/org/opensearch/arrow/spi/StreamTicket.java b/libs/arrow-spi/src/main/java/org/opensearch/arrow/spi/StreamTicket.java index fe463ffdc4e3d..6d823f5773b1e 100644 --- a/libs/arrow-spi/src/main/java/org/opensearch/arrow/spi/StreamTicket.java +++ b/libs/arrow-spi/src/main/java/org/opensearch/arrow/spi/StreamTicket.java @@ -37,15 +37,4 @@ public interface StreamTicket { * @return Base64 encoded byte array containing the ticket information */ byte[] toBytes(); - - /** - * Creates a StreamTicket from its serialized byte representation. - * - * @param bytes Base64 encoded byte array containing ticket information - * @return a new StreamTicket instance - * @throws IllegalArgumentException if the input is invalid - */ - static StreamTicket fromBytes(byte[] bytes) { - throw new UnsupportedOperationException("Implementation must be provided by concrete class"); - } } diff --git a/libs/arrow-spi/src/main/java/org/opensearch/arrow/spi/StreamTicketFactory.java b/libs/arrow-spi/src/main/java/org/opensearch/arrow/spi/StreamTicketFactory.java new file mode 100644 index 0000000000000..d587136c711e6 --- /dev/null +++ b/libs/arrow-spi/src/main/java/org/opensearch/arrow/spi/StreamTicketFactory.java @@ -0,0 +1,35 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.arrow.spi; + +import org.opensearch.common.annotation.ExperimentalApi; + +/** + * Factory interface for creating and managing StreamTicket instances. + * This factory provides methods to create and deserialize StreamTickets, + * ensuring consistent ticket creation. + */ +@ExperimentalApi +public interface StreamTicketFactory { + /** + * Generates a new StreamTicket + * + * @return A new StreamTicket instance + */ + StreamTicket generateTicket(); + + /** + * Deserializes a StreamTicket from its byte representation. + * + * @param bytes The byte array containing the serialized ticket data + * @return A StreamTicket instance reconstructed from the byte array + * @throws IllegalArgumentException if bytes is null or invalid + */ + StreamTicket fromBytes(byte[] bytes); +}