Skip to content

Commit

Permalink
Merge pull request #6 from nitram22/master
Browse files Browse the repository at this point in the history
Extension allowing the use of a different certstream server
  • Loading branch information
joshbooks authored Apr 4, 2020
2 parents ab89693 + 6d093bf commit ac82dc0
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 15 deletions.
18 changes: 18 additions & 0 deletions src/example/ExampleClientAlternativeServer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package example;

import com.google.gson.Gson;
import io.calidog.certstream.CertStream;

/**
* Demo Client that prints out a message of a certstream-server with a given URI
* The server address of the CaliDog Server is entered here for demo purposes
*/
public class ExampleClientAlternativeServer {

public static void main (String[] args){

CertStream.onMessageAlternativeServer(msg -> System.out.println(new Gson().toJson(msg)), "wss://certstream.calidog.io");

}

}
10 changes: 10 additions & 0 deletions src/io/calidog/certstream/BoringParts.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@ public BoringParts(CertStreamStringMessageHandler messageHandler, long sleepBefo
webSocketClient = webSocketClientSupplier.get();
}

/**
* Constructor for the connection to an alternative server
* @param serverURI String representation of the alternative server's address
*/
public BoringParts(CertStreamStringMessageHandler messageHandler, long sleepBeforeReconnect, String serverURI){
this.sleepBeforeReconnect = sleepBeforeReconnect;
this.webSocketClientSupplier = () -> defaultImplFactory.make(messageHandler, serverURI);
this.webSocketClient = webSocketClientSupplier.get();
}

//todo reconnection logic
@Override
public void onClose(int i, String s, boolean b) {
Expand Down
84 changes: 73 additions & 11 deletions src/io/calidog/certstream/CertStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,24 @@ public static void onMessageString(Consumer<String> handler)
{
BoringParts theBoringParts = new BoringParts(handler::accept);

while (theBoringParts.isNotClosed())
{
Thread.yield();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {}
runThread(theBoringParts);

}).start();
}

/**
* @param handler A {@link Consumer<String>} that we'll
* run in a Thread that stays alive as long
* as the WebSocket stays open
* @param serverURI Address of the server to which a WebSocket
* connection is established
*/
public static void onMessageStringAlternativeServer(Consumer<String> handler, String serverURI){
new Thread(() -> {
BoringParts theBoringParts = new BoringParts(handler::accept, 0 , serverURI);

runThread(theBoringParts);

if (Thread.interrupted())
{
break;
}
}
}).start();
}

Expand Down Expand Up @@ -80,4 +86,60 @@ public static void onMessage(CertStreamMessageHandler handler)
handler.onMessage(fullMsg);
});
}

/**
* @param handler A {@link Consumer<CertStreamMessage>} that we'll
* run in a Thread that stays alive as long
* as the WebSocket stays open
* @param serverURI Address of the server to which a WebSocket
* connection is established
*/
public static void onMessageAlternativeServer (CertStreamMessageHandler handler, String serverURI){
onMessageStringAlternativeServer(string -> {
CertStreamMessagePOJO msg;

try {
msg = new Gson().fromJson(string, CertStreamMessagePOJO.class);

if (msg.messageType.equalsIgnoreCase("heartbeat")) {
return;
}
} catch (JsonSyntaxException e) {
System.out.println(e.getMessage());
logger.warn("onMessageAlternativeServer had an exception parsing some json", e);
return;
}

CertStreamMessage fullMsg;

try {
fullMsg = CertStreamMessage.fromPOJO(msg);
} catch (CertificateException e){
logger.warn("Encountered a CertificateException", e);
return;
}

handler.onMessage(fullMsg);
}, serverURI);
}

/**
* Runs thread until WebSocket is closed
* @param theBoringParts Websocket connection that is checked
* to see if it has been closed
*/
public static void runThread(BoringParts theBoringParts) {
while (theBoringParts.isNotClosed())
{
Thread.yield();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {}

if (Thread.interrupted())
{
break;
}
}
}
}
10 changes: 10 additions & 0 deletions src/io/calidog/certstream/CertStreamClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ public CertStreamClientImpl(CertStreamClient client) {
this.connect();
}

/**
* Constructor for alternative Server
* @param serverURI String representation of server-address
*/
public CertStreamClientImpl(CertStreamClient client, String serverURI) throws URISyntaxException {
super(new URI(serverURI));
this.client = client;
this.connect();
}

public void onOpen(ServerHandshake serverHandshake) {
client.onOpen(serverHandshake);
}
Expand Down
37 changes: 33 additions & 4 deletions src/io/calidog/certstream/CertStreamClientImplFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import org.java_websocket.handshake.ServerHandshake;

import java.net.URISyntaxException;

/**
* A bundler around a functional interface. This is the little bit of magic
* that lets you just pass in a {@link java.util.function.Consumer} and have
Expand Down Expand Up @@ -30,12 +32,39 @@ public CertStreamClientImplFactory(CertStreamOpenHandler openHandler,

/**
* @param messageHandler The handler for onMessage
* @return a complete CertStreamClientImpl made out of a function
* and the handlers this Object was initialized with
* @return CertStreamClientImpl connected to the
* default sever ("wss://certstream.calidog.io")
*/
public CertStreamClientImpl make(CertStreamStringMessageHandler messageHandler)
{
return new CertStreamClientImpl(new CertStreamClient() {
return new CertStreamClientImpl(createClient(messageHandler));
}

/**
*
* @param messageHandler The handler for onMessage
* @param serverURI String representation of the address of the alternative server
* @return CertStreamClientImpl connected to an
* alternative Certstream-Server
*/
public CertStreamClientImpl make(CertStreamStringMessageHandler messageHandler, String serverURI){
CertStreamClientImpl certStreamClient = null;
try {
certStreamClient = new CertStreamClientImpl(createClient(messageHandler), serverURI);
} catch (URISyntaxException e) {
e.printStackTrace();
}
return certStreamClient;
}

/**
*
* @param messageHandler The handler for onMessage
* @return a complete CertStreamClientImpl made out of a function
* and the handlers this Object was initialized with
*/
public CertStreamClient createClient(CertStreamStringMessageHandler messageHandler){
return new CertStreamClient() {
@Override
public void onOpen(ServerHandshake serverHandshake) {
openHandler.onOpen(serverHandshake);
Expand All @@ -55,7 +84,7 @@ public void onClose(int i, String s, boolean b) {
public void onError(Exception e) {
errorHandler.onError(e);
}
});
};
}


Expand Down

0 comments on commit ac82dc0

Please sign in to comment.