diff --git a/h2o-clustering/build.gradle b/h2o-clustering/build.gradle index 014bd2a87be6..0af71215084d 100644 --- a/h2o-clustering/build.gradle +++ b/h2o-clustering/build.gradle @@ -7,16 +7,13 @@ repositories { } dependencies { - api 'org.nanohttpd:nanohttpd:2.3.1' - api 'org.nanohttpd:nanohttpd-webserver:2.3.1' - api 'org.nanohttpd:nanohttpd-nanolets:2.3.1' compileOnly project(":h2o-core") // This module is intended to be put on H2O's classpath separately compileOnly "javax.servlet:javax.servlet-api:${servletApiVersion}" - testImplementation group: 'junit', name: 'junit', version: '4.12' + testImplementation group: 'junit', name: 'junit', version: '4.13.1' testImplementation 'com.github.stefanbirkner:system-rules:1.19.0' testImplementation project(":h2o-test-support") - testImplementation "commons-io:commons-io:2.4" + testImplementation 'commons-io:commons-io:2.7' testRuntimeOnly project(":${defaultWebserverModule}") } diff --git a/h2o-clustering/src/main/java/water/clustering/AssistedClusteringEmbeddedConfigProvider.java b/h2o-clustering/src/main/java/water/clustering/AssistedClusteringEmbeddedConfigProvider.java index 04deb2ef18a2..16a4da5bd852 100644 --- a/h2o-clustering/src/main/java/water/clustering/AssistedClusteringEmbeddedConfigProvider.java +++ b/h2o-clustering/src/main/java/water/clustering/AssistedClusteringEmbeddedConfigProvider.java @@ -3,7 +3,6 @@ import org.apache.log4j.Logger; import water.H2O; import water.clustering.api.AssistedClusteringRestApi; -import water.clustering.api.GracefulAsyncRunner; import water.init.AbstractEmbeddedH2OConfig; import water.init.EmbeddedConfigProvider; import water.util.Log; @@ -45,7 +44,6 @@ private Optional startAssistedClusteringRestApi(final Log.info("Starting assisted clustering REST API services"); try { final AssistedClusteringRestApi assistedClusteringRestApi = new AssistedClusteringRestApi(flatFileCallback); - assistedClusteringRestApi.setAsyncRunner(new GracefulAsyncRunner()); assistedClusteringRestApi.start(); Log.info("Assisted clustering REST API services successfully started."); return Optional.of(assistedClusteringRestApi); diff --git a/h2o-clustering/src/main/java/water/clustering/api/AssistedClusteringEndpoint.java b/h2o-clustering/src/main/java/water/clustering/api/AssistedClusteringEndpoint.java index 68e081d68a62..d7dde7c1f4fc 100644 --- a/h2o-clustering/src/main/java/water/clustering/api/AssistedClusteringEndpoint.java +++ b/h2o-clustering/src/main/java/water/clustering/api/AssistedClusteringEndpoint.java @@ -1,12 +1,14 @@ package water.clustering.api; -import fi.iki.elonen.NanoHTTPD; -import fi.iki.elonen.router.RouterNanoHTTPD; +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; import org.apache.log4j.Logger; +import water.init.NetworkInit; +import java.io.BufferedReader; import java.io.IOException; -import java.util.HashMap; -import java.util.Map; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; @@ -14,79 +16,75 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; +import java.util.stream.Collectors; + +import static java.net.HttpURLConnection.*; +import static water.clustering.api.HttpResponses.*; /** - * A REST Endpoint waiting for external assist to POST a flatifle with H2O nodes. - * Once successfully submitted, this endpoint will no longer accept any new calls. + * A REST Endpoint waiting for external assist to POST a flatfile with H2O nodes. + * Once successfully submitted, this endpoint will no longer accept any new calls. * It is the caller's responsibility to submit a valid flatfile. - * + *

* There is no parsing or validation done on the flatfile, except for basic emptiness checks. - * The logic for IPv4/IPv6 parsing is hidden in {@link water.init.NetworkInit} class and is therefore hidden + * The logic for IPv4/IPv6 parsing is hidden in {@link NetworkInit} class and is therefore hidden * from this class. As this module is intended to insertable onto classpath of any H2O, it does not rely on * specific NetworkInit implementation. */ -public class AssistedClusteringEndpoint extends RouterNanoHTTPD.DefaultHandler { +public class AssistedClusteringEndpoint implements HttpHandler, AutoCloseable { private static final Logger LOG = Logger.getLogger(AssistedClusteringEndpoint.class); private final ReadWriteLock lock = new ReentrantReadWriteLock(); private final AtomicBoolean flatFileReceived; private final ExecutorService flatFileConsumerCallbackExecutor = Executors.newSingleThreadExecutor(); + private final Consumer flatFileConsumer; - public AssistedClusteringEndpoint() { - flatFileReceived = new AtomicBoolean(false); + public AssistedClusteringEndpoint(Consumer flatFileConsumer) { + this.flatFileConsumer = flatFileConsumer; + this.flatFileReceived = new AtomicBoolean(false); } @Override - public String getText() { - throw new IllegalStateException(String.format("Method getText should not be called on '%s'", - getClass().getName())); - } - - @Override - public String getMimeType() { - return "text/plain"; - } - - @Override - public NanoHTTPD.Response.IStatus getStatus() { - return null; - } - - public static final String RESPONSE_MIME_TYPE = "text/plain"; + public void handle(HttpExchange httpExchange) throws IOException { + if (!POST_METHOD.equals(httpExchange.getRequestMethod())) { + newResponseCodeOnlyResponse(httpExchange, HTTP_BAD_METHOD); + } - @Override - public NanoHTTPD.Response post(final RouterNanoHTTPD.UriResource uriResource, final Map urlParams, final NanoHTTPD.IHTTPSession session) { - final Map map = new HashMap<>(); - try { - session.parseBody(map); - } catch (IOException | NanoHTTPD.ResponseException e) { + String postBody; + try (InputStreamReader isr = new InputStreamReader(httpExchange.getRequestBody(), StandardCharsets.UTF_8); + BufferedReader br = new BufferedReader(isr)) { + postBody = br.lines().collect(Collectors.joining("\n")); + if (postBody.isEmpty()) { + newFixedLengthResponse(httpExchange, HTTP_BAD_REQUEST, + MIME_TYPE_TEXT_PLAIN, "Unable to parse IP addresses in body. Only one IPv4/IPv6 address per line is accepted."); + return; + } + } catch (IOException e) { LOG.error("Received incorrect flatfile request.", e); - return NanoHTTPD.newFixedLengthResponse(NanoHTTPD.Response.Status.BAD_REQUEST, RESPONSE_MIME_TYPE, null); + newResponseCodeOnlyResponse(httpExchange, HTTP_BAD_REQUEST); + return; } - // The text/plain content-type is stored as `postData` by HTTPD in the map. - final String postBody = map.get("postData"); - if (postBody != null) { - final Lock writeLock = lock.writeLock(); - try { - writeLock.lock(); - if (flatFileReceived.get()) { - return NanoHTTPD.newFixedLengthResponse(NanoHTTPD.Response.Status.BAD_REQUEST, RESPONSE_MIME_TYPE, - "Flatfile already provided."); - } else { - final Consumer flatFileConsumer = (Consumer) uriResource.initParameter(Consumer.class); - // Do not block response with internal handling - flatFileConsumerCallbackExecutor.submit(() -> flatFileConsumer.accept(postBody)); - flatFileReceived.set(true); // Do not accept any new requests once the flatfile has been received. - } - } finally { - writeLock.unlock(); + final Lock writeLock = lock.writeLock(); + try { + writeLock.lock(); + if (flatFileReceived.get()) { + newFixedLengthResponse(httpExchange, HTTP_BAD_REQUEST, MIME_TYPE_TEXT_PLAIN, "Flatfile already provided."); + return; + } else { + // Do not block response with internal handling + flatFileConsumerCallbackExecutor.submit(() -> flatFileConsumer.accept(postBody)); + flatFileReceived.set(true); // Do not accept any new requests once the flatfile has been received. } - return NanoHTTPD.newFixedLengthResponse(NanoHTTPD.Response.Status.OK, RESPONSE_MIME_TYPE, null); - } else { - return NanoHTTPD.newFixedLengthResponse(NanoHTTPD.Response.Status.BAD_REQUEST, RESPONSE_MIME_TYPE, - "Unable to parse IP addresses in body. Only one IPv4/IPv6 address per line is accepted."); + } finally { + writeLock.unlock(); } + newResponseCodeOnlyResponse(httpExchange, HTTP_OK); + } + + @Override + public void close() { + flatFileConsumerCallbackExecutor.shutdown(); } } diff --git a/h2o-clustering/src/main/java/water/clustering/api/AssistedClusteringRestApi.java b/h2o-clustering/src/main/java/water/clustering/api/AssistedClusteringRestApi.java index 16738f8ce7a9..43e9ebf58e79 100644 --- a/h2o-clustering/src/main/java/water/clustering/api/AssistedClusteringRestApi.java +++ b/h2o-clustering/src/main/java/water/clustering/api/AssistedClusteringRestApi.java @@ -1,24 +1,25 @@ package water.clustering.api; -import fi.iki.elonen.NanoHTTPD; -import fi.iki.elonen.router.RouterNanoHTTPD; +import com.sun.net.httpserver.HttpServer; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.Objects; import java.util.function.Consumer; /** * Rest API definition for the assisted clustering function. */ -public class AssistedClusteringRestApi extends RouterNanoHTTPD implements AutoCloseable { +public class AssistedClusteringRestApi implements AutoCloseable { /** * Default port to bind to / listen on. */ private static final int DEFAULT_PORT = 8080; public static final String ASSISTED_CLUSTERING_PORT_KEY = "H2O_ASSISTED_CLUSTERING_API_PORT"; - private final Consumer flatFileConsumer; + private final AssistedClusteringEndpoint assistedClusteringEndpoint; + private final HttpServer server; /** * Creates, but not starts assisted clustering REST API. To start the REST API, please use @@ -27,13 +28,13 @@ public class AssistedClusteringRestApi extends RouterNanoHTTPD implements AutoCl * The REST API is bound to a default port of 8080, unless specified otherwise by the H2O_ASSISTED_CLUSTERING_API_PORT environment * variable. */ - public AssistedClusteringRestApi(Consumer flatFileConsumer) { - super(getPort()); + public AssistedClusteringRestApi(Consumer flatFileConsumer) throws IOException { Objects.requireNonNull(flatFileConsumer); - this.flatFileConsumer = flatFileConsumer; + this.assistedClusteringEndpoint = new AssistedClusteringEndpoint(flatFileConsumer); + int port = getPort(); + server = HttpServer.create(new InetSocketAddress(port), 0); addMappings(); } - /** * @return Either user-defined port via environment variable or default port to bind the REST API to. */ @@ -51,11 +52,9 @@ private static int getPort() { } } - @Override - public void addMappings() { - super.addMappings(); - addRoute("/clustering/flatfile", AssistedClusteringEndpoint.class, this.flatFileConsumer); - addRoute("/cluster/status", H2OClusterStatusEndpoint.class); + private void addMappings() { + server.createContext("/clustering/flatfile", assistedClusteringEndpoint); + server.createContext("/cluster/status", new H2OClusterStatusEndpoint()); } /** @@ -63,18 +62,11 @@ public void addMappings() { */ @Override public void close() { - stop(); + assistedClusteringEndpoint.close(); + server.stop(0); } - @Override public void start() throws IOException { - // Make sure the API is never ran as daemon and is properly terminated with the H2O JVM (the latest) - start(NanoHTTPD.SOCKET_READ_TIMEOUT, false); - } - - @Override - public void start(int timeout) throws IOException { - // Make sure the API is never ran as daemon and is properly terminated with the H2O JVM (the latest) - super.start(timeout, false); + server.start(); } } diff --git a/h2o-clustering/src/main/java/water/clustering/api/GracefulAsyncRunner.java b/h2o-clustering/src/main/java/water/clustering/api/GracefulAsyncRunner.java deleted file mode 100644 index d1a5de6b86ef..000000000000 --- a/h2o-clustering/src/main/java/water/clustering/api/GracefulAsyncRunner.java +++ /dev/null @@ -1,40 +0,0 @@ -package water.clustering.api; - -import fi.iki.elonen.NanoHTTPD; -import org.apache.log4j.Logger; - - -/** - * The REST API is only needed until a valid flatfile is received. Afterwards, the flatfile is handed over to - * H2O via embedded config. Therefore, as soon as the embedded config is constructed, the REST API is shut down - * and the objects destroyed. However, there might still be a thread running - sending HTTP response to the assist which - * sent the flatfile. The {@link fi.iki.elonen.NanoHTTPD.DefaultAsyncRunner} does not wait for existing connections - * to be properly terminated and shuts them down. - *

- * This implementation overrides that behavior and waits until all connections are closed before shutdown. - */ -public class GracefulAsyncRunner extends NanoHTTPD.DefaultAsyncRunner { - private static final Logger LOG = Logger.getLogger(GracefulAsyncRunner.class); - - @Override - public void closeAll() { - while (getRunning().size() > 0) { - synchronized (this) { - try { - this.wait(1000); - } catch (InterruptedException e) { - LOG.error("Waiting for asyncRunner to gracefully shutdown interrupted. Closing all connections now.", e); - } - } - } - super.closeAll(); - } - - @Override - public void closed(final NanoHTTPD.ClientHandler clientHandler) { - super.closed(clientHandler); - synchronized (this) { - this.notify(); - } - } -} diff --git a/h2o-clustering/src/main/java/water/clustering/api/H2OClusterStatusEndpoint.java b/h2o-clustering/src/main/java/water/clustering/api/H2OClusterStatusEndpoint.java index 5608fd2e0c84..d637cf48c04e 100644 --- a/h2o-clustering/src/main/java/water/clustering/api/H2OClusterStatusEndpoint.java +++ b/h2o-clustering/src/main/java/water/clustering/api/H2OClusterStatusEndpoint.java @@ -1,42 +1,32 @@ package water.clustering.api; -import fi.iki.elonen.NanoHTTPD; -import fi.iki.elonen.router.RouterNanoHTTPD; import water.H2O; import water.H2ONode; +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; + +import java.io.IOException; +import java.net.HttpURLConnection; import java.util.Arrays; -import java.util.Map; import java.util.Set; +import static water.clustering.api.HttpResponses.*; -public class H2OClusterStatusEndpoint extends RouterNanoHTTPD.DefaultHandler { - - @Override - public String getText() { - throw new IllegalStateException(String.format("Method getText should not be called on '%s'", - getClass().getName())); - } +public class H2OClusterStatusEndpoint implements HttpHandler { @Override - public String getMimeType() { - return "application/json"; - } - - @Override - public NanoHTTPD.Response.IStatus getStatus() { - throw new IllegalStateException(String.format("Method getMimeType should not be called on '%s'", - getClass().getName())); - } - - @Override - public NanoHTTPD.Response get(RouterNanoHTTPD.UriResource uriResource, Map urlParams, NanoHTTPD.IHTTPSession session) { + public void handle(HttpExchange httpExchange) throws IOException { + if (!GET_METHOD.equals(httpExchange.getRequestMethod())) { + newResponseCodeOnlyResponse(httpExchange, HttpURLConnection.HTTP_BAD_METHOD); + } // H2O cluster grows in time, even when a flat file is used. The H2O.CLOUD property might be updated with new nodes during // the clustering process and doesn't necessarily have to contain all the nodes since the very beginning of the clustering process. // From this endpoint's point of view, H2O is clustered if and only if the H2O cloud members contain all nodes defined in the // flat file. if (!H2O.isFlatfileEnabled()) { - return NanoHTTPD.newFixedLengthResponse(NanoHTTPD.Response.Status.NO_CONTENT, getMimeType(), null); + newResponseCodeOnlyResponse(httpExchange, HttpURLConnection.HTTP_NO_CONTENT); + return; } final Set flatFile = H2O.getFlatfile(); final H2ONode[] cloudMembers = H2O.CLOUD.members(); @@ -45,9 +35,9 @@ public NanoHTTPD.Response get(RouterNanoHTTPD.UriResource uriResource, Map */ -public class KubernetesRestApi extends RouterNanoHTTPD implements AutoCloseable { +public class KubernetesRestApi implements AutoCloseable { /** * Default port to bind to / listen on. @@ -24,6 +24,7 @@ public class KubernetesRestApi extends RouterNanoHTTPD implements AutoCloseable private static final int DEFAULT_PORT = 8080; public static final String KUBERNETES_REST_API_PORT_KEY = "H2O_KUBERNETES_API_PORT"; + private final HttpServer server; /** * Creates, but not starts Kubernetes REST API. To start the REST API, please use @@ -32,8 +33,9 @@ public class KubernetesRestApi extends RouterNanoHTTPD implements AutoCloseable * The REST API is bound to a default port of 8080, unless specified otherwise by H2O_KUBERNETES_API_PORT environment * variable. */ - public KubernetesRestApi() { - super(getPort()); + public KubernetesRestApi() throws IOException { + int port = getPort(); + server = HttpServer.create(new InetSocketAddress(port), 0); addMappings(); } @@ -51,27 +53,17 @@ private static int getPort() { } } - @Override public void addMappings() { - super.addMappings(); - addRoute("/kubernetes/isLeaderNode", KubernetesLeaderNodeProbeHandler.class); + server.createContext("/kubernetes/isLeaderNode", new KubernetesLeaderNodeProbeHandler()); } @Override public void close() { - stop(); + server.stop(0); } - @Override public void start() throws IOException { - // Make sure the API is never ran as daemon. Scope of the K8S API = H2O's lifetime - start(NanoHTTPD.SOCKET_READ_TIMEOUT, false); - } - - @Override - public void start(int timeout) throws IOException { - // Make sure the API is never ran as daemon. Scope of the K8S API = H2O's lifetime - super.start(timeout, false); + server.start(); } } diff --git a/h2o-k8s/src/main/java/water/k8s/lookup/KubernetesLookup.java b/h2o-k8s/src/main/java/water/k8s/lookup/KubernetesLookup.java index b41a9e325081..cc2aef09b212 100644 --- a/h2o-k8s/src/main/java/water/k8s/lookup/KubernetesLookup.java +++ b/h2o-k8s/src/main/java/water/k8s/lookup/KubernetesLookup.java @@ -1,7 +1,5 @@ package water.k8s.lookup; -import water.k8s.lookup.LookupConstraint; - import java.util.Collection; import java.util.Optional; import java.util.Set; diff --git a/h2o-k8s/src/main/java/water/k8s/probe/KubernetesLeaderNodeProbeHandler.java b/h2o-k8s/src/main/java/water/k8s/probe/KubernetesLeaderNodeProbeHandler.java index c0be2d876504..90207b2d79bd 100644 --- a/h2o-k8s/src/main/java/water/k8s/probe/KubernetesLeaderNodeProbeHandler.java +++ b/h2o-k8s/src/main/java/water/k8s/probe/KubernetesLeaderNodeProbeHandler.java @@ -1,40 +1,36 @@ package water.k8s.probe; -import fi.iki.elonen.NanoHTTPD; -import fi.iki.elonen.router.RouterNanoHTTPD; +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; import water.k8s.H2OCluster; -import java.util.Map; +import java.io.IOException; +import static java.net.HttpURLConnection.*; -public class KubernetesLeaderNodeProbeHandler extends RouterNanoHTTPD.DefaultHandler { +public class KubernetesLeaderNodeProbeHandler implements HttpHandler { - @Override - public String getText() { - throw new IllegalStateException(String.format("Method getText should not be called on '%s'", - getClass().getName())); - } - - @Override - public String getMimeType() { - return "text/plain"; - } + public static final String MIME_TYPE_TEXT_PLAIN = "text/plain"; + public static final String GET_METHOD = "GET"; @Override - public NanoHTTPD.Response.IStatus getStatus() { - throw new IllegalStateException(String.format("Method getMimeType should not be called on '%s'", - getClass().getName())); - } - - @Override - public NanoHTTPD.Response get(RouterNanoHTTPD.UriResource uriResource, Map urlParams, NanoHTTPD.IHTTPSession session) { + public void handle(HttpExchange httpExchange) throws IOException { + if (!GET_METHOD.equals(httpExchange.getRequestMethod())) { + httpResponseWithoutBody(httpExchange, HTTP_BAD_METHOD); + } // All nodes report ready state until the clustering process is finished. Since then, only the leader node is ready. final H2OCluster.H2ONodeInfo self = H2OCluster.getCurrentNodeInfo(); if (self == null || self.isLeader() || !H2OCluster.isClustered()) { - return NanoHTTPD.newFixedLengthResponse(NanoHTTPD.Response.Status.OK, getMimeType(), null); + httpResponseWithoutBody(httpExchange, HTTP_OK); } else { - return NanoHTTPD.newFixedLengthResponse(NanoHTTPD.Response.Status.NOT_FOUND, getMimeType(), null); + httpResponseWithoutBody(httpExchange, HTTP_NOT_FOUND); } } + private static void httpResponseWithoutBody(HttpExchange httpExchange, int httpResponseCode) throws IOException { + httpExchange.getResponseHeaders().set("Content-Type", MIME_TYPE_TEXT_PLAIN); + httpExchange.sendResponseHeaders(httpResponseCode, -1); + httpExchange.close(); + } + }