Skip to content

Commit

Permalink
Support identity placeholder in MQTT binding topics
Browse files Browse the repository at this point in the history
  • Loading branch information
epieffe committed Dec 27, 2024
1 parent 3e2a862 commit 3bd97af
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public MqttRouteConfig resolveSubscribe(
String topic)
{
return routes.stream()
.filter(r -> r.authorized(authorization) && r.matchesSubscribe(topic))
.filter(r -> r.authorized(authorization) && r.matchesSubscribe(topic, authorization))
.findFirst()
.orElse(null);
}
Expand All @@ -128,7 +128,7 @@ public MqttRouteConfig resolvePublish(
String topic)
{
return routes.stream()
.filter(r -> r.authorized(authorization) && r.matchesPublish(topic))
.filter(r -> r.authorized(authorization) && r.matchesPublish(topic, authorization))
.findFirst()
.orElse(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import io.aklivity.zilla.runtime.binding.mqtt.config.MqttConditionConfig;
import io.aklivity.zilla.runtime.binding.mqtt.config.MqttWithConfig;
import io.aklivity.zilla.runtime.engine.config.GuardedConfig;
import io.aklivity.zilla.runtime.engine.config.RouteConfig;

public final class MqttRouteConfig
Expand All @@ -32,6 +33,7 @@ public final class MqttRouteConfig
private final List<MqttConditionMatcher> when;
private final MqttWithConfig with;
private final LongPredicate authorized;
private final List<GuardedConfig> guarded;

public MqttRouteConfig(
RouteConfig route)
Expand All @@ -43,6 +45,7 @@ public MqttRouteConfig(
.collect(toList());
this.with = (MqttWithConfig) route.with;
this.authorized = route.authorized;
this.guarded = route.guarded;
}

public long compositeId()
Expand All @@ -63,14 +66,31 @@ boolean matchesSession(
}

boolean matchesSubscribe(
String topic)
String topic,
long authorization)
{
return when.isEmpty() || when.stream().anyMatch(m -> m.matchesSubscribe(topic));
String actualTopic = replaceTopicPlaceholders(topic, authorization);
return when.isEmpty() || when.stream().anyMatch(m -> m.matchesSubscribe(actualTopic));
}

boolean matchesPublish(
String topic)
String topic,
long authorization)
{
String actualTopic = replaceTopicPlaceholders(topic, authorization);
return when.isEmpty() || when.stream().anyMatch(m -> m.matchesPublish(actualTopic));
}

private String replaceTopicPlaceholders(String topic, long authorization)
{
return when.isEmpty() || when.stream().anyMatch(m -> m.matchesPublish(topic));
for (GuardedConfig g : guarded)
{
String identity = g.identity.apply(authorization);
if (identity != null)
{
topic = topic.replace(String.format("${guarded[%s].identity}", g.name), identity);
}
}
return topic;
}
}

0 comments on commit 3bd97af

Please sign in to comment.