Skip to content

Commit

Permalink
Log subscriptions better, shorten queue check interval, reduce queue …
Browse files Browse the repository at this point in the history
…size.
  • Loading branch information
bertrik committed Feb 27, 2024
1 parent 03363bc commit 86a9004
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,15 @@ public void getTrafficEvents(@Context Sse sse, @Context SseEventSink sseEventSin

// get initial data
String clientId = "client-" + atomicInteger.incrementAndGet();
BlockingQueue<AggregateMeasurement> queue = new ArrayBlockingQueue<>(10);
BlockingQueue<AggregateMeasurement> queue = new ArrayBlockingQueue<>(3);
eventCallback(clientId, queue, location);

// subscribe to updates
LOG.info("Subscribing client '{}' for '{}'", clientId, location);
handler.subscribe(clientId, () -> eventCallback(clientId, queue, location));
try (sseEventSink) {
while (!sseEventSink.isClosed()) {
AggregateMeasurement measurement = queue.poll(5, TimeUnit.SECONDS);
AggregateMeasurement measurement = queue.poll(1, TimeUnit.SECONDS);
if (measurement != null) {
String id = String.valueOf(measurement.dateTime.getEpochSecond() / 60);
String json = mapper.writeValueAsString(new MeasurementResult(measurement));
Expand All @@ -133,6 +134,7 @@ public void getTrafficEvents(@Context Sse sse, @Context SseEventSink sseEventSin
} catch (InterruptedException | JsonProcessingException e) {
LOG.warn("Error sending SSE: {}", e.getMessage());
} finally {
LOG.info("Unsubscribing client '{}' for '{}'", clientId, location);
handler.unsubscribe(clientId);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,13 +179,11 @@ public FeatureCollection.Feature getStaticData(String location) {

@Override
public void subscribe(String clientId, INotifyData callback) {
LOG.info("Subscribe: {}", clientId);
subscriptions.put(clientId, callback);
}

@Override
public void unsubscribe(String clientId) {
LOG.info("Unsubscribe: {}", clientId);
subscriptions.remove(clientId);
}

Expand Down

0 comments on commit 86a9004

Please sign in to comment.