From 3e876c978cc6456429c1585c1e4b451092a30768 Mon Sep 17 00:00:00 2001 From: NuttyShrimp Date: Wed, 20 Mar 2024 20:54:56 +0100 Subject: [PATCH 01/12] refactor(station): replace timed http request with websockets --- src/main/java/telraam/station/Fetcher.java | 150 +++++++----------- .../java/telraam/station/JsonBodyHandler.java | 43 ----- .../telraam/station/models/RonnyResponse.java | 12 -- 3 files changed, 55 insertions(+), 150 deletions(-) delete mode 100644 src/main/java/telraam/station/JsonBodyHandler.java delete mode 100644 src/main/java/telraam/station/models/RonnyResponse.java diff --git a/src/main/java/telraam/station/Fetcher.java b/src/main/java/telraam/station/Fetcher.java index fbd51b4..a8a460a 100644 --- a/src/main/java/telraam/station/Fetcher.java +++ b/src/main/java/telraam/station/Fetcher.java @@ -1,6 +1,9 @@ package telraam.station; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; import org.jdbi.v3.core.Jdbi; +import com.fasterxml.jackson.databind.ObjectMapper; import telraam.database.daos.BatonDAO; import telraam.database.daos.DetectionDAO; import telraam.database.daos.StationDAO; @@ -12,19 +15,14 @@ import telraam.station.models.RonnyDetection; import telraam.station.models.RonnyResponse; -import java.io.IOException; -import java.net.ConnectException; import java.net.URI; import java.net.URISyntaxException; -import java.net.http.HttpClient; -import java.net.http.HttpConnectTimeoutException; -import java.net.http.HttpRequest; -import java.net.http.HttpResponse; +import java.net.http.*; import java.sql.Timestamp; -import java.time.Duration; import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.function.Function; -import java.util.function.Supplier; import java.util.logging.Logger; import java.util.stream.Collectors; @@ -62,7 +60,7 @@ public Fetcher(Jdbi database, Station station, Set lappers, Set bodyHandler = new JsonBodyHandler<>(RonnyResponse.class); + ObjectMapper mapper = new ObjectMapper(); while (true) { //Update the station to account for possible changes in the database @@ -81,7 +79,7 @@ public void fetch() { //Create URL URI url; try { - url = new URI(station.getUrl() + "/detections/" + lastDetectionId); + url = new URI(station.getUrl() + "/ws"); } catch (URISyntaxException ex) { this.logger.severe(ex.getMessage()); try { @@ -92,91 +90,53 @@ public void fetch() { continue; } - //Create request - HttpRequest request; - try { - request = HttpRequest.newBuilder() - .uri(url) - .version(HttpClient.Version.HTTP_1_1) - .timeout(Duration.ofSeconds(Fetcher.REQUEST_TIMEOUT_S)) - .build(); - } catch (IllegalArgumentException e) { - logger.severe(e.getMessage()); - try { - Thread.sleep(Fetcher.ERROR_TIMEOUT_MS); - } catch (InterruptedException ex) { - logger.severe(ex.getMessage()); - } - continue; - } - - //Do request - HttpResponse> response; - try { - try { - response = this.client.send(request, bodyHandler); - } catch (ConnectException | HttpConnectTimeoutException ex) { - this.logger.severe("Could not connect to " + request.uri()); - Thread.sleep(Fetcher.ERROR_TIMEOUT_MS); - continue; - } catch (IOException e) { - logger.severe(e.getMessage()); - Thread.sleep(Fetcher.ERROR_TIMEOUT_MS); - continue; - } - } catch (InterruptedException e) { - logger.severe(e.getMessage()); - continue; - } - - //Check response state - if (response.statusCode() != 200) { - this.logger.warning( - "Unexpected status code(" + response.statusCode() + ") when requesting " + url + " for station(" + this.station.getName() + ")" - ); - continue; - } - - //Fetch all batons and create a map by batonMAC - Map baton_mac_map = batonDAO.getAll().stream() - .collect(Collectors.toMap(b -> b.getMac().toUpperCase(), Function.identity())); - - //Insert detections - List new_detections = new ArrayList<>(); - List detections = response.body().get().detections; - for (RonnyDetection detection : detections) { - if (baton_mac_map.containsKey(detection.mac.toUpperCase())) { - var baton = baton_mac_map.get(detection.mac.toUpperCase()); - new_detections.add(new Detection( - baton.getId(), - station.getId(), - detection.rssi, - detection.battery, - detection.uptimeMs, - detection.id, - new Timestamp((long) (detection.detectionTimestamp * 1000)), - new Timestamp(System.currentTimeMillis()) - )); - } - } - if (!new_detections.isEmpty()) { - detectionDAO.insertAll(new_detections); - new_detections.forEach((detection) -> { - lappers.forEach((lapper) -> lapper.handle(detection)); - positioners.forEach((positioner) -> positioner.handle(detection)); - }); - } - - this.logger.finer("Fetched " + detections.size() + " detections from " + station.getName() + ", Saved " + new_detections.size()); - - //If few detections are retrieved from the station, wait for some time. - if (detections.size() < Fetcher.FULL_BATCH_SIZE) { - try { - Thread.sleep(Fetcher.IDLE_TIMEOUT_MS); - } catch (InterruptedException e) { - logger.severe(e.getMessage()); + CompletableFuture ws = this.client.newWebSocketBuilder().buildAsync(URI.create("ws://websocket.example.com"), new WebSocket.Listener() { + @Override + public CompletionStage onText(WebSocket webSocket, CharSequence data, boolean last) { + //Fetch all batons and create a map by batonMAC + Map baton_mac_map = batonDAO.getAll().stream() + .collect(Collectors.toMap(b -> b.getMac().toUpperCase(), Function.identity())); + + //Insert detections + List new_detections = new ArrayList<>(); + + try { + List detections = Arrays.asList(mapper.readValue(data.toString(), RonnyDetection[].class)); + for (RonnyDetection detection : detections) { + if (baton_mac_map.containsKey(detection.mac.toUpperCase())) { + var baton = baton_mac_map.get(detection.mac.toUpperCase()); + new_detections.add(new Detection( + baton.getId(), + station.getId(), + detection.rssi, + detection.battery, + detection.uptimeMs, + detection.id, + new Timestamp((long) (detection.detectionTimestamp * 1000)), + new Timestamp(System.currentTimeMillis()) + )); + } + } + if (!new_detections.isEmpty()) { + detectionDAO.insertAll(new_detections); + new_detections.forEach((detection) -> lappers.forEach((lapper) -> lapper.handle(detection))); + } + + logger.finer("Fetched " + detections.size() + " detections from " + station.getName() + ", Saved " + new_detections.size()); + + //If few detections are retrieved from the station, wait for some time. + if (detections.size() < Fetcher.FULL_BATCH_SIZE) { + try { + Thread.sleep(Fetcher.IDLE_TIMEOUT_MS); + } catch (InterruptedException e) { + logger.severe(e.getMessage()); + } + } + } catch (JsonProcessingException e) { + logger.severe(e.getMessage()); + } + return null; } - } + }); } } -} \ No newline at end of file diff --git a/src/main/java/telraam/station/JsonBodyHandler.java b/src/main/java/telraam/station/JsonBodyHandler.java deleted file mode 100644 index 2d28e7f..0000000 --- a/src/main/java/telraam/station/JsonBodyHandler.java +++ /dev/null @@ -1,43 +0,0 @@ -package telraam.station; - -import com.fasterxml.jackson.databind.ObjectMapper; - -import java.io.IOException; -import java.io.InputStream; -import java.io.UncheckedIOException; -import java.net.http.HttpResponse; -import java.util.function.Supplier; - -public class JsonBodyHandler implements HttpResponse.BodyHandler> { - - private static final ObjectMapper om = new ObjectMapper(); - private final Class targetClass; - - public JsonBodyHandler(Class targetClass) { - this.targetClass = targetClass; - } - - @Override - public HttpResponse.BodySubscriber> apply(HttpResponse.ResponseInfo responseInfo) { - return asJSON(this.targetClass); - } - - - public static HttpResponse.BodySubscriber> asJSON(Class targetType) { - HttpResponse.BodySubscriber upstream = HttpResponse.BodySubscribers.ofInputStream(); - - return HttpResponse.BodySubscribers.mapping( - upstream, - inputStream -> toSupplierOfType(inputStream, targetType)); - } - - public static Supplier toSupplierOfType(InputStream inputStream, Class targetType) { - return () -> { - try (InputStream stream = inputStream) { - return om.readValue(stream, targetType); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - }; - } -} \ No newline at end of file diff --git a/src/main/java/telraam/station/models/RonnyResponse.java b/src/main/java/telraam/station/models/RonnyResponse.java deleted file mode 100644 index 726cafc..0000000 --- a/src/main/java/telraam/station/models/RonnyResponse.java +++ /dev/null @@ -1,12 +0,0 @@ -package telraam.station.models; - -import com.fasterxml.jackson.annotation.JsonProperty; - -import java.util.List; - -public class RonnyResponse { - public List detections; - - @JsonProperty("station_id") - public String stationRonnyName; -} From 44e0c3a64abeedf9f8dee12441533a4dcc1acfde Mon Sep 17 00:00:00 2001 From: NuttyShrimp Date: Wed, 20 Mar 2024 21:36:30 +0100 Subject: [PATCH 02/12] feat(station): send InitMessage on open connection --- src/main/java/telraam/station/Fetcher.java | 32 ++++++++++++++-------- 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/src/main/java/telraam/station/Fetcher.java b/src/main/java/telraam/station/Fetcher.java index a8a460a..6a2efa6 100644 --- a/src/main/java/telraam/station/Fetcher.java +++ b/src/main/java/telraam/station/Fetcher.java @@ -1,7 +1,6 @@ package telraam.station; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.type.TypeReference; import org.jdbi.v3.core.Jdbi; import com.fasterxml.jackson.databind.ObjectMapper; import telraam.database.daos.BatonDAO; @@ -13,7 +12,6 @@ import telraam.logic.lapper.Lapper; import telraam.logic.positioner.Positioner; import telraam.station.models.RonnyDetection; -import telraam.station.models.RonnyResponse; import java.net.URI; import java.net.URISyntaxException; @@ -47,6 +45,13 @@ public class Fetcher { //Timeout when result has less than a full batch of detections. private final static int IDLE_TIMEOUT_MS = 4000; // Wait 4 seconds + private class InitWSMessage { + public int lastId; + public InitWSMessage(int lastId) { + this.lastId = lastId; + } + } + public Fetcher(Jdbi database, Station station, Set lappers, Set positioners) { this.batonDAO = database.onDemand(BatonDAO.class); @@ -76,6 +81,8 @@ public void fetch() { lastDetectionId = lastDetection.get().getRemoteId(); } + InitWSMessage wsMessage = new InitWSMessage(lastDetectionId); + //Create URL URI url; try { @@ -90,7 +97,16 @@ public void fetch() { continue; } - CompletableFuture ws = this.client.newWebSocketBuilder().buildAsync(URI.create("ws://websocket.example.com"), new WebSocket.Listener() { + CompletableFuture ws = this.client.newWebSocketBuilder().buildAsync(url, new WebSocket.Listener() { + @Override + public void onOpen(WebSocket webSocket) { + try { + webSocket.sendText(mapper.writeValueAsString(wsMessage), true); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + @Override public CompletionStage onText(WebSocket webSocket, CharSequence data, boolean last) { //Fetch all batons and create a map by batonMAC @@ -123,15 +139,6 @@ public CompletionStage onText(WebSocket webSocket, CharSequence data, boolean } logger.finer("Fetched " + detections.size() + " detections from " + station.getName() + ", Saved " + new_detections.size()); - - //If few detections are retrieved from the station, wait for some time. - if (detections.size() < Fetcher.FULL_BATCH_SIZE) { - try { - Thread.sleep(Fetcher.IDLE_TIMEOUT_MS); - } catch (InterruptedException e) { - logger.severe(e.getMessage()); - } - } } catch (JsonProcessingException e) { logger.severe(e.getMessage()); } @@ -140,3 +147,4 @@ public CompletionStage onText(WebSocket webSocket, CharSequence data, boolean }); } } +} From 734f5c03082164b308e05a6c29540f275eae9f87 Mon Sep 17 00:00:00 2001 From: NuttyShrimp Date: Fri, 29 Mar 2024 17:50:21 +0100 Subject: [PATCH 03/12] refactor: Use jakarta websockets --- build.gradle | 5 + .../telraam/database/daos/DetectionDAO.java | 7 + src/main/java/telraam/station/Fetcher.java | 175 +++++++++--------- .../java/telraam/station/WebsocketClient.java | 58 ++++++ 4 files changed, 153 insertions(+), 92 deletions(-) create mode 100644 src/main/java/telraam/station/WebsocketClient.java diff --git a/build.gradle b/build.gradle index 6800130..ba9fd31 100644 --- a/build.gradle +++ b/build.gradle @@ -66,6 +66,11 @@ dependencies { 'org.eclipse.jetty.websocket:websocket-jetty-api:' + jettyVersion, 'org.eclipse.jetty.websocket:websocket-jetty-server:' + jettyVersion, ) + + // Websocket client libs + compileOnly 'jakarta.websocket:jakarta.websocket-client-api:2.2.0-M1' + implementation 'org.glassfish.tyrus.bundles:tyrus-standalone-client:2.2.0-M1' + // Database implementation('com.h2database:h2:2.2.220') implementation('org.postgresql:postgresql:42.7.3') diff --git a/src/main/java/telraam/database/daos/DetectionDAO.java b/src/main/java/telraam/database/daos/DetectionDAO.java index c50452e..fba506f 100644 --- a/src/main/java/telraam/database/daos/DetectionDAO.java +++ b/src/main/java/telraam/database/daos/DetectionDAO.java @@ -33,6 +33,13 @@ INSERT INTO detection (station_id, baton_id, timestamp, rssi, battery, remote_id @GetGeneratedKeys({"id"}) int insertAll(@BindBean List detection); + @SqlBatch(""" + INSERT INTO detection (station_id, baton_id, timestamp, rssi, battery, remote_id, uptime_ms, timestamp_ingestion) \ + VALUES (:stationId, (SELECT id FROM baton WHERE :batonMac), :timestamp, :rssi, :battery, :remoteId, :uptimeMs, :timestampIngestion) + """) + @GetGeneratedKeys({"id"}) + int insertAllWithoutBaton(@BindBean List detection, @Bind("batonMac") List batonMac); + @SqlQuery("SELECT * FROM detection WHERE id = :id") @RegisterBeanMapper(Detection.class) Optional getById(@Bind("id") int id); diff --git a/src/main/java/telraam/station/Fetcher.java b/src/main/java/telraam/station/Fetcher.java index 6a2efa6..c8b5c34 100644 --- a/src/main/java/telraam/station/Fetcher.java +++ b/src/main/java/telraam/station/Fetcher.java @@ -6,7 +6,6 @@ import telraam.database.daos.BatonDAO; import telraam.database.daos.DetectionDAO; import telraam.database.daos.StationDAO; -import telraam.database.models.Baton; import telraam.database.models.Detection; import telraam.database.models.Station; import telraam.logic.lapper.Lapper; @@ -18,13 +17,11 @@ import java.net.http.*; import java.sql.Timestamp; import java.util.*; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; -import java.util.function.Function; import java.util.logging.Logger; -import java.util.stream.Collectors; public class Fetcher { + //Timeout to wait for before sending the next request after an error. + private final static int ERROR_TIMEOUT_MS = 2000; private final Set lappers; private final Set positioners; private Station station; @@ -36,23 +33,6 @@ public class Fetcher { private final HttpClient client = HttpClient.newHttpClient(); private final Logger logger = Logger.getLogger(Fetcher.class.getName()); - //Timeout to wait for before sending the next request after an error. - private final static int ERROR_TIMEOUT_MS = 2000; - //Timeout for a request to a station. - private final static int REQUEST_TIMEOUT_S = 10; - //Full batch size, if this number of detections is reached, more are probably available immediately. - private final static int FULL_BATCH_SIZE = 1000; - //Timeout when result has less than a full batch of detections. - private final static int IDLE_TIMEOUT_MS = 4000; // Wait 4 seconds - - private class InitWSMessage { - public int lastId; - public InitWSMessage(int lastId) { - this.lastId = lastId; - } - } - - public Fetcher(Jdbi database, Station station, Set lappers, Set positioners) { this.batonDAO = database.onDemand(BatonDAO.class); this.detectionDAO = database.onDemand(DetectionDAO.class); @@ -67,84 +47,95 @@ public void fetch() { logger.info("Running Fetcher for station(" + this.station.getId() + ")"); ObjectMapper mapper = new ObjectMapper(); - while (true) { - //Update the station to account for possible changes in the database - this.stationDAO.getById(station.getId()).ifPresentOrElse( - station -> this.station = station, - () -> this.logger.severe("Can't update station from database.") - ); - - //Get last detection id - int lastDetectionId = 0; - Optional lastDetection = detectionDAO.latestDetectionByStationId(this.station.getId()); - if (lastDetection.isPresent()) { - lastDetectionId = lastDetection.get().getRemoteId(); + //Update the station to account for possible changes in the database + this.stationDAO.getById(station.getId()).ifPresentOrElse( + station -> this.station = station, + () -> this.logger.severe("Can't update station from database.") + ); + + //Get last detection id + int lastDetectionId = 0; + Optional lastDetection = detectionDAO.latestDetectionByStationId(this.station.getId()); + if (lastDetection.isPresent()) { + lastDetectionId = lastDetection.get().getRemoteId(); + } + + InitWSMessage wsMessage = new InitWSMessage(lastDetectionId); + String wsMessageEncoded; + try { + wsMessageEncoded = mapper.writeValueAsString(wsMessage); + } catch (JsonProcessingException e) { + logger.severe(e.getMessage()); + try { + Thread.sleep(Fetcher.ERROR_TIMEOUT_MS); + } catch (InterruptedException ex) { + logger.severe(ex.getMessage()); + } + this.fetch(); + return; + } + + //Create URL + URI url; + try { + URI stationUrl = URI.create(station.getUrl()); + url = new URI("ws", stationUrl.getHost(), "/detections", ""); + } catch (URISyntaxException ex) { + this.logger.severe(ex.getMessage()); + try { + Thread.sleep(Fetcher.ERROR_TIMEOUT_MS); + } catch (InterruptedException e) { + logger.severe(e.getMessage()); } + this.fetch(); + return; + } - InitWSMessage wsMessage = new InitWSMessage(lastDetectionId); + WebsocketClient websocketClient = new WebsocketClient(url); + websocketClient.addOnOpenHandler(() -> { + websocketClient.sendMessage(wsMessageEncoded); + }); + websocketClient.addMessageHandler((String msg) -> { + //Insert detections + List new_detections = new ArrayList<>(); + List detection_mac_addresses = new ArrayList<>(); + logger.info("Received message on WS"); - //Create URL - URI url; try { - url = new URI(station.getUrl() + "/ws"); - } catch (URISyntaxException ex) { - this.logger.severe(ex.getMessage()); - try { - Thread.sleep(Fetcher.ERROR_TIMEOUT_MS); - } catch (InterruptedException e) { - logger.severe(e.getMessage()); + List detections = Arrays.asList(mapper.readValue(msg, RonnyDetection[].class)); + for (RonnyDetection detection : detections) { + new_detections.add(new Detection( + 0, + station.getId(), + detection.rssi, + detection.battery, + detection.uptimeMs, + detection.id, + new Timestamp((long) (detection.detectionTimestamp * 1000)), + new Timestamp(System.currentTimeMillis()) + )); + detection_mac_addresses.add(detection.mac); } - continue; + if (!new_detections.isEmpty()) { + detectionDAO.insertAllWithoutBaton(new_detections, detection_mac_addresses); + new_detections.forEach((detection) -> { + lappers.forEach((lapper) -> lapper.handle(detection)); + positioners.forEach(positioner -> positioner.handle(detection)); + }); + } + + logger.finer("Fetched " + detections.size() + " detections from " + station.getName() + ", Saved " + new_detections.size()); + } catch (JsonProcessingException e) { + logger.severe(e.getMessage()); } + }); + } - CompletableFuture ws = this.client.newWebSocketBuilder().buildAsync(url, new WebSocket.Listener() { - @Override - public void onOpen(WebSocket webSocket) { - try { - webSocket.sendText(mapper.writeValueAsString(wsMessage), true); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - } + private class InitWSMessage { + public int lastId; - @Override - public CompletionStage onText(WebSocket webSocket, CharSequence data, boolean last) { - //Fetch all batons and create a map by batonMAC - Map baton_mac_map = batonDAO.getAll().stream() - .collect(Collectors.toMap(b -> b.getMac().toUpperCase(), Function.identity())); - - //Insert detections - List new_detections = new ArrayList<>(); - - try { - List detections = Arrays.asList(mapper.readValue(data.toString(), RonnyDetection[].class)); - for (RonnyDetection detection : detections) { - if (baton_mac_map.containsKey(detection.mac.toUpperCase())) { - var baton = baton_mac_map.get(detection.mac.toUpperCase()); - new_detections.add(new Detection( - baton.getId(), - station.getId(), - detection.rssi, - detection.battery, - detection.uptimeMs, - detection.id, - new Timestamp((long) (detection.detectionTimestamp * 1000)), - new Timestamp(System.currentTimeMillis()) - )); - } - } - if (!new_detections.isEmpty()) { - detectionDAO.insertAll(new_detections); - new_detections.forEach((detection) -> lappers.forEach((lapper) -> lapper.handle(detection))); - } - - logger.finer("Fetched " + detections.size() + " detections from " + station.getName() + ", Saved " + new_detections.size()); - } catch (JsonProcessingException e) { - logger.severe(e.getMessage()); - } - return null; - } - }); + public InitWSMessage(int lastId) { + this.lastId = lastId; } } } diff --git a/src/main/java/telraam/station/WebsocketClient.java b/src/main/java/telraam/station/WebsocketClient.java new file mode 100644 index 0000000..e57564a --- /dev/null +++ b/src/main/java/telraam/station/WebsocketClient.java @@ -0,0 +1,58 @@ +package telraam.station; + +import jakarta.websocket.*; + +import java.net.URI; + +@ClientEndpoint +public class WebsocketClient { + public interface MessageHandler { + void handleMessage(String message); + } + public interface OnOpenHandler { + void handleMsgOpen(); + } + + Session session = null; + private MessageHandler messageHandler; + private OnOpenHandler onOpenHandler; + + public WebsocketClient(URI endpointURI) { + try { + WebSocketContainer container = ContainerProvider.getWebSocketContainer(); + container.connectToServer(this, endpointURI); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @OnOpen + public void onOpen(Session session) { + this.session = session; + } + + @OnClose + public void onClose(Session userSession, CloseReason reason) { + System.out.println("closing websocket"); + this.session = null; + } + + @OnMessage + public void onMessage(String message) { + if (this.messageHandler != null) { + this.messageHandler.handleMessage(message); + } + } + + public void addOnOpenHandler(OnOpenHandler openHandler) { + this.onOpenHandler = openHandler; + } + + public void addMessageHandler(MessageHandler msgHandler) { + this.messageHandler = msgHandler; + } + + public void sendMessage(String message) { + this.session.getAsyncRemote().sendText(message); + } +} From a75a0ce73cb7cb7705cb34986449f5e2620f2cb1 Mon Sep 17 00:00:00 2001 From: NuttyShrimp Date: Tue, 2 Apr 2024 01:13:00 +0200 Subject: [PATCH 04/12] feat: re-add http fetcher & move WS to own package --- src/main/java/telraam/App.java | 6 +- src/main/java/telraam/station/Fetcher.java | 147 +-------------- .../java/telraam/station/FetcherFactory.java | 42 +++++ .../telraam/station/http/HTTPFetcher.java | 174 ++++++++++++++++++ .../telraam/station/http/JsonBodyHandler.java | 42 +++++ .../telraam/station/models/RonnyResponse.java | 12 ++ .../{ => websocket}/WebsocketClient.java | 2 +- .../station/websocket/WebsocketFetcher.java | 140 ++++++++++++++ 8 files changed, 425 insertions(+), 140 deletions(-) create mode 100644 src/main/java/telraam/station/FetcherFactory.java create mode 100644 src/main/java/telraam/station/http/HTTPFetcher.java create mode 100644 src/main/java/telraam/station/http/JsonBodyHandler.java create mode 100644 src/main/java/telraam/station/models/RonnyResponse.java rename src/main/java/telraam/station/{ => websocket}/WebsocketClient.java (97%) create mode 100644 src/main/java/telraam/station/websocket/WebsocketFetcher.java diff --git a/src/main/java/telraam/App.java b/src/main/java/telraam/App.java index 5b1644e..2f8f79f 100644 --- a/src/main/java/telraam/App.java +++ b/src/main/java/telraam/App.java @@ -24,7 +24,8 @@ import telraam.logic.lapper.robust.RobustLapper; import telraam.logic.positioner.Positioner; import telraam.logic.positioner.simple.SimplePositioner; -import telraam.station.Fetcher; +import telraam.station.FetcherFactory; +import telraam.station.websocket.WebsocketFetcher; import telraam.util.AcceptedLapsUtil; import telraam.websocket.WebSocketConnection; @@ -142,9 +143,10 @@ public void run(AppConfiguration configuration, Environment environment) { positioners.add(new SimplePositioner(this.database)); // Start fetch thread for each station + FetcherFactory fetcherFactory = new FetcherFactory(this.database, lappers, positioners); StationDAO stationDAO = this.database.onDemand(StationDAO.class); for (Station station : stationDAO.getAll()) { - new Thread(() -> new Fetcher(this.database, station, lappers, positioners).fetch()).start(); + new Thread(() -> fetcherFactory.create(station).fetch()).start(); } } diff --git a/src/main/java/telraam/station/Fetcher.java b/src/main/java/telraam/station/Fetcher.java index c8b5c34..753474a 100644 --- a/src/main/java/telraam/station/Fetcher.java +++ b/src/main/java/telraam/station/Fetcher.java @@ -1,141 +1,14 @@ package telraam.station; -import com.fasterxml.jackson.core.JsonProcessingException; -import org.jdbi.v3.core.Jdbi; -import com.fasterxml.jackson.databind.ObjectMapper; -import telraam.database.daos.BatonDAO; -import telraam.database.daos.DetectionDAO; -import telraam.database.daos.StationDAO; -import telraam.database.models.Detection; -import telraam.database.models.Station; -import telraam.logic.lapper.Lapper; -import telraam.logic.positioner.Positioner; -import telraam.station.models.RonnyDetection; - -import java.net.URI; -import java.net.URISyntaxException; -import java.net.http.*; -import java.sql.Timestamp; -import java.util.*; -import java.util.logging.Logger; - -public class Fetcher { +public interface Fetcher { //Timeout to wait for before sending the next request after an error. - private final static int ERROR_TIMEOUT_MS = 2000; - private final Set lappers; - private final Set positioners; - private Station station; - - private final BatonDAO batonDAO; - private final DetectionDAO detectionDAO; - private final StationDAO stationDAO; - - private final HttpClient client = HttpClient.newHttpClient(); - private final Logger logger = Logger.getLogger(Fetcher.class.getName()); - - public Fetcher(Jdbi database, Station station, Set lappers, Set positioners) { - this.batonDAO = database.onDemand(BatonDAO.class); - this.detectionDAO = database.onDemand(DetectionDAO.class); - this.stationDAO = database.onDemand(StationDAO.class); - - this.lappers = lappers; - this.positioners = positioners; - this.station = station; - } - - public void fetch() { - logger.info("Running Fetcher for station(" + this.station.getId() + ")"); - ObjectMapper mapper = new ObjectMapper(); - - //Update the station to account for possible changes in the database - this.stationDAO.getById(station.getId()).ifPresentOrElse( - station -> this.station = station, - () -> this.logger.severe("Can't update station from database.") - ); - - //Get last detection id - int lastDetectionId = 0; - Optional lastDetection = detectionDAO.latestDetectionByStationId(this.station.getId()); - if (lastDetection.isPresent()) { - lastDetectionId = lastDetection.get().getRemoteId(); - } - - InitWSMessage wsMessage = new InitWSMessage(lastDetectionId); - String wsMessageEncoded; - try { - wsMessageEncoded = mapper.writeValueAsString(wsMessage); - } catch (JsonProcessingException e) { - logger.severe(e.getMessage()); - try { - Thread.sleep(Fetcher.ERROR_TIMEOUT_MS); - } catch (InterruptedException ex) { - logger.severe(ex.getMessage()); - } - this.fetch(); - return; - } - - //Create URL - URI url; - try { - URI stationUrl = URI.create(station.getUrl()); - url = new URI("ws", stationUrl.getHost(), "/detections", ""); - } catch (URISyntaxException ex) { - this.logger.severe(ex.getMessage()); - try { - Thread.sleep(Fetcher.ERROR_TIMEOUT_MS); - } catch (InterruptedException e) { - logger.severe(e.getMessage()); - } - this.fetch(); - return; - } - - WebsocketClient websocketClient = new WebsocketClient(url); - websocketClient.addOnOpenHandler(() -> { - websocketClient.sendMessage(wsMessageEncoded); - }); - websocketClient.addMessageHandler((String msg) -> { - //Insert detections - List new_detections = new ArrayList<>(); - List detection_mac_addresses = new ArrayList<>(); - logger.info("Received message on WS"); - - try { - List detections = Arrays.asList(mapper.readValue(msg, RonnyDetection[].class)); - for (RonnyDetection detection : detections) { - new_detections.add(new Detection( - 0, - station.getId(), - detection.rssi, - detection.battery, - detection.uptimeMs, - detection.id, - new Timestamp((long) (detection.detectionTimestamp * 1000)), - new Timestamp(System.currentTimeMillis()) - )); - detection_mac_addresses.add(detection.mac); - } - if (!new_detections.isEmpty()) { - detectionDAO.insertAllWithoutBaton(new_detections, detection_mac_addresses); - new_detections.forEach((detection) -> { - lappers.forEach((lapper) -> lapper.handle(detection)); - positioners.forEach(positioner -> positioner.handle(detection)); - }); - } - - logger.finer("Fetched " + detections.size() + " detections from " + station.getName() + ", Saved " + new_detections.size()); - } catch (JsonProcessingException e) { - logger.severe(e.getMessage()); - } - }); - } - - private class InitWSMessage { - public int lastId; - - public InitWSMessage(int lastId) { - this.lastId = lastId; - } - } + int ERROR_TIMEOUT_MS = 2000; + //Timeout for a request to a station. + int REQUEST_TIMEOUT_S = 10; + //Full batch size, if this number of detections is reached, more are probably available immediately. + int FULL_BATCH_SIZE = 1000; + //Timeout when result has less than a full batch of detections. + int IDLE_TIMEOUT_MS = 4000; // Wait 4 seconds + + void fetch(); } diff --git a/src/main/java/telraam/station/FetcherFactory.java b/src/main/java/telraam/station/FetcherFactory.java new file mode 100644 index 0000000..b3cb487 --- /dev/null +++ b/src/main/java/telraam/station/FetcherFactory.java @@ -0,0 +1,42 @@ +package telraam.station; + +import org.jdbi.v3.core.Jdbi; +import telraam.database.models.Station; +import telraam.logic.lapper.Lapper; +import telraam.logic.positioner.Positioner; +import telraam.station.http.HTTPFetcher; +import telraam.station.websocket.WebsocketFetcher; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Set; +import java.util.logging.Logger; + +public class FetcherFactory { + private final Logger logger = Logger.getLogger(FetcherFactory.class.getName()); + private final Jdbi database; + private final Set lappers; + private final Set positioners; + public FetcherFactory(Jdbi database, Set lappers, Set positioners) { + this.database = database; + this.lappers = lappers; + this.positioners = positioners; + } + + public Fetcher create(Station station) { + try { + URI stationURI = new URI(station.getUrl()); + return switch (stationURI.getScheme()) { + case "ws" -> new WebsocketFetcher(database, station, lappers, positioners); + case "http" -> new HTTPFetcher(database, station, lappers, positioners); + default -> { + logger.severe(String.format("%s is not a valid scheme for a station", stationURI.getScheme())); + yield null; + } + }; + } catch (URISyntaxException e) { + logger.severe(String.format("Failed to parse station URI: %s", e.getMessage())); + } + return null; + } +} diff --git a/src/main/java/telraam/station/http/HTTPFetcher.java b/src/main/java/telraam/station/http/HTTPFetcher.java new file mode 100644 index 0000000..c296efd --- /dev/null +++ b/src/main/java/telraam/station/http/HTTPFetcher.java @@ -0,0 +1,174 @@ +package telraam.station.http; + +import org.jdbi.v3.core.Jdbi; +import telraam.database.daos.BatonDAO; +import telraam.database.daos.DetectionDAO; +import telraam.database.daos.StationDAO; +import telraam.database.models.Baton; +import telraam.database.models.Detection; +import telraam.database.models.Station; +import telraam.logic.lapper.Lapper; +import telraam.logic.positioner.Positioner; +import telraam.station.Fetcher; +import telraam.station.models.RonnyDetection; +import telraam.station.models.RonnyResponse; + +import java.io.IOException; +import java.net.ConnectException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.http.HttpClient; +import java.net.http.HttpConnectTimeoutException; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.sql.Timestamp; +import java.time.Duration; +import java.util.*; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.logging.Logger; +import java.util.stream.Collectors; + +public class HTTPFetcher implements Fetcher { + private final Set lappers; + private final Set positioners; + private Station station; + + private final BatonDAO batonDAO; + private final DetectionDAO detectionDAO; + private final StationDAO stationDAO; + + private final HttpClient client = HttpClient.newHttpClient(); + private final Logger logger = Logger.getLogger(Fetcher.class.getName()); + + + public HTTPFetcher(Jdbi database, Station station, Set lappers, Set positioners) { + this.batonDAO = database.onDemand(BatonDAO.class); + this.detectionDAO = database.onDemand(DetectionDAO.class); + this.stationDAO = database.onDemand(StationDAO.class); + + this.lappers = lappers; + this.positioners = positioners; + this.station = station; + } + + public void fetch() { + logger.info("Running Fetcher for station(" + this.station.getId() + ")"); + JsonBodyHandler bodyHandler = new JsonBodyHandler<>(RonnyResponse.class); + + while (true) { + //Update the station to account for possible changes in the database + this.stationDAO.getById(station.getId()).ifPresentOrElse( + station -> this.station = station, + () -> this.logger.severe("Can't update station from database.") + ); + + //Get last detection id + int lastDetectionId = 0; + Optional lastDetection = detectionDAO.latestDetectionByStationId(this.station.getId()); + if (lastDetection.isPresent()) { + lastDetectionId = lastDetection.get().getRemoteId(); + } + + //Create URL + URI url; + try { + url = new URI(station.getUrl() + "/detections/" + lastDetectionId); + } catch (URISyntaxException ex) { + this.logger.severe(ex.getMessage()); + try { + Thread.sleep(Fetcher.ERROR_TIMEOUT_MS); + } catch (InterruptedException e) { + logger.severe(e.getMessage()); + } + continue; + } + + //Create request + HttpRequest request; + try { + request = HttpRequest.newBuilder() + .uri(url) + .version(HttpClient.Version.HTTP_1_1) + .timeout(Duration.ofSeconds(Fetcher.REQUEST_TIMEOUT_S)) + .build(); + } catch (IllegalArgumentException e) { + logger.severe(e.getMessage()); + try { + Thread.sleep(Fetcher.ERROR_TIMEOUT_MS); + } catch (InterruptedException ex) { + logger.severe(ex.getMessage()); + } + continue; + } + + //Do request + HttpResponse> response; + try { + try { + response = this.client.send(request, bodyHandler); + } catch (ConnectException | HttpConnectTimeoutException ex) { + this.logger.severe("Could not connect to " + request.uri()); + Thread.sleep(Fetcher.ERROR_TIMEOUT_MS); + continue; + } catch (IOException e) { + logger.severe(e.getMessage()); + Thread.sleep(Fetcher.ERROR_TIMEOUT_MS); + continue; + } + } catch (InterruptedException e) { + logger.severe(e.getMessage()); + continue; + } + + //Check response state + if (response.statusCode() != 200) { + this.logger.warning( + "Unexpected status code(" + response.statusCode() + ") when requesting " + url + " for station(" + this.station.getName() + ")" + ); + continue; + } + + //Fetch all batons and create a map by batonMAC + Map baton_mac_map = batonDAO.getAll().stream() + .collect(Collectors.toMap(b -> b.getMac().toUpperCase(), Function.identity())); + + //Insert detections + List new_detections = new ArrayList<>(); + List detections = response.body().get().detections; + for (RonnyDetection detection : detections) { + if (baton_mac_map.containsKey(detection.mac.toUpperCase())) { + var baton = baton_mac_map.get(detection.mac.toUpperCase()); + new_detections.add(new Detection( + baton.getId(), + station.getId(), + detection.rssi, + detection.battery, + detection.uptimeMs, + detection.id, + new Timestamp((long) (detection.detectionTimestamp * 1000)), + new Timestamp(System.currentTimeMillis()) + )); + } + } + if (!new_detections.isEmpty()) { + detectionDAO.insertAll(new_detections); + new_detections.forEach((detection) -> { + lappers.forEach((lapper) -> lapper.handle(detection)); + positioners.forEach((positioner) -> positioner.handle(detection)); + }); + } + + this.logger.finer("Fetched " + detections.size() + " detections from " + station.getName() + ", Saved " + new_detections.size()); + + //If few detections are retrieved from the station, wait for some time. + if (detections.size() < Fetcher.FULL_BATCH_SIZE) { + try { + Thread.sleep(Fetcher.IDLE_TIMEOUT_MS); + } catch (InterruptedException e) { + logger.severe(e.getMessage()); + } + } + } + } +} \ No newline at end of file diff --git a/src/main/java/telraam/station/http/JsonBodyHandler.java b/src/main/java/telraam/station/http/JsonBodyHandler.java new file mode 100644 index 0000000..462039f --- /dev/null +++ b/src/main/java/telraam/station/http/JsonBodyHandler.java @@ -0,0 +1,42 @@ +package telraam.station.http; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.net.http.HttpResponse; +import java.util.function.Supplier; + +public class JsonBodyHandler implements HttpResponse.BodyHandler> { + + private static final ObjectMapper om = new ObjectMapper(); + private final Class targetClass; + + public JsonBodyHandler(Class targetClass) { + this.targetClass = targetClass; + } + + public static HttpResponse.BodySubscriber> asJSON(Class targetType) { + HttpResponse.BodySubscriber upstream = HttpResponse.BodySubscribers.ofInputStream(); + + return HttpResponse.BodySubscribers.mapping( + upstream, + inputStream -> toSupplierOfType(inputStream, targetType)); + } + + public static Supplier toSupplierOfType(InputStream inputStream, Class targetType) { + return () -> { + try (InputStream stream = inputStream) { + return om.readValue(stream, targetType); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }; + } + + @Override + public HttpResponse.BodySubscriber> apply(HttpResponse.ResponseInfo responseInfo) { + return asJSON(this.targetClass); + } +} diff --git a/src/main/java/telraam/station/models/RonnyResponse.java b/src/main/java/telraam/station/models/RonnyResponse.java new file mode 100644 index 0000000..63e1382 --- /dev/null +++ b/src/main/java/telraam/station/models/RonnyResponse.java @@ -0,0 +1,12 @@ +package telraam.station.models; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; + +public class RonnyResponse { + public List detections; + + @JsonProperty("station_id") + public String stationRonnyName; +} \ No newline at end of file diff --git a/src/main/java/telraam/station/WebsocketClient.java b/src/main/java/telraam/station/websocket/WebsocketClient.java similarity index 97% rename from src/main/java/telraam/station/WebsocketClient.java rename to src/main/java/telraam/station/websocket/WebsocketClient.java index e57564a..775e484 100644 --- a/src/main/java/telraam/station/WebsocketClient.java +++ b/src/main/java/telraam/station/websocket/WebsocketClient.java @@ -1,4 +1,4 @@ -package telraam.station; +package telraam.station.websocket; import jakarta.websocket.*; diff --git a/src/main/java/telraam/station/websocket/WebsocketFetcher.java b/src/main/java/telraam/station/websocket/WebsocketFetcher.java new file mode 100644 index 0000000..3163dd8 --- /dev/null +++ b/src/main/java/telraam/station/websocket/WebsocketFetcher.java @@ -0,0 +1,140 @@ +package telraam.station.websocket; + +import com.fasterxml.jackson.core.JsonProcessingException; +import org.jdbi.v3.core.Jdbi; +import com.fasterxml.jackson.databind.ObjectMapper; +import telraam.database.daos.BatonDAO; +import telraam.database.daos.DetectionDAO; +import telraam.database.daos.StationDAO; +import telraam.database.models.Detection; +import telraam.database.models.Station; +import telraam.logic.lapper.Lapper; +import telraam.logic.positioner.Positioner; +import telraam.station.Fetcher; +import telraam.station.models.RonnyDetection; + +import java.net.URI; +import java.net.URISyntaxException; +import java.net.http.*; +import java.sql.Timestamp; +import java.util.*; +import java.util.logging.Logger; + +public class WebsocketFetcher implements Fetcher { + private final Set lappers; + private final Set positioners; + private Station station; + + private final BatonDAO batonDAO; + private final DetectionDAO detectionDAO; + private final StationDAO stationDAO; + + private final HttpClient client = HttpClient.newHttpClient(); + private final Logger logger = Logger.getLogger(WebsocketFetcher.class.getName()); + + public WebsocketFetcher(Jdbi database, Station station, Set lappers, Set positioners) { + this.batonDAO = database.onDemand(BatonDAO.class); + this.detectionDAO = database.onDemand(DetectionDAO.class); + this.stationDAO = database.onDemand(StationDAO.class); + this.lappers = lappers; + this.positioners = positioners; + + this.station = station; + } + + public void fetch() { + logger.info("Running Fetcher for station(" + this.station.getId() + ")"); + ObjectMapper mapper = new ObjectMapper(); + + //Update the station to account for possible changes in the database + this.stationDAO.getById(station.getId()).ifPresentOrElse( + station -> this.station = station, + () -> this.logger.severe("Can't update station from database.") + ); + + //Get last detection id + int lastDetectionId = 0; + Optional lastDetection = detectionDAO.latestDetectionByStationId(this.station.getId()); + if (lastDetection.isPresent()) { + lastDetectionId = lastDetection.get().getRemoteId(); + } + + InitWSMessage wsMessage = new InitWSMessage(lastDetectionId); + String wsMessageEncoded; + try { + wsMessageEncoded = mapper.writeValueAsString(wsMessage); + } catch (JsonProcessingException e) { + logger.severe(e.getMessage()); + try { + Thread.sleep(Fetcher.ERROR_TIMEOUT_MS); + } catch (InterruptedException ex) { + logger.severe(ex.getMessage()); + } + this.fetch(); + return; + } + + //Create URL + URI url; + try { + URI stationUrl = URI.create(station.getUrl()); + url = new URI("ws", stationUrl.getHost(), "/detections", ""); + } catch (URISyntaxException ex) { + this.logger.severe(ex.getMessage()); + try { + Thread.sleep(Fetcher.ERROR_TIMEOUT_MS); + } catch (InterruptedException e) { + logger.severe(e.getMessage()); + } + this.fetch(); + return; + } + + WebsocketClient websocketClient = new WebsocketClient(url); + websocketClient.addOnOpenHandler(() -> { + websocketClient.sendMessage(wsMessageEncoded); + }); + websocketClient.addMessageHandler((String msg) -> { + //Insert detections + List new_detections = new ArrayList<>(); + List detection_mac_addresses = new ArrayList<>(); + logger.info("Received message on WS"); + + try { + List detections = Arrays.asList(mapper.readValue(msg, RonnyDetection[].class)); + for (RonnyDetection detection : detections) { + new_detections.add(new Detection( + 0, + station.getId(), + detection.rssi, + detection.battery, + detection.uptimeMs, + detection.id, + new Timestamp((long) (detection.detectionTimestamp * 1000)), + new Timestamp(System.currentTimeMillis()) + )); + detection_mac_addresses.add(detection.mac); + } + if (!new_detections.isEmpty()) { + detectionDAO.insertAllWithoutBaton(new_detections, detection_mac_addresses); + new_detections.forEach((detection) -> { + lappers.forEach((lapper) -> lapper.handle(detection)); + positioners.forEach(positioner -> positioner.handle(detection)); + }); + } + + logger.finer("Fetched " + detections.size() + " detections from " + station.getName() + ", Saved " + new_detections.size()); + } catch (JsonProcessingException e) { + logger.severe(e.getMessage()); + } + }); + } + + private class InitWSMessage { + public int lastId; + + public InitWSMessage(int lastId) { + this.lastId = lastId; + } + } +} From 05b6769b863c456f3f7d37bc10b46fcf30f8a206 Mon Sep 17 00:00:00 2001 From: NuttyShrimp Date: Tue, 2 Apr 2024 01:16:31 +0200 Subject: [PATCH 05/12] fix(ws-fetcher): catch client creation error --- .../station/websocket/WebsocketClient.java | 2 +- .../station/websocket/WebsocketFetcher.java | 18 +++++++++++++++++- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/src/main/java/telraam/station/websocket/WebsocketClient.java b/src/main/java/telraam/station/websocket/WebsocketClient.java index 775e484..f3a063a 100644 --- a/src/main/java/telraam/station/websocket/WebsocketClient.java +++ b/src/main/java/telraam/station/websocket/WebsocketClient.java @@ -17,7 +17,7 @@ public interface OnOpenHandler { private MessageHandler messageHandler; private OnOpenHandler onOpenHandler; - public WebsocketClient(URI endpointURI) { + public WebsocketClient(URI endpointURI) throws RuntimeException { try { WebSocketContainer container = ContainerProvider.getWebSocketContainer(); container.connectToServer(this, endpointURI); diff --git a/src/main/java/telraam/station/websocket/WebsocketFetcher.java b/src/main/java/telraam/station/websocket/WebsocketFetcher.java index 3163dd8..79362d3 100644 --- a/src/main/java/telraam/station/websocket/WebsocketFetcher.java +++ b/src/main/java/telraam/station/websocket/WebsocketFetcher.java @@ -90,7 +90,23 @@ public void fetch() { return; } - WebsocketClient websocketClient = new WebsocketClient(url); + + WebsocketClient websocketClient; + + try { + websocketClient = new WebsocketClient(url); + } catch (RuntimeException ex) { + this.logger.severe(ex.getMessage()); + try { + Thread.sleep(Fetcher.ERROR_TIMEOUT_MS); + } catch (InterruptedException e) { + logger.severe(e.getMessage()); + } + this.fetch(); + return; + } + + websocketClient.addOnOpenHandler(() -> { websocketClient.sendMessage(wsMessageEncoded); }); From c3c77b40a389ee0c0585b09478cc9082f71496f4 Mon Sep 17 00:00:00 2001 From: NuttyShrimp Date: Tue, 2 Apr 2024 18:20:33 +0200 Subject: [PATCH 06/12] feat(ws-fetcher): open WS after setting handlers --- .../station/websocket/WebsocketClient.java | 30 ++++++++++++---- .../station/websocket/WebsocketFetcher.java | 35 +++++++++++-------- 2 files changed, 43 insertions(+), 22 deletions(-) diff --git a/src/main/java/telraam/station/websocket/WebsocketClient.java b/src/main/java/telraam/station/websocket/WebsocketClient.java index f3a063a..bf9ad46 100644 --- a/src/main/java/telraam/station/websocket/WebsocketClient.java +++ b/src/main/java/telraam/station/websocket/WebsocketClient.java @@ -9,18 +9,24 @@ public class WebsocketClient { public interface MessageHandler { void handleMessage(String message); } - public interface OnOpenHandler { - void handleMsgOpen(); + public interface onStateChangeHandler { + void handleChange(); } - Session session = null; + private URI endpoint; + private Session session = null; private MessageHandler messageHandler; - private OnOpenHandler onOpenHandler; + private onStateChangeHandler onOpenHandler; + private onStateChangeHandler onCloseHandler; public WebsocketClient(URI endpointURI) throws RuntimeException { + this.endpoint = endpointURI; + } + + public void listen() throws RuntimeException { try { WebSocketContainer container = ContainerProvider.getWebSocketContainer(); - container.connectToServer(this, endpointURI); + container.connectToServer(this, endpoint); } catch (Exception e) { throw new RuntimeException(e); } @@ -29,12 +35,17 @@ public WebsocketClient(URI endpointURI) throws RuntimeException { @OnOpen public void onOpen(Session session) { this.session = session; + if (this.onOpenHandler != null) { + this.onOpenHandler.handleChange(); + } } @OnClose public void onClose(Session userSession, CloseReason reason) { - System.out.println("closing websocket"); this.session = null; + if (this.onCloseHandler != null) { + this.onCloseHandler.handleChange(); + } } @OnMessage @@ -44,15 +55,20 @@ public void onMessage(String message) { } } - public void addOnOpenHandler(OnOpenHandler openHandler) { + public void addOnOpenHandler(onStateChangeHandler openHandler) { this.onOpenHandler = openHandler; } + public void addOnCloseHandler(onStateChangeHandler openHandler) { + this.onCloseHandler = openHandler; + } + public void addMessageHandler(MessageHandler msgHandler) { this.messageHandler = msgHandler; } public void sendMessage(String message) { + this.session.getAsyncRemote().sendText(message); } } diff --git a/src/main/java/telraam/station/websocket/WebsocketFetcher.java b/src/main/java/telraam/station/websocket/WebsocketFetcher.java index 79362d3..76a4428 100644 --- a/src/main/java/telraam/station/websocket/WebsocketFetcher.java +++ b/src/main/java/telraam/station/websocket/WebsocketFetcher.java @@ -77,8 +77,7 @@ public void fetch() { //Create URL URI url; try { - URI stationUrl = URI.create(station.getUrl()); - url = new URI("ws", stationUrl.getHost(), "/detections", ""); + url = new URI(station.getUrl()); } catch (URISyntaxException ex) { this.logger.severe(ex.getMessage()); try { @@ -91,30 +90,23 @@ public void fetch() { } - WebsocketClient websocketClient; - - try { - websocketClient = new WebsocketClient(url); - } catch (RuntimeException ex) { - this.logger.severe(ex.getMessage()); + WebsocketClient websocketClient = new WebsocketClient(url); + websocketClient.addOnOpenHandler(() -> { + websocketClient.sendMessage(wsMessageEncoded); + }); + websocketClient.addOnCloseHandler(() -> { + this.logger.severe(String.format("Websocket for station %s got closed", station.getName())); try { Thread.sleep(Fetcher.ERROR_TIMEOUT_MS); } catch (InterruptedException e) { logger.severe(e.getMessage()); } this.fetch(); - return; - } - - - websocketClient.addOnOpenHandler(() -> { - websocketClient.sendMessage(wsMessageEncoded); }); websocketClient.addMessageHandler((String msg) -> { //Insert detections List new_detections = new ArrayList<>(); List detection_mac_addresses = new ArrayList<>(); - logger.info("Received message on WS"); try { List detections = Arrays.asList(mapper.readValue(msg, RonnyDetection[].class)); @@ -144,6 +136,19 @@ public void fetch() { logger.severe(e.getMessage()); } }); + + try { + websocketClient.listen(); + } catch (RuntimeException ex) { + this.logger.severe(ex.getMessage()); + try { + Thread.sleep(Fetcher.ERROR_TIMEOUT_MS); + } catch (InterruptedException e) { + logger.severe(e.getMessage()); + } + this.fetch(); + return; + } } private class InitWSMessage { From affe45c2dbd2b4f4471613a01a554c40c73d353d Mon Sep 17 00:00:00 2001 From: NuttyShrimp Date: Tue, 2 Apr 2024 18:21:21 +0200 Subject: [PATCH 07/12] feat(ws-fetcher): retrieve missing values for detections from DB & try to fix simplePositioner cursedness with threading --- .../java/telraam/database/daos/DetectionDAO.java | 7 ++++--- .../java/telraam/database/models/Detection.java | 4 +++- src/main/java/telraam/database/models/Team.java | 10 +++++++++- .../logic/positioner/simple/SimplePositioner.java | 15 ++++++++------- .../station/websocket/WebsocketFetcher.java | 12 +++++++++--- 5 files changed, 33 insertions(+), 15 deletions(-) diff --git a/src/main/java/telraam/database/daos/DetectionDAO.java b/src/main/java/telraam/database/daos/DetectionDAO.java index fba506f..4e45e3c 100644 --- a/src/main/java/telraam/database/daos/DetectionDAO.java +++ b/src/main/java/telraam/database/daos/DetectionDAO.java @@ -35,10 +35,11 @@ INSERT INTO detection (station_id, baton_id, timestamp, rssi, battery, remote_id @SqlBatch(""" INSERT INTO detection (station_id, baton_id, timestamp, rssi, battery, remote_id, uptime_ms, timestamp_ingestion) \ - VALUES (:stationId, (SELECT id FROM baton WHERE :batonMac), :timestamp, :rssi, :battery, :remoteId, :uptimeMs, :timestampIngestion) + VALUES (:stationId, (SELECT id FROM baton WHERE mac = :batonMac), :timestamp, :rssi, :battery, :remoteId, :uptimeMs, :timestampIngestion) """) - @GetGeneratedKeys({"id"}) - int insertAllWithoutBaton(@BindBean List detection, @Bind("batonMac") List batonMac); + @GetGeneratedKeys({"id", "baton_id"}) + @RegisterBeanMapper(Detection.class) + List insertAllWithoutBaton(@BindBean List detection, @Bind("batonMac") List batonMac); @SqlQuery("SELECT * FROM detection WHERE id = :id") @RegisterBeanMapper(Detection.class) diff --git a/src/main/java/telraam/database/models/Detection.java b/src/main/java/telraam/database/models/Detection.java index 23a4067..5cd050b 100644 --- a/src/main/java/telraam/database/models/Detection.java +++ b/src/main/java/telraam/database/models/Detection.java @@ -6,7 +6,9 @@ import java.sql.Timestamp; -@Setter @Getter @NoArgsConstructor +@Setter +@Getter +@NoArgsConstructor public class Detection { private Integer id; private Integer batonId; diff --git a/src/main/java/telraam/database/models/Team.java b/src/main/java/telraam/database/models/Team.java index eb5e086..317b2ed 100644 --- a/src/main/java/telraam/database/models/Team.java +++ b/src/main/java/telraam/database/models/Team.java @@ -4,7 +4,11 @@ import lombok.NoArgsConstructor; import lombok.Setter; -@Getter @Setter @NoArgsConstructor +import java.util.Objects; + +@Getter +@Setter +@NoArgsConstructor public class Team { private Integer id; private String name; @@ -19,4 +23,8 @@ public Team(String name, int batonId) { this.name = name; this.batonId = batonId; } + + public boolean equals(Team obj) { + return Objects.equals(id, obj.getId()); + } } diff --git a/src/main/java/telraam/logic/positioner/simple/SimplePositioner.java b/src/main/java/telraam/logic/positioner/simple/SimplePositioner.java index f8a9022..90d0ec9 100644 --- a/src/main/java/telraam/logic/positioner/simple/SimplePositioner.java +++ b/src/main/java/telraam/logic/positioner/simple/SimplePositioner.java @@ -20,6 +20,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.logging.Level; import java.util.logging.Logger; public class SimplePositioner implements Positioner { @@ -31,9 +32,9 @@ public class SimplePositioner implements Positioner { private static final Logger logger = Logger.getLogger(SimplePositioner.class.getName()); private final PositionSender positionSender; private final Map batonIdToTeam; - private final Map> teamDetections; + private final Map> teamDetections; private final List stations; - private final Map teamPositions; + private final Map teamPositions; public SimplePositioner(Jdbi jdbi) { this.debounceScheduled = false; @@ -46,8 +47,8 @@ public SimplePositioner(Jdbi jdbi) { TeamDAO teamDAO = jdbi.onDemand(TeamDAO.class); List teams = teamDAO.getAll(); for (Team team: teams) { - teamDetections.put(team, new CircularQueue<>(QUEUE_SIZE)); - teamPositions.put(team, new Position(team.getId())); + teamDetections.put(team.getId(), new CircularQueue<>(QUEUE_SIZE)); + teamPositions.put(team.getId(), new Position(team.getId())); } List switchovers = jdbi.onDemand(BatonSwitchoverDAO.class).getAll(); switchovers.sort(Comparator.comparing(BatonSwitchover::getTimestamp)); @@ -63,7 +64,7 @@ public SimplePositioner(Jdbi jdbi) { public void calculatePositions() { logger.info("SimplePositioner: Calculating positions..."); - for (Map.Entry> entry: teamDetections.entrySet()) { + for (Map.Entry> entry: teamDetections.entrySet()) { List detections = teamDetections.get(entry.getKey()); detections.sort(Comparator.comparing(Detection::getTimestamp)); @@ -86,7 +87,7 @@ public void calculatePositions() { public void handle(Detection detection) { Team team = batonIdToTeam.get(detection.getBatonId()); - teamDetections.get(team).add(detection); + teamDetections.get(team.getId()).add(detection); if (! debounceScheduled) { debounceScheduled = true; @@ -94,7 +95,7 @@ public void handle(Detection detection) { try { calculatePositions(); } catch (Exception e) { - logger.severe(e.getMessage()); + logger.log(Level.SEVERE, e.getMessage(), e); } debounceScheduled = false; }, DEBOUNCE_TIMEOUT, TimeUnit.SECONDS); diff --git a/src/main/java/telraam/station/websocket/WebsocketFetcher.java b/src/main/java/telraam/station/websocket/WebsocketFetcher.java index 76a4428..f0b910b 100644 --- a/src/main/java/telraam/station/websocket/WebsocketFetcher.java +++ b/src/main/java/telraam/station/websocket/WebsocketFetcher.java @@ -124,11 +124,17 @@ public void fetch() { detection_mac_addresses.add(detection.mac); } if (!new_detections.isEmpty()) { - detectionDAO.insertAllWithoutBaton(new_detections, detection_mac_addresses); - new_detections.forEach((detection) -> { + List db_detections = detectionDAO.insertAllWithoutBaton(new_detections, detection_mac_addresses); + for(int i = 0; i < new_detections.size(); i++) { + Detection detection = new_detections.get(i); + Detection db_detection = db_detections.get(i); + + detection.setBatonId(db_detection.getBatonId()); + detection.setId(db_detection.getId()); + lappers.forEach((lapper) -> lapper.handle(detection)); positioners.forEach(positioner -> positioner.handle(detection)); - }); + } } logger.finer("Fetched " + detections.size() + " detections from " + station.getName() + ", Saved " + new_detections.size()); From 606bc40f7da1daeb523eb10c5b471cdaffa888a4 Mon Sep 17 00:00:00 2001 From: NuttyShrimp Date: Sat, 13 Apr 2024 14:40:53 +0200 Subject: [PATCH 08/12] fix(simplePositioner): make handle function synchronised --- .../positioner/simple/SimplePositioner.java | 22 +++++++++---------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/src/main/java/telraam/logic/positioner/simple/SimplePositioner.java b/src/main/java/telraam/logic/positioner/simple/SimplePositioner.java index 90d0ec9..9958d1e 100644 --- a/src/main/java/telraam/logic/positioner/simple/SimplePositioner.java +++ b/src/main/java/telraam/logic/positioner/simple/SimplePositioner.java @@ -13,23 +13,22 @@ import telraam.logic.positioner.PositionSender; import telraam.logic.positioner.Positioner; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import java.util.logging.Logger; public class SimplePositioner implements Positioner { + private static final Logger logger = Logger.getLogger(SimplePositioner.class.getName()); private final int QUEUE_SIZE = 50; private final int MIN_RSSI = -85; private final int DEBOUNCE_TIMEOUT = 1; private boolean debounceScheduled; private final ScheduledExecutorService scheduler; - private static final Logger logger = Logger.getLogger(SimplePositioner.class.getName()); private final PositionSender positionSender; private final Map batonIdToTeam; private final Map> teamDetections; @@ -46,14 +45,14 @@ public SimplePositioner(Jdbi jdbi) { TeamDAO teamDAO = jdbi.onDemand(TeamDAO.class); List teams = teamDAO.getAll(); - for (Team team: teams) { + for (Team team : teams) { teamDetections.put(team.getId(), new CircularQueue<>(QUEUE_SIZE)); teamPositions.put(team.getId(), new Position(team.getId())); } List switchovers = jdbi.onDemand(BatonSwitchoverDAO.class).getAll(); switchovers.sort(Comparator.comparing(BatonSwitchover::getTimestamp)); - for (BatonSwitchover switchover: switchovers) { + for (BatonSwitchover switchover : switchovers) { batonIdToTeam.put(switchover.getNewBatonId(), teamDAO.getById(switchover.getTeamId()).get()); } @@ -64,13 +63,13 @@ public SimplePositioner(Jdbi jdbi) { public void calculatePositions() { logger.info("SimplePositioner: Calculating positions..."); - for (Map.Entry> entry: teamDetections.entrySet()) { + for (Map.Entry> entry : teamDetections.entrySet()) { List detections = teamDetections.get(entry.getKey()); detections.sort(Comparator.comparing(Detection::getTimestamp)); int currentStationRssi = MIN_RSSI; int currentStationPosition = 0; - for (Detection detection: detections) { + for (Detection detection : detections) { if (detection.getRssi() > currentStationRssi) { currentStationRssi = detection.getRssi(); currentStationPosition = detection.getStationId(); @@ -85,11 +84,11 @@ public void calculatePositions() { logger.info("SimplePositioner: Done calculating positions"); } - public void handle(Detection detection) { + public synchronized void handle(Detection detection) { Team team = batonIdToTeam.get(detection.getBatonId()); teamDetections.get(team.getId()).add(detection); - if (! debounceScheduled) { + if (!debounceScheduled) { debounceScheduled = true; scheduler.schedule(() -> { try { @@ -101,5 +100,4 @@ public void handle(Detection detection) { }, DEBOUNCE_TIMEOUT, TimeUnit.SECONDS); } } - } From cfb26f7b5f92c523239e8e88c1e0dbe0b14d4ba4 Mon Sep 17 00:00:00 2001 From: NuttyShrimp Date: Mon, 15 Apr 2024 16:17:29 +0200 Subject: [PATCH 09/12] fix(gradle): replace impl with jersey one is more in line with what we use for our server impl --- build.gradle | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index ba9fd31..6dd0107 100644 --- a/build.gradle +++ b/build.gradle @@ -69,7 +69,8 @@ dependencies { // Websocket client libs compileOnly 'jakarta.websocket:jakarta.websocket-client-api:2.2.0-M1' - implementation 'org.glassfish.tyrus.bundles:tyrus-standalone-client:2.2.0-M1' + // Impl for jakarta websocket clients + implementation 'org.eclipse.jetty.websocket:websocket-jakarta-client:11.0.20' // Database implementation('com.h2database:h2:2.2.220') From 1f525229003499c2ffcb9d21b7124c8359a964c2 Mon Sep 17 00:00:00 2001 From: NuttyShrimp Date: Mon, 15 Apr 2024 16:19:03 +0200 Subject: [PATCH 10/12] fix(ws-fetcher): use lombok annotation --- .../java/telraam/station/websocket/WebsocketFetcher.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/main/java/telraam/station/websocket/WebsocketFetcher.java b/src/main/java/telraam/station/websocket/WebsocketFetcher.java index f0b910b..88e4f77 100644 --- a/src/main/java/telraam/station/websocket/WebsocketFetcher.java +++ b/src/main/java/telraam/station/websocket/WebsocketFetcher.java @@ -1,6 +1,7 @@ package telraam.station.websocket; import com.fasterxml.jackson.core.JsonProcessingException; +import lombok.AllArgsConstructor; import org.jdbi.v3.core.Jdbi; import com.fasterxml.jackson.databind.ObjectMapper; import telraam.database.daos.BatonDAO; @@ -157,11 +158,8 @@ public void fetch() { } } - private class InitWSMessage { + @AllArgsConstructor + private static class InitWSMessage { public int lastId; - - public InitWSMessage(int lastId) { - this.lastId = lastId; - } } } From 3ee2e92a2236344f4eb50d6a36f836d8aeb67fe3 Mon Sep 17 00:00:00 2001 From: NuttyShrimp Date: Fri, 19 Apr 2024 10:00:59 +0200 Subject: [PATCH 11/12] fix(ws-fetcher): baton mac's to uppercase --- src/main/java/telraam/station/websocket/WebsocketFetcher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/telraam/station/websocket/WebsocketFetcher.java b/src/main/java/telraam/station/websocket/WebsocketFetcher.java index 88e4f77..285ad15 100644 --- a/src/main/java/telraam/station/websocket/WebsocketFetcher.java +++ b/src/main/java/telraam/station/websocket/WebsocketFetcher.java @@ -122,7 +122,7 @@ public void fetch() { new Timestamp((long) (detection.detectionTimestamp * 1000)), new Timestamp(System.currentTimeMillis()) )); - detection_mac_addresses.add(detection.mac); + detection_mac_addresses.add(detection.mac.toUpperCase()); } if (!new_detections.isEmpty()) { List db_detections = detectionDAO.insertAllWithoutBaton(new_detections, detection_mac_addresses); From aa4547372a3961a1e89d2cb62c9a177808dd420e Mon Sep 17 00:00:00 2001 From: FKD13 <44001949+FKD13@users.noreply.github.com> Date: Sat, 20 Apr 2024 19:06:13 +0200 Subject: [PATCH 12/12] ingrease message size --- src/main/java/telraam/station/websocket/WebsocketClient.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/telraam/station/websocket/WebsocketClient.java b/src/main/java/telraam/station/websocket/WebsocketClient.java index bf9ad46..25b7bb4 100644 --- a/src/main/java/telraam/station/websocket/WebsocketClient.java +++ b/src/main/java/telraam/station/websocket/WebsocketClient.java @@ -26,6 +26,7 @@ public WebsocketClient(URI endpointURI) throws RuntimeException { public void listen() throws RuntimeException { try { WebSocketContainer container = ContainerProvider.getWebSocketContainer(); + container.setDefaultMaxTextMessageBufferSize(100 * 1048576); // 100Mb container.connectToServer(this, endpoint); } catch (Exception e) { throw new RuntimeException(e);