From 86a900487a736c9fcd43e25cc6f9378445b6c489 Mon Sep 17 00:00:00 2001 From: Bertrik Sikken Date: Tue, 27 Feb 2024 21:21:16 +0100 Subject: [PATCH] Log subscriptions better, shorten queue check interval, reduce queue size. --- .../verkeersdrukte/app/VerkeersDrukteResource.java | 6 ++++-- .../verkeersdrukte/traffic/TrafficHandler.java | 2 -- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/verkeersdrukte/src/main/java/nl/bertriksikken/verkeersdrukte/app/VerkeersDrukteResource.java b/verkeersdrukte/src/main/java/nl/bertriksikken/verkeersdrukte/app/VerkeersDrukteResource.java index c0361a3..8f90a10 100644 --- a/verkeersdrukte/src/main/java/nl/bertriksikken/verkeersdrukte/app/VerkeersDrukteResource.java +++ b/verkeersdrukte/src/main/java/nl/bertriksikken/verkeersdrukte/app/VerkeersDrukteResource.java @@ -115,14 +115,15 @@ public void getTrafficEvents(@Context Sse sse, @Context SseEventSink sseEventSin // get initial data String clientId = "client-" + atomicInteger.incrementAndGet(); - BlockingQueue queue = new ArrayBlockingQueue<>(10); + BlockingQueue queue = new ArrayBlockingQueue<>(3); eventCallback(clientId, queue, location); // subscribe to updates + LOG.info("Subscribing client '{}' for '{}'", clientId, location); handler.subscribe(clientId, () -> eventCallback(clientId, queue, location)); try (sseEventSink) { while (!sseEventSink.isClosed()) { - AggregateMeasurement measurement = queue.poll(5, TimeUnit.SECONDS); + AggregateMeasurement measurement = queue.poll(1, TimeUnit.SECONDS); if (measurement != null) { String id = String.valueOf(measurement.dateTime.getEpochSecond() / 60); String json = mapper.writeValueAsString(new MeasurementResult(measurement)); @@ -133,6 +134,7 @@ public void getTrafficEvents(@Context Sse sse, @Context SseEventSink sseEventSin } catch (InterruptedException | JsonProcessingException e) { LOG.warn("Error sending SSE: {}", e.getMessage()); } finally { + LOG.info("Unsubscribing client '{}' for '{}'", clientId, location); handler.unsubscribe(clientId); } } diff --git a/verkeersdrukte/src/main/java/nl/bertriksikken/verkeersdrukte/traffic/TrafficHandler.java b/verkeersdrukte/src/main/java/nl/bertriksikken/verkeersdrukte/traffic/TrafficHandler.java index cfdc69e..096f161 100644 --- a/verkeersdrukte/src/main/java/nl/bertriksikken/verkeersdrukte/traffic/TrafficHandler.java +++ b/verkeersdrukte/src/main/java/nl/bertriksikken/verkeersdrukte/traffic/TrafficHandler.java @@ -179,13 +179,11 @@ public FeatureCollection.Feature getStaticData(String location) { @Override public void subscribe(String clientId, INotifyData callback) { - LOG.info("Subscribe: {}", clientId); subscriptions.put(clientId, callback); } @Override public void unsubscribe(String clientId) { - LOG.info("Unsubscribe: {}", clientId); subscriptions.remove(clientId); }