Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add Web Socket Forwarder from connection Web Client to Agent for HTTPS support #414

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
##################
SONIC_SERVER_HOST=192.168.1.1
SONIC_SERVER_PORT=3000
SONIC_SERVER_HTTPS=false
SONIC_SERVER_HTTPS_PORT=443
SONIC_EUREKA_USERNAME=sonic
SONIC_EUREKA_PASSWORD=sonic
SONIC_EUREKA_PORT=8761
Expand Down
4 changes: 4 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,17 @@ services:
environment:
- SONIC_SERVER_HOST
- SONIC_SERVER_PORT
- SONIC_SERVER_HTTPS
volumes:
- ./certs:/etc/nginx/certs
networks:
- sonic-network
depends_on:
- sonic-server-gateway
restart: on-failure
ports:
- "${SONIC_SERVER_PORT}:80"
- "${SONIC_SERVER_HTTPS_PORT}:443"

networks:
sonic-network:
Expand Down
5 changes: 5 additions & 0 deletions sonic-server-controller/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.java-websocket</groupId>
<artifactId>Java-WebSocket</artifactId>
<version>1.5.3</version>
</dependency>
<!-- undertow -->
<dependency>
<groupId>org.springframework.boot</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;

Expand All @@ -40,4 +41,17 @@ public static void sendText(Session session, String message) {
}
}
}

public static void sendByte(Session session, ByteBuffer message) {
if (session == null || !session.isOpen()) {
return;
}
synchronized (session) {
try {
session.getBasicRemote().sendBinary(message);
} catch (IllegalStateException | IOException e) {
log.error("WebSocket send msg error...connection has been closed.");
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package org.cloud.sonic.controller.transport;


import java.net.URI;
import java.util.HashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.io.StringWriter;
import java.io.PrintWriter;

import org.cloud.sonic.controller.config.WsEndpointConfigure;
import org.springframework.stereotype.Component;
import org.java_websocket.enums.ReadyState;

import jakarta.websocket.OnClose;
import jakarta.websocket.OnError;
import jakarta.websocket.OnMessage;
import jakarta.websocket.OnOpen;
import jakarta.websocket.Session;
import jakarta.websocket.server.PathParam;
import jakarta.websocket.server.ServerEndpoint;
import lombok.extern.slf4j.Slf4j;

@Component
@Slf4j
@ServerEndpoint(value = "/hub/{host}/{port}/{service}/{key}/{udId}", configurator = WsEndpointConfigure.class)
public class HubAudioServer {
HashMap<Session,TransportClient> hubMap = new HashMap<Session,TransportClient>();
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();

@OnOpen
public void onOpen(Session session, @PathParam("host") String host, @PathParam("port") String port, @PathParam("service") String service, @PathParam("key") String key, @PathParam("udId") String udId) {
String agentUri = String.format("ws://%s:%s/websockets/%s/%s/%s", host, port, service, key, udId);

try {
URI uri = new URI(agentUri);
TransportClient agent = new TransportClient(session, uri);
agent.connect();
hubMap.put(session, agent);
log.info(String.format("Hub Server: Connected to agent\nUrl=%s", agentUri));
}
catch(Exception ex)
{
StringWriter sw = new StringWriter();
ex.printStackTrace(new PrintWriter(sw));
log.error(String.format("Hub Server: Failed to connect agent!\nUrl=%s\nError=%s", agentUri, sw.toString()));
}
}

@OnMessage
public void onMessage(Session session, String message) {
if (hubMap.get(session).getReadyState() == ReadyState.NOT_YET_CONNECTED)
{
cachedThreadPool.execute(() -> {
boolean needRetry = true;
while (needRetry) {
try {
if (hubMap.get(session).getReadyState() == ReadyState.OPEN) {
log.info(String.format("Hub Server: Messages forwarded to agent\nUrl=%s", hubMap.get(session).getUrl()));
hubMap.get(session).send(message);
needRetry = false;
} else if (hubMap.get(session).getReadyState() == ReadyState.NOT_YET_CONNECTED) {
log.info(String.format("Hub Server: Wait agent in 1s\nStatus=%s\nUrl=%s", hubMap.get(session).getReadyState().toString(), hubMap.get(session).getUrl()));
Thread.sleep(1000);
} else {
log.info(String.format("Hub Server: Agent closed\nStatus=%s\nUrl=%s", hubMap.get(session).getReadyState().toString(), hubMap.get(session).getUrl()));
needRetry = false;
}

} catch (Exception error) {
needRetry = false;
StringWriter sw = new StringWriter();
error.printStackTrace(new PrintWriter(sw));
log.error(String.format("Hub Server: Fail to forward\nStatus=%s\nUrl=%s\nMessages=%s\nError=%s", hubMap.get(session).getReadyState().toString(), hubMap.get(session).getUrl(), message, sw.toString()));
}
}
});
}
else
{
log.info(String.format("Hub Server: Messages forwarded to agent\nUrl=%s", hubMap.get(session).getUrl()));
hubMap.get(session).send(message);
}
}

@OnClose
public void onClose(Session session) {
log.info(String.format("Hub Server: Closed\nUrl=%s", hubMap.get(session).getUrl()));
hubMap.get(session).close();
hubMap.remove(session);
}

@OnError
public void onError(Session session, Throwable error) {
StringWriter sw = new StringWriter();
error.printStackTrace(new PrintWriter(sw));
log.error(String.format("Hub Server: Error caught!\nUrl=%s\nError=%s", hubMap.get(session).getUrl(), sw.toString()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package org.cloud.sonic.controller.transport;


import java.net.URI;
import java.util.HashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.io.StringWriter;
import java.io.PrintWriter;

import org.cloud.sonic.controller.config.WsEndpointConfigure;
import org.springframework.stereotype.Component;
import org.java_websocket.enums.ReadyState;

import jakarta.websocket.OnClose;
import jakarta.websocket.OnError;
import jakarta.websocket.OnMessage;
import jakarta.websocket.OnOpen;
import jakarta.websocket.Session;
import jakarta.websocket.server.PathParam;
import jakarta.websocket.server.ServerEndpoint;
import lombok.extern.slf4j.Slf4j;

@Component
@Slf4j
@ServerEndpoint(value = "/hub/{host}/{port}/{platform}/{key}/{udId}/{token}", configurator = WsEndpointConfigure.class)
public class HubServer {
HashMap<Session,TransportClient> hubMap = new HashMap<Session,TransportClient>();
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();

@OnOpen
public void onOpen(Session session, @PathParam("host") String host, @PathParam("port") String port, @PathParam("platform") String platform, @PathParam("key") String key, @PathParam("udId") String udId, @PathParam("token") String token) {
String agentUri = String.format("ws://%s:%s/websockets/%s/%s/%s/%s", host, port, platform, key, udId, token);

try {
URI uri = new URI(agentUri);
TransportClient agent = new TransportClient(session, uri);
agent.connect();
hubMap.put(session, agent);
log.info(String.format("Hub Server: Connected to agent\nUrl=%s", agentUri));
}
catch(Exception ex)
{
StringWriter sw = new StringWriter();
ex.printStackTrace(new PrintWriter(sw));
log.error(String.format("Hub Server: Failed to connect agent!\nUrl=%s\nError=%s", agentUri, sw.toString()));
}
}

@OnMessage
public void onMessage(Session session, String message) {
if (hubMap.get(session).getReadyState() == ReadyState.NOT_YET_CONNECTED)
{
cachedThreadPool.execute(() -> {
boolean needRetry = true;
while (needRetry) {
try {
if (hubMap.get(session).getReadyState() == ReadyState.OPEN) {
log.info(String.format("Hub Server: Messages forwarded to agent\nUrl=%s", hubMap.get(session).getUrl()));
hubMap.get(session).send(message);
needRetry = false;
} else if (hubMap.get(session).getReadyState() == ReadyState.NOT_YET_CONNECTED) {
log.info(String.format("Hub Server: Wait agent in 1s\nStatus=%s\nUrl=%s", hubMap.get(session).getReadyState().toString(), hubMap.get(session).getUrl()));
Thread.sleep(1000);
} else {
log.info(String.format("Hub Server: Agent closed\nStatus=%s\nUrl=%s", hubMap.get(session).getReadyState().toString(), hubMap.get(session).getUrl()));
needRetry = false;
}

} catch (Exception error) {
needRetry = false;
StringWriter sw = new StringWriter();
error.printStackTrace(new PrintWriter(sw));
log.error(String.format("Hub Server: Fail to forward\nStatus=%s\nUrl=%s\nMessages=%s\nError=%s", hubMap.get(session).getReadyState().toString(), hubMap.get(session).getUrl(), message, sw.toString()));
}
}
});
}
else
{
log.info(String.format("Hub Server: Messages forwarded to agent\nUrl=%s", hubMap.get(session).getUrl()));
hubMap.get(session).send(message);
}
}

@OnClose
public void onClose(Session session) {
log.info(String.format("Hub Server: Closed\nUrl=%s", hubMap.get(session).getUrl()));
hubMap.get(session).close();
hubMap.remove(session);
}

@OnError
public void onError(Session session, Throwable error) {
StringWriter sw = new StringWriter();
error.printStackTrace(new PrintWriter(sw));
log.error(String.format("Hub Server: Error caught!\nUrl=%s\nError=%s", hubMap.get(session).getUrl(), sw.toString()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package org.cloud.sonic.controller.transport;


import java.net.URI;
import java.util.HashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.io.StringWriter;
import java.io.PrintWriter;

import org.cloud.sonic.controller.config.WsEndpointConfigure;
import org.springframework.stereotype.Component;
import org.java_websocket.enums.ReadyState;

import jakarta.websocket.OnClose;
import jakarta.websocket.OnError;
import jakarta.websocket.OnMessage;
import jakarta.websocket.OnOpen;
import jakarta.websocket.Session;
import jakarta.websocket.server.PathParam;
import jakarta.websocket.server.ServerEndpoint;
import lombok.extern.slf4j.Slf4j;

@Component
@Slf4j
@ServerEndpoint(value = "/hub/{host}/{port}/{platform}/{service}/{key}/{udId}/{token}", configurator = WsEndpointConfigure.class)
public class HubServiceServer {
HashMap<Session,TransportClient> hubMap = new HashMap<Session,TransportClient>();
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();

@OnOpen
public void onOpen(Session session, @PathParam("host") String host, @PathParam("port") String port, @PathParam("platform") String platform, @PathParam("service") String service, @PathParam("key") String key, @PathParam("udId") String udId, @PathParam("token") String token) {
String agentUri = String.format("ws://%s:%s/websockets/%s/%s/%s/%s/%s", host, port, platform, service, key, udId, token);

try {
URI uri = new URI(agentUri);
TransportClient agent = new TransportClient(session, uri);
agent.connect();
hubMap.put(session, agent);
log.info(String.format("Hub Server: Connected to agent\nUrl=%s", agentUri));
}
catch(Exception ex)
{
StringWriter sw = new StringWriter();
ex.printStackTrace(new PrintWriter(sw));
log.error(String.format("Hub Server: Failed to connect agent!\nUrl=%s\nError=%s", agentUri, sw.toString()));
}
}

@OnMessage
public void onMessage(Session session, String message) {
if (hubMap.get(session).getReadyState() == ReadyState.NOT_YET_CONNECTED)
{
cachedThreadPool.execute(() -> {
boolean needRetry = true;
while (needRetry) {
try {
if (hubMap.get(session).getReadyState() == ReadyState.OPEN) {
log.info(String.format("Hub Server: Messages forwarded to agent\nUrl=%s", hubMap.get(session).getUrl()));
hubMap.get(session).send(message);
needRetry = false;
} else if (hubMap.get(session).getReadyState() == ReadyState.NOT_YET_CONNECTED) {
log.info(String.format("Hub Server: Wait agent in 1s\nStatus=%s\nUrl=%s", hubMap.get(session).getReadyState().toString(), hubMap.get(session).getUrl()));
Thread.sleep(1000);
} else {
log.info(String.format("Hub Server: Agent closed\nStatus=%s\nUrl=%s", hubMap.get(session).getReadyState().toString(), hubMap.get(session).getUrl()));
needRetry = false;
}

} catch (Exception error) {
needRetry = false;
StringWriter sw = new StringWriter();
error.printStackTrace(new PrintWriter(sw));
log.error(String.format("Hub Server: Fail to forward\nStatus=%s\nUrl=%s\nMessages=%s\nError=%s", hubMap.get(session).getReadyState().toString(), hubMap.get(session).getUrl(), message, sw.toString()));
}
}
});
}
else
{
log.info(String.format("Hub Server: Messages forwarded to agent\nUrl=%s", hubMap.get(session).getUrl()));
hubMap.get(session).send(message);
}
}

@OnClose
public void onClose(Session session) {
log.info(String.format("Hub Server: Closed\nUrl=%s", hubMap.get(session).getUrl()));
hubMap.get(session).close();
hubMap.remove(session);
}

@OnError
public void onError(Session session, Throwable error) {
StringWriter sw = new StringWriter();
error.printStackTrace(new PrintWriter(sw));
log.error(String.format("Hub Server: Error caught!\nUrl=%s\nError=%s", hubMap.get(session).getUrl(), sw.toString()));
}
}
Loading