-
Notifications
You must be signed in to change notification settings - Fork 2k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
11 changed files
with
145 additions
and
198 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
108 changes: 53 additions & 55 deletions
108
h2o-clustering/src/main/java/water/clustering/api/AssistedClusteringEndpoint.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,92 +1,90 @@ | ||
package water.clustering.api; | ||
|
||
import fi.iki.elonen.NanoHTTPD; | ||
import fi.iki.elonen.router.RouterNanoHTTPD; | ||
import com.sun.net.httpserver.HttpExchange; | ||
import com.sun.net.httpserver.HttpHandler; | ||
import org.apache.log4j.Logger; | ||
import water.init.NetworkInit; | ||
|
||
import java.io.BufferedReader; | ||
import java.io.IOException; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.io.InputStreamReader; | ||
import java.nio.charset.StandardCharsets; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.concurrent.locks.Lock; | ||
import java.util.concurrent.locks.ReadWriteLock; | ||
import java.util.concurrent.locks.ReentrantReadWriteLock; | ||
import java.util.function.Consumer; | ||
import java.util.stream.Collectors; | ||
|
||
import static java.net.HttpURLConnection.*; | ||
import static water.clustering.api.HttpResponses.*; | ||
|
||
/** | ||
* A REST Endpoint waiting for external assist to POST a flatifle with H2O nodes. | ||
* Once successfully submitted, this endpoint will no longer accept any new calls. | ||
* A REST Endpoint waiting for external assist to POST a flatfile with H2O nodes. | ||
* Once successfully submitted, this endpoint will no longer accept any new calls. | ||
* It is the caller's responsibility to submit a valid flatfile. | ||
* | ||
* <p> | ||
* There is no parsing or validation done on the flatfile, except for basic emptiness checks. | ||
* The logic for IPv4/IPv6 parsing is hidden in {@link water.init.NetworkInit} class and is therefore hidden | ||
* The logic for IPv4/IPv6 parsing is hidden in {@link NetworkInit} class and is therefore hidden | ||
* from this class. As this module is intended to insertable onto classpath of any H2O, it does not rely on | ||
* specific NetworkInit implementation. | ||
*/ | ||
public class AssistedClusteringEndpoint extends RouterNanoHTTPD.DefaultHandler { | ||
public class AssistedClusteringEndpoint implements HttpHandler, AutoCloseable { | ||
|
||
private static final Logger LOG = Logger.getLogger(AssistedClusteringEndpoint.class); | ||
private final ReadWriteLock lock = new ReentrantReadWriteLock(); | ||
private final AtomicBoolean flatFileReceived; | ||
private final ExecutorService flatFileConsumerCallbackExecutor = Executors.newSingleThreadExecutor(); | ||
private final Consumer<String> flatFileConsumer; | ||
|
||
public AssistedClusteringEndpoint() { | ||
flatFileReceived = new AtomicBoolean(false); | ||
public AssistedClusteringEndpoint(Consumer<String> flatFileConsumer) { | ||
this.flatFileConsumer = flatFileConsumer; | ||
this.flatFileReceived = new AtomicBoolean(false); | ||
} | ||
|
||
@Override | ||
public String getText() { | ||
throw new IllegalStateException(String.format("Method getText should not be called on '%s'", | ||
getClass().getName())); | ||
} | ||
|
||
@Override | ||
public String getMimeType() { | ||
return "text/plain"; | ||
} | ||
|
||
@Override | ||
public NanoHTTPD.Response.IStatus getStatus() { | ||
return null; | ||
} | ||
|
||
public static final String RESPONSE_MIME_TYPE = "text/plain"; | ||
public void handle(HttpExchange httpExchange) throws IOException { | ||
if (!POST_METHOD.equals(httpExchange.getRequestMethod())) { | ||
newResponseCodeOnlyResponse(httpExchange, HTTP_BAD_METHOD); | ||
} | ||
|
||
@Override | ||
public NanoHTTPD.Response post(final RouterNanoHTTPD.UriResource uriResource, final Map<String, String> urlParams, final NanoHTTPD.IHTTPSession session) { | ||
final Map<String, String> map = new HashMap<>(); | ||
try { | ||
session.parseBody(map); | ||
} catch (IOException | NanoHTTPD.ResponseException e) { | ||
String postBody; | ||
try (InputStreamReader isr = new InputStreamReader(httpExchange.getRequestBody(), StandardCharsets.UTF_8); | ||
BufferedReader br = new BufferedReader(isr)) { | ||
postBody = br.lines().collect(Collectors.joining("\n")); | ||
if (postBody.isEmpty()) { | ||
newFixedLengthResponse(httpExchange, HTTP_BAD_REQUEST, | ||
MIME_TYPE_TEXT_PLAIN, "Unable to parse IP addresses in body. Only one IPv4/IPv6 address per line is accepted."); | ||
return; | ||
} | ||
} catch (IOException e) { | ||
LOG.error("Received incorrect flatfile request.", e); | ||
return NanoHTTPD.newFixedLengthResponse(NanoHTTPD.Response.Status.BAD_REQUEST, RESPONSE_MIME_TYPE, null); | ||
newResponseCodeOnlyResponse(httpExchange, HTTP_BAD_REQUEST); | ||
return; | ||
} | ||
// The text/plain content-type is stored as `postData` by HTTPD in the map. | ||
final String postBody = map.get("postData"); | ||
|
||
if (postBody != null) { | ||
final Lock writeLock = lock.writeLock(); | ||
try { | ||
writeLock.lock(); | ||
if (flatFileReceived.get()) { | ||
return NanoHTTPD.newFixedLengthResponse(NanoHTTPD.Response.Status.BAD_REQUEST, RESPONSE_MIME_TYPE, | ||
"Flatfile already provided."); | ||
} else { | ||
final Consumer<String> flatFileConsumer = (Consumer<String>) uriResource.initParameter(Consumer.class); | ||
// Do not block response with internal handling | ||
flatFileConsumerCallbackExecutor.submit(() -> flatFileConsumer.accept(postBody)); | ||
flatFileReceived.set(true); // Do not accept any new requests once the flatfile has been received. | ||
} | ||
} finally { | ||
writeLock.unlock(); | ||
final Lock writeLock = lock.writeLock(); | ||
try { | ||
writeLock.lock(); | ||
if (flatFileReceived.get()) { | ||
newFixedLengthResponse(httpExchange, HTTP_BAD_REQUEST, MIME_TYPE_TEXT_PLAIN, "Flatfile already provided."); | ||
return; | ||
} else { | ||
// Do not block response with internal handling | ||
flatFileConsumerCallbackExecutor.submit(() -> flatFileConsumer.accept(postBody)); | ||
flatFileReceived.set(true); // Do not accept any new requests once the flatfile has been received. | ||
} | ||
return NanoHTTPD.newFixedLengthResponse(NanoHTTPD.Response.Status.OK, RESPONSE_MIME_TYPE, null); | ||
} else { | ||
return NanoHTTPD.newFixedLengthResponse(NanoHTTPD.Response.Status.BAD_REQUEST, RESPONSE_MIME_TYPE, | ||
"Unable to parse IP addresses in body. Only one IPv4/IPv6 address per line is accepted."); | ||
} finally { | ||
writeLock.unlock(); | ||
} | ||
newResponseCodeOnlyResponse(httpExchange, HTTP_OK); | ||
} | ||
|
||
@Override | ||
public void close() { | ||
flatFileConsumerCallbackExecutor.shutdown(); | ||
} | ||
} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
40 changes: 0 additions & 40 deletions
40
h2o-clustering/src/main/java/water/clustering/api/GracefulAsyncRunner.java
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.