Skip to content

Commit

Permalink
#4 getting ready for cluster invocations
Browse files Browse the repository at this point in the history
  • Loading branch information
marvec committed May 11, 2015
1 parent afb8fa2 commit ee3f75a
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@
import org.silverware.microservices.Context;
import org.silverware.microservices.MicroserviceMetaData;
import org.silverware.microservices.providers.MicroserviceProvider;
import org.silverware.microservices.providers.cluster.internal.HttpServiceProxy;
import org.silverware.microservices.silver.ClusterSilverService;
import org.silverware.microservices.silver.cluster.ServiceHandle;
import org.silverware.microservices.util.Utils;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
Expand Down Expand Up @@ -85,8 +87,18 @@ public void run() {
MicroserviceMetaData metaData = toLookup.poll();
if (metaData != null) {
channel.getView().forEach(address -> {
//address.toString()
// todo lookup
try {
List<ServiceHandle> handles = metaData.query(context, address.toString());
Set<ServiceHandle> localHandles = handles.stream().map(serviceHandle -> serviceHandle.withProxy(HttpServiceProxy.getProxy(context, serviceHandle))).collect(Collectors.toSet());

if (outboundHandles.containsKey(metaData)) {
outboundHandles.get(metaData).addAll(localHandles);
} else {
outboundHandles.put(metaData, localHandles);
}
} catch (Exception e) {
log.info(String.format("Unable to lookup Microservice %s at host %s:", metaData.toString(), address.toString()), e);
}
});
} else {
Thread.sleep(100);
Expand All @@ -113,14 +125,27 @@ public Set<Object> lookupLocalMicroservice(final MicroserviceMetaData metaData)
return new HashSet<>();
}

private static class ChannelReceiver extends ReceiverAdapter {
private class ChannelReceiver extends ReceiverAdapter {
@Override
public void receive(final Message msg) {
log.info("Received message " + msg);
}

@Override
public void viewAccepted(final View view) {
Set<String> availableNodes = new HashSet<>();
view.forEach(address -> availableNodes.add(address.toString()));

outboundHandles.forEach((metaData, serviceHandles) -> {
Set<ServiceHandle> toBeRemoved = new HashSet<>();

toBeRemoved.addAll(
serviceHandles.stream().filter(serviceHandle ->
availableNodes.contains(serviceHandle.getHost())
).collect(Collectors.toSet()));
serviceHandles.removeAll(toBeRemoved);
});

log.info("View accepted " + view);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* limitations under the License.
* -----------------------------------------------------------------------/
*/
package org.silverware.microservices.providers.http.invoker.internal;
package org.silverware.microservices.providers.cluster.internal;

import org.silverware.microservices.Context;
import org.silverware.microservices.silver.cluster.ServiceHandle;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,17 @@
*/
package org.silverware.microservices;

import com.cedarsoftware.util.io.JsonReader;
import com.cedarsoftware.util.io.JsonWriter;
import org.silverware.microservices.silver.HttpInvokerSilverService;
import org.silverware.microservices.silver.cluster.Invocation;
import org.silverware.microservices.silver.cluster.ServiceHandle;

import java.lang.annotation.Annotation;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Arrays;
import java.util.List;
import java.util.Set;

/**
Expand Down Expand Up @@ -128,4 +137,24 @@ public int hashCode() {
public String toString() {
return "microservice " + name + " of type " + type.getCanonicalName() + " with qualifiers " + Arrays.toString(qualifiers.toArray());
}

@SuppressWarnings("unchecked")
public List<ServiceHandle> query(final Context context, final String host) throws Exception {
String urlBase = "http://" + host + "/" + context.getProperties().get(HttpInvokerSilverService.INVOKER_URL) + "/query";

HttpURLConnection con = (HttpURLConnection) new URL(urlBase).openConnection();
con.setRequestMethod("POST");
con.setDoInput(true);
con.setDoOutput(true);
con.connect();

JsonWriter jsonWriter = new JsonWriter(con.getOutputStream());
jsonWriter.write(this);
JsonReader jsonReader = new JsonReader(con.getInputStream());
List<ServiceHandle> response = (List<ServiceHandle>) jsonReader.readObject();

con.disconnect();

return response;
}
}

0 comments on commit ee3f75a

Please sign in to comment.