From dfd4510f4ba7796b840063dd49e4a996393934d5 Mon Sep 17 00:00:00 2001 From: Jan Lecoutere Date: Mon, 22 Apr 2024 19:35:18 +0200 Subject: [PATCH] feat: add WS station fetcher (#135) * refactor(station): replace timed http request with websockets * feat(station): send InitMessage on open connection * refactor: Use jakarta websockets * feat: re-add http fetcher & move WS to own package * fix(ws-fetcher): catch client creation error * feat(ws-fetcher): open WS after setting handlers * feat(ws-fetcher): retrieve missing values for detections from DB & try to fix simplePositioner cursedness with threading * fix(simplePositioner): make handle function synchronised * fix(gradle): replace impl with jersey one is more in line with what we use for our server impl * fix(ws-fetcher): use lombok annotation * fix(ws-fetcher): baton mac's to uppercase * ingrease message size --------- Co-authored-by: FKD13 <44001949+FKD13@users.noreply.github.com> --- build.gradle | 6 + src/main/java/telraam/App.java | 6 +- .../telraam/database/daos/DetectionDAO.java | 8 + .../telraam/database/models/Detection.java | 4 +- .../java/telraam/database/models/Team.java | 10 +- .../positioner/simple/SimplePositioner.java | 35 ++-- src/main/java/telraam/station/Fetcher.java | 182 +----------------- .../java/telraam/station/FetcherFactory.java | 42 ++++ .../telraam/station/http/HTTPFetcher.java | 174 +++++++++++++++++ .../station/{ => http}/JsonBodyHandler.java | 15 +- .../telraam/station/models/RonnyResponse.java | 2 +- .../station/websocket/WebsocketClient.java | 75 ++++++++ .../station/websocket/WebsocketFetcher.java | 165 ++++++++++++++++ 13 files changed, 518 insertions(+), 206 deletions(-) create mode 100644 src/main/java/telraam/station/FetcherFactory.java create mode 100644 src/main/java/telraam/station/http/HTTPFetcher.java rename src/main/java/telraam/station/{ => http}/JsonBodyHandler.java (97%) create mode 100644 src/main/java/telraam/station/websocket/WebsocketClient.java create mode 100644 src/main/java/telraam/station/websocket/WebsocketFetcher.java diff --git a/build.gradle b/build.gradle index 6800130..6dd0107 100644 --- a/build.gradle +++ b/build.gradle @@ -66,6 +66,12 @@ dependencies { 'org.eclipse.jetty.websocket:websocket-jetty-api:' + jettyVersion, 'org.eclipse.jetty.websocket:websocket-jetty-server:' + jettyVersion, ) + + // Websocket client libs + compileOnly 'jakarta.websocket:jakarta.websocket-client-api:2.2.0-M1' + // Impl for jakarta websocket clients + implementation 'org.eclipse.jetty.websocket:websocket-jakarta-client:11.0.20' + // Database implementation('com.h2database:h2:2.2.220') implementation('org.postgresql:postgresql:42.7.3') diff --git a/src/main/java/telraam/App.java b/src/main/java/telraam/App.java index 4075bbd..08a3847 100644 --- a/src/main/java/telraam/App.java +++ b/src/main/java/telraam/App.java @@ -25,7 +25,8 @@ import telraam.logic.lapper.slapper.Slapper; import telraam.logic.positioner.Positioner; import telraam.logic.positioner.simple.SimplePositioner; -import telraam.station.Fetcher; +import telraam.station.FetcherFactory; +import telraam.station.websocket.WebsocketFetcher; import telraam.util.AcceptedLapsUtil; import telraam.websocket.WebSocketConnection; @@ -144,9 +145,10 @@ public void run(AppConfiguration configuration, Environment environment) { positioners.add(new SimplePositioner(this.database)); // Start fetch thread for each station + FetcherFactory fetcherFactory = new FetcherFactory(this.database, lappers, positioners); StationDAO stationDAO = this.database.onDemand(StationDAO.class); for (Station station : stationDAO.getAll()) { - new Thread(() -> new Fetcher(this.database, station, lappers, positioners).fetch()).start(); + new Thread(() -> fetcherFactory.create(station).fetch()).start(); } } diff --git a/src/main/java/telraam/database/daos/DetectionDAO.java b/src/main/java/telraam/database/daos/DetectionDAO.java index c50452e..4e45e3c 100644 --- a/src/main/java/telraam/database/daos/DetectionDAO.java +++ b/src/main/java/telraam/database/daos/DetectionDAO.java @@ -33,6 +33,14 @@ INSERT INTO detection (station_id, baton_id, timestamp, rssi, battery, remote_id @GetGeneratedKeys({"id"}) int insertAll(@BindBean List detection); + @SqlBatch(""" + INSERT INTO detection (station_id, baton_id, timestamp, rssi, battery, remote_id, uptime_ms, timestamp_ingestion) \ + VALUES (:stationId, (SELECT id FROM baton WHERE mac = :batonMac), :timestamp, :rssi, :battery, :remoteId, :uptimeMs, :timestampIngestion) + """) + @GetGeneratedKeys({"id", "baton_id"}) + @RegisterBeanMapper(Detection.class) + List insertAllWithoutBaton(@BindBean List detection, @Bind("batonMac") List batonMac); + @SqlQuery("SELECT * FROM detection WHERE id = :id") @RegisterBeanMapper(Detection.class) Optional getById(@Bind("id") int id); diff --git a/src/main/java/telraam/database/models/Detection.java b/src/main/java/telraam/database/models/Detection.java index 23a4067..5cd050b 100644 --- a/src/main/java/telraam/database/models/Detection.java +++ b/src/main/java/telraam/database/models/Detection.java @@ -6,7 +6,9 @@ import java.sql.Timestamp; -@Setter @Getter @NoArgsConstructor +@Setter +@Getter +@NoArgsConstructor public class Detection { private Integer id; private Integer batonId; diff --git a/src/main/java/telraam/database/models/Team.java b/src/main/java/telraam/database/models/Team.java index eb5e086..317b2ed 100644 --- a/src/main/java/telraam/database/models/Team.java +++ b/src/main/java/telraam/database/models/Team.java @@ -4,7 +4,11 @@ import lombok.NoArgsConstructor; import lombok.Setter; -@Getter @Setter @NoArgsConstructor +import java.util.Objects; + +@Getter +@Setter +@NoArgsConstructor public class Team { private Integer id; private String name; @@ -19,4 +23,8 @@ public Team(String name, int batonId) { this.name = name; this.batonId = batonId; } + + public boolean equals(Team obj) { + return Objects.equals(id, obj.getId()); + } } diff --git a/src/main/java/telraam/logic/positioner/simple/SimplePositioner.java b/src/main/java/telraam/logic/positioner/simple/SimplePositioner.java index f8a9022..9958d1e 100644 --- a/src/main/java/telraam/logic/positioner/simple/SimplePositioner.java +++ b/src/main/java/telraam/logic/positioner/simple/SimplePositioner.java @@ -13,27 +13,27 @@ import telraam.logic.positioner.PositionSender; import telraam.logic.positioner.Positioner; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.logging.Level; import java.util.logging.Logger; public class SimplePositioner implements Positioner { + private static final Logger logger = Logger.getLogger(SimplePositioner.class.getName()); private final int QUEUE_SIZE = 50; private final int MIN_RSSI = -85; private final int DEBOUNCE_TIMEOUT = 1; private boolean debounceScheduled; private final ScheduledExecutorService scheduler; - private static final Logger logger = Logger.getLogger(SimplePositioner.class.getName()); private final PositionSender positionSender; private final Map batonIdToTeam; - private final Map> teamDetections; + private final Map> teamDetections; private final List stations; - private final Map teamPositions; + private final Map teamPositions; public SimplePositioner(Jdbi jdbi) { this.debounceScheduled = false; @@ -45,14 +45,14 @@ public SimplePositioner(Jdbi jdbi) { TeamDAO teamDAO = jdbi.onDemand(TeamDAO.class); List teams = teamDAO.getAll(); - for (Team team: teams) { - teamDetections.put(team, new CircularQueue<>(QUEUE_SIZE)); - teamPositions.put(team, new Position(team.getId())); + for (Team team : teams) { + teamDetections.put(team.getId(), new CircularQueue<>(QUEUE_SIZE)); + teamPositions.put(team.getId(), new Position(team.getId())); } List switchovers = jdbi.onDemand(BatonSwitchoverDAO.class).getAll(); switchovers.sort(Comparator.comparing(BatonSwitchover::getTimestamp)); - for (BatonSwitchover switchover: switchovers) { + for (BatonSwitchover switchover : switchovers) { batonIdToTeam.put(switchover.getNewBatonId(), teamDAO.getById(switchover.getTeamId()).get()); } @@ -63,13 +63,13 @@ public SimplePositioner(Jdbi jdbi) { public void calculatePositions() { logger.info("SimplePositioner: Calculating positions..."); - for (Map.Entry> entry: teamDetections.entrySet()) { + for (Map.Entry> entry : teamDetections.entrySet()) { List detections = teamDetections.get(entry.getKey()); detections.sort(Comparator.comparing(Detection::getTimestamp)); int currentStationRssi = MIN_RSSI; int currentStationPosition = 0; - for (Detection detection: detections) { + for (Detection detection : detections) { if (detection.getRssi() > currentStationRssi) { currentStationRssi = detection.getRssi(); currentStationPosition = detection.getStationId(); @@ -84,21 +84,20 @@ public void calculatePositions() { logger.info("SimplePositioner: Done calculating positions"); } - public void handle(Detection detection) { + public synchronized void handle(Detection detection) { Team team = batonIdToTeam.get(detection.getBatonId()); - teamDetections.get(team).add(detection); + teamDetections.get(team.getId()).add(detection); - if (! debounceScheduled) { + if (!debounceScheduled) { debounceScheduled = true; scheduler.schedule(() -> { try { calculatePositions(); } catch (Exception e) { - logger.severe(e.getMessage()); + logger.log(Level.SEVERE, e.getMessage(), e); } debounceScheduled = false; }, DEBOUNCE_TIMEOUT, TimeUnit.SECONDS); } } - } diff --git a/src/main/java/telraam/station/Fetcher.java b/src/main/java/telraam/station/Fetcher.java index fbd51b4..753474a 100644 --- a/src/main/java/telraam/station/Fetcher.java +++ b/src/main/java/telraam/station/Fetcher.java @@ -1,182 +1,14 @@ package telraam.station; -import org.jdbi.v3.core.Jdbi; -import telraam.database.daos.BatonDAO; -import telraam.database.daos.DetectionDAO; -import telraam.database.daos.StationDAO; -import telraam.database.models.Baton; -import telraam.database.models.Detection; -import telraam.database.models.Station; -import telraam.logic.lapper.Lapper; -import telraam.logic.positioner.Positioner; -import telraam.station.models.RonnyDetection; -import telraam.station.models.RonnyResponse; - -import java.io.IOException; -import java.net.ConnectException; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.http.HttpClient; -import java.net.http.HttpConnectTimeoutException; -import java.net.http.HttpRequest; -import java.net.http.HttpResponse; -import java.sql.Timestamp; -import java.time.Duration; -import java.util.*; -import java.util.function.Function; -import java.util.function.Supplier; -import java.util.logging.Logger; -import java.util.stream.Collectors; - -public class Fetcher { - private final Set lappers; - private final Set positioners; - private Station station; - - private final BatonDAO batonDAO; - private final DetectionDAO detectionDAO; - private final StationDAO stationDAO; - - private final HttpClient client = HttpClient.newHttpClient(); - private final Logger logger = Logger.getLogger(Fetcher.class.getName()); - +public interface Fetcher { //Timeout to wait for before sending the next request after an error. - private final static int ERROR_TIMEOUT_MS = 2000; + int ERROR_TIMEOUT_MS = 2000; //Timeout for a request to a station. - private final static int REQUEST_TIMEOUT_S = 10; + int REQUEST_TIMEOUT_S = 10; //Full batch size, if this number of detections is reached, more are probably available immediately. - private final static int FULL_BATCH_SIZE = 1000; + int FULL_BATCH_SIZE = 1000; //Timeout when result has less than a full batch of detections. - private final static int IDLE_TIMEOUT_MS = 4000; // Wait 4 seconds - - - public Fetcher(Jdbi database, Station station, Set lappers, Set positioners) { - this.batonDAO = database.onDemand(BatonDAO.class); - this.detectionDAO = database.onDemand(DetectionDAO.class); - this.stationDAO = database.onDemand(StationDAO.class); - - this.lappers = lappers; - this.positioners = positioners; - this.station = station; - } - - public void fetch() { - logger.info("Running Fetcher for station(" + this.station.getId() + ")"); - JsonBodyHandler bodyHandler = new JsonBodyHandler<>(RonnyResponse.class); - - while (true) { - //Update the station to account for possible changes in the database - this.stationDAO.getById(station.getId()).ifPresentOrElse( - station -> this.station = station, - () -> this.logger.severe("Can't update station from database.") - ); - - //Get last detection id - int lastDetectionId = 0; - Optional lastDetection = detectionDAO.latestDetectionByStationId(this.station.getId()); - if (lastDetection.isPresent()) { - lastDetectionId = lastDetection.get().getRemoteId(); - } - - //Create URL - URI url; - try { - url = new URI(station.getUrl() + "/detections/" + lastDetectionId); - } catch (URISyntaxException ex) { - this.logger.severe(ex.getMessage()); - try { - Thread.sleep(Fetcher.ERROR_TIMEOUT_MS); - } catch (InterruptedException e) { - logger.severe(e.getMessage()); - } - continue; - } - - //Create request - HttpRequest request; - try { - request = HttpRequest.newBuilder() - .uri(url) - .version(HttpClient.Version.HTTP_1_1) - .timeout(Duration.ofSeconds(Fetcher.REQUEST_TIMEOUT_S)) - .build(); - } catch (IllegalArgumentException e) { - logger.severe(e.getMessage()); - try { - Thread.sleep(Fetcher.ERROR_TIMEOUT_MS); - } catch (InterruptedException ex) { - logger.severe(ex.getMessage()); - } - continue; - } - - //Do request - HttpResponse> response; - try { - try { - response = this.client.send(request, bodyHandler); - } catch (ConnectException | HttpConnectTimeoutException ex) { - this.logger.severe("Could not connect to " + request.uri()); - Thread.sleep(Fetcher.ERROR_TIMEOUT_MS); - continue; - } catch (IOException e) { - logger.severe(e.getMessage()); - Thread.sleep(Fetcher.ERROR_TIMEOUT_MS); - continue; - } - } catch (InterruptedException e) { - logger.severe(e.getMessage()); - continue; - } - - //Check response state - if (response.statusCode() != 200) { - this.logger.warning( - "Unexpected status code(" + response.statusCode() + ") when requesting " + url + " for station(" + this.station.getName() + ")" - ); - continue; - } - - //Fetch all batons and create a map by batonMAC - Map baton_mac_map = batonDAO.getAll().stream() - .collect(Collectors.toMap(b -> b.getMac().toUpperCase(), Function.identity())); - - //Insert detections - List new_detections = new ArrayList<>(); - List detections = response.body().get().detections; - for (RonnyDetection detection : detections) { - if (baton_mac_map.containsKey(detection.mac.toUpperCase())) { - var baton = baton_mac_map.get(detection.mac.toUpperCase()); - new_detections.add(new Detection( - baton.getId(), - station.getId(), - detection.rssi, - detection.battery, - detection.uptimeMs, - detection.id, - new Timestamp((long) (detection.detectionTimestamp * 1000)), - new Timestamp(System.currentTimeMillis()) - )); - } - } - if (!new_detections.isEmpty()) { - detectionDAO.insertAll(new_detections); - new_detections.forEach((detection) -> { - lappers.forEach((lapper) -> lapper.handle(detection)); - positioners.forEach((positioner) -> positioner.handle(detection)); - }); - } - - this.logger.finer("Fetched " + detections.size() + " detections from " + station.getName() + ", Saved " + new_detections.size()); + int IDLE_TIMEOUT_MS = 4000; // Wait 4 seconds - //If few detections are retrieved from the station, wait for some time. - if (detections.size() < Fetcher.FULL_BATCH_SIZE) { - try { - Thread.sleep(Fetcher.IDLE_TIMEOUT_MS); - } catch (InterruptedException e) { - logger.severe(e.getMessage()); - } - } - } - } -} \ No newline at end of file + void fetch(); +} diff --git a/src/main/java/telraam/station/FetcherFactory.java b/src/main/java/telraam/station/FetcherFactory.java new file mode 100644 index 0000000..b3cb487 --- /dev/null +++ b/src/main/java/telraam/station/FetcherFactory.java @@ -0,0 +1,42 @@ +package telraam.station; + +import org.jdbi.v3.core.Jdbi; +import telraam.database.models.Station; +import telraam.logic.lapper.Lapper; +import telraam.logic.positioner.Positioner; +import telraam.station.http.HTTPFetcher; +import telraam.station.websocket.WebsocketFetcher; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Set; +import java.util.logging.Logger; + +public class FetcherFactory { + private final Logger logger = Logger.getLogger(FetcherFactory.class.getName()); + private final Jdbi database; + private final Set lappers; + private final Set positioners; + public FetcherFactory(Jdbi database, Set lappers, Set positioners) { + this.database = database; + this.lappers = lappers; + this.positioners = positioners; + } + + public Fetcher create(Station station) { + try { + URI stationURI = new URI(station.getUrl()); + return switch (stationURI.getScheme()) { + case "ws" -> new WebsocketFetcher(database, station, lappers, positioners); + case "http" -> new HTTPFetcher(database, station, lappers, positioners); + default -> { + logger.severe(String.format("%s is not a valid scheme for a station", stationURI.getScheme())); + yield null; + } + }; + } catch (URISyntaxException e) { + logger.severe(String.format("Failed to parse station URI: %s", e.getMessage())); + } + return null; + } +} diff --git a/src/main/java/telraam/station/http/HTTPFetcher.java b/src/main/java/telraam/station/http/HTTPFetcher.java new file mode 100644 index 0000000..c296efd --- /dev/null +++ b/src/main/java/telraam/station/http/HTTPFetcher.java @@ -0,0 +1,174 @@ +package telraam.station.http; + +import org.jdbi.v3.core.Jdbi; +import telraam.database.daos.BatonDAO; +import telraam.database.daos.DetectionDAO; +import telraam.database.daos.StationDAO; +import telraam.database.models.Baton; +import telraam.database.models.Detection; +import telraam.database.models.Station; +import telraam.logic.lapper.Lapper; +import telraam.logic.positioner.Positioner; +import telraam.station.Fetcher; +import telraam.station.models.RonnyDetection; +import telraam.station.models.RonnyResponse; + +import java.io.IOException; +import java.net.ConnectException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.http.HttpClient; +import java.net.http.HttpConnectTimeoutException; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.sql.Timestamp; +import java.time.Duration; +import java.util.*; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.logging.Logger; +import java.util.stream.Collectors; + +public class HTTPFetcher implements Fetcher { + private final Set lappers; + private final Set positioners; + private Station station; + + private final BatonDAO batonDAO; + private final DetectionDAO detectionDAO; + private final StationDAO stationDAO; + + private final HttpClient client = HttpClient.newHttpClient(); + private final Logger logger = Logger.getLogger(Fetcher.class.getName()); + + + public HTTPFetcher(Jdbi database, Station station, Set lappers, Set positioners) { + this.batonDAO = database.onDemand(BatonDAO.class); + this.detectionDAO = database.onDemand(DetectionDAO.class); + this.stationDAO = database.onDemand(StationDAO.class); + + this.lappers = lappers; + this.positioners = positioners; + this.station = station; + } + + public void fetch() { + logger.info("Running Fetcher for station(" + this.station.getId() + ")"); + JsonBodyHandler bodyHandler = new JsonBodyHandler<>(RonnyResponse.class); + + while (true) { + //Update the station to account for possible changes in the database + this.stationDAO.getById(station.getId()).ifPresentOrElse( + station -> this.station = station, + () -> this.logger.severe("Can't update station from database.") + ); + + //Get last detection id + int lastDetectionId = 0; + Optional lastDetection = detectionDAO.latestDetectionByStationId(this.station.getId()); + if (lastDetection.isPresent()) { + lastDetectionId = lastDetection.get().getRemoteId(); + } + + //Create URL + URI url; + try { + url = new URI(station.getUrl() + "/detections/" + lastDetectionId); + } catch (URISyntaxException ex) { + this.logger.severe(ex.getMessage()); + try { + Thread.sleep(Fetcher.ERROR_TIMEOUT_MS); + } catch (InterruptedException e) { + logger.severe(e.getMessage()); + } + continue; + } + + //Create request + HttpRequest request; + try { + request = HttpRequest.newBuilder() + .uri(url) + .version(HttpClient.Version.HTTP_1_1) + .timeout(Duration.ofSeconds(Fetcher.REQUEST_TIMEOUT_S)) + .build(); + } catch (IllegalArgumentException e) { + logger.severe(e.getMessage()); + try { + Thread.sleep(Fetcher.ERROR_TIMEOUT_MS); + } catch (InterruptedException ex) { + logger.severe(ex.getMessage()); + } + continue; + } + + //Do request + HttpResponse> response; + try { + try { + response = this.client.send(request, bodyHandler); + } catch (ConnectException | HttpConnectTimeoutException ex) { + this.logger.severe("Could not connect to " + request.uri()); + Thread.sleep(Fetcher.ERROR_TIMEOUT_MS); + continue; + } catch (IOException e) { + logger.severe(e.getMessage()); + Thread.sleep(Fetcher.ERROR_TIMEOUT_MS); + continue; + } + } catch (InterruptedException e) { + logger.severe(e.getMessage()); + continue; + } + + //Check response state + if (response.statusCode() != 200) { + this.logger.warning( + "Unexpected status code(" + response.statusCode() + ") when requesting " + url + " for station(" + this.station.getName() + ")" + ); + continue; + } + + //Fetch all batons and create a map by batonMAC + Map baton_mac_map = batonDAO.getAll().stream() + .collect(Collectors.toMap(b -> b.getMac().toUpperCase(), Function.identity())); + + //Insert detections + List new_detections = new ArrayList<>(); + List detections = response.body().get().detections; + for (RonnyDetection detection : detections) { + if (baton_mac_map.containsKey(detection.mac.toUpperCase())) { + var baton = baton_mac_map.get(detection.mac.toUpperCase()); + new_detections.add(new Detection( + baton.getId(), + station.getId(), + detection.rssi, + detection.battery, + detection.uptimeMs, + detection.id, + new Timestamp((long) (detection.detectionTimestamp * 1000)), + new Timestamp(System.currentTimeMillis()) + )); + } + } + if (!new_detections.isEmpty()) { + detectionDAO.insertAll(new_detections); + new_detections.forEach((detection) -> { + lappers.forEach((lapper) -> lapper.handle(detection)); + positioners.forEach((positioner) -> positioner.handle(detection)); + }); + } + + this.logger.finer("Fetched " + detections.size() + " detections from " + station.getName() + ", Saved " + new_detections.size()); + + //If few detections are retrieved from the station, wait for some time. + if (detections.size() < Fetcher.FULL_BATCH_SIZE) { + try { + Thread.sleep(Fetcher.IDLE_TIMEOUT_MS); + } catch (InterruptedException e) { + logger.severe(e.getMessage()); + } + } + } + } +} \ No newline at end of file diff --git a/src/main/java/telraam/station/JsonBodyHandler.java b/src/main/java/telraam/station/http/JsonBodyHandler.java similarity index 97% rename from src/main/java/telraam/station/JsonBodyHandler.java rename to src/main/java/telraam/station/http/JsonBodyHandler.java index 2d28e7f..462039f 100644 --- a/src/main/java/telraam/station/JsonBodyHandler.java +++ b/src/main/java/telraam/station/http/JsonBodyHandler.java @@ -1,4 +1,4 @@ -package telraam.station; +package telraam.station.http; import com.fasterxml.jackson.databind.ObjectMapper; @@ -17,12 +17,6 @@ public JsonBodyHandler(Class targetClass) { this.targetClass = targetClass; } - @Override - public HttpResponse.BodySubscriber> apply(HttpResponse.ResponseInfo responseInfo) { - return asJSON(this.targetClass); - } - - public static HttpResponse.BodySubscriber> asJSON(Class targetType) { HttpResponse.BodySubscriber upstream = HttpResponse.BodySubscribers.ofInputStream(); @@ -40,4 +34,9 @@ public static Supplier toSupplierOfType(InputStream inputStream, Class } }; } -} \ No newline at end of file + + @Override + public HttpResponse.BodySubscriber> apply(HttpResponse.ResponseInfo responseInfo) { + return asJSON(this.targetClass); + } +} diff --git a/src/main/java/telraam/station/models/RonnyResponse.java b/src/main/java/telraam/station/models/RonnyResponse.java index 726cafc..63e1382 100644 --- a/src/main/java/telraam/station/models/RonnyResponse.java +++ b/src/main/java/telraam/station/models/RonnyResponse.java @@ -9,4 +9,4 @@ public class RonnyResponse { @JsonProperty("station_id") public String stationRonnyName; -} +} \ No newline at end of file diff --git a/src/main/java/telraam/station/websocket/WebsocketClient.java b/src/main/java/telraam/station/websocket/WebsocketClient.java new file mode 100644 index 0000000..25b7bb4 --- /dev/null +++ b/src/main/java/telraam/station/websocket/WebsocketClient.java @@ -0,0 +1,75 @@ +package telraam.station.websocket; + +import jakarta.websocket.*; + +import java.net.URI; + +@ClientEndpoint +public class WebsocketClient { + public interface MessageHandler { + void handleMessage(String message); + } + public interface onStateChangeHandler { + void handleChange(); + } + + private URI endpoint; + private Session session = null; + private MessageHandler messageHandler; + private onStateChangeHandler onOpenHandler; + private onStateChangeHandler onCloseHandler; + + public WebsocketClient(URI endpointURI) throws RuntimeException { + this.endpoint = endpointURI; + } + + public void listen() throws RuntimeException { + try { + WebSocketContainer container = ContainerProvider.getWebSocketContainer(); + container.setDefaultMaxTextMessageBufferSize(100 * 1048576); // 100Mb + container.connectToServer(this, endpoint); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @OnOpen + public void onOpen(Session session) { + this.session = session; + if (this.onOpenHandler != null) { + this.onOpenHandler.handleChange(); + } + } + + @OnClose + public void onClose(Session userSession, CloseReason reason) { + this.session = null; + if (this.onCloseHandler != null) { + this.onCloseHandler.handleChange(); + } + } + + @OnMessage + public void onMessage(String message) { + if (this.messageHandler != null) { + this.messageHandler.handleMessage(message); + } + } + + public void addOnOpenHandler(onStateChangeHandler openHandler) { + this.onOpenHandler = openHandler; + } + + public void addOnCloseHandler(onStateChangeHandler openHandler) { + this.onCloseHandler = openHandler; + } + + public void addMessageHandler(MessageHandler msgHandler) { + this.messageHandler = msgHandler; + } + + public void sendMessage(String message) { + + this.session.getAsyncRemote().sendText(message); + } +} diff --git a/src/main/java/telraam/station/websocket/WebsocketFetcher.java b/src/main/java/telraam/station/websocket/WebsocketFetcher.java new file mode 100644 index 0000000..285ad15 --- /dev/null +++ b/src/main/java/telraam/station/websocket/WebsocketFetcher.java @@ -0,0 +1,165 @@ +package telraam.station.websocket; + +import com.fasterxml.jackson.core.JsonProcessingException; +import lombok.AllArgsConstructor; +import org.jdbi.v3.core.Jdbi; +import com.fasterxml.jackson.databind.ObjectMapper; +import telraam.database.daos.BatonDAO; +import telraam.database.daos.DetectionDAO; +import telraam.database.daos.StationDAO; +import telraam.database.models.Detection; +import telraam.database.models.Station; +import telraam.logic.lapper.Lapper; +import telraam.logic.positioner.Positioner; +import telraam.station.Fetcher; +import telraam.station.models.RonnyDetection; + +import java.net.URI; +import java.net.URISyntaxException; +import java.net.http.*; +import java.sql.Timestamp; +import java.util.*; +import java.util.logging.Logger; + +public class WebsocketFetcher implements Fetcher { + private final Set lappers; + private final Set positioners; + private Station station; + + private final BatonDAO batonDAO; + private final DetectionDAO detectionDAO; + private final StationDAO stationDAO; + + private final HttpClient client = HttpClient.newHttpClient(); + private final Logger logger = Logger.getLogger(WebsocketFetcher.class.getName()); + + public WebsocketFetcher(Jdbi database, Station station, Set lappers, Set positioners) { + this.batonDAO = database.onDemand(BatonDAO.class); + this.detectionDAO = database.onDemand(DetectionDAO.class); + this.stationDAO = database.onDemand(StationDAO.class); + this.lappers = lappers; + this.positioners = positioners; + + this.station = station; + } + + public void fetch() { + logger.info("Running Fetcher for station(" + this.station.getId() + ")"); + ObjectMapper mapper = new ObjectMapper(); + + //Update the station to account for possible changes in the database + this.stationDAO.getById(station.getId()).ifPresentOrElse( + station -> this.station = station, + () -> this.logger.severe("Can't update station from database.") + ); + + //Get last detection id + int lastDetectionId = 0; + Optional lastDetection = detectionDAO.latestDetectionByStationId(this.station.getId()); + if (lastDetection.isPresent()) { + lastDetectionId = lastDetection.get().getRemoteId(); + } + + InitWSMessage wsMessage = new InitWSMessage(lastDetectionId); + String wsMessageEncoded; + try { + wsMessageEncoded = mapper.writeValueAsString(wsMessage); + } catch (JsonProcessingException e) { + logger.severe(e.getMessage()); + try { + Thread.sleep(Fetcher.ERROR_TIMEOUT_MS); + } catch (InterruptedException ex) { + logger.severe(ex.getMessage()); + } + this.fetch(); + return; + } + + //Create URL + URI url; + try { + url = new URI(station.getUrl()); + } catch (URISyntaxException ex) { + this.logger.severe(ex.getMessage()); + try { + Thread.sleep(Fetcher.ERROR_TIMEOUT_MS); + } catch (InterruptedException e) { + logger.severe(e.getMessage()); + } + this.fetch(); + return; + } + + + WebsocketClient websocketClient = new WebsocketClient(url); + websocketClient.addOnOpenHandler(() -> { + websocketClient.sendMessage(wsMessageEncoded); + }); + websocketClient.addOnCloseHandler(() -> { + this.logger.severe(String.format("Websocket for station %s got closed", station.getName())); + try { + Thread.sleep(Fetcher.ERROR_TIMEOUT_MS); + } catch (InterruptedException e) { + logger.severe(e.getMessage()); + } + this.fetch(); + }); + websocketClient.addMessageHandler((String msg) -> { + //Insert detections + List new_detections = new ArrayList<>(); + List detection_mac_addresses = new ArrayList<>(); + + try { + List detections = Arrays.asList(mapper.readValue(msg, RonnyDetection[].class)); + for (RonnyDetection detection : detections) { + new_detections.add(new Detection( + 0, + station.getId(), + detection.rssi, + detection.battery, + detection.uptimeMs, + detection.id, + new Timestamp((long) (detection.detectionTimestamp * 1000)), + new Timestamp(System.currentTimeMillis()) + )); + detection_mac_addresses.add(detection.mac.toUpperCase()); + } + if (!new_detections.isEmpty()) { + List db_detections = detectionDAO.insertAllWithoutBaton(new_detections, detection_mac_addresses); + for(int i = 0; i < new_detections.size(); i++) { + Detection detection = new_detections.get(i); + Detection db_detection = db_detections.get(i); + + detection.setBatonId(db_detection.getBatonId()); + detection.setId(db_detection.getId()); + + lappers.forEach((lapper) -> lapper.handle(detection)); + positioners.forEach(positioner -> positioner.handle(detection)); + } + } + + logger.finer("Fetched " + detections.size() + " detections from " + station.getName() + ", Saved " + new_detections.size()); + } catch (JsonProcessingException e) { + logger.severe(e.getMessage()); + } + }); + + try { + websocketClient.listen(); + } catch (RuntimeException ex) { + this.logger.severe(ex.getMessage()); + try { + Thread.sleep(Fetcher.ERROR_TIMEOUT_MS); + } catch (InterruptedException e) { + logger.severe(e.getMessage()); + } + this.fetch(); + return; + } + } + + @AllArgsConstructor + private static class InitWSMessage { + public int lastId; + } +}