Skip to content

Commit

Permalink
Merge pull request #122 from arago/add_scope_reg
Browse files Browse the repository at this point in the history
Version 2.3.2-SNAPSHOT
  • Loading branch information
viteke authored Aug 8, 2022
2 parents 17c88fa + 9f7c09b commit 3450428
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 18 deletions.
4 changes: 2 additions & 2 deletions java/hiro-action-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
<groupId>co.arago</groupId>
<artifactId>hiro-action-client</artifactId>
<name>${project.artifactId}</name>
<version>2.3.1-SNAPSHOT</version>
<version>2.3.2-SNAPSHOT</version>
<parent>
<groupId>co.arago</groupId>
<artifactId>hiro-client-all</artifactId>
<version>2.3.1-SNAPSHOT</version>
<version>2.3.2-SNAPSHOT</version>
</parent>

<properties>
Expand Down
4 changes: 2 additions & 2 deletions java/hiro-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
<groupId>co.arago</groupId>
<artifactId>hiro-client</artifactId>
<name>${project.artifactId}</name>
<version>2.3.1-SNAPSHOT</version>
<version>2.3.2-SNAPSHOT</version>
<parent>
<groupId>co.arago</groupId>
<artifactId>hiro-client-all</artifactId>
<version>2.3.1-SNAPSHOT</version>
<version>2.3.2-SNAPSHOT</version>
</parent>

<properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,8 @@ public interface WebSocketClient extends Closeable {
void removeEventFilter(String id);

void clearEventFilters();

void subscribeScope(String scopeId);

void removeScope(String scopeId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class ClientBuilder {
protected int timeout = 0; // msecs
protected String apiVersion = null; // enforce /api/<vers>/graph
protected List<Map> eventFilterMessages;
protected List<String> subscribeScopeIds;
protected ProxyServer.Builder proxyBuilder;

public enum WebsocketType {
Expand Down Expand Up @@ -93,6 +94,11 @@ public ClientBuilder setEventFilterMessages(List<Map> eventFilterMessages) {
return this;
}

public ClientBuilder setSubscribeScopeIds(List<String> subscribeScopeIds) {
this.subscribeScopeIds = subscribeScopeIds;
return this;
}

public HiroClient makeHiroClient() {
return new DefaultHiroClient(notEmpty(restApiUrl, "restApiUrl"), notNull(tokenProvider, "tokenProvider"),
client, trustAllCerts, debugLevel, timeout, apiVersion, proxyBuilder);
Expand Down Expand Up @@ -120,7 +126,7 @@ public WebSocketClient makeWebSocketClient(WebsocketType type, String urlParamet

try {
return new DefaultWebSocketClient(restApiUrl, urlParameters, tokenProvider, client, debugLevel, timeout,
type, dataListener, logListener, handler, eventFilterMessages);
type, dataListener, logListener, handler, eventFilterMessages, subscribeScopeIds);
} catch (Throwable ex) {
return Throwables.unchecked(ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public final class DefaultWebSocketClient implements WebSocketClient {
private final String urlParameters;
private final WebSocketListener handler;

private final List<String> subscribeScopeIds = new CopyOnWriteArrayList<>();
private final Map<String, Map> eventFilterMessages = new ConcurrentHashMap<>();

/**
Expand Down Expand Up @@ -109,13 +110,33 @@ public void onOpen(WebSocket websocket) {
LOG.log(Level.FINEST, "connected " + this);
}

try {
if (type == WebsocketType.Event) {
for (String scopeId : subscribeScopeIds) {
String message = getSubscribeScopeMessage(scopeId);
LOG.log(Level.INFO, "Subscribe to scopeId: " + message);
websocket.sendTextFrame(message).get(timeout, TimeUnit.MILLISECONDS);
}

for (Map filter : eventFilterMessages.values()) {
String message = getEventRegisterMessage(filter);
LOG.log(Level.INFO, "Send filter: " + message);
websocket.sendTextFrame(message).get(timeout, TimeUnit.MILLISECONDS);
}
}
} catch (Throwable ex) {
Throwables.unchecked(ex);
}

if (handler != null) {
handler.onOpen(websocket);
}

if (logListener != null) {
process(logListener, JSONValue.toJSONString(m));
}
running = true;

doReconnect = true;
tokenValid = false;
exitOnClose = false;
Expand Down Expand Up @@ -268,8 +289,8 @@ public void onPingFrame(byte[] payload) {

public DefaultWebSocketClient(String restApiUrl, String urlParameters, TokenProvider tokenProvider,
AsyncHttpClient client, Level debugLevel, int timeout, WebsocketType type, Listener<String> dataListener,
Listener<String> logListener, WebSocketListener handler, List<Map> eventFilterMessages)
throws InterruptedException, ExecutionException, URISyntaxException {
Listener<String> logListener, WebSocketListener handler, List<Map> eventFilterMessages,
List<String> subscribeScopeIds) {

if (debugLevel != null) {
LOG.setLevel(debugLevel);
Expand All @@ -285,6 +306,10 @@ public DefaultWebSocketClient(String restApiUrl, String urlParameters, TokenProv
this.urlParameters = urlParameters;
this.handler = handler;

if (subscribeScopeIds != null) {
this.subscribeScopeIds.addAll(subscribeScopeIds);
}

if (eventFilterMessages != null) {
for (Map filter : eventFilterMessages) {
this.eventFilterMessages.put(getFilterId(filter), filter);
Expand Down Expand Up @@ -316,23 +341,15 @@ private void connect(boolean isReconnecting) {
throw new HiroException("Failed to initialize WebSocketClient. It is null.", 500);
}

if (type == WebsocketType.Event) {
for (Map filter : eventFilterMessages.values()) {
String message = getEventRegisterMessage(filter);
LOG.log(Level.INFO, "Send filter: " + message);
webSocketClient.sendTextFrame(message).get(timeout, TimeUnit.MILLISECONDS);
}
}

running = true;
} catch (Throwable ex) {
closeWs();

if (LOG.isLoggable(Level.FINEST)) {
LOG.log(Level.FINEST, "connection failed " + this, ex);
}

throw new HiroException("connection failed " + this + " " + ex.getMessage(), 400, ex);
throw new HiroException("connection failed " + this + " " + ((ex instanceof TimeoutException)
? "timeout of " + String.valueOf(timeout) + "ms reached" : ex.getMessage()), 400, ex);
}

if (LOG.isLoggable(Level.FINEST)) {
Expand Down Expand Up @@ -464,6 +481,16 @@ private String getEventRegisterMessage(Map filter) {
return message;
}

private String getSubscribeScopeMessage(String scopeId) {
final Map m = HiroCollections.newMap();
m.put("type", "subscribe");
m.put("id", scopeId);

String message = JSONValue.toJSONString(m);

return message;
}

@Override
public synchronized void removeEventFilter(String id) {
final Map m = HiroCollections.newMap();
Expand Down Expand Up @@ -493,6 +520,24 @@ public synchronized void clearEventFilters() {
eventFilterMessages.clear();
}

@Override
public void subscribeScope(String scopeId) {
String message = getSubscribeScopeMessage(scopeId);
LOG.log(Level.INFO, "Subscribe to scopeId: " + message);
sendMessage(message);

subscribeScopeIds.add(scopeId);
}

/**
* This only removes the scope from the internal list since there is no 'unsubscribe'. You need to restart the
* websocket for this change to take effect.
*/
@Override
public void removeScope(String scopeId) {
subscribeScopeIds.remove(scopeId);
}

@Override
public synchronized long sendMessage(String type, Map<String, String> headers, Map body) {
final long id = idCounter.incrementAndGet();
Expand Down
2 changes: 1 addition & 1 deletion java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<artifactId>hiro-client-all</artifactId>
<name>${project.artifactId}</name>
<packaging>pom</packaging>
<version>2.3.1-SNAPSHOT</version>
<version>2.3.2-SNAPSHOT</version>

<properties>
<maven.exec.skip>false</maven.exec.skip>
Expand Down

0 comments on commit 3450428

Please sign in to comment.