From acee6db4b9c4c9bbc39d28d4a45d04afa1c0418f Mon Sep 17 00:00:00 2001 From: Rishabh Maurya Date: Tue, 17 Dec 2024 19:36:24 -0800 Subject: [PATCH] interim changes --- modules/arrow-flight-rpc/build.gradle | 2 + .../arrow/flight/ArrowFlightServerIT.java | 50 ++++++ .../arrow/flight/BaseFlightStreamPlugin.java | 10 +- .../arrow/flight/FlightStreamPlugin.java | 17 +- .../arrow/flight/bootstrap/FlightService.java | 110 ++++++++---- .../bootstrap/FlightStreamPluginImpl.java | 10 +- .../arrow/flight/bootstrap/Utils.java | 156 ++++++++++++++++++ .../bootstrap/client/FlightClientManager.java | 9 +- .../bootstrap/server/FlightServerBuilder.java | 40 +++-- .../flight/bootstrap/server/ServerConfig.java | 10 +- .../arrow/flight/FlightStreamPluginTests.java | 3 +- .../server/FlightServerBuilderTests.java | 13 +- .../arrow/spi/StreamManagerWrapper.java | 13 +- .../common/settings/FeatureFlagSettings.java | 5 +- .../opensearch/common/util/FeatureFlags.java | 2 +- .../main/java/org/opensearch/node/Node.java | 7 +- .../plugins/StreamManagerPlugin.java | 4 +- .../opensearch/test/InternalTestCluster.java | 5 +- .../test/OpenSearchIntegTestCase.java | 2 +- .../opensearch/test/OpenSearchTestCase.java | 12 +- 20 files changed, 393 insertions(+), 87 deletions(-) create mode 100644 modules/arrow-flight-rpc/src/internalClusterTest/java/org/opensearch/arrow/flight/ArrowFlightServerIT.java create mode 100644 modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/Utils.java diff --git a/modules/arrow-flight-rpc/build.gradle b/modules/arrow-flight-rpc/build.gradle index 0d6b272608479..6390221992903 100644 --- a/modules/arrow-flight-rpc/build.gradle +++ b/modules/arrow-flight-rpc/build.gradle @@ -10,6 +10,7 @@ */ apply plugin: 'opensearch.publish' +apply plugin: 'opensearch.internal-cluster-test' opensearchplugin { description 'Arrow flight based Stream implementation' @@ -19,6 +20,7 @@ opensearchplugin { dependencies { implementation project(':libs:opensearch-arrow-spi') implementation project(':modules:transport-netty4') + implementation "io.netty:netty-transport-native-epoll:${versions.netty}" implementation 'org.checkerframework:checker-qual:3.44.0' implementation "org.apache.arrow:arrow-memory-core:${versions.arrow}" diff --git a/modules/arrow-flight-rpc/src/internalClusterTest/java/org/opensearch/arrow/flight/ArrowFlightServerIT.java b/modules/arrow-flight-rpc/src/internalClusterTest/java/org/opensearch/arrow/flight/ArrowFlightServerIT.java new file mode 100644 index 0000000000000..c9cc7fcbf1356 --- /dev/null +++ b/modules/arrow-flight-rpc/src/internalClusterTest/java/org/opensearch/arrow/flight/ArrowFlightServerIT.java @@ -0,0 +1,50 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.arrow.flight; + +import org.apache.arrow.flight.Action; +import org.apache.arrow.flight.OpenSearchFlightClient; +import org.apache.arrow.flight.Result; +import org.opensearch.arrow.flight.bootstrap.FlightService; +import org.opensearch.arrow.flight.bootstrap.client.FlightClientManager; +import org.opensearch.plugins.Plugin; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 10) +public class ArrowFlightServerIT extends OpenSearchIntegTestCase { + + private FlightClientManager flightClientManager; + + @Override + protected Collection> nodePlugins() { + return Collections.singleton(FlightStreamPlugin.class); + } + + @Override + public void setUp() throws Exception { + super.setUp(); + ensureGreen(); + FlightService flightService = internalCluster().getInstance(FlightService.class); + flightClientManager = flightService.getFlightClientManager(); + } + + + public void testArrowFlightEndpoint() throws Exception { + Action pingAction = new Action("ping"); + try(OpenSearchFlightClient flightClient = flightClientManager.getFlightClient(flightClientManager.getLocalNodeId())) { + assertNotNull(flightClient); + Iterator results = flightClient.doAction(pingAction); + } + } + +} diff --git a/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/BaseFlightStreamPlugin.java b/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/BaseFlightStreamPlugin.java index 79d9e6bd2daee..de002e229d8d0 100644 --- a/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/BaseFlightStreamPlugin.java +++ b/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/BaseFlightStreamPlugin.java @@ -8,6 +8,7 @@ package org.opensearch.arrow.flight; +import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.arrow.spi.StreamManager; import org.opensearch.client.Client; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; @@ -21,6 +22,7 @@ import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; +import org.opensearch.plugins.ClusterPlugin; import org.opensearch.plugins.NetworkPlugin; import org.opensearch.plugins.Plugin; import org.opensearch.plugins.SecureTransportSettingsProvider; @@ -31,6 +33,7 @@ import org.opensearch.threadpool.ExecutorBuilder; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.Transport; +import org.opensearch.transport.TransportService; import org.opensearch.watcher.ResourceWatcherService; import java.util.Collection; @@ -42,7 +45,7 @@ * BaseFlightStreamPlugin is a plugin that implements the StreamManagerPlugin interface. * It provides the necessary components for handling flight streams in the OpenSearch cluster. */ -public abstract class BaseFlightStreamPlugin extends Plugin implements StreamManagerPlugin, NetworkPlugin { +public abstract class BaseFlightStreamPlugin extends Plugin implements StreamManagerPlugin, NetworkPlugin, ClusterPlugin { /** * Constructor for BaseFlightStreamPlugin. @@ -109,7 +112,7 @@ public abstract Map> getSecureTransports( * Returns the StreamManager instance for managing flight streams. */ @Override - public abstract StreamManager getStreamManager(); + public abstract Supplier getStreamManager(); /** * Returns a list of ExecutorBuilder instances for building thread pools used for FlightServer @@ -123,4 +126,7 @@ public abstract Map> getSecureTransports( */ @Override public abstract List> getSettings(); + + @Override + public abstract void onNodeStarted(DiscoveryNode localNode); } diff --git a/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/FlightStreamPlugin.java b/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/FlightStreamPlugin.java index ba8020469ff79..fa68dad7ef927 100644 --- a/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/FlightStreamPlugin.java +++ b/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/FlightStreamPlugin.java @@ -12,6 +12,7 @@ import org.opensearch.arrow.spi.StreamManager; import org.opensearch.client.Client; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.network.NetworkService; import org.opensearch.common.settings.Setting; @@ -88,8 +89,8 @@ public Map> getSecureTransports( } @Override - public StreamManager getStreamManager() { - return null; + public Supplier getStreamManager() { + return () -> null; } @Override @@ -101,6 +102,11 @@ public List> getExecutorBuilders(Settings settings) { public List> getSettings() { return List.of(); } + + @Override + public void onNodeStarted(DiscoveryNode localNode) { + + } }; } } @@ -188,7 +194,7 @@ public Map> getSecureTransports( * Gets the StreamManager instance for managing flight streams. */ @Override - public StreamManager getStreamManager() { + public Supplier getStreamManager() { return delegate.getStreamManager(); } @@ -208,4 +214,9 @@ public List> getExecutorBuilders(Settings settings) { public List> getSettings() { return delegate.getSettings(); } + + @Override + public void onNodeStarted(DiscoveryNode localNode) { + delegate.onNodeStarted(localNode); + } } diff --git a/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/FlightService.java b/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/FlightService.java index fcde449cdfda5..992ffd2070f6e 100644 --- a/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/FlightService.java +++ b/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/FlightService.java @@ -8,13 +8,18 @@ package org.opensearch.arrow.flight.bootstrap; +import io.netty.channel.EventLoopGroup; +import io.netty.util.concurrent.Future; import org.apache.arrow.flight.FlightProducer; +import org.apache.arrow.flight.Location; import org.apache.arrow.flight.OpenSearchFlightServer; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.util.AutoCloseables; import org.apache.arrow.util.VisibleForTesting; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.arrow.flight.bootstrap.client.FlightClientManager; import org.opensearch.arrow.flight.bootstrap.server.FlightServerBuilder; import org.opensearch.arrow.flight.bootstrap.server.ServerConfig; @@ -24,6 +29,8 @@ import org.opensearch.arrow.flight.core.BaseFlightProducer; import org.opensearch.arrow.flight.core.FlightStreamManager; import org.opensearch.arrow.spi.StreamManager; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.SetOnce; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; @@ -33,9 +40,13 @@ import java.io.IOException; import java.security.AccessController; -import java.security.PrivilegedActionException; import java.security.PrivilegedExceptionAction; import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + /** * FlightService manages the Arrow Flight server and client for OpenSearch. @@ -48,12 +59,15 @@ public class FlightService extends AbstractLifecycleComponent { private static OpenSearchFlightServer server; private static BufferAllocator allocator; - private static FlightStreamManager streamManager; + private static Supplier streamManager; private static FlightClientManager clientManager; private final SetOnce threadPool = new SetOnce<>(); + private final SetOnce clusterService = new SetOnce<>(); + private ExecutorService grpcExecutor; private final SetOnce secureTransportSettingsProvider = new SetOnce<>(); private SslContextProvider sslContextProvider; - + private EventLoopGroup bossELG; + private EventLoopGroup workerELG; /** * Constructor for FlightService. * @param settings The settings for the FlightService. @@ -76,14 +90,8 @@ public FlightService(Settings settings) { * @param threadPool The ThreadPool instance. */ public void initialize(ClusterService clusterService, ThreadPool threadPool) { + this.clusterService.trySet(clusterService); this.threadPool.trySet(Objects.requireNonNull(threadPool)); - if (ServerConfig.isSslEnabled()) { - sslContextProvider = new DefaultSslContextProvider(secureTransportSettingsProvider::get); - } else { - sslContextProvider = new DisabledSslContextProvider(); - } - clientManager = new FlightClientManager(() -> allocator, Objects.requireNonNull(clusterService), sslContextProvider); - streamManager = new FlightStreamManager(() -> allocator, clientManager); } /** @@ -94,53 +102,84 @@ public void setSecureTransportSettingsProvider(SecureTransportSettingsProvider s this.secureTransportSettingsProvider.trySet(secureTransportSettingsProvider); } - /** - * Starts the FlightService by initializing and starting the Arrow Flight server. - */ + @Override protected void doStart() { + DiscoveryNode localNode = Objects.requireNonNull(clusterService.get()).localNode(); + // everything is lazily started in onNodeStart() after TransportService is started + if (isDedicatedClusterManagerNode(localNode)) { + return; + } try { allocator = AccessController.doPrivileged( (PrivilegedExceptionAction) () -> new RootAllocator(Integer.MAX_VALUE) ); + } catch (Exception e) { + throw new RuntimeException("Failed to initialize Arrow Flight server", e); + } - FlightProducer producer = new BaseFlightProducer(clientManager, streamManager, allocator); - FlightServerBuilder builder = new FlightServerBuilder(threadPool.get(), () -> allocator, producer, sslContextProvider); + if (ServerConfig.isSslEnabled()) { + sslContextProvider = new DefaultSslContextProvider(secureTransportSettingsProvider::get); + } else { + sslContextProvider = new DisabledSslContextProvider(); + } + clientManager = new FlightClientManager(() -> allocator, Objects.requireNonNull(clusterService.get()), sslContextProvider); + FlightStreamManager flightStreamManager = new FlightStreamManager(() -> allocator, clientManager); + streamManager = () -> flightStreamManager; + + try { + FlightProducer producer = new BaseFlightProducer(clientManager, flightStreamManager, allocator); + Location serverLocation = ServerConfig.getLocation(localNode.getAddress().getAddress(), Integer.parseInt(localNode.getAttributes().get("transport.stream.port"))); + grpcExecutor = Objects.requireNonNull(threadPool.get()).executor(ServerConfig.FLIGHT_THREAD_POOL_NAME); + FlightServerBuilder builder = new FlightServerBuilder(() -> allocator, producer, sslContextProvider, serverLocation, grpcExecutor); + bossELG = Utils.create("os-grpc-boss-ELG", 1, null); + workerELG = Utils.create("os-grpc-worker-ELG", 0, null); + builder.elg(bossELG, workerELG, Utils.DEFAULT_CLIENT_CHANNEL_TYPE); server = builder.build(); server.start(); - logger.info("Arrow Flight server started successfully:{}", ServerConfig.getServerLocation().getUri().toString()); + logger.info("Arrow Flight server started successfully:{}", serverLocation); } catch (IOException e) { logger.error("Failed to start Arrow Flight server", e); throw new RuntimeException("Failed to start Arrow Flight server", e); - } catch (PrivilegedActionException e) { - throw new RuntimeException(e); } } /** - * Stops the FlightService by closing the Arrow Flight server, client manager, and stream manager. + * Stops the FlightService */ @Override protected void doStop() { try { - server.shutdown(); - streamManager.close(); - clientManager.close(); - server.close(); - logger.info("Arrow Flight service closed successfully"); + AutoCloseables.close(server, clientManager, allocator); + if (grpcExecutor != null) { + grpcExecutor.shutdown(); + } + if (bossELG != null) { + Future shutdownFuture = bossELG.shutdownGracefully(0, 5, TimeUnit.SECONDS); + shutdownFuture.awaitUninterruptibly(); + if (!shutdownFuture.isSuccess()) { + //logger.warn(new ParameterizedMessage("Error closing {} netty event loop group", THREAD_PREFIX), shutdownFuture.cause()); + } + } + if (workerELG != null) { + Future shutdownFuture = workerELG.shutdownGracefully(0, 5, TimeUnit.SECONDS); + shutdownFuture.awaitUninterruptibly(); + if (!shutdownFuture.isSuccess()) { + //logger.warn(new ParameterizedMessage("Error closing {} netty event loop group", THREAD_PREFIX), shutdownFuture.cause()); + } + } + logger.info("Arrow Flight service stopped successfully"); } catch (Exception e) { - logger.error("Error while closing Arrow Flight service", e); + logger.error("Error while stopping Arrow Flight service", e); } } /** - * Closes the BufferAllocator used by the FlightService. + * Closes the FlightService by closing the Arrow Flight server, client manager, and stream manager. */ @Override protected void doClose() { - if (allocator != null) { - allocator.close(); - } + doStop(); } /** @@ -151,11 +190,22 @@ public FlightClientManager getFlightClientManager() { return clientManager; } + public void onNodeStart(DiscoveryNode localNode) { + + } + + private boolean isDedicatedClusterManagerNode(DiscoveryNode node) { + Set nodeRoles = node.getRoles(); + return nodeRoles.size() == 1 && + (nodeRoles.contains(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE) || + nodeRoles.contains(DiscoveryNodeRole.MASTER_ROLE)); + } + /** * Retrieves the StreamManager used by the FlightService. * @return The StreamManager instance. */ - public StreamManager getStreamManager() { + public Supplier getStreamManager() { return streamManager; } diff --git a/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/FlightStreamPluginImpl.java b/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/FlightStreamPluginImpl.java index f86e3cf0c5000..b7e030872fa73 100644 --- a/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/FlightStreamPluginImpl.java +++ b/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/FlightStreamPluginImpl.java @@ -13,6 +13,7 @@ import org.opensearch.arrow.spi.StreamManager; import org.opensearch.client.Client; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.network.NetworkService; import org.opensearch.common.settings.Setting; @@ -30,6 +31,7 @@ import org.opensearch.threadpool.ExecutorBuilder; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.Transport; +import org.opensearch.transport.TransportService; import org.opensearch.watcher.ResourceWatcherService; import java.util.Collection; @@ -109,15 +111,21 @@ public Map> getSecureTransports( SecureTransportSettingsProvider secureTransportSettingsProvider, Tracer tracer ) { + flightService.setSecureTransportSettingsProvider(secureTransportSettingsProvider); return Collections.emptyMap(); } + @Override + public void onNodeStarted(DiscoveryNode localNode) { + flightService.onNodeStart(localNode); + } + /** * Gets the StreamManager instance for managing flight streams. */ @Override - public StreamManager getStreamManager() { + public Supplier getStreamManager() { return flightService.getStreamManager(); } diff --git a/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/Utils.java b/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/Utils.java new file mode 100644 index 0000000000000..d58e031bc99ac --- /dev/null +++ b/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/Utils.java @@ -0,0 +1,156 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.arrow.flight.bootstrap; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelFactory; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.ReflectiveChannelFactory; +import io.netty.channel.ServerChannel; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import org.apache.arrow.util.Preconditions; + +import java.lang.reflect.Constructor; +import java.util.concurrent.ThreadFactory; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class Utils { + private static final Logger logger = Logger.getLogger(Utils.class.getName()); + public static final ChannelFactory DEFAULT_SERVER_CHANNEL_FACTORY; + public static final Class DEFAULT_CLIENT_CHANNEL_TYPE; + public static final Class EPOLL_DOMAIN_CLIENT_CHANNEL_TYPE; + private static final Constructor EPOLL_EVENT_LOOP_GROUP_CONSTRUCTOR; + + private static boolean isEpollAvailable() { + try { + return (Boolean)Class.forName("io.netty.channel.epoll.Epoll").getDeclaredMethod("isAvailable").invoke((Object)null); + } catch (ClassNotFoundException var1) { + return false; + } catch (Exception var2) { + Exception e = var2; + throw new RuntimeException("Exception while checking Epoll availability", e); + } + } + + private static Throwable getEpollUnavailabilityCause() { + try { + return (Throwable)Class.forName("io.netty.channel.epoll.Epoll").getDeclaredMethod("unavailabilityCause").invoke((Object)null); + } catch (Exception var1) { + Exception e = var1; + return e; + } + } + + private static Class epollChannelType() { + try { + Class channelType = Class.forName("io.netty.channel.epoll.EpollSocketChannel").asSubclass(Channel.class); + return channelType; + } catch (ClassNotFoundException var1) { + ClassNotFoundException e = var1; + throw new RuntimeException("Cannot load EpollSocketChannel", e); + } + } + + private static Class epollDomainSocketChannelType() { + try { + Class channelType = Class.forName("io.netty.channel.epoll.EpollDomainSocketChannel").asSubclass(Channel.class); + return channelType; + } catch (ClassNotFoundException var1) { + ClassNotFoundException e = var1; + throw new RuntimeException("Cannot load EpollDomainSocketChannel", e); + } + } + + private static Constructor epollEventLoopGroupConstructor() { + try { + return Class.forName("io.netty.channel.epoll.EpollEventLoopGroup").asSubclass(EventLoopGroup.class).getConstructor(Integer.TYPE, ThreadFactory.class); + } catch (ClassNotFoundException var1) { + ClassNotFoundException e = var1; + throw new RuntimeException("Cannot load EpollEventLoopGroup", e); + } catch (NoSuchMethodException var2) { + NoSuchMethodException e = var2; + throw new RuntimeException("EpollEventLoopGroup constructor not found", e); + } + } + + private static Class epollServerChannelType() { + try { + Class serverSocketChannel = Class.forName("io.netty.channel.epoll.EpollServerSocketChannel").asSubclass(ServerChannel.class); + return serverSocketChannel; + } catch (ClassNotFoundException var1) { + ClassNotFoundException e = var1; + throw new RuntimeException("Cannot load EpollServerSocketChannel", e); + } + } + + private static EventLoopGroup createEpollEventLoopGroup(int parallelism, ThreadFactory threadFactory) { + Preconditions.checkState(EPOLL_EVENT_LOOP_GROUP_CONSTRUCTOR != null, "Epoll is not available"); + try { + return (EventLoopGroup)EPOLL_EVENT_LOOP_GROUP_CONSTRUCTOR.newInstance(parallelism, threadFactory); + } catch (Exception var3) { + Exception e = var3; + throw new RuntimeException("Cannot create Epoll EventLoopGroup", e); + } + } + + private static ChannelFactory nioServerChannelFactory() { + return new ChannelFactory() { + public ServerChannel newChannel() { + return new NioServerSocketChannel(); + } + }; + } + + private static ChannelOption getEpollChannelOption(String optionName) { + if (isEpollAvailable()) { + try { + return (ChannelOption)Class.forName("io.netty.channel.epoll.EpollChannelOption").getField(optionName).get((Object)null); + } catch (Exception var2) { + Exception e = var2; + throw new RuntimeException("ChannelOption(" + optionName + ") is not available", e); + } + } else { + return null; + } + } + + + private Utils() { + } + + static { + if (isEpollAvailable()) { + DEFAULT_CLIENT_CHANNEL_TYPE = epollChannelType(); + EPOLL_DOMAIN_CLIENT_CHANNEL_TYPE = epollDomainSocketChannelType(); + DEFAULT_SERVER_CHANNEL_FACTORY = new ReflectiveChannelFactory(epollServerChannelType()); + EPOLL_EVENT_LOOP_GROUP_CONSTRUCTOR = epollEventLoopGroupConstructor(); + + } else { + logger.log(Level.FINE, "Epoll is not available, using Nio.", getEpollUnavailabilityCause()); + DEFAULT_SERVER_CHANNEL_FACTORY = nioServerChannelFactory(); + DEFAULT_CLIENT_CHANNEL_TYPE = NioSocketChannel.class; + EPOLL_DOMAIN_CLIENT_CHANNEL_TYPE = null; + EPOLL_EVENT_LOOP_GROUP_CONSTRUCTOR = null; + } + } + + public static EventLoopGroup create(String name, int numEventLoops, ThreadFactory threadFactory) { + // ThreadFactory threadFactory = new DefaultThreadFactory(name, true); + if (isEpollAvailable()) { + return createEpollEventLoopGroup(numEventLoops, threadFactory); + } else { + return new NioEventLoopGroup(numEventLoops, threadFactory); + } + } +} + diff --git a/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/client/FlightClientManager.java b/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/client/FlightClientManager.java index 58b27189cba75..53f23d6a599f2 100644 --- a/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/client/FlightClientManager.java +++ b/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/client/FlightClientManager.java @@ -20,6 +20,7 @@ import org.opensearch.cluster.ClusterStateListener; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.util.FeatureFlags; import java.util.Map; import java.util.Objects; @@ -27,6 +28,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.function.Supplier; +import static org.opensearch.common.util.FeatureFlags.ARROW_STREAMS_SETTING; + /** * Manages Flight client connections to OpenSearch nodes in a cluster. * This class maintains a pool of Flight clients for internode communication, @@ -118,15 +121,13 @@ private FlightClientHolder buildFlightClient(String nodeId) { if (node.getVersion().before(minVersion)) { return null; } - - String arrowStreamsEnabled = node.getAttributes().get("arrow.streams.enabled"); - if (!"true".equals(arrowStreamsEnabled)) { + if (!FeatureFlags.isEnabled(ARROW_STREAMS_SETTING)) { return null; } String clientPort = node.getAttributes().get("transport.stream.port"); FlightClientBuilder builder = new FlightClientBuilder( - node.getHostAddress(), + node.getAddress().getAddress(), Integer.parseInt(clientPort), allocator.get(), sslContextProvider diff --git a/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/server/FlightServerBuilder.java b/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/server/FlightServerBuilder.java index f8fde465febdc..c70757579240b 100644 --- a/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/server/FlightServerBuilder.java +++ b/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/server/FlightServerBuilder.java @@ -8,18 +8,19 @@ package org.opensearch.arrow.flight.bootstrap.server; +import io.netty.channel.Channel; +import io.netty.channel.EventLoopGroup; import org.apache.arrow.flight.FlightProducer; import org.apache.arrow.flight.Location; import org.apache.arrow.flight.OpenSearchFlightServer; import org.apache.arrow.memory.BufferAllocator; import org.opensearch.arrow.flight.bootstrap.tls.SslContextProvider; -import org.opensearch.threadpool.ThreadPool; +import org.opensearch.arrow.flight.bootstrap.Utils; import java.io.IOException; import java.util.concurrent.ExecutorService; import java.util.function.Supplier; -import static org.opensearch.arrow.flight.bootstrap.server.ServerConfig.FLIGHT_THREAD_POOL_NAME; /** * Builder class for creating and configuring OpenSearch Flight server instances. @@ -27,29 +28,26 @@ * buffer allocation, producer configuration, and SSL/TLS settings. */ public class FlightServerBuilder { - private final ThreadPool threadPool; - private final Supplier allocator; - private final FlightProducer producer; - private final SslContextProvider sslContextProvider; - + private final OpenSearchFlightServer.Builder builder; /** * Creates a new FlightServerBuilder instance with the specified configurations. * - * @param threadPool The thread pool used for handling Flight server operations * @param allocator Supplier for Arrow buffer allocation * @param producer The Flight producer that handles incoming requests * @param sslContextProvider Provider for SSL/TLS context configuration */ public FlightServerBuilder( - ThreadPool threadPool, Supplier allocator, FlightProducer producer, - SslContextProvider sslContextProvider + SslContextProvider sslContextProvider, + Location location, + ExecutorService executorService ) { - this.threadPool = threadPool; - this.allocator = allocator; - this.producer = producer; - this.sslContextProvider = sslContextProvider; + this.builder = OpenSearchFlightServer.builder(allocator.get(), location, producer); + builder.executor(executorService); + if (sslContextProvider.isSslEnabled()) { + builder.useTls(sslContextProvider.getServerSslContext()); + } } /** @@ -57,13 +55,13 @@ public FlightServerBuilder( * @return A configured OpenSearchFlightServer instance */ public OpenSearchFlightServer build() throws IOException { - final Location location = ServerConfig.getServerLocation(); - ExecutorService executorService = threadPool.executor(FLIGHT_THREAD_POOL_NAME); - OpenSearchFlightServer.Builder builder = OpenSearchFlightServer.builder(allocator.get(), location, producer); - builder.executor(executorService); - if (sslContextProvider.isSslEnabled()) { - builder.useTls(sslContextProvider.getServerSslContext()); - } return builder.build(); } + + public FlightServerBuilder elg(EventLoopGroup bossELG, EventLoopGroup workerELG, Class channelType) { + builder.transportHint("netty.channelType", Utils.DEFAULT_CLIENT_CHANNEL_TYPE); + builder.transportHint("netty.bossEventLoopGroup", bossELG); + builder.transportHint("netty.workerEventLoopGroup", workerELG); + return this; + } } diff --git a/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/server/ServerConfig.java b/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/server/ServerConfig.java index 21a352ee83155..fd77114b91c98 100644 --- a/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/server/ServerConfig.java +++ b/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/server/ServerConfig.java @@ -30,7 +30,7 @@ public class ServerConfig { */ public ServerConfig() {} - static final Setting STREAM_PORT = Setting.intSetting( + public static final Setting STREAM_PORT = Setting.intSetting( "node.attr.transport.stream.port", 9880, 1024, @@ -88,10 +88,10 @@ public ServerConfig() {} Setting.Property.NodeScope ); - static final String FLIGHT_THREAD_POOL_NAME = "flight-server"; + public static final String FLIGHT_THREAD_POOL_NAME = "flight-server"; private static final String host = "localhost"; - private static int port; + public static int port; private static boolean enableSsl; private static ScalingExecutorBuilder executorBuilder; @@ -166,7 +166,7 @@ public static List> getSettings() { }; } - private static Location getLocation(String address, int port) { + public static Location getLocation(String address, int port) { if (enableSsl) { return Location.forGrpcTls(address, port); } @@ -176,7 +176,7 @@ private static Location getLocation(String address, int port) { private static class Netty4Configs { public static final Setting NETTY_ALLOCATOR_NUM_DIRECT_ARENAS = Setting.intSetting( "io.netty.allocator.numDirectArenas", - 1, // TODO - 2 * the number of available processors + 1, // TODO - 2 * the number of available processors; to be confirmed and set after running benchmarks 1, Setting.Property.NodeScope ); diff --git a/modules/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/FlightStreamPluginTests.java b/modules/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/FlightStreamPluginTests.java index 24757e712a1bb..2d538740bd204 100644 --- a/modules/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/FlightStreamPluginTests.java +++ b/modules/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/FlightStreamPluginTests.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.Collection; import java.util.List; +import java.util.function.Supplier; import static org.opensearch.common.util.FeatureFlags.ARROW_STREAMS_SETTING; import static org.mockito.Mockito.mock; @@ -69,7 +70,7 @@ public void testPluginEnableAndDisable() throws IOException { assertNotNull(executorBuilders); assertFalse(executorBuilders.isEmpty()); - StreamManager streamManager = plugin.getStreamManager(); + Supplier streamManager = plugin.getStreamManager(); assertNotNull(streamManager); List> settings = plugin.getSettings(); diff --git a/modules/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/bootstrap/server/FlightServerBuilderTests.java b/modules/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/bootstrap/server/FlightServerBuilderTests.java index d9e7a276d30d4..5b3aeea93c583 100644 --- a/modules/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/bootstrap/server/FlightServerBuilderTests.java +++ b/modules/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/bootstrap/server/FlightServerBuilderTests.java @@ -17,6 +17,8 @@ import org.opensearch.threadpool.ThreadPool; import java.io.IOException; +import java.util.Objects; +import java.util.concurrent.ExecutorService; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -43,7 +45,8 @@ public void tearDown() throws Exception { } public void testBuilderConstructorWithValidInputs() throws IOException { - FlightServerBuilder newBuilder = new FlightServerBuilder(threadPool, () -> allocator, producer, mock(SslContextProvider.class)); + ExecutorService executorService = Objects.requireNonNull(threadPool).executor(ServerConfig.FLIGHT_THREAD_POOL_NAME); + FlightServerBuilder newBuilder = new FlightServerBuilder(() -> allocator, producer, mock(SslContextProvider.class), null, executorService); assertNotNull(newBuilder); assertNotNull(newBuilder.build()); } @@ -51,14 +54,15 @@ public void testBuilderConstructorWithValidInputs() throws IOException { public void testBuilderConstructorWithNullThreadPool() { expectThrows( NullPointerException.class, - () -> (new FlightServerBuilder(null, () -> allocator, producer, mock(SslContextProvider.class))).build() + () -> (new FlightServerBuilder(() -> allocator, producer, mock(SslContextProvider.class), null, null)).build() ); } public void testBuilderConstructorWithNullAllocator() { + ExecutorService executorService = Objects.requireNonNull(threadPool).executor(ServerConfig.FLIGHT_THREAD_POOL_NAME); expectThrows( NullPointerException.class, - () -> (new FlightServerBuilder(threadPool, null, producer, mock(SslContextProvider.class))).build() + () -> (new FlightServerBuilder(null, producer, mock(SslContextProvider.class), null, executorService)).build() ); } @@ -66,7 +70,8 @@ public void testBuilderConstructorWithSslNull() { SslContextProvider sslContextProvider = mock(SslContextProvider.class); when(sslContextProvider.isSslEnabled()).thenReturn(true); when(sslContextProvider.getServerSslContext()).thenReturn(null); - FlightServerBuilder newBuilder = new FlightServerBuilder(threadPool, () -> allocator, producer, sslContextProvider); + ExecutorService executorService = Objects.requireNonNull(threadPool).executor(ServerConfig.FLIGHT_THREAD_POOL_NAME); + FlightServerBuilder newBuilder = new FlightServerBuilder(() -> allocator, producer, sslContextProvider, null, executorService); assertNotNull(newBuilder); expectThrows(NullPointerException.class, newBuilder::build); } diff --git a/server/src/main/java/org/opensearch/arrow/spi/StreamManagerWrapper.java b/server/src/main/java/org/opensearch/arrow/spi/StreamManagerWrapper.java index 4f308d641181e..033dfb0af5468 100644 --- a/server/src/main/java/org/opensearch/arrow/spi/StreamManagerWrapper.java +++ b/server/src/main/java/org/opensearch/arrow/spi/StreamManagerWrapper.java @@ -20,16 +20,17 @@ import org.opensearch.tasks.TaskManager; import java.io.IOException; +import java.util.function.Supplier; /** * Wraps a StreamManager to make it work with the TaskManager. */ public class StreamManagerWrapper implements StreamManager { - private final StreamManager streamManager; + private final Supplier streamManager; private final TaskManager taskManager; - public StreamManagerWrapper(StreamManager streamManager, TaskManager taskManager) { + public StreamManagerWrapper(Supplier streamManager, TaskManager taskManager) { super(); this.streamManager = streamManager; this.taskManager = taskManager; @@ -38,24 +39,24 @@ public StreamManagerWrapper(StreamManager streamManager, TaskManager taskManager @Override public StreamTicket registerStream(StreamProducer producer, TaskId parentTaskId) { StreamProducerTaskWrapper wrappedProducer = new StreamProducerTaskWrapper(producer, taskManager, parentTaskId); - StreamTicket ticket = streamManager.registerStream(wrappedProducer, parentTaskId); + StreamTicket ticket = streamManager.get().registerStream(wrappedProducer, parentTaskId); wrappedProducer.setDescription(ticket.toString()); return ticket; } @Override public StreamReader getStreamReader(StreamTicket ticket) { - return streamManager.getStreamReader(ticket); + return streamManager.get().getStreamReader(ticket); } @Override public StreamTicketFactory getStreamTicketFactory() { - return streamManager.getStreamTicketFactory(); + return streamManager.get().getStreamTicketFactory(); } @Override public void close() throws Exception { - streamManager.close(); + streamManager.get().close(); } static class StreamProducerTaskWrapper implements StreamProducer { diff --git a/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java b/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java index 59d999798868e..173c7378764c1 100644 --- a/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java @@ -39,6 +39,7 @@ protected FeatureFlagSettings( FeatureFlags.STAR_TREE_INDEX_SETTING, FeatureFlags.APPLICATION_BASED_CONFIGURATION_TEMPLATES_SETTING, FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL_SETTING, - FeatureFlags.TERM_VERSION_PRECOMMIT_ENABLE_SETTING - ); + FeatureFlags.TERM_VERSION_PRECOMMIT_ENABLE_SETTING, + FeatureFlags.ARROW_STREAMS_SETTING + ); } diff --git a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java index 4f0462f0b5cdd..4be45aed70023 100644 --- a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java +++ b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java @@ -129,7 +129,7 @@ public class FeatureFlags { ); public static final String ARROW_STREAMS = "opensearch.experimental.feature.arrow.streams.enabled"; - public static final Setting ARROW_STREAMS_SETTING = Setting.boolSetting(ARROW_STREAMS, true, Property.NodeScope); + public static final Setting ARROW_STREAMS_SETTING = Setting.boolSetting(ARROW_STREAMS, false, Property.NodeScope); private static final List> ALL_FEATURE_FLAG_SETTINGS = List.of( REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING, diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 3a04f51517334..16b2c75b419db 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -311,6 +311,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import java.util.function.Supplier; import java.util.function.UnaryOperator; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -1392,9 +1393,10 @@ protected Node( ); } if (!streamManagerPlugins.isEmpty()) { - if (streamManagerPlugins.get(0).getStreamManager() != null) { + Supplier baseStreamManager = streamManagerPlugins.get(0).getStreamManager(); + if (baseStreamManager != null) { streamManager = new StreamManagerWrapper( - streamManagerPlugins.get(0).getStreamManager(), + baseStreamManager, transportService.getTaskManager() ); logger.info("StreamManager initialized"); @@ -1802,6 +1804,7 @@ public void onTimeout(TimeValue timeout) { writePortsFile("http", http.boundAddress()); } + logger.info("started"); pluginsService.filterPlugins(ClusterPlugin.class).forEach(plugin -> plugin.onNodeStarted(clusterService.localNode())); diff --git a/server/src/main/java/org/opensearch/plugins/StreamManagerPlugin.java b/server/src/main/java/org/opensearch/plugins/StreamManagerPlugin.java index 62a3f2327acd7..aefde6b4842d0 100644 --- a/server/src/main/java/org/opensearch/plugins/StreamManagerPlugin.java +++ b/server/src/main/java/org/opensearch/plugins/StreamManagerPlugin.java @@ -10,6 +10,8 @@ import org.opensearch.arrow.spi.StreamManager; +import java.util.function.Supplier; + /** * An interface for OpenSearch plugins to implement to provide a StreamManager. * This interface is used by the Arrow Flight plugin to get the StreamManager instance. @@ -22,5 +24,5 @@ public interface StreamManagerPlugin { * * @return The StreamManager instance */ - StreamManager getStreamManager(); + Supplier getStreamManager(); } diff --git a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java index fa5fb736f518f..68d9530e84972 100644 --- a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java @@ -165,6 +165,7 @@ import static org.opensearch.test.NodeRoles.onlyRoles; import static org.opensearch.test.NodeRoles.removeRoles; import static org.opensearch.test.OpenSearchTestCase.assertBusy; +import static org.opensearch.test.OpenSearchTestCase.getBaseStreamPort; import static org.opensearch.test.OpenSearchTestCase.randomBoolean; import static org.opensearch.test.OpenSearchTestCase.randomFrom; import static org.hamcrest.Matchers.equalTo; @@ -237,6 +238,8 @@ public final class InternalTestCluster extends TestCluster { static final int DEFAULT_MIN_NUM_CLIENT_NODES = 0; static final int DEFAULT_MAX_NUM_CLIENT_NODES = 1; + private static final AtomicInteger FLIGHT_PORT_COUNTER = new AtomicInteger(0); + /* Sorted map to make traverse order reproducible. * The map of nodes is never mutated so individual reads are safe without synchronization. * Updates are intended to follow a copy-on-write approach. */ @@ -755,7 +758,7 @@ private Settings getNodeSettings( final Settings.Builder updatedSettings = Settings.builder(); updatedSettings.put(Environment.PATH_HOME_SETTING.getKey(), baseDir); - + updatedSettings.put("node.attr.transport.stream.port", getBaseStreamPort() + FLIGHT_PORT_COUNTER.getAndIncrement()); if (numDataPaths > 1) { updatedSettings.putList( Environment.PATH_DATA_SETTING.getKey(), diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java index 1ee856d3092f0..383cc8b02aaa6 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java @@ -707,7 +707,7 @@ protected Settings featureFlagSettings() { // Enabling Telemetry setting by default featureSettings.put(FeatureFlags.TELEMETRY_SETTING.getKey(), true); featureSettings.put(FeatureFlags.APPLICATION_BASED_CONFIGURATION_TEMPLATES_SETTING.getKey(), true); - + featureSettings.put(FeatureFlags.ARROW_STREAMS_SETTING.getKey(), true); return featureSettings.build(); } diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java index b180187303a60..c6d215b443545 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java @@ -1768,7 +1768,7 @@ public static String getPortRange() { return getBasePort() + "-" + (getBasePort() + 99); // upper bound is inclusive } - protected static int getBasePort() { + private static int generateBasePort(int start) { // some tests use MockTransportService to do network based testing. Yet, we run tests in multiple JVMs that means // concurrent tests could claim port that another JVM just released and if that test tries to simulate a disconnect it might // be smart enough to re-connect depending on what is tested. To reduce the risk, since this is very hard to debug we use @@ -1792,7 +1792,15 @@ protected static int getBasePort() { startAt = (int) Math.floorMod(workerId - 1, 223L) + 1; } assert startAt >= 0 : "Unexpected test worker Id, resulting port range would be negative"; - return 10300 + (startAt * 100); + return start + (startAt * 100); + } + + protected static int getBaseStreamPort() { + return generateBasePort(9880); + } + + protected static int getBasePort() { + return generateBasePort(10300); } protected static InetAddress randomIp(boolean v4) {