diff --git a/cluster-microservice-provider/src/main/java/org/silverware/microservices/providers/cluster/ClusterMicroserviceProvider.java b/cluster-microservice-provider/src/main/java/org/silverware/microservices/providers/cluster/ClusterMicroserviceProvider.java index a185dae..46f4099 100644 --- a/cluster-microservice-provider/src/main/java/org/silverware/microservices/providers/cluster/ClusterMicroserviceProvider.java +++ b/cluster-microservice-provider/src/main/java/org/silverware/microservices/providers/cluster/ClusterMicroserviceProvider.java @@ -28,11 +28,13 @@ import org.silverware.microservices.Context; import org.silverware.microservices.MicroserviceMetaData; import org.silverware.microservices.providers.MicroserviceProvider; +import org.silverware.microservices.providers.cluster.internal.HttpServiceProxy; import org.silverware.microservices.silver.ClusterSilverService; import org.silverware.microservices.silver.cluster.ServiceHandle; import org.silverware.microservices.util.Utils; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Queue; import java.util.Set; @@ -85,8 +87,18 @@ public void run() { MicroserviceMetaData metaData = toLookup.poll(); if (metaData != null) { channel.getView().forEach(address -> { - //address.toString() - // todo lookup + try { + List handles = metaData.query(context, address.toString()); + Set localHandles = handles.stream().map(serviceHandle -> serviceHandle.withProxy(HttpServiceProxy.getProxy(context, serviceHandle))).collect(Collectors.toSet()); + + if (outboundHandles.containsKey(metaData)) { + outboundHandles.get(metaData).addAll(localHandles); + } else { + outboundHandles.put(metaData, localHandles); + } + } catch (Exception e) { + log.info(String.format("Unable to lookup Microservice %s at host %s:", metaData.toString(), address.toString()), e); + } }); } else { Thread.sleep(100); @@ -113,7 +125,7 @@ public Set lookupLocalMicroservice(final MicroserviceMetaData metaData) return new HashSet<>(); } - private static class ChannelReceiver extends ReceiverAdapter { + private class ChannelReceiver extends ReceiverAdapter { @Override public void receive(final Message msg) { log.info("Received message " + msg); @@ -121,6 +133,19 @@ public void receive(final Message msg) { @Override public void viewAccepted(final View view) { + Set availableNodes = new HashSet<>(); + view.forEach(address -> availableNodes.add(address.toString())); + + outboundHandles.forEach((metaData, serviceHandles) -> { + Set toBeRemoved = new HashSet<>(); + + toBeRemoved.addAll( + serviceHandles.stream().filter(serviceHandle -> + availableNodes.contains(serviceHandle.getHost()) + ).collect(Collectors.toSet())); + serviceHandles.removeAll(toBeRemoved); + }); + log.info("View accepted " + view); } } diff --git a/http-invoker-microservice-provider/src/main/java/org/silverware/microservices/providers/http/invoker/internal/HttpServiceProxy.java b/cluster-microservice-provider/src/main/java/org/silverware/microservices/providers/cluster/internal/HttpServiceProxy.java similarity index 97% rename from http-invoker-microservice-provider/src/main/java/org/silverware/microservices/providers/http/invoker/internal/HttpServiceProxy.java rename to cluster-microservice-provider/src/main/java/org/silverware/microservices/providers/cluster/internal/HttpServiceProxy.java index b379a53..da27bc3 100644 --- a/http-invoker-microservice-provider/src/main/java/org/silverware/microservices/providers/http/invoker/internal/HttpServiceProxy.java +++ b/cluster-microservice-provider/src/main/java/org/silverware/microservices/providers/cluster/internal/HttpServiceProxy.java @@ -17,7 +17,7 @@ * limitations under the License. * -----------------------------------------------------------------------/ */ -package org.silverware.microservices.providers.http.invoker.internal; +package org.silverware.microservices.providers.cluster.internal; import org.silverware.microservices.Context; import org.silverware.microservices.silver.cluster.ServiceHandle; diff --git a/microservices/src/main/java/org/silverware/microservices/MicroserviceMetaData.java b/microservices/src/main/java/org/silverware/microservices/MicroserviceMetaData.java index c2cbb33..3219359 100644 --- a/microservices/src/main/java/org/silverware/microservices/MicroserviceMetaData.java +++ b/microservices/src/main/java/org/silverware/microservices/MicroserviceMetaData.java @@ -19,8 +19,17 @@ */ package org.silverware.microservices; +import com.cedarsoftware.util.io.JsonReader; +import com.cedarsoftware.util.io.JsonWriter; +import org.silverware.microservices.silver.HttpInvokerSilverService; +import org.silverware.microservices.silver.cluster.Invocation; +import org.silverware.microservices.silver.cluster.ServiceHandle; + import java.lang.annotation.Annotation; +import java.net.HttpURLConnection; +import java.net.URL; import java.util.Arrays; +import java.util.List; import java.util.Set; /** @@ -128,4 +137,24 @@ public int hashCode() { public String toString() { return "microservice " + name + " of type " + type.getCanonicalName() + " with qualifiers " + Arrays.toString(qualifiers.toArray()); } + + @SuppressWarnings("unchecked") + public List query(final Context context, final String host) throws Exception { + String urlBase = "http://" + host + "/" + context.getProperties().get(HttpInvokerSilverService.INVOKER_URL) + "/query"; + + HttpURLConnection con = (HttpURLConnection) new URL(urlBase).openConnection(); + con.setRequestMethod("POST"); + con.setDoInput(true); + con.setDoOutput(true); + con.connect(); + + JsonWriter jsonWriter = new JsonWriter(con.getOutputStream()); + jsonWriter.write(this); + JsonReader jsonReader = new JsonReader(con.getInputStream()); + List response = (List) jsonReader.readObject(); + + con.disconnect(); + + return response; + } }