diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java index 2201ee3031c55..8aa678e9e9f0a 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java @@ -630,6 +630,7 @@ public CompletableFuture allowTopicOperationAsync(TopicName topicName, case COMPACT: case OFFLOAD: case UNLOAD: + case DELETE_METADATA: case ADD_BUNDLE_RANGE: case GET_BUNDLE_RANGE: case DELETE_BUNDLE_RANGE: diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index abe183baa7835..ad75e723707a6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -709,6 +709,58 @@ public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) return future; } + protected CompletableFuture internalRemovePropertiesAsync(boolean authoritative, String key) { + return validateTopicOwnershipAsync(topicName, authoritative) + .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.DELETE_METADATA)) + .thenCompose(__ -> { + if (topicName.isPartitioned()) { + return internalRemoveNonPartitionedTopicProperties(key); + } else { + return pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName) + .thenCompose(metadata -> { + if (metadata.partitions == 0) { + return internalRemoveNonPartitionedTopicProperties(key); + } + return namespaceResources() + .getPartitionedTopicResources().updatePartitionedTopicAsync(topicName, + p -> { + if (p.properties != null) { + p.properties.remove(key); + } + return new PartitionedTopicMetadata(p.partitions, p.properties); + }); + }); + } + }).thenAccept(__ -> + log.info("[{}] remove [{}] properties success with key {}", + clientAppId(), topicName, key)); + } + + private CompletableFuture internalRemoveNonPartitionedTopicProperties(String key) { + CompletableFuture future = new CompletableFuture<>(); + pulsar().getBrokerService().getTopicIfExists(topicName.toString()) + .thenAccept(opt -> { + if (!opt.isPresent()) { + throw new RestException(Status.NOT_FOUND, + getTopicNotFoundErrorMessage(topicName.toString())); + } + ManagedLedger managedLedger = ((PersistentTopic) opt.get()).getManagedLedger(); + managedLedger.asyncDeleteProperty(key, new AsyncCallbacks.UpdatePropertiesCallback() { + + @Override + public void updatePropertiesComplete(Map properties, Object ctx) { + future.complete(null); + } + + @Override + public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) { + future.completeExceptionally(exception); + } + }, null); + }); + return future; + } + protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, boolean authoritative, boolean force, boolean deleteSchema) { validateTopicOwnershipAsync(topicName, authoritative) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 5102da5d36f52..26889da82d417 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -951,6 +951,41 @@ public void updateProperties( }); } + @DELETE + @Path("/{tenant}/{namespace}/{topic}/properties") + @ApiOperation(value = "Remove the key in properties on the given topic.") + @ApiResponses(value = { + @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), + @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"), + @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Partitioned topic does not exist"), + @ApiResponse(code = 409, message = "Concurrent modification"), + @ApiResponse(code = 412, message = "Partitioned topic name is invalid"), + @ApiResponse(code = 500, message = "Internal server error") + }) + public void removeProperties( + @Suspended final AsyncResponse asyncResponse, + @ApiParam(value = "Specify the tenant", required = true) + @PathParam("tenant") String tenant, + @ApiParam(value = "Specify the namespace", required = true) + @PathParam("namespace") String namespace, + @ApiParam(value = "Specify topic name", required = true) + @PathParam("topic") @Encoded String encodedTopic, + @QueryParam("key") String key, + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validatePersistentTopicName(tenant, namespace, encodedTopic); + internalRemovePropertiesAsync(authoritative, key) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + if (!isRedirectException(ex)) { + log.error("[{}] Failed to remove key {} in properties on topic {}", + clientAppId(), key, topicName, ex); + } + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } + @DELETE @Path("/{tenant}/{namespace}/{topic}/partitions") @ApiOperation(value = "Delete a partitioned topic.", diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index 86e35c56d5628..cb186ce256af2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -906,6 +906,7 @@ public void testCreateAndGetTopicProperties() throws Exception { public void testUpdatePartitionedTopicProperties() throws Exception { final String namespace = "prop-xyz/ns2"; final String topicName = "persistent://" + namespace + "/testUpdatePartitionedTopicProperties"; + final String topicNameTwo = "persistent://" + namespace + "/testUpdatePartitionedTopicProperties2"; admin.namespaces().createNamespace(namespace, 20); // create partitioned topic with properties @@ -935,6 +936,25 @@ public void testUpdatePartitionedTopicProperties() throws Exception { Assert.assertEquals(properties.size(), 2); Assert.assertEquals(properties.get("key1"), "value11"); Assert.assertEquals(properties.get("key2"), "value2"); + + // create topic without properties + admin.topics().createPartitionedTopic(topicNameTwo, 2); + properties = admin.topics().getProperties(topicNameTwo); + Assert.assertNull(properties); + // remove key of properties on this topic + admin.topics().removeProperties(topicNameTwo, "key1"); + properties = admin.topics().getProperties(topicNameTwo); + Assert.assertNull(properties); + Map topicProp = new HashMap<>(); + topicProp.put("key1", "value1"); + topicProp.put("key2", "value2"); + admin.topics().updateProperties(topicNameTwo, topicProp); + properties = admin.topics().getProperties(topicNameTwo); + Assert.assertEquals(properties, topicProp); + admin.topics().removeProperties(topicNameTwo, "key1"); + topicProp.remove("key1"); + properties = admin.topics().getProperties(topicNameTwo); + Assert.assertEquals(properties, topicProp); } @Test diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java index 7409f119c04d7..be706d048d686 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -697,6 +697,24 @@ void updatePartitionedTopic(String topic, int numPartitions, boolean updateLocal */ CompletableFuture updatePropertiesAsync(String topic, Map properties); + /** + * Remove the key in properties on a topic. + * + * @param topic + * @param key + * @throws PulsarAdminException + */ + void removeProperties(String topic, String key) throws PulsarAdminException; + + /** + * Remove the key in properties on a topic asynchronously. + * + * @param topic + * @param key + * @return + */ + CompletableFuture removePropertiesAsync(String topic, String key); + /** * Delete a partitioned topic. *

diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 9f9092076f967..b4be27e0770a4 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -494,6 +494,19 @@ public void deletePartitionedTopic(String topic) throws PulsarAdminException { deletePartitionedTopic(topic, false); } + @Override + public void removeProperties(String topic, String key) throws PulsarAdminException { + sync(() -> removePropertiesAsync(topic, key)); + } + + @Override + public CompletableFuture removePropertiesAsync(String topic, String key) { + TopicName tn = validateTopic(topic); + WebTarget path = topicPath(tn, "properties") + .queryParam("key", key); + return asyncDeleteRequest(path); + } + @Override public CompletableFuture deletePartitionedTopicAsync(String topic) { return deletePartitionedTopicAsync(topic, false); diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index f7221aaf02831..5eaca80473d09 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -1436,6 +1436,10 @@ public void topics() throws Exception { props.put("x", "y,z"); verify(mockTopics).updateProperties("persistent://myprop/clust/ns1/ds1", props); + cmdTopics = new CmdTopics(() -> admin); + cmdTopics.run(split("remove-properties persistent://myprop/clust/ns1/ds1 --key a")); + verify(mockTopics).removeProperties("persistent://myprop/clust/ns1/ds1", "a"); + cmdTopics = new CmdTopics(() -> admin); cmdTopics.run(split("create-partitioned-topic persistent://myprop/clust/ns1/ds1 --partitions 32")); verify(mockTopics).createPartitionedTopic("persistent://myprop/clust/ns1/ds1", 32, null); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 94774670f5a7f..eaa907726997d 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -121,6 +121,7 @@ public CmdTopics(Supplier admin) { jcommander.addCommand("get-partitioned-topic-metadata", new GetPartitionedTopicMetadataCmd()); jcommander.addCommand("get-properties", new GetPropertiesCmd()); jcommander.addCommand("update-properties", new UpdateProperties()); + jcommander.addCommand("remove-properties", new RemoveProperties()); jcommander.addCommand("delete-partitioned-topic", new DeletePartitionedCmd()); jcommander.addCommand("peek-messages", new PeekMessages()); @@ -627,6 +628,21 @@ void run() throws Exception { } } + @Parameters(commandDescription = "Remove the key in properties of a topic") + private class RemoveProperties extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Parameter(names = {"--key", "-k"}, description = "The key to remove in the properties of topic") + private String key; + + @Override + void run() throws Exception { + String topic = validateTopicName(params); + getTopics().removeProperties(topic, key); + } + } + @Parameters(commandDescription = "Delete a partitioned topic. " + "It will also delete all the partitions of the topic if it exists.") private class DeletePartitionedCmd extends CliCommand { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java index d4de706e607b7..0184e0efb82d7 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java @@ -50,6 +50,7 @@ public enum TopicOperation { GET_STATS, GET_METADATA, + DELETE_METADATA, GET_BACKLOG_SIZE, SET_REPLICATED_SUBSCRIPTION_STATUS,