Skip to content

Commit

Permalink
interim changes
Browse files Browse the repository at this point in the history
  • Loading branch information
rishabhmaurya committed Dec 18, 2024
1 parent 7c7437d commit acee6db
Show file tree
Hide file tree
Showing 20 changed files with 393 additions and 87 deletions.
2 changes: 2 additions & 0 deletions modules/arrow-flight-rpc/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
*/

apply plugin: 'opensearch.publish'
apply plugin: 'opensearch.internal-cluster-test'

opensearchplugin {
description 'Arrow flight based Stream implementation'
Expand All @@ -19,6 +20,7 @@ opensearchplugin {
dependencies {
implementation project(':libs:opensearch-arrow-spi')
implementation project(':modules:transport-netty4')
implementation "io.netty:netty-transport-native-epoll:${versions.netty}"

implementation 'org.checkerframework:checker-qual:3.44.0'
implementation "org.apache.arrow:arrow-memory-core:${versions.arrow}"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.arrow.flight;

import org.apache.arrow.flight.Action;
import org.apache.arrow.flight.OpenSearchFlightClient;
import org.apache.arrow.flight.Result;
import org.opensearch.arrow.flight.bootstrap.FlightService;
import org.opensearch.arrow.flight.bootstrap.client.FlightClientManager;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 10)
public class ArrowFlightServerIT extends OpenSearchIntegTestCase {

private FlightClientManager flightClientManager;

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singleton(FlightStreamPlugin.class);
}

@Override
public void setUp() throws Exception {
super.setUp();
ensureGreen();
FlightService flightService = internalCluster().getInstance(FlightService.class);
flightClientManager = flightService.getFlightClientManager();
}


public void testArrowFlightEndpoint() throws Exception {
Action pingAction = new Action("ping");
try(OpenSearchFlightClient flightClient = flightClientManager.getFlightClient(flightClientManager.getLocalNodeId())) {
assertNotNull(flightClient);
Iterator<Result> results = flightClient.doAction(pingAction);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.arrow.flight;

import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.arrow.spi.StreamManager;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
Expand All @@ -21,6 +22,7 @@
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.plugins.ClusterPlugin;
import org.opensearch.plugins.NetworkPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.SecureTransportSettingsProvider;
Expand All @@ -31,6 +33,7 @@
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.Transport;
import org.opensearch.transport.TransportService;
import org.opensearch.watcher.ResourceWatcherService;

import java.util.Collection;
Expand All @@ -42,7 +45,7 @@
* BaseFlightStreamPlugin is a plugin that implements the StreamManagerPlugin interface.
* It provides the necessary components for handling flight streams in the OpenSearch cluster.
*/
public abstract class BaseFlightStreamPlugin extends Plugin implements StreamManagerPlugin, NetworkPlugin {
public abstract class BaseFlightStreamPlugin extends Plugin implements StreamManagerPlugin, NetworkPlugin, ClusterPlugin {

/**
* Constructor for BaseFlightStreamPlugin.
Expand Down Expand Up @@ -109,7 +112,7 @@ public abstract Map<String, Supplier<Transport>> getSecureTransports(
* Returns the StreamManager instance for managing flight streams.
*/
@Override
public abstract StreamManager getStreamManager();
public abstract Supplier<StreamManager> getStreamManager();

/**
* Returns a list of ExecutorBuilder instances for building thread pools used for FlightServer
Expand All @@ -123,4 +126,7 @@ public abstract Map<String, Supplier<Transport>> getSecureTransports(
*/
@Override
public abstract List<Setting<?>> getSettings();

@Override
public abstract void onNodeStarted(DiscoveryNode localNode);
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.arrow.spi.StreamManager;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.network.NetworkService;
import org.opensearch.common.settings.Setting;
Expand Down Expand Up @@ -88,8 +89,8 @@ public Map<String, Supplier<Transport>> getSecureTransports(
}

@Override
public StreamManager getStreamManager() {
return null;
public Supplier<StreamManager> getStreamManager() {
return () -> null;
}

@Override
Expand All @@ -101,6 +102,11 @@ public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
public List<Setting<?>> getSettings() {
return List.of();
}

@Override
public void onNodeStarted(DiscoveryNode localNode) {

}
};
}
}
Expand Down Expand Up @@ -188,7 +194,7 @@ public Map<String, Supplier<Transport>> getSecureTransports(
* Gets the StreamManager instance for managing flight streams.
*/
@Override
public StreamManager getStreamManager() {
public Supplier<StreamManager> getStreamManager() {
return delegate.getStreamManager();
}

Expand All @@ -208,4 +214,9 @@ public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
public List<Setting<?>> getSettings() {
return delegate.getSettings();
}

@Override
public void onNodeStarted(DiscoveryNode localNode) {
delegate.onNodeStarted(localNode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,18 @@

package org.opensearch.arrow.flight.bootstrap;

import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.Future;
import org.apache.arrow.flight.FlightProducer;
import org.apache.arrow.flight.Location;
import org.apache.arrow.flight.OpenSearchFlightServer;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.util.VisibleForTesting;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.arrow.flight.bootstrap.client.FlightClientManager;
import org.opensearch.arrow.flight.bootstrap.server.FlightServerBuilder;
import org.opensearch.arrow.flight.bootstrap.server.ServerConfig;
Expand All @@ -24,6 +29,8 @@
import org.opensearch.arrow.flight.core.BaseFlightProducer;
import org.opensearch.arrow.flight.core.FlightStreamManager;
import org.opensearch.arrow.spi.StreamManager;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.SetOnce;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
Expand All @@ -33,9 +40,13 @@

import java.io.IOException;
import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;


/**
* FlightService manages the Arrow Flight server and client for OpenSearch.
Expand All @@ -48,12 +59,15 @@ public class FlightService extends AbstractLifecycleComponent {

private static OpenSearchFlightServer server;
private static BufferAllocator allocator;
private static FlightStreamManager streamManager;
private static Supplier<StreamManager> streamManager;
private static FlightClientManager clientManager;
private final SetOnce<ThreadPool> threadPool = new SetOnce<>();
private final SetOnce<ClusterService> clusterService = new SetOnce<>();
private ExecutorService grpcExecutor;
private final SetOnce<SecureTransportSettingsProvider> secureTransportSettingsProvider = new SetOnce<>();
private SslContextProvider sslContextProvider;

private EventLoopGroup bossELG;
private EventLoopGroup workerELG;
/**
* Constructor for FlightService.
* @param settings The settings for the FlightService.
Expand All @@ -76,14 +90,8 @@ public FlightService(Settings settings) {
* @param threadPool The ThreadPool instance.
*/
public void initialize(ClusterService clusterService, ThreadPool threadPool) {
this.clusterService.trySet(clusterService);
this.threadPool.trySet(Objects.requireNonNull(threadPool));
if (ServerConfig.isSslEnabled()) {
sslContextProvider = new DefaultSslContextProvider(secureTransportSettingsProvider::get);
} else {
sslContextProvider = new DisabledSslContextProvider();
}
clientManager = new FlightClientManager(() -> allocator, Objects.requireNonNull(clusterService), sslContextProvider);
streamManager = new FlightStreamManager(() -> allocator, clientManager);
}

/**
Expand All @@ -94,53 +102,84 @@ public void setSecureTransportSettingsProvider(SecureTransportSettingsProvider s
this.secureTransportSettingsProvider.trySet(secureTransportSettingsProvider);
}

/**
* Starts the FlightService by initializing and starting the Arrow Flight server.
*/

@Override
protected void doStart() {
DiscoveryNode localNode = Objects.requireNonNull(clusterService.get()).localNode();
// everything is lazily started in onNodeStart() after TransportService is started
if (isDedicatedClusterManagerNode(localNode)) {
return;
}
try {
allocator = AccessController.doPrivileged(
(PrivilegedExceptionAction<BufferAllocator>) () -> new RootAllocator(Integer.MAX_VALUE)
);
} catch (Exception e) {
throw new RuntimeException("Failed to initialize Arrow Flight server", e);
}

FlightProducer producer = new BaseFlightProducer(clientManager, streamManager, allocator);
FlightServerBuilder builder = new FlightServerBuilder(threadPool.get(), () -> allocator, producer, sslContextProvider);
if (ServerConfig.isSslEnabled()) {
sslContextProvider = new DefaultSslContextProvider(secureTransportSettingsProvider::get);
} else {
sslContextProvider = new DisabledSslContextProvider();
}
clientManager = new FlightClientManager(() -> allocator, Objects.requireNonNull(clusterService.get()), sslContextProvider);
FlightStreamManager flightStreamManager = new FlightStreamManager(() -> allocator, clientManager);
streamManager = () -> flightStreamManager;

try {
FlightProducer producer = new BaseFlightProducer(clientManager, flightStreamManager, allocator);
Location serverLocation = ServerConfig.getLocation(localNode.getAddress().getAddress(), Integer.parseInt(localNode.getAttributes().get("transport.stream.port")));
grpcExecutor = Objects.requireNonNull(threadPool.get()).executor(ServerConfig.FLIGHT_THREAD_POOL_NAME);
FlightServerBuilder builder = new FlightServerBuilder(() -> allocator, producer, sslContextProvider, serverLocation, grpcExecutor);
bossELG = Utils.create("os-grpc-boss-ELG", 1, null);
workerELG = Utils.create("os-grpc-worker-ELG", 0, null);
builder.elg(bossELG, workerELG, Utils.DEFAULT_CLIENT_CHANNEL_TYPE);
server = builder.build();
server.start();
logger.info("Arrow Flight server started successfully:{}", ServerConfig.getServerLocation().getUri().toString());
logger.info("Arrow Flight server started successfully:{}", serverLocation);
} catch (IOException e) {
logger.error("Failed to start Arrow Flight server", e);
throw new RuntimeException("Failed to start Arrow Flight server", e);
} catch (PrivilegedActionException e) {
throw new RuntimeException(e);
}
}

/**
* Stops the FlightService by closing the Arrow Flight server, client manager, and stream manager.
* Stops the FlightService
*/
@Override
protected void doStop() {
try {
server.shutdown();
streamManager.close();
clientManager.close();
server.close();
logger.info("Arrow Flight service closed successfully");
AutoCloseables.close(server, clientManager, allocator);
if (grpcExecutor != null) {
grpcExecutor.shutdown();
}
if (bossELG != null) {
Future<?> shutdownFuture = bossELG.shutdownGracefully(0, 5, TimeUnit.SECONDS);
shutdownFuture.awaitUninterruptibly();
if (!shutdownFuture.isSuccess()) {
//logger.warn(new ParameterizedMessage("Error closing {} netty event loop group", THREAD_PREFIX), shutdownFuture.cause());
}
}
if (workerELG != null) {
Future<?> shutdownFuture = workerELG.shutdownGracefully(0, 5, TimeUnit.SECONDS);
shutdownFuture.awaitUninterruptibly();
if (!shutdownFuture.isSuccess()) {
//logger.warn(new ParameterizedMessage("Error closing {} netty event loop group", THREAD_PREFIX), shutdownFuture.cause());
}
}
logger.info("Arrow Flight service stopped successfully");
} catch (Exception e) {
logger.error("Error while closing Arrow Flight service", e);
logger.error("Error while stopping Arrow Flight service", e);
}
}

/**
* Closes the BufferAllocator used by the FlightService.
* Closes the FlightService by closing the Arrow Flight server, client manager, and stream manager.
*/
@Override
protected void doClose() {
if (allocator != null) {
allocator.close();
}
doStop();
}

/**
Expand All @@ -151,11 +190,22 @@ public FlightClientManager getFlightClientManager() {
return clientManager;
}

public void onNodeStart(DiscoveryNode localNode) {

}

private boolean isDedicatedClusterManagerNode(DiscoveryNode node) {
Set<DiscoveryNodeRole> nodeRoles = node.getRoles();
return nodeRoles.size() == 1 &&
(nodeRoles.contains(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE) ||
nodeRoles.contains(DiscoveryNodeRole.MASTER_ROLE));
}

/**
* Retrieves the StreamManager used by the FlightService.
* @return The StreamManager instance.
*/
public StreamManager getStreamManager() {
public Supplier<StreamManager> getStreamManager() {
return streamManager;
}

Expand Down
Loading

0 comments on commit acee6db

Please sign in to comment.