diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java index 2201ee3031c55..8aa678e9e9f0a 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java @@ -630,6 +630,7 @@ public CompletableFuture allowTopicOperationAsync(TopicName topicName, case COMPACT: case OFFLOAD: case UNLOAD: + case DELETE_METADATA: case ADD_BUNDLE_RANGE: case GET_BUNDLE_RANGE: case DELETE_BUNDLE_RANGE: diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 1ec1c3d83554c..26170bb56f8c5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -64,6 +64,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerOfflineBacklog; import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; @@ -654,6 +655,113 @@ private CompletableFuture> getPropertiesAsync() { }); } + protected CompletableFuture internalUpdatePropertiesAsync(boolean authoritative, + Map properties) { + if (properties == null || properties.isEmpty()) { + log.warn("[{}] [{}] properties is empty, ignore update", clientAppId(), topicName); + return CompletableFuture.completedFuture(null); + } + return validateTopicOwnershipAsync(topicName, authoritative) + .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.PRODUCE)) + .thenCompose(__ -> { + if (topicName.isPartitioned()) { + return internalUpdateNonPartitionedTopicProperties(properties); + } else { + return pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName) + .thenCompose(metadata -> { + if (metadata.partitions == 0) { + return internalUpdateNonPartitionedTopicProperties(properties); + } + return namespaceResources() + .getPartitionedTopicResources().updatePartitionedTopicAsync(topicName, + p -> new PartitionedTopicMetadata(p.partitions, + p.properties == null ? properties + : MapUtils.putAll(p.properties, properties.entrySet().toArray()))); + }); + } + }).thenAccept(__ -> + log.info("[{}] [{}] update properties success with properties {}", + clientAppId(), topicName, properties)); + } + + private CompletableFuture internalUpdateNonPartitionedTopicProperties(Map properties) { + CompletableFuture future = new CompletableFuture<>(); + pulsar().getBrokerService().getTopicIfExists(topicName.toString()) + .thenAccept(opt -> { + if (!opt.isPresent()) { + throw new RestException(Status.NOT_FOUND, + getTopicNotFoundErrorMessage(topicName.toString())); + } + ManagedLedger managedLedger = ((PersistentTopic) opt.get()).getManagedLedger(); + managedLedger.asyncSetProperties(properties, new AsyncCallbacks.UpdatePropertiesCallback() { + + @Override + public void updatePropertiesComplete(Map properties, Object ctx) { + managedLedger.getConfig().getProperties().putAll(properties); + future.complete(null); + } + + @Override + public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) { + future.completeExceptionally(exception); + } + }, null); + }); + return future; + } + + protected CompletableFuture internalRemovePropertiesAsync(boolean authoritative, String key) { + return validateTopicOwnershipAsync(topicName, authoritative) + .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.DELETE_METADATA)) + .thenCompose(__ -> { + if (topicName.isPartitioned()) { + return internalRemoveNonPartitionedTopicProperties(key); + } else { + return pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName) + .thenCompose(metadata -> { + if (metadata.partitions == 0) { + return internalRemoveNonPartitionedTopicProperties(key); + } + return namespaceResources() + .getPartitionedTopicResources().updatePartitionedTopicAsync(topicName, + p -> { + if (p.properties != null) { + p.properties.remove(key); + } + return new PartitionedTopicMetadata(p.partitions, p.properties); + }); + }); + } + }).thenAccept(__ -> + log.info("[{}] remove [{}] properties success with key {}", + clientAppId(), topicName, key)); + } + + private CompletableFuture internalRemoveNonPartitionedTopicProperties(String key) { + CompletableFuture future = new CompletableFuture<>(); + pulsar().getBrokerService().getTopicIfExists(topicName.toString()) + .thenAccept(opt -> { + if (!opt.isPresent()) { + throw new RestException(Status.NOT_FOUND, + getTopicNotFoundErrorMessage(topicName.toString())); + } + ManagedLedger managedLedger = ((PersistentTopic) opt.get()).getManagedLedger(); + managedLedger.asyncDeleteProperty(key, new AsyncCallbacks.UpdatePropertiesCallback() { + + @Override + public void updatePropertiesComplete(Map properties, Object ctx) { + future.complete(null); + } + + @Override + public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) { + future.completeExceptionally(exception); + } + }, null); + }); + return future; + } + protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, boolean authoritative, boolean force, boolean deleteSchema) { validateTopicOwnershipAsync(topicName, authoritative) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index af5120fa9f91e..26889da82d417 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -915,6 +915,77 @@ public void getProperties( }); } + @PUT + @Path("/{tenant}/{namespace}/{topic}/properties") + @ApiOperation(value = "Update the properties on the given topic.") + @ApiResponses(value = { + @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), + @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or" + + "subscriber is not authorized to access this operation"), + @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Topic/Subscription does not exist"), + @ApiResponse(code = 405, message = "Method Not Allowed"), + @ApiResponse(code = 500, message = "Internal server error"), + @ApiResponse(code = 503, message = "Failed to validate global cluster configuration") + }) + public void updateProperties( + @Suspended final AsyncResponse asyncResponse, + @ApiParam(value = "Specify the tenant", required = true) + @PathParam("tenant") String tenant, + @ApiParam(value = "Specify the namespace", required = true) + @PathParam("namespace") String namespace, + @ApiParam(value = "Specify topic name", required = true) + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @ApiParam(value = "Key value pair properties for the topic metadata") Map properties){ + validatePersistentTopicName(tenant, namespace, encodedTopic); + internalUpdatePropertiesAsync(authoritative, properties) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + if (!isRedirectException(ex)) { + log.error("[{}] Failed to update topic {} properties", clientAppId(), topicName, ex); + } + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } + + @DELETE + @Path("/{tenant}/{namespace}/{topic}/properties") + @ApiOperation(value = "Remove the key in properties on the given topic.") + @ApiResponses(value = { + @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), + @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"), + @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Partitioned topic does not exist"), + @ApiResponse(code = 409, message = "Concurrent modification"), + @ApiResponse(code = 412, message = "Partitioned topic name is invalid"), + @ApiResponse(code = 500, message = "Internal server error") + }) + public void removeProperties( + @Suspended final AsyncResponse asyncResponse, + @ApiParam(value = "Specify the tenant", required = true) + @PathParam("tenant") String tenant, + @ApiParam(value = "Specify the namespace", required = true) + @PathParam("namespace") String namespace, + @ApiParam(value = "Specify topic name", required = true) + @PathParam("topic") @Encoded String encodedTopic, + @QueryParam("key") String key, + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validatePersistentTopicName(tenant, namespace, encodedTopic); + internalRemovePropertiesAsync(authoritative, key) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + if (!isRedirectException(ex)) { + log.error("[{}] Failed to remove key {} in properties on topic {}", + clientAppId(), key, topicName, ex); + } + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } + @DELETE @Path("/{tenant}/{namespace}/{topic}/partitions") @ApiOperation(value = "Delete a partitioned topic.", diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index 5b43aa7145e81..d84fab7137f86 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -902,6 +902,99 @@ public void testCreateAndGetTopicProperties() throws Exception { Assert.assertEquals(properties22.get("key2"), "value2"); } + @Test + public void testUpdatePartitionedTopicProperties() throws Exception { + final String namespace = "prop-xyz/ns2"; + final String topicName = "persistent://" + namespace + "/testUpdatePartitionedTopicProperties"; + final String topicNameTwo = "persistent://" + namespace + "/testUpdatePartitionedTopicProperties2"; + admin.namespaces().createNamespace(namespace, 20); + + // create partitioned topic without properties + admin.topics().createPartitionedTopic(topicName, 2); + Map properties = admin.topics().getProperties(topicName); + Assert.assertNull(properties); + Map topicProperties = new HashMap<>(); + topicProperties.put("key1", "value1"); + admin.topics().updateProperties(topicName, topicProperties); + properties = admin.topics().getProperties(topicName); + Assert.assertNotNull(properties); + Assert.assertEquals(properties.get("key1"), "value1"); + + // update with new key, old properties should keep + topicProperties = new HashMap<>(); + topicProperties.put("key2", "value2"); + admin.topics().updateProperties(topicName, topicProperties); + properties = admin.topics().getProperties(topicName); + Assert.assertNotNull(properties); + Assert.assertEquals(properties.size(), 2); + Assert.assertEquals(properties.get("key1"), "value1"); + Assert.assertEquals(properties.get("key2"), "value2"); + + // override old values + topicProperties = new HashMap<>(); + topicProperties.put("key1", "value11"); + admin.topics().updateProperties(topicName, topicProperties); + properties = admin.topics().getProperties(topicName); + Assert.assertNotNull(properties); + Assert.assertEquals(properties.size(), 2); + Assert.assertEquals(properties.get("key1"), "value11"); + Assert.assertEquals(properties.get("key2"), "value2"); + + // create topic without properties + admin.topics().createPartitionedTopic(topicNameTwo, 2); + properties = admin.topics().getProperties(topicNameTwo); + Assert.assertNull(properties); + // remove key of properties on this topic + admin.topics().removeProperties(topicNameTwo, "key1"); + properties = admin.topics().getProperties(topicNameTwo); + Assert.assertNull(properties); + Map topicProp = new HashMap<>(); + topicProp.put("key1", "value1"); + topicProp.put("key2", "value2"); + admin.topics().updateProperties(topicNameTwo, topicProp); + properties = admin.topics().getProperties(topicNameTwo); + Assert.assertEquals(properties, topicProp); + admin.topics().removeProperties(topicNameTwo, "key1"); + topicProp.remove("key1"); + properties = admin.topics().getProperties(topicNameTwo); + Assert.assertEquals(properties, topicProp); + } + + @Test + public void testUpdateNonPartitionedTopicProperties() throws Exception { + final String namespace = "prop-xyz/ns2"; + final String topicName = "persistent://" + namespace + "/testUpdateNonPartitionedTopicProperties"; + admin.namespaces().createNamespace(namespace, 20); + + // create non-partitioned topic with properties + Map topicProperties = new HashMap<>(); + topicProperties.put("key1", "value1"); + admin.topics().createNonPartitionedTopic(topicName, topicProperties); + Map properties = admin.topics().getProperties(topicName); + Assert.assertNotNull(properties); + Assert.assertEquals(properties.get("key1"), "value1"); + + // update with new key, old properties should keep + topicProperties = new HashMap<>(); + topicProperties.put("key2", "value2"); + admin.topics().updateProperties(topicName, topicProperties); + properties = admin.topics().getProperties(topicName); + Assert.assertNotNull(properties); + Assert.assertEquals(properties.size(), 2); + Assert.assertEquals(properties.get("key1"), "value1"); + Assert.assertEquals(properties.get("key2"), "value2"); + + // override old values + topicProperties = new HashMap<>(); + topicProperties.put("key1", "value11"); + admin.topics().updateProperties(topicName, topicProperties); + properties = admin.topics().getProperties(topicName); + Assert.assertNotNull(properties); + Assert.assertEquals(properties.size(), 2); + Assert.assertEquals(properties.get("key1"), "value11"); + Assert.assertEquals(properties.get("key2"), "value2"); + } + @Test public void testNonPersistentTopics() throws Exception { final String namespace = "prop-xyz/ns2"; diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java index 73f9a199a1b12..be706d048d686 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -679,6 +679,42 @@ void updatePartitionedTopic(String topic, int numPartitions, boolean updateLocal */ CompletableFuture> getPropertiesAsync(String topic); + /** + * Update Topic Properties on a topic. + * The new properties will override the existing values, old properties in the topic will be keep if not override. + * @param topic + * @param properties + * @throws PulsarAdminException + */ + void updateProperties(String topic, Map properties) throws PulsarAdminException; + + /** + * Update Topic Properties on a topic. + * The new properties will override the existing values, old properties in the topic will be keep if not override. + * @param topic + * @param properties + * @return + */ + CompletableFuture updatePropertiesAsync(String topic, Map properties); + + /** + * Remove the key in properties on a topic. + * + * @param topic + * @param key + * @throws PulsarAdminException + */ + void removeProperties(String topic, String key) throws PulsarAdminException; + + /** + * Remove the key in properties on a topic asynchronously. + * + * @param topic + * @param key + * @return + */ + CompletableFuture removePropertiesAsync(String topic, String key); + /** * Delete a partitioned topic. *

diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index dbdaffd9e5c5f..b4be27e0770a4 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -474,11 +474,39 @@ public void failed(Throwable throwable) { return future; } + @Override + public void updateProperties(String topic, Map properties) throws PulsarAdminException { + sync(() -> updatePropertiesAsync(topic, properties)); + } + + @Override + public CompletableFuture updatePropertiesAsync(String topic, Map properties) { + TopicName tn = validateTopic(topic); + WebTarget path = topicPath(tn, "properties"); + if (properties == null) { + properties = new HashMap<>(); + } + return asyncPutRequest(path, Entity.entity(properties, MediaType.APPLICATION_JSON)); + } + @Override public void deletePartitionedTopic(String topic) throws PulsarAdminException { deletePartitionedTopic(topic, false); } + @Override + public void removeProperties(String topic, String key) throws PulsarAdminException { + sync(() -> removePropertiesAsync(topic, key)); + } + + @Override + public CompletableFuture removePropertiesAsync(String topic, String key) { + TopicName tn = validateTopic(topic); + WebTarget path = topicPath(tn, "properties") + .queryParam("key", key); + return asyncDeleteRequest(path); + } + @Override public CompletableFuture deletePartitionedTopicAsync(String topic) { return deletePartitionedTopicAsync(topic, false); diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 8e7e339b536cc..5eaca80473d09 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -1429,6 +1429,17 @@ public void topics() throws Exception { cmdTopics.run(split("update-subscription-properties persistent://myprop/clust/ns1/ds1 -s sub1 -p a=b -p c=d")); verify(mockTopics).updateSubscriptionProperties("persistent://myprop/clust/ns1/ds1", "sub1", props); + cmdTopics = new CmdTopics(() -> admin); + cmdTopics.run(split("update-properties persistent://myprop/clust/ns1/ds1 --property a=b -p x=y,z")); + props = new HashMap<>(); + props.put("a", "b"); + props.put("x", "y,z"); + verify(mockTopics).updateProperties("persistent://myprop/clust/ns1/ds1", props); + + cmdTopics = new CmdTopics(() -> admin); + cmdTopics.run(split("remove-properties persistent://myprop/clust/ns1/ds1 --key a")); + verify(mockTopics).removeProperties("persistent://myprop/clust/ns1/ds1", "a"); + cmdTopics = new CmdTopics(() -> admin); cmdTopics.run(split("create-partitioned-topic persistent://myprop/clust/ns1/ds1 --partitions 32")); verify(mockTopics).createPartitionedTopic("persistent://myprop/clust/ns1/ds1", 32, null); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 056847c7583a1..eaa907726997d 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -56,6 +56,7 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.cli.NoSplitter; import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.MessageImpl; @@ -119,6 +120,8 @@ public CmdTopics(Supplier admin) { jcommander.addCommand("update-partitioned-topic", new UpdatePartitionedCmd()); jcommander.addCommand("get-partitioned-topic-metadata", new GetPartitionedTopicMetadataCmd()); jcommander.addCommand("get-properties", new GetPropertiesCmd()); + jcommander.addCommand("update-properties", new UpdateProperties()); + jcommander.addCommand("remove-properties", new RemoveProperties()); jcommander.addCommand("delete-partitioned-topic", new DeletePartitionedCmd()); jcommander.addCommand("peek-messages", new PeekMessages()); @@ -605,6 +608,41 @@ void run() throws Exception { } } + @Parameters(commandDescription = "Update the properties of on a topic") + private class UpdateProperties extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Parameter(names = {"--property", "-p"}, description = "key value pair properties(-p a=b -p c=d)", + required = false, splitter = NoSplitter.class) + private java.util.List properties; + + @Override + void run() throws Exception { + String topic = validateTopicName(params); + Map map = parseListKeyValueMap(properties); + if (map == null) { + map = Collections.emptyMap(); + } + getTopics().updateProperties(topic, map); + } + } + + @Parameters(commandDescription = "Remove the key in properties of a topic") + private class RemoveProperties extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Parameter(names = {"--key", "-k"}, description = "The key to remove in the properties of topic") + private String key; + + @Override + void run() throws Exception { + String topic = validateTopicName(params); + getTopics().removeProperties(topic, key); + } + } + @Parameters(commandDescription = "Delete a partitioned topic. " + "It will also delete all the partitions of the topic if it exists.") private class DeletePartitionedCmd extends CliCommand { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java index d4de706e607b7..0184e0efb82d7 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java @@ -50,6 +50,7 @@ public enum TopicOperation { GET_STATS, GET_METADATA, + DELETE_METADATA, GET_BACKLOG_SIZE, SET_REPLICATED_SUBSCRIPTION_STATUS,