Skip to content

Commit

Permalink
Initial working dynamic data.
Browse files Browse the repository at this point in the history
  • Loading branch information
bertrik committed Jan 25, 2024
1 parent c2fcb3f commit acf7617
Show file tree
Hide file tree
Showing 9 changed files with 164 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,14 @@ public D2LogicalModel(PayloadPublication payloadPublication) {
@JsonSubTypes({@Type(value = MeasuredDataPublication.class, name = "MeasuredDataPublication")})
public static abstract class PayloadPublication {
@JacksonXmlProperty(localName = "type", isAttribute = true)
public String type;
public final String type;

@JacksonXmlProperty(localName = "publicationTime")
public String publicationTime;

PayloadPublication(String type) {
this.type = type;
}

PayloadPublication(String type, Instant publicationDateTime) {
this(type);
publicationTime = publicationDateTime.truncatedTo(ChronoUnit.SECONDS).toString();
}
}

@JsonIgnoreProperties(ignoreUnknown = true)
Expand All @@ -58,11 +53,13 @@ public static final class MeasuredDataPublication extends PayloadPublication {
public List<SiteMeasurements> siteMeasurementsList = new ArrayList<>();

public MeasuredDataPublication() {
// jackson no-arg constructor
super("MeasuredDataPublication");
}

public MeasuredDataPublication(Instant publicationTime) {
super("MeasuredDataPublication", publicationTime);
super("MeasuredDataPublication");
this.publicationTime = publicationTime.truncatedTo(ChronoUnit.SECONDS).toString();
}

public void addSiteMeasurements(SiteMeasurements measurements) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ public final class MeasuredValue {
int index;

@JacksonXmlProperty(localName = "type", isAttribute = true)
String type = null;
String type = ""; // sometimes "_SiteMeasurementsIndexMeasuredValue"

@JacksonXmlProperty(localName = "measuredValue")
MeasuredValueWrapper measuredValue;
public MeasuredValueWrapper measuredValue;

MeasuredValue() {
// jackson constructor
Expand All @@ -35,13 +35,13 @@ public MeasuredValue(int index, BasicData basicData) {

@Override
public String toString() {
return measuredValue.toString();
return String.format(Locale.ROOT, "[%d]=%s", index, measuredValue);
}

@JsonIgnoreProperties(ignoreUnknown = true)
public static final class MeasuredValueWrapper {
@JacksonXmlProperty(localName = "basicData")
BasicData basicData;
public BasicData basicData;

MeasuredValueWrapper() {
// jackson constructor
Expand All @@ -58,13 +58,14 @@ public String toString() {
}
}

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type")
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.EXISTING_PROPERTY, property = "type")
@JsonSubTypes({@Type(value = TrafficFlow.class, name = "TrafficFlow"), @Type(value = TrafficSpeed.class, name = "TrafficSpeed")})
public static abstract class BasicData {
String type;
@JacksonXmlProperty(localName = "type", isAttribute = true)
public final String type;

@JsonCreator
public BasicData(String type) {
BasicData(String type) {
this.type = type;
}
}
Expand All @@ -73,7 +74,7 @@ public BasicData(String type) {
@JsonInclude(Include.NON_NULL)
public static abstract class BasicDataValue {
@JacksonXmlProperty(localName = "dataError")
Boolean dataError;
public boolean dataError = false;
@JacksonXmlProperty(localName = "numberOfInputValuesUsed", isAttribute = true)
int numberOfInputValuesUsed;

Expand All @@ -85,7 +86,7 @@ public static abstract class BasicDataValue {
@JsonIgnoreProperties(ignoreUnknown = true)
public static final class TrafficFlow extends BasicData {
@JacksonXmlProperty(localName = "vehicleFlow")
VehicleFlow vehicleFlow;
public VehicleFlow vehicleFlow;

TrafficFlow() {
// jackson constructor
Expand All @@ -107,7 +108,7 @@ public String toString() {
*/
public static final class VehicleFlow extends BasicDataValue {
@JacksonXmlProperty(localName = "vehicleFlowRate")
int vehicleFlowRate;
public int vehicleFlowRate;

VehicleFlow() {
// jackson constructor
Expand All @@ -129,7 +130,7 @@ public String toString() {
@JsonIgnoreProperties(ignoreUnknown = true)
public static final class TrafficSpeed extends BasicData {
@JacksonXmlProperty(localName = "averageVehicleSpeed")
AverageVehicleSpeed averageVehicleSpeed;
public AverageVehicleSpeed averageVehicleSpeed;

TrafficSpeed() {
// jackson constructor
Expand All @@ -148,7 +149,7 @@ public String toString() {

public static final class AverageVehicleSpeed extends BasicDataValue {
@JacksonXmlProperty(localName = "speed")
Double speed;
public final Double speed;

AverageVehicleSpeed() {
// jackson constructor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
public final class SiteMeasurements {

@JacksonXmlProperty(localName = "measurementSiteReference")
Reference reference;
public Reference reference;

@JacksonXmlProperty(localName = "measurementTimeDefault")
String measurementTimeDefault = "";
public String measurementTimeDefault = "";

@JacksonXmlElementWrapper(useWrapping = false)
@JacksonXmlProperty(localName = "measuredValue")
Expand All @@ -43,7 +43,7 @@ public void addMeasuredValue(MeasuredValue.BasicData basicData) {

public static final class Reference {
@JacksonXmlProperty(localName = "id", isAttribute = true)
String id = "";
public String id = "";

@JacksonXmlProperty(localName = "version", isAttribute = true)
String version = "";
Expand All @@ -60,6 +60,11 @@ public static final class Reference {
this.id = id;
this.version = version;
}

@Override
public String toString() {
return id;
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,14 @@
import jakarta.ws.rs.sse.OutboundSseEvent;
import jakarta.ws.rs.sse.Sse;
import jakarta.ws.rs.sse.SseEventSink;
import nl.bertriksikken.verkeersdrukte.traffic.AggregateMeasurement;
import nl.bertriksikken.verkeersdrukte.traffic.ITrafficHandler;
import nl.bertriksikken.verkeersdrukte.traffic.MeasurementResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Optional;

@Path("/traffic")
@Produces(MediaType.APPLICATION_JSON)
public final class VerkeersDrukteResource {
Expand Down Expand Up @@ -42,10 +46,13 @@ public String getStatic(@PathParam("location") String location) {

@GET
@Path("/dynamic/{location}")
public String getDynamic(@PathParam("location") String location) {
public Optional<MeasurementResult> getDynamic(@PathParam("location") String location) {
// return snapshot of most recent measurement for location
LOG.info("getDynamic() for location {}", location);
return location;
AggregateMeasurement aggregateMeasurement = handler.getDynamicData(location);
if (aggregateMeasurement == null) {
return Optional.empty();
}
return Optional.of(new MeasurementResult(location, aggregateMeasurement));
}

@GET
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package nl.bertriksikken.verkeersdrukte.traffic;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.Locale;

/**
* JSON serializable representation of a measurement for one location.
*/
@JsonInclude(JsonInclude.Include.NON_NULL)
public final class AggregateMeasurement {
@JsonProperty("datetime")
final String dateTime;
@JsonProperty("flow")
final Double flow; // vehicles per hour
@JsonProperty("speed")
final Double speed; // km per hour

public AggregateMeasurement(String dateTime, double flow, double speed) {
this.dateTime = dateTime;
this.flow = Double.isFinite(flow) ? flow : null;
this.speed = Double.isFinite(speed) ? speed : null;
}

@Override
public String toString() {
return String.format(Locale.ROOT, "{flow=%.1f,speed=%.1f}", flow, speed);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@

public interface ITrafficHandler {

public void subscribe(String clientId, INotifyData callback);

public void unsubscribe(String clientId);

boolean isHealthy();

AggregateMeasurement getDynamicData(String location);

void subscribe(String clientId, INotifyData callback);

void unsubscribe(String clientId);

public interface INotifyData {
void notifyUpdate();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package nl.bertriksikken.verkeersdrukte.traffic;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public final class MeasurementCache {

private final String publishedDateTime;
private Map<String, AggregateMeasurement> measurementMap = new ConcurrentHashMap<>();

MeasurementCache(String publishedDateTime) {
this.publishedDateTime = publishedDateTime;
}

public void put(String location, AggregateMeasurement measurement) {
measurementMap.put(location, measurement);
}

public AggregateMeasurement get(String location) {
return measurementMap.get(location);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package nl.bertriksikken.verkeersdrukte.traffic;

import com.fasterxml.jackson.annotation.JsonProperty;

public final class MeasurementResult {

@JsonProperty("location")
public final String location;

@JsonProperty("measurement")
public final AggregateMeasurement measurement;

public MeasurementResult(String location, AggregateMeasurement measurement) {
this.location = location;
this.measurement = measurement;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -33,6 +34,7 @@ public final class TrafficHandler implements ITrafficHandler, Managed {
private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
private final NdwClient ndwClient;
private final ObjectMapper mapper;
private MeasurementCache measurementCache = new MeasurementCache("");

public TrafficHandler(NdwConfig config) {
this.ndwClient = NdwClient.create(config);
Expand Down Expand Up @@ -63,7 +65,7 @@ private void downloadTrafficSpeed() {
Duration age = Duration.between(response.getLastModified(), Instant.now());
next = response.getLastModified().plusSeconds(65);
LOG.info("Got data, {} bytes, age {}", response.getContents().length, age);
decode(new ByteArrayInputStream(response.getContents()));
measurementCache = decode(new ByteArrayInputStream(response.getContents()));
} catch (IOException e) {
LOG.warn("Download failed", e);
next = Instant.now().plusSeconds(60);
Expand All @@ -80,7 +82,7 @@ private void downloadTrafficSpeed() {
notifyClients();
}

private void decode(ByteArrayInputStream inputStream) throws IOException {
private MeasurementCache decode(ByteArrayInputStream inputStream) throws IOException {
LOG.info("Parsing");
try (GZIPInputStream gzis = new GZIPInputStream(inputStream)) {
JsonNode node = mapper.readValue(gzis, JsonNode.class);
Expand All @@ -90,18 +92,58 @@ private void decode(ByteArrayInputStream inputStream) throws IOException {
LOG.info("Payload publication: type {}, time {}", payloadPublication.type, payloadPublication.publicationTime);

D2LogicalModel.MeasuredDataPublication measuredDataPublication = (D2LogicalModel.MeasuredDataPublication) payloadPublication;
int numMeasurements = 0;
int numSites = 0;
MeasurementCache snapshot = new MeasurementCache(measuredDataPublication.publicationTime);
for (SiteMeasurements measurements : measuredDataPublication.siteMeasurementsList) {
for (MeasuredValue value : measurements.measuredValueList) {
numMeasurements++;
}
numSites++;
AggregateMeasurement aggregateMeasurement = aggregateValues(measurements);
snapshot.put(measurements.reference.id, aggregateMeasurement);
}
LOG.info("Parsed {} measurements from {} sites", numMeasurements, numSites);
return snapshot;
}
}


private AggregateMeasurement aggregateValues(SiteMeasurements measurements) {
String dateTime = measurements.measurementTimeDefault;
// group by type
List<MeasuredValue.TrafficFlow> flows = new ArrayList<>();
List<MeasuredValue.TrafficSpeed> speeds = new ArrayList<>();
for (MeasuredValue value : measurements.measuredValueList) {
switch (value.measuredValue.basicData.type) {
case "TrafficFlow":
flows.add((MeasuredValue.TrafficFlow) value.measuredValue.basicData);
break;
case "TrafficSpeed":
speeds.add((MeasuredValue.TrafficSpeed) value.measuredValue.basicData);
break;
default:
break;
}
}
if (flows.isEmpty() || (flows.size() != speeds.size())) {
// cannot determine speed
return new AggregateMeasurement(dateTime, Double.NaN, Double.NaN);
}

// aggregate flow as simple sum, speed as flow-weighted sum
double sumFlowSpeed = 0.0;
double sumFlow = 0.0;
for (int i = 0; i < flows.size(); i++) {
MeasuredValue.TrafficFlow flow = flows.get(i);
MeasuredValue.TrafficSpeed speed = speeds.get(i);
double flowValue = flow.vehicleFlow.dataError ? Double.NaN : flow.vehicleFlow.vehicleFlowRate;
double speedValue = speed.averageVehicleSpeed.dataError ? Double.NaN : speed.averageVehicleSpeed.speed;
sumFlowSpeed += flow.vehicleFlow.vehicleFlowRate * speed.averageVehicleSpeed.speed;
sumFlow += flow.vehicleFlow.vehicleFlowRate;
}
double aggregateSpeed = sumFlowSpeed / sumFlow;
return new AggregateMeasurement(dateTime, sumFlow, aggregateSpeed);
}

@Override
public AggregateMeasurement getDynamicData(String location) {
return measurementCache.get(location);
}

@Override
public void subscribe(String clientId, INotifyData callback) {
Subscription subscription = new Subscription(clientId, callback);
Expand Down

0 comments on commit acf7617

Please sign in to comment.