Skip to content

Commit

Permalink
Experiment with MQTT and $filter
Browse files Browse the repository at this point in the history
  • Loading branch information
hylkevds committed Mar 20, 2024
1 parent 1e8f734 commit 17ebe89
Show file tree
Hide file tree
Showing 10 changed files with 356 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import de.fraunhofer.iosb.ilt.frostserver.query.expression.constant.IntegerConstant;
import de.fraunhofer.iosb.ilt.frostserver.query.expression.constant.StringConstant;
import de.fraunhofer.iosb.ilt.frostserver.query.expression.function.comparison.Equal;
import de.fraunhofer.iosb.ilt.frostserver.query.expression.function.logical.And;
import de.fraunhofer.iosb.ilt.frostserver.settings.CoreSettings;
import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -96,9 +97,9 @@ public boolean matches(PersistenceManager persistenceManager, Entity newEntity,
return true;
}

protected void generateFilter(int pathElementOffset) {
protected void generateFilter(int pathElementOffset, Expression extraFilter) {
final List<Property> properties = new ArrayList<>();
boolean direct = true;
boolean direct = extraFilter == null;
final int size = path.size();
final int startIdx = size - 1 - pathElementOffset;
if (startIdx < 0) {
Expand Down Expand Up @@ -134,7 +135,7 @@ protected void generateFilter(int pathElementOffset) {
properties.add(navProp);

if (id != null) {
createMatchExpression(properties, epe);
createMatchExpression(properties, epe, extraFilter);
// there should be at most two PathElements left, the EntitySetPath and the EntityPath now visiting
assert (i <= 1);
return;
Expand All @@ -160,7 +161,7 @@ private void createMatcher(final NavigationPropertyMain navProp, PkValue pkValue
};
}

private void createMatchExpression(List<Property> properties, final PathElementEntity epe) {
private void createMatchExpression(List<Property> properties, final PathElementEntity epe, Expression extraFilter) {
final PrimaryKey primaryKey = entityType.getPrimaryKey();
properties.addAll(primaryKey.getKeyProperties());
String epeId = UrlHelper.quoteForUrl(primaryKey, epe.getPkValues());
Expand All @@ -169,6 +170,9 @@ private void createMatchExpression(List<Property> properties, final PathElementE
} else {
matchExpression = new Equal(new Path(properties), new IntegerConstant(epeId));
}
if (extraFilter != null) {
matchExpression = new And(matchExpression, extraFilter);
}
query = new Query(modelRegistry, queryDefaults, path, ANONYMOUS_PRINCIPAL);
query.setFilter(matchExpression);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import de.fraunhofer.iosb.ilt.frostserver.property.EntityPropertyMain;
import de.fraunhofer.iosb.ilt.frostserver.query.Expand;
import de.fraunhofer.iosb.ilt.frostserver.query.Query;
import de.fraunhofer.iosb.ilt.frostserver.query.expression.Expression;
import de.fraunhofer.iosb.ilt.frostserver.settings.CoreSettings;
import de.fraunhofer.iosb.ilt.frostserver.util.StringHelper;
import de.fraunhofer.iosb.ilt.frostserver.util.exception.IncorrectRequestException;
Expand Down Expand Up @@ -57,23 +58,30 @@ private void init() {

String queryString = SubscriptionFactory.getQueryFromTopic(topic);
query = parseQuery(queryString);
if (query != null && (query.getCount().isPresent()
|| query.getFilter() != null
|| !query.getOrderBy().isEmpty()
|| query.getSkip().isPresent()
|| query.getTop().isPresent())) {
throw new IllegalArgumentException("Invalid subscription to: '" + topic + "': only $select and $expand is allowed in query options.");
} else if (query != null && !query.getExpand().isEmpty()) {
Query queryCopy = parseQuery(queryString);
if (queryCopy != null) {
List<Expand> expandList = queryCopy.getExpand();
expandQuery = new Query(modelRegistry, queryDefaults, queryCopy.getPath())
.setExpand(expandList)
.addSelect(entityType.getPrimaryKey().getKeyProperties());
Expression filter = null;
if (query != null) {
if (query.getCount().isPresent()
|| query.getFilter() != null
|| !query.getOrderBy().isEmpty()
|| query.getSkip().isPresent()
|| query.getTop().isPresent()) {
throw new IllegalArgumentException("Invalid subscription to: '" + topic + "': only $select and $expand is allowed in query options.");
}
if (query.getFilter() != null && !settings.getMqttSettings().isAllowMqttFilter()) {
throw new IllegalArgumentException("Invalid subscription to: '" + topic + "': only $filter is not allowed in query options.");
}
filter = query.getFilter();
if (!query.getExpand().isEmpty()) {
Query queryCopy = parseQuery(queryString);
if (queryCopy != null) {
List<Expand> expandList = queryCopy.getExpand();
expandQuery = new Query(modelRegistry, queryDefaults, queryCopy.getPath())
.setExpand(expandList)
.addSelect(entityType.getPrimaryKey().getKeyProperties());
}
}
}

generateFilter(1);
generateFilter(1, filter);
}

private Query parseQuery(String topic) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ private void init() {
PkValue id = ((PathElementEntity) path.getLastElement()).getPkValues();
matcher = x -> x.getPrimaryKeyValues().equals(id);
}
generateFilter(1);
generateFilter(1, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private void init() {
}
query = new Query(modelRegistry, queryDefaults, path, ANONYMOUS_PRINCIPAL);
query.addSelect(property);
generateFilter(2);
generateFilter(2, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ public class MqttSettings implements ConfigDefaults {
public static final String TAG_CREATE_THREAD_POOL_SIZE = "CreateThreadPoolSize";
@DefaultValue("")
public static final String TAG_EXPOSED_MQTT_ENDPOINTS = "exposedEndpoints";
@DefaultValueBoolean(false)
public static final String TAG_MQTT_ALLOW_FILTER = "allowFilter";

/**
* Constraints
Expand Down Expand Up @@ -140,6 +142,12 @@ public class MqttSettings implements ConfigDefaults {
* Number of threads used to process EntityCreateEvents.
*/
private int createThreadPoolSize;

/**
* Flag indicating if $filter is allowed on MQTT topics or not.
*/
private boolean allowMqttFilter;

/**
* Extension point for implementation specific settings.
*/
Expand All @@ -158,6 +166,7 @@ private void init(CoreSettings coreSettings, Settings customSettings) {
mqttServerImplementationClass = customSettings.get(TAG_IMPLEMENTATION_CLASS, getClass());
enableMqtt = customSettings.getBoolean(TAG_ENABLED, getClass());
port = customSettings.getInt(TAG_PORT, getClass());
allowMqttFilter = customSettings.getBoolean(TAG_MQTT_ALLOW_FILTER, getClass());
setHost(customSettings.get(TAG_HOST, getClass()));
setInternalHost(customSettings.get(TAG_HOST_INTERNAL, getClass()));
setSubscribeMessageQueueSize(customSettings.getInt(TAG_SUBSCRIBE_MESSAGE_QUEUE_SIZE, getClass()));
Expand Down Expand Up @@ -342,4 +351,8 @@ public void setCreateThreadPoolSize(int createThreadPoolSize) {
this.createThreadPoolSize = createThreadPoolSize;
}

public boolean isAllowMqttFilter() {
return allowMqttFilter;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import de.fraunhofer.iosb.ilt.frostclient.models.SensorThingsTaskingV11;
import de.fraunhofer.iosb.ilt.frostserver.plugin.actuation.ActuationModelSettings;
import de.fraunhofer.iosb.ilt.frostserver.plugin.multidatastream.MdsModelSettings;
import de.fraunhofer.iosb.ilt.frostserver.settings.MqttSettings;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
Expand Down Expand Up @@ -73,6 +74,7 @@ public abstract class AbstractTestClass {
defaultProperties.put(PREFIX_PLUGINS + ActuationModelSettings.TAG_ENABLE_ACTUATION, "true");
defaultProperties.put(PREFIX_PLUGINS + MdsModelSettings.TAG_ENABLE_MDS_MODEL, "true");
defaultProperties.put(TAG_FILTER_DELETE_ENABLE, "true");
defaultProperties.put(PREFIX_MQTT + MqttSettings.TAG_MQTT_ALLOW_FILTER, "true");
defaultProperties.put(PREFIX_MQTT + "session.timeout.seconds", "100");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,36 +17,31 @@
*/
package de.fraunhofer.iosb.ilt.statests.c03filtering;

import static de.fraunhofer.iosb.ilt.frostclient.models.SensorThingsSensingV11.EP_PARAMETERS;
import static de.fraunhofer.iosb.ilt.frostclient.models.SensorThingsSensingV11.EP_PROPERTIES;
import static de.fraunhofer.iosb.ilt.frostclient.models.SensorThingsSensingV11.EP_RESULTQUALITY;
import static de.fraunhofer.iosb.ilt.frostclient.models.SensorThingsSensingV11.EP_VALIDTIME;
import static de.fraunhofer.iosb.ilt.frostclient.utils.CollectionsHelper.propertiesBuilder;
import static de.fraunhofer.iosb.ilt.frostclient.utils.ParserUtils.formatKeyValuesForUrl;
import static de.fraunhofer.iosb.ilt.statests.util.EntityUtils.createDatastream;
import static de.fraunhofer.iosb.ilt.statests.util.EntityUtils.createObservationSet;
import static de.fraunhofer.iosb.ilt.statests.util.EntityUtils.createObservedProperty;
import static de.fraunhofer.iosb.ilt.statests.util.EntityUtils.createSensor;
import static de.fraunhofer.iosb.ilt.statests.util.EntityUtils.testFilterResults;
import static de.fraunhofer.iosb.ilt.statests.util.Utils.getFromList;
import static org.junit.jupiter.api.Assertions.fail;

import de.fraunhofer.iosb.ilt.frostclient.dao.Dao;
import de.fraunhofer.iosb.ilt.frostclient.exception.ServiceFailureException;
import de.fraunhofer.iosb.ilt.frostclient.model.Entity;
import de.fraunhofer.iosb.ilt.frostclient.models.ext.MapValue;
import de.fraunhofer.iosb.ilt.frostclient.models.ext.TimeInterval;
import de.fraunhofer.iosb.ilt.frostclient.models.ext.UnitOfMeasurement;
import de.fraunhofer.iosb.ilt.frostclient.utils.CollectionsHelper;
import de.fraunhofer.iosb.ilt.statests.AbstractTestClass;
import de.fraunhofer.iosb.ilt.statests.ServerVersion;
import de.fraunhofer.iosb.ilt.statests.util.EntityUtils;
import de.fraunhofer.iosb.ilt.statests.util.HTTPMethods;
import java.net.URISyntaxException;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.geojson.LineString;
import org.geojson.LngLatAlt;
import org.geojson.Point;
Expand Down Expand Up @@ -192,104 +187,35 @@ private static void createEntities() throws ServiceFailureException, URISyntaxEx
sSrvc.create(location);
LOCATIONS.add(location);

createSensor("Sensor 0", "The sensor with idx 0.", "text", "Some metadata.");
createSensor("Sensor 1", "The sensor with idx 1.", "text", "Some metadata.");
createSensor("Sensor 2", "The sensor with idx 2.", "text", "Some metadata.");
createSensor("Sensor 3", "The sensor with idx 3.", "text", "Some metadata.");
createSensor(sSrvc, "Sensor 0", "The sensor with idx 0.", "text", "Some metadata.", SENSORS);
createSensor(sSrvc, "Sensor 1", "The sensor with idx 1.", "text", "Some metadata.", SENSORS);
createSensor(sSrvc, "Sensor 2", "The sensor with idx 2.", "text", "Some metadata.", SENSORS);
createSensor(sSrvc, "Sensor 3", "The sensor with idx 3.", "text", "Some metadata.", SENSORS);

createObservedProperty("ObservedProperty 0", "http://ucom.org/temperature", "ObservedProperty with index 0.");
createObservedProperty("ObservedProperty 1", "http://ucom.org/humidity", "ObservedProperty with index 1.");
createObservedProperty("ObservedProperty 2", "http://ucom.org/pressure", "ObservedProperty with index 2.");
createObservedProperty("ObservedProperty 3", "http://ucom.org/turbidity", "ObservedProperty with index 3.");
createObservedProperty(sSrvc, "ObservedProperty 0", "http://ucom.org/temperature", "ObservedProperty with index 0.", O_PROPS);
createObservedProperty(sSrvc, "ObservedProperty 1", "http://ucom.org/humidity", "ObservedProperty with index 1.", O_PROPS);
createObservedProperty(sSrvc, "ObservedProperty 2", "http://ucom.org/pressure", "ObservedProperty with index 2.", O_PROPS);
createObservedProperty(sSrvc, "ObservedProperty 3", "http://ucom.org/turbidity", "ObservedProperty with index 3.", O_PROPS);

UnitOfMeasurement uomTemp = new UnitOfMeasurement("degree celcius", "°C", "ucum:T");

createDatastream("Datastream 0", "Datastream 1 of thing 0, sensor 0.", "someType", uomTemp, THINGS.get(0), SENSORS.get(0), O_PROPS.get(0));
createDatastream("Datastream 1", "Datastream 2 of thing 0, sensor 1.", "someType", uomTemp, THINGS.get(0), SENSORS.get(1), O_PROPS.get(1));
createDatastream("Datastream 2", "Datastream 3 of thing 0, sensor 2.", "someType", uomTemp, THINGS.get(0), SENSORS.get(2), O_PROPS.get(2));
createDatastream("Datastream 3", "Datastream 1 of thing 1, sensor 0.", "someType", uomTemp, THINGS.get(1), SENSORS.get(0), O_PROPS.get(0));
createDatastream("Datastream 4", "Datastream 2 of thing 1, sensor 1.", "someType", uomTemp, THINGS.get(1), SENSORS.get(1), O_PROPS.get(1));
createDatastream("Datastream 5", "Datastream 3 of thing 1, sensor 3.", "someType", uomTemp, THINGS.get(1), SENSORS.get(3), O_PROPS.get(3));
createDatastream("Datastream 6", "Datastream 1 of thing 2, sensor 3.", "someType", uomTemp, THINGS.get(2), SENSORS.get(1), O_PROPS.get(0));
createDatastream(sSrvc, "Datastream 0", "Datastream 1 of thing 0, sensor 0.", "someType", uomTemp, THINGS.get(0), SENSORS.get(0), O_PROPS.get(0), DATASTREAMS);
createDatastream(sSrvc, "Datastream 1", "Datastream 2 of thing 0, sensor 1.", "someType", uomTemp, THINGS.get(0), SENSORS.get(1), O_PROPS.get(1), DATASTREAMS);
createDatastream(sSrvc, "Datastream 2", "Datastream 3 of thing 0, sensor 2.", "someType", uomTemp, THINGS.get(0), SENSORS.get(2), O_PROPS.get(2), DATASTREAMS);
createDatastream(sSrvc, "Datastream 3", "Datastream 1 of thing 1, sensor 0.", "someType", uomTemp, THINGS.get(1), SENSORS.get(0), O_PROPS.get(0), DATASTREAMS);
createDatastream(sSrvc, "Datastream 4", "Datastream 2 of thing 1, sensor 1.", "someType", uomTemp, THINGS.get(1), SENSORS.get(1), O_PROPS.get(1), DATASTREAMS);
createDatastream(sSrvc, "Datastream 5", "Datastream 3 of thing 1, sensor 3.", "someType", uomTemp, THINGS.get(1), SENSORS.get(3), O_PROPS.get(3), DATASTREAMS);
createDatastream(sSrvc, "Datastream 6", "Datastream 1 of thing 2, sensor 3.", "someType", uomTemp, THINGS.get(2), SENSORS.get(1), O_PROPS.get(0), DATASTREAMS);

ZonedDateTime startTime = ZonedDateTime.parse("2016-01-01T01:00:00.000Z");
TimeInterval startInterval = TimeInterval.create(Instant.parse("2016-01-01T01:00:00.000Z"), Instant.parse("2016-01-01T02:00:00.000Z"));

createObservationSet(DATASTREAMS.get(0), 0, startTime, startInterval, 6);
createObservationSet(DATASTREAMS.get(1), 3, startTime, startInterval, 6);
createObservationSet(DATASTREAMS.get(2), 6, startTime, startInterval, 6);
createObservationSet(DATASTREAMS.get(3), 9, startTime, startInterval, 6);
createObservationSet(DATASTREAMS.get(4), 12, startTime, startInterval, 6);
createObservationSet(DATASTREAMS.get(5), 15, startTime, startInterval, 6);

}

private static Entity createSensor(String name, String desc, String type, String metadata) throws ServiceFailureException {
int idx = SENSORS.size();
MapValue properties = CollectionsHelper.propertiesBuilder()
.addItem("idx", idx)
.build();

Entity sensor = sMdl.newSensor(name, desc, type, metadata)
.setProperty(EP_PROPERTIES, properties);
sSrvc.create(sensor);
SENSORS.add(sensor);
return sensor;
}

private static Entity createDatastream(String name, String desc, String type, UnitOfMeasurement uom, Entity thing, Entity sensor, Entity op) throws ServiceFailureException {
int idx = DATASTREAMS.size();
MapValue properties = CollectionsHelper.propertiesBuilder()
.addItem("idx", idx)
.build();

Entity ds = sMdl.newDatastream(name, desc, type, uom)
.setProperty(EP_PROPERTIES, properties)
.setProperty(sMdl.npDatastreamThing, thing)
.setProperty(sMdl.npDatastreamSensor, sensor)
.setProperty(sMdl.npDatastreamObservedproperty, op);
sSrvc.create(ds);
DATASTREAMS.add(ds);
return ds;
}

private static Entity createObservedProperty(String name, String definition, String description) throws ServiceFailureException {
int idx = O_PROPS.size();
MapValue properties = CollectionsHelper.propertiesBuilder()
.addItem("idx", idx)
.build();
Entity obsProp = sMdl.newObservedProperty(name, definition, description)
.setProperty(EP_PROPERTIES, properties);
sSrvc.create(obsProp);
O_PROPS.add(obsProp);
return obsProp;
}

private static void createObservationSet(Entity datastream, long resultStart, ZonedDateTime phenomenonTimeStart, TimeInterval validTimeStart, long count) throws ServiceFailureException {
for (int i = 0; i < count; i++) {
ZonedDateTime phenTime = phenomenonTimeStart.plus(i, ChronoUnit.HOURS);
TimeInterval validTime = TimeInterval.create(
validTimeStart.getStart().plus(count, TimeUnit.HOURS),
validTimeStart.getEnd().plus(count, TimeUnit.HOURS));
createObservation(datastream, resultStart + i, phenTime, validTime);
}
}

private static Entity createObservation(Entity datastream, long result, ZonedDateTime phenomenonTime, TimeInterval validTime) throws ServiceFailureException {
int idx = OBSERVATIONS.size();
Map<String, Object> parameters = new HashMap<>();
parameters.put("idx", idx);
Entity obs = sMdl.newObservation(result, phenomenonTime, datastream)
.setProperty(EP_VALIDTIME, validTime)
.setProperty(EP_PARAMETERS, parameters);
if (idx % 2 == 0) {
obs.setProperty(EP_RESULTQUALITY, idx);
} else {
obs.setProperty(EP_RESULTQUALITY, "number-" + idx);
}
sSrvc.create(obs);
OBSERVATIONS.add(obs);
return obs;
createObservationSet(sSrvc, DATASTREAMS.get(0), 0, startTime, startInterval, 6, OBSERVATIONS);
createObservationSet(sSrvc, DATASTREAMS.get(1), 3, startTime, startInterval, 6, OBSERVATIONS);
createObservationSet(sSrvc, DATASTREAMS.get(2), 6, startTime, startInterval, 6, OBSERVATIONS);
createObservationSet(sSrvc, DATASTREAMS.get(3), 9, startTime, startInterval, 6, OBSERVATIONS);
createObservationSet(sSrvc, DATASTREAMS.get(4), 12, startTime, startInterval, 6, OBSERVATIONS);
createObservationSet(sSrvc, DATASTREAMS.get(5), 15, startTime, startInterval, 6, OBSERVATIONS);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -544,17 +544,6 @@ private void checkSubscribePatch(EntityType entityType, List<String> selectedPro
mqttHelper.getTopic(entityType, selectedProperties));
}

private JsonNode filterEntity(JsonNode entity, List<String> selectedProperties) {
Iterator iterator = entity.fieldNames();
while (iterator.hasNext()) {
String key = iterator.next().toString();
if (!selectedProperties.contains(key)) {
iterator.remove();
}
}
return entity;
}

private Callable<Object> getDeepInsertEntityAction(EntityType entityType) {
Callable<Object> trigger = () -> {
switch (entityType) {
Expand Down
Loading

0 comments on commit 17ebe89

Please sign in to comment.