diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 8b5a4ef270b0e..0f7ae00713dce 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -3535,6 +3535,12 @@ public double getLoadBalancerBandwidthOutResourceWeight() { ) private String compactionServiceFactoryClassName = "org.apache.pulsar.compaction.PulsarCompactionServiceFactory"; + @FieldContext( + category = CATEGORY_SERVER, + doc = "Opt-out of topic-existence check when setting permissions" + ) + private boolean allowAclChangesOnNonExistentTopics = false; + /**** --- KeyStore TLS config variables. --- ****/ @FieldContext( category = CATEGORY_KEYSTORE_TLS, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 6070093cc3585..9a306f6b4fff7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -206,8 +206,16 @@ protected CompletableFuture> internalGetPartitionedTopicListAsync() protected CompletableFuture>> internalGetPermissionsOnTopic() { // This operation should be reading from zookeeper and it should be allowed without having admin privileges - return validateAdminAccessForTenantAsync(namespaceName.getTenant()) - .thenCompose(__ -> internalCheckTopicExists(topicName)) + CompletableFuture validateAccessForTenantCf = + validateAdminAccessForTenantAsync(namespaceName.getTenant()); + + var checkIfTopicExists = !pulsar().getConfiguration().isAllowAclChangesOnNonExistentTopics(); + if (checkIfTopicExists) { + validateAccessForTenantCf = validateAccessForTenantCf + .thenCompose(__ -> internalCheckTopicExists(topicName)); + } + + return validateAccessForTenantCf .thenCompose(__ -> getAuthorizationService().getPermissionsAsync(topicName)); } @@ -258,9 +266,16 @@ private CompletableFuture grantPermissionsAsync(TopicName topicUri, String protected void internalGrantPermissionsOnTopic(final AsyncResponse asyncResponse, String role, Set actions) { // This operation should be reading from zookeeper and it should be allowed without having admin privileges - validateAdminAccessForTenantAsync(namespaceName.getTenant()) - .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) - .thenCompose(__ -> internalCheckTopicExists(topicName)) + CompletableFuture validateAccessForTenantCf = validateAdminAccessForTenantAsync(namespaceName.getTenant()) + .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()); + + var checkIfTopicExists = !pulsar().getConfiguration().isAllowAclChangesOnNonExistentTopics(); + if (checkIfTopicExists) { + validateAccessForTenantCf = validateAccessForTenantCf + .thenCompose(__ -> internalCheckTopicExists(topicName)); + } + + validateAccessForTenantCf .thenCompose(unused1 -> grantPermissionsAsync(topicName, role, actions)) .thenAccept(unused -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { @@ -273,8 +288,16 @@ protected void internalGrantPermissionsOnTopic(final AsyncResponse asyncResponse protected void internalRevokePermissionsOnTopic(AsyncResponse asyncResponse, String role) { // This operation should be reading from zookeeper and it should be allowed without having admin privileges - validateAdminAccessForTenantAsync(namespaceName.getTenant()) - .thenCompose(__ -> internalCheckTopicExists(topicName)) + CompletableFuture validateAccessForTenantCf = + validateAdminAccessForTenantAsync(namespaceName.getTenant()); + + var checkIfTopicExists = !pulsar().getConfiguration().isAllowAclChangesOnNonExistentTopics(); + if (checkIfTopicExists) { + validateAccessForTenantCf = validateAccessForTenantCf + .thenCompose(__ -> internalCheckTopicExists(topicName)); + } + + validateAccessForTenantCf .thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, true, false) .thenCompose(metadata -> { int numPartitions = metadata.partitions; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index 70c2b343ec584..cea43cc9345d8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.admin; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -3668,4 +3669,23 @@ public void testPermissions() { assertThrows(NotFoundException.class, () -> admin.topics().grantPermission(topic, subject, Set.of(AuthAction.produce))); assertThrows(NotFoundException.class, () -> admin.topics().revokePermissions(topic, subject)); } + + @Test + @SneakyThrows + public void testPermissionsAllowAclChangesOnNonExistentTopics() { + pulsar.getConfiguration().setAllowAclChangesOnNonExistentTopics(true); + try { + String namespace = "prop-xyz/ns1/"; + final String random = UUID.randomUUID().toString(); + final String topic = "persistent://" + namespace + random; + final String subject = UUID.randomUUID().toString(); + admin.topics().grantPermission(topic, subject, Set.of(AuthAction.produce)); + assertThat(admin.topics().getPermissions(topic).get(subject)).containsExactly(AuthAction.produce); + admin.topics().revokePermissions(topic, subject); + assertThat(admin.topics().getPermissions(topic).get(subject)).isNullOrEmpty(); + } finally { + // reset config + pulsar.getConfiguration().setAllowAclChangesOnNonExistentTopics(false); + } + } }