Skip to content

Commit

Permalink
Decode xml gz payload in TrafficHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
bertrik committed Jan 23, 2024
1 parent bf35761 commit c2fcb3f
Showing 1 changed file with 36 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -1,20 +1,28 @@
package nl.bertriksikken.verkeersdrukte.traffic;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.xml.XmlMapper;
import com.google.common.util.concurrent.Runnables;
import io.dropwizard.lifecycle.Managed;
import nl.bertriksikken.datex2.D2LogicalModel;
import nl.bertriksikken.datex2.MeasuredValue;
import nl.bertriksikken.datex2.SiteMeasurements;
import nl.bertriksikken.verkeersdrukte.ndw.FileResponse;
import nl.bertriksikken.verkeersdrukte.ndw.NdwClient;
import nl.bertriksikken.verkeersdrukte.ndw.NdwConfig;
import nl.bertriksikken.verkeersdrukte.ndw.FileResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.zip.GZIPInputStream;

public final class TrafficHandler implements ITrafficHandler, Managed {

Expand All @@ -24,10 +32,11 @@ public final class TrafficHandler implements ITrafficHandler, Managed {

private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
private final NdwClient ndwClient;
private final ObjectMapper mapper;

public TrafficHandler(NdwConfig config) {
this.ndwClient = NdwClient.create(config);

this.mapper = new XmlMapper();
}

@Override
Expand All @@ -54,21 +63,45 @@ private void downloadTrafficSpeed() {
Duration age = Duration.between(response.getLastModified(), Instant.now());
next = response.getLastModified().plusSeconds(65);
LOG.info("Got data, {} bytes, age {}", response.getContents().length, age);
decode(new ByteArrayInputStream(response.getContents()));
} catch (IOException e) {
LOG.warn("Download failed", e);
next = Instant.now().plusSeconds(60);
}

// schedule next
Duration interval = Duration.between(Instant.now(), next);
while (interval.isNegative()) {
interval = interval.plusSeconds(60);
}
LOG.info("Scheduling next download in {}, at {}", interval, next);
LOG.info("Scheduling next download in {}", interval);
executor.schedule(this::downloadTrafficSpeed, interval.toMillis(), TimeUnit.MILLISECONDS);

notifyClients();
}

private void decode(ByteArrayInputStream inputStream) throws IOException {
LOG.info("Parsing");
try (GZIPInputStream gzis = new GZIPInputStream(inputStream)) {
JsonNode node = mapper.readValue(gzis, JsonNode.class);
JsonNode d2LogicalModel = node.at("/Body/d2LogicalModel");
D2LogicalModel model = mapper.treeToValue(d2LogicalModel, D2LogicalModel.class);
D2LogicalModel.PayloadPublication payloadPublication = model.payloadPublication;
LOG.info("Payload publication: type {}, time {}", payloadPublication.type, payloadPublication.publicationTime);

D2LogicalModel.MeasuredDataPublication measuredDataPublication = (D2LogicalModel.MeasuredDataPublication) payloadPublication;
int numMeasurements = 0;
int numSites = 0;
for (SiteMeasurements measurements : measuredDataPublication.siteMeasurementsList) {
for (MeasuredValue value : measurements.measuredValueList) {
numMeasurements++;
}
numSites++;
}
LOG.info("Parsed {} measurements from {} sites", numMeasurements, numSites);
}
}

@Override
public void subscribe(String clientId, INotifyData callback) {
Subscription subscription = new Subscription(clientId, callback);
Expand Down

0 comments on commit c2fcb3f

Please sign in to comment.