Skip to content

Commit

Permalink
removed CONNECT message from client connection
Browse files Browse the repository at this point in the history
  • Loading branch information
Florian-Limpoeck committed Nov 7, 2024
1 parent 8f8f6f7 commit 582e3af
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 38 deletions.
17 changes: 9 additions & 8 deletions src/main/java/com/hivemq/bootstrap/ClientConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.hivemq.mqtt.handler.publish.PublishFlushHandler;
import com.hivemq.mqtt.message.ProtocolVersion;
import com.hivemq.mqtt.message.connect.CONNECT;
import com.hivemq.mqtt.message.connect.MqttWillPublish;
import com.hivemq.mqtt.message.mqtt5.Mqtt5UserProperties;
import com.hivemq.mqtt.message.pool.FreePacketIdRanges;
import com.hivemq.security.auth.SslClientCertificate;
Expand Down Expand Up @@ -60,7 +61,7 @@ public class ClientConnection implements ClientConnectionContext {
private @NotNull String clientId;
private boolean cleanStart;
private @Nullable ModifiableDefaultPermissions authPermissions;
private @Nullable CONNECT connectMessage;
private @Nullable MqttWillPublish willPublish;
private @Nullable AtomicInteger inFlightMessageCount;
private @Nullable Integer clientReceiveMaximum;
private @Nullable Integer connectKeepAlive;
Expand Down Expand Up @@ -131,7 +132,7 @@ public class ClientConnection implements ClientConnectionContext {
context.cleanStart,
context.authPermissions,
context.connectedListener,
context.connectMessage,
context.willPublish,
context.clientReceiveMaximum,
context.connectKeepAlive,
context.queueSizeMaximum,
Expand Down Expand Up @@ -178,7 +179,7 @@ public ClientConnection(
final boolean cleanStart,
final @Nullable ModifiableDefaultPermissions authPermissions,
final @NotNull Listener connectedListener,
final @Nullable CONNECT connectMessage,
final @Nullable MqttWillPublish mqttWillPublish,
final @Nullable Integer clientReceiveMaximum,
final @Nullable Integer connectKeepAlive,
final @Nullable Long queueSizeMaximum,
Expand Down Expand Up @@ -219,7 +220,7 @@ public ClientConnection(
this.cleanStart = cleanStart;
this.authPermissions = authPermissions;
this.connectedListener = connectedListener;
this.connectMessage = connectMessage;
this.willPublish = mqttWillPublish;
this.clientReceiveMaximum = clientReceiveMaximum;
this.connectKeepAlive = connectKeepAlive;
this.queueSizeMaximum = queueSizeMaximum;
Expand Down Expand Up @@ -327,13 +328,13 @@ public void setAuthPermissions(final @NotNull ModifiableDefaultPermissions authP
return connectedListener;
}

public @Nullable CONNECT getConnectMessage() {
return connectMessage;
public @Nullable MqttWillPublish getWillPublish() {
return willPublish;
}

@Override
public void setConnectMessage(final @Nullable CONNECT connectMessage) {
this.connectMessage = connectMessage;
public void setWillPublish(final @Nullable MqttWillPublish willPublish) {
this.willPublish = willPublish;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.hivemq.extensions.events.client.parameters.ClientEventListeners;
import com.hivemq.mqtt.message.ProtocolVersion;
import com.hivemq.mqtt.message.connect.CONNECT;
import com.hivemq.mqtt.message.connect.MqttWillPublish;
import com.hivemq.mqtt.message.mqtt5.Mqtt5UserProperties;
import com.hivemq.security.auth.SslClientCertificate;
import io.netty.channel.Channel;
Expand Down Expand Up @@ -136,7 +137,7 @@ public interface ClientConnectionContext {

void setRequestProblemInformation(boolean problemInformationRequested);

void setConnectMessage(@Nullable CONNECT msg);
void setWillPublish(@Nullable MqttWillPublish willPublish);

@NotNull String @Nullable [] getTopicAliasMapping();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.hivemq.mqtt.handler.publish.PublishFlushHandler;
import com.hivemq.mqtt.message.ProtocolVersion;
import com.hivemq.mqtt.message.connect.CONNECT;
import com.hivemq.mqtt.message.connect.MqttWillPublish;
import com.hivemq.mqtt.message.mqtt5.Mqtt5UserProperties;
import com.hivemq.security.auth.SslClientCertificate;
import io.netty.channel.Channel;
Expand All @@ -53,7 +54,7 @@ public class UndefinedClientConnection implements ClientConnectionContext {
@Nullable String clientId;
boolean cleanStart;
@Nullable ModifiableDefaultPermissions authPermissions;
@Nullable CONNECT connectMessage;
@Nullable MqttWillPublish willPublish;
@Nullable Integer clientReceiveMaximum;
@Nullable Integer connectKeepAlive;
@Nullable Long queueSizeMaximum;
Expand Down Expand Up @@ -165,8 +166,8 @@ public void setAuthPermissions(final @NotNull ModifiableDefaultPermissions authP
}

@Override
public void setConnectMessage(final @Nullable CONNECT connectMessage) {
this.connectMessage = connectMessage;
public void setWillPublish(final @Nullable MqttWillPublish willPublish) {
this.willPublish = willPublish;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import com.hivemq.mqtt.handler.connack.MqttConnacker;
import com.hivemq.mqtt.handler.publish.DefaultPermissionsEvaluator;
import com.hivemq.mqtt.message.connack.CONNACK;
import com.hivemq.mqtt.message.connect.CONNECT;
import com.hivemq.mqtt.message.connect.MqttWillPublish;
import com.hivemq.mqtt.message.mqtt5.Mqtt5UserProperties;
import com.hivemq.mqtt.message.reason.Mqtt5ConnAckReasonCode;
Expand Down Expand Up @@ -124,9 +123,9 @@ private void fireInitialize(
if (pluginInitializerMap.isEmpty() && msg != null) {
clientConnection.setPreventLwt(false);
ctx.writeAndFlush(msg, promise);
// Prevent leaking the retained CONNECT message for any existing ClientConnection.
// The CONNECT message would otherwise be owned by the plugin initialization below outside this scope.
clientConnection.setConnectMessage(null);
// Prevent leaking the retained WILL message for any existing ClientConnection.
// The WILL message would otherwise be owned by the plugin initialization below outside this scope.
clientConnection.setWillPublish(null);
return;
}

Expand Down Expand Up @@ -175,14 +174,14 @@ private void fireInitialize(
@Override
public void onSuccess(@Nullable final Void result) {
authenticateWill(ctx, msg, promise);
clientConnection.setConnectMessage(null);
clientConnection.setWillPublish(null);
}

@Override
public void onFailure(final @NotNull Throwable t) {
Exceptions.rethrowError(t);
log.error("Calling initializer failed", t);
clientConnection.setConnectMessage(null);
clientConnection.setWillPublish(null);
ctx.writeAndFlush(msg, promise);
}
}, ctx.executor());
Expand All @@ -195,13 +194,12 @@ private void authenticateWill(

final ClientConnection clientConnection = ClientConnection.of(ctx.channel());

final CONNECT connect = clientConnection.getConnectMessage();
if (connect == null || connect.getWillPublish() == null) {
final MqttWillPublish willPublish = clientConnection.getWillPublish();
if (willPublish == null) {
ctx.writeAndFlush(msg, promise);
return;
}

final MqttWillPublish willPublish = connect.getWillPublish();
final ModifiableDefaultPermissions permissions = clientConnection.getAuthPermissions();
if (DefaultPermissionsEvaluator.checkWillPublish(permissions, willPublish)) {
clientConnection.setPreventLwt(false); //clear prevent flag, Will is authorized
Expand All @@ -213,7 +211,7 @@ private void authenticateWill(
clientConnection.setPreventLwt(true);
//We have already added the will to the session, so we need to remove it again
final ListenableFuture<Void> removeWillFuture =
clientSessionPersistence.deleteWill(connect.getClientIdentifier());
clientSessionPersistence.deleteWill(clientConnection.getClientId());
Futures.addCallback(removeWillFuture, new FutureCallback<>() {
@Override
public void onSuccess(@Nullable final Void result) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -606,8 +606,8 @@ private void sendConnackSuccess(

final ChannelFuture connackSent;

// We retain the CONNECT message in memory during the initialization progress, e.g. for plugin initialization.
clientConnection.setConnectMessage(msg);
// We retain the WILL message in memory during the initialization progress, e.g. for plugin initialization.
clientConnection.setWillPublish(msg.getWillPublish());

if (msg.getProtocolVersion() == ProtocolVersion.MQTTv5) {
final CONNACK connack = buildMqtt5Connack(clientConnection, msg, sessionPresent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import com.hivemq.mqtt.message.ProtocolVersion;
import com.hivemq.mqtt.message.QoS;
import com.hivemq.mqtt.message.connack.CONNACK;
import com.hivemq.mqtt.message.connect.CONNECT;
import com.hivemq.mqtt.message.connect.MqttWillPublish;
import com.hivemq.mqtt.message.mqtt5.Mqtt5UserProperties;
import com.hivemq.mqtt.message.reason.Mqtt5ConnAckReasonCode;
Expand Down Expand Up @@ -112,7 +111,7 @@ public void setUp() throws Exception {

channel = new EmbeddedChannel();
clientConnection = new DummyClientConnection(channel, publishFlushHandler);
clientConnection.setConnectMessage(mock(CONNECT.class));
clientConnection.setWillPublish(mock(MqttWillPublish.class));
clientConnection.setClientId("test_client");
clientConnection.setProtocolVersion(ProtocolVersion.MQTTv5);

Expand Down Expand Up @@ -163,7 +162,7 @@ public void test_write_connack_no_initializer() throws Exception {
verify(channelHandlerContext).writeAndFlush(any(Object.class), eq(channelPromise));

assertFalse(ClientConnection.of(channel).isPreventLwt());
assertNull(clientConnection.getConnectMessage());
assertNull(clientConnection.getWillPublish());
}

@Test(timeout = 10000)
Expand Down Expand Up @@ -193,7 +192,7 @@ public void test_write_connack_fire_initialize() throws Exception {
verify(initializers, timeout(5000).times(1)).getClientInitializerMap();
verify(channelHandlerContext, timeout(5000)).writeAndFlush(any(Object.class), eq(channelPromise));
verify(channelPipeline).remove(any(ChannelHandler.class));
assertNull(clientConnection.getConnectMessage());
assertNull(clientConnection.getWillPublish());
}

@Test(timeout = 10000)
Expand All @@ -215,10 +214,7 @@ public void test_write_will_publish_not_authorized() throws Exception {
.withPayload(new byte[]{1, 2, 3})
.build();

final CONNECT connect =
new CONNECT.Mqtt5Builder().withClientIdentifier("test-client").withWillPublish(willPublish).build();

ClientConnection.of(channel).setConnectMessage(connect);
ClientConnection.of(channel).setWillPublish(willPublish);

final ModifiableDefaultPermissionsImpl permissions = new ModifiableDefaultPermissionsImpl();
permissions.add(new TopicPermissionBuilderImpl(new TestConfigurationBootstrap().getFullConfigurationService()).topicFilter(
Expand All @@ -238,7 +234,7 @@ public void test_write_will_publish_not_authorized() throws Exception {

verify(channelPipeline).remove(any(ChannelHandler.class));
assertTrue(ClientConnection.of(channel).isPreventLwt());
assertNull(clientConnection.getConnectMessage());
assertNull(clientConnection.getWillPublish());
}

@Test(timeout = 10000)
Expand All @@ -251,10 +247,7 @@ public void test_write_will_publish_authorized() throws Exception {
.withPayload(new byte[]{1, 2, 3})
.build();

final CONNECT connect =
new CONNECT.Mqtt5Builder().withClientIdentifier("test-client").withWillPublish(willPublish).build();

ClientConnection.of(channel).setConnectMessage(connect);
ClientConnection.of(channel).setWillPublish(willPublish);

final ModifiableDefaultPermissionsImpl permissions = new ModifiableDefaultPermissionsImpl();
permissions.add(new TopicPermissionBuilderImpl(new TestConfigurationBootstrap().getFullConfigurationService()).topicFilter(
Expand All @@ -271,7 +264,7 @@ public void test_write_will_publish_authorized() throws Exception {

verify(channelPipeline).remove(any(ChannelHandler.class));
assertFalse(ClientConnection.of(channel).isPreventLwt());
assertNull(clientConnection.getConnectMessage());
assertNull(clientConnection.getWillPublish());
}

private Map<String, ClientInitializer> createClientInitializerMap() throws Exception {
Expand Down

0 comments on commit 582e3af

Please sign in to comment.