From ee3081d27424bf9dbb5a255af28b9172427ebe81 Mon Sep 17 00:00:00 2001 From: Xiaoyu Hou Date: Thu, 25 Aug 2022 11:30:10 +0800 Subject: [PATCH 1/3] [broker][admin]Add api for update topic properties (#17238) (cherry picked from commit b21f72886dc20362efe6808d690ddca32c5abd1e) Signed-off-by: Zixuan Liu --- .../admin/impl/PersistentTopicsBase.java | 55 +++++++++++++++ .../broker/admin/v2/PersistentTopics.java | 36 ++++++++++ .../pulsar/broker/admin/AdminApi2Test.java | 70 +++++++++++++++++++ .../apache/pulsar/client/admin/Topics.java | 18 +++++ .../client/admin/internal/TopicsImpl.java | 15 ++++ .../pulsar/admin/cli/PulsarAdminToolTest.java | 7 ++ .../apache/pulsar/admin/cli/CmdTopics.java | 22 ++++++ 7 files changed, 223 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 1ec1c3d83554c..abe183baa7835 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -64,6 +64,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerOfflineBacklog; import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; @@ -654,6 +655,60 @@ private CompletableFuture> getPropertiesAsync() { }); } + protected CompletableFuture internalUpdatePropertiesAsync(boolean authoritative, + Map properties) { + if (properties == null || properties.isEmpty()) { + log.warn("[{}] [{}] properties is empty, ignore update", clientAppId(), topicName); + return CompletableFuture.completedFuture(null); + } + return validateTopicOwnershipAsync(topicName, authoritative) + .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.PRODUCE)) + .thenCompose(__ -> { + if (topicName.isPartitioned()) { + return internalUpdateNonPartitionedTopicProperties(properties); + } else { + return pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName) + .thenCompose(metadata -> { + if (metadata.partitions == 0) { + return internalUpdateNonPartitionedTopicProperties(properties); + } + return namespaceResources() + .getPartitionedTopicResources().updatePartitionedTopicAsync(topicName, + p -> new PartitionedTopicMetadata(p.partitions, + MapUtils.putAll(p.properties, properties.entrySet().toArray()))); + }); + } + }).thenAccept(__ -> + log.info("[{}] [{}] update properties success with properties {}", + clientAppId(), topicName, properties)); + } + + private CompletableFuture internalUpdateNonPartitionedTopicProperties(Map properties) { + CompletableFuture future = new CompletableFuture<>(); + pulsar().getBrokerService().getTopicIfExists(topicName.toString()) + .thenAccept(opt -> { + if (!opt.isPresent()) { + throw new RestException(Status.NOT_FOUND, + getTopicNotFoundErrorMessage(topicName.toString())); + } + ManagedLedger managedLedger = ((PersistentTopic) opt.get()).getManagedLedger(); + managedLedger.asyncSetProperties(properties, new AsyncCallbacks.UpdatePropertiesCallback() { + + @Override + public void updatePropertiesComplete(Map properties, Object ctx) { + managedLedger.getConfig().getProperties().putAll(properties); + future.complete(null); + } + + @Override + public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) { + future.completeExceptionally(exception); + } + }, null); + }); + return future; + } + protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, boolean authoritative, boolean force, boolean deleteSchema) { validateTopicOwnershipAsync(topicName, authoritative) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index af5120fa9f91e..5102da5d36f52 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -915,6 +915,42 @@ public void getProperties( }); } + @PUT + @Path("/{tenant}/{namespace}/{topic}/properties") + @ApiOperation(value = "Update the properties on the given topic.") + @ApiResponses(value = { + @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), + @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or" + + "subscriber is not authorized to access this operation"), + @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Topic/Subscription does not exist"), + @ApiResponse(code = 405, message = "Method Not Allowed"), + @ApiResponse(code = 500, message = "Internal server error"), + @ApiResponse(code = 503, message = "Failed to validate global cluster configuration") + }) + public void updateProperties( + @Suspended final AsyncResponse asyncResponse, + @ApiParam(value = "Specify the tenant", required = true) + @PathParam("tenant") String tenant, + @ApiParam(value = "Specify the namespace", required = true) + @PathParam("namespace") String namespace, + @ApiParam(value = "Specify topic name", required = true) + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @ApiParam(value = "Key value pair properties for the topic metadata") Map properties){ + validatePersistentTopicName(tenant, namespace, encodedTopic); + internalUpdatePropertiesAsync(authoritative, properties) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + if (!isRedirectException(ex)) { + log.error("[{}] Failed to update topic {} properties", clientAppId(), topicName, ex); + } + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } + @DELETE @Path("/{tenant}/{namespace}/{topic}/partitions") @ApiOperation(value = "Delete a partitioned topic.", diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index 5b43aa7145e81..86e35c56d5628 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -902,6 +902,76 @@ public void testCreateAndGetTopicProperties() throws Exception { Assert.assertEquals(properties22.get("key2"), "value2"); } + @Test + public void testUpdatePartitionedTopicProperties() throws Exception { + final String namespace = "prop-xyz/ns2"; + final String topicName = "persistent://" + namespace + "/testUpdatePartitionedTopicProperties"; + admin.namespaces().createNamespace(namespace, 20); + + // create partitioned topic with properties + Map topicProperties = new HashMap<>(); + topicProperties.put("key1", "value1"); + admin.topics().createPartitionedTopic(topicName, 2, topicProperties); + Map properties = admin.topics().getProperties(topicName); + Assert.assertNotNull(properties); + Assert.assertEquals(properties.get("key1"), "value1"); + + // update with new key, old properties should keep + topicProperties = new HashMap<>(); + topicProperties.put("key2", "value2"); + admin.topics().updateProperties(topicName, topicProperties); + properties = admin.topics().getProperties(topicName); + Assert.assertNotNull(properties); + Assert.assertEquals(properties.size(), 2); + Assert.assertEquals(properties.get("key1"), "value1"); + Assert.assertEquals(properties.get("key2"), "value2"); + + // override old values + topicProperties = new HashMap<>(); + topicProperties.put("key1", "value11"); + admin.topics().updateProperties(topicName, topicProperties); + properties = admin.topics().getProperties(topicName); + Assert.assertNotNull(properties); + Assert.assertEquals(properties.size(), 2); + Assert.assertEquals(properties.get("key1"), "value11"); + Assert.assertEquals(properties.get("key2"), "value2"); + } + + @Test + public void testUpdateNonPartitionedTopicProperties() throws Exception { + final String namespace = "prop-xyz/ns2"; + final String topicName = "persistent://" + namespace + "/testUpdateNonPartitionedTopicProperties"; + admin.namespaces().createNamespace(namespace, 20); + + // create non-partitioned topic with properties + Map topicProperties = new HashMap<>(); + topicProperties.put("key1", "value1"); + admin.topics().createNonPartitionedTopic(topicName, topicProperties); + Map properties = admin.topics().getProperties(topicName); + Assert.assertNotNull(properties); + Assert.assertEquals(properties.get("key1"), "value1"); + + // update with new key, old properties should keep + topicProperties = new HashMap<>(); + topicProperties.put("key2", "value2"); + admin.topics().updateProperties(topicName, topicProperties); + properties = admin.topics().getProperties(topicName); + Assert.assertNotNull(properties); + Assert.assertEquals(properties.size(), 2); + Assert.assertEquals(properties.get("key1"), "value1"); + Assert.assertEquals(properties.get("key2"), "value2"); + + // override old values + topicProperties = new HashMap<>(); + topicProperties.put("key1", "value11"); + admin.topics().updateProperties(topicName, topicProperties); + properties = admin.topics().getProperties(topicName); + Assert.assertNotNull(properties); + Assert.assertEquals(properties.size(), 2); + Assert.assertEquals(properties.get("key1"), "value11"); + Assert.assertEquals(properties.get("key2"), "value2"); + } + @Test public void testNonPersistentTopics() throws Exception { final String namespace = "prop-xyz/ns2"; diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java index 73f9a199a1b12..7409f119c04d7 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -679,6 +679,24 @@ void updatePartitionedTopic(String topic, int numPartitions, boolean updateLocal */ CompletableFuture> getPropertiesAsync(String topic); + /** + * Update Topic Properties on a topic. + * The new properties will override the existing values, old properties in the topic will be keep if not override. + * @param topic + * @param properties + * @throws PulsarAdminException + */ + void updateProperties(String topic, Map properties) throws PulsarAdminException; + + /** + * Update Topic Properties on a topic. + * The new properties will override the existing values, old properties in the topic will be keep if not override. + * @param topic + * @param properties + * @return + */ + CompletableFuture updatePropertiesAsync(String topic, Map properties); + /** * Delete a partitioned topic. *

diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index dbdaffd9e5c5f..9f9092076f967 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -474,6 +474,21 @@ public void failed(Throwable throwable) { return future; } + @Override + public void updateProperties(String topic, Map properties) throws PulsarAdminException { + sync(() -> updatePropertiesAsync(topic, properties)); + } + + @Override + public CompletableFuture updatePropertiesAsync(String topic, Map properties) { + TopicName tn = validateTopic(topic); + WebTarget path = topicPath(tn, "properties"); + if (properties == null) { + properties = new HashMap<>(); + } + return asyncPutRequest(path, Entity.entity(properties, MediaType.APPLICATION_JSON)); + } + @Override public void deletePartitionedTopic(String topic) throws PulsarAdminException { deletePartitionedTopic(topic, false); diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 8e7e339b536cc..f7221aaf02831 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -1429,6 +1429,13 @@ public void topics() throws Exception { cmdTopics.run(split("update-subscription-properties persistent://myprop/clust/ns1/ds1 -s sub1 -p a=b -p c=d")); verify(mockTopics).updateSubscriptionProperties("persistent://myprop/clust/ns1/ds1", "sub1", props); + cmdTopics = new CmdTopics(() -> admin); + cmdTopics.run(split("update-properties persistent://myprop/clust/ns1/ds1 --property a=b -p x=y,z")); + props = new HashMap<>(); + props.put("a", "b"); + props.put("x", "y,z"); + verify(mockTopics).updateProperties("persistent://myprop/clust/ns1/ds1", props); + cmdTopics = new CmdTopics(() -> admin); cmdTopics.run(split("create-partitioned-topic persistent://myprop/clust/ns1/ds1 --partitions 32")); verify(mockTopics).createPartitionedTopic("persistent://myprop/clust/ns1/ds1", 32, null); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 056847c7583a1..94774670f5a7f 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -56,6 +56,7 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.cli.NoSplitter; import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.MessageImpl; @@ -119,6 +120,7 @@ public CmdTopics(Supplier admin) { jcommander.addCommand("update-partitioned-topic", new UpdatePartitionedCmd()); jcommander.addCommand("get-partitioned-topic-metadata", new GetPartitionedTopicMetadataCmd()); jcommander.addCommand("get-properties", new GetPropertiesCmd()); + jcommander.addCommand("update-properties", new UpdateProperties()); jcommander.addCommand("delete-partitioned-topic", new DeletePartitionedCmd()); jcommander.addCommand("peek-messages", new PeekMessages()); @@ -605,6 +607,26 @@ void run() throws Exception { } } + @Parameters(commandDescription = "Update the properties of on a topic") + private class UpdateProperties extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Parameter(names = {"--property", "-p"}, description = "key value pair properties(-p a=b -p c=d)", + required = false, splitter = NoSplitter.class) + private java.util.List properties; + + @Override + void run() throws Exception { + String topic = validateTopicName(params); + Map map = parseListKeyValueMap(properties); + if (map == null) { + map = Collections.emptyMap(); + } + getTopics().updateProperties(topic, map); + } + } + @Parameters(commandDescription = "Delete a partitioned topic. " + "It will also delete all the partitions of the topic if it exists.") private class DeletePartitionedCmd extends CliCommand { From 09d445c962e0c6b134d69ad518179fe2f62c1a91 Mon Sep 17 00:00:00 2001 From: Ruguo Yu Date: Fri, 2 Sep 2022 00:05:53 +0800 Subject: [PATCH 2/3] [broker][admin] Add cmd to remove topic properties (#17337) * [broker][admin] Add cmd to remove topic properties * address comment * address comment (cherry picked from commit 7075a5ce0d4a70f52625ac8c3d0c48894442b72a) Signed-off-by: Zixuan Liu --- .../PulsarAuthorizationProvider.java | 1 + .../admin/impl/PersistentTopicsBase.java | 52 +++++++++++++++++++ .../broker/admin/v2/PersistentTopics.java | 35 +++++++++++++ .../pulsar/broker/admin/AdminApi2Test.java | 20 +++++++ .../apache/pulsar/client/admin/Topics.java | 18 +++++++ .../client/admin/internal/TopicsImpl.java | 13 +++++ .../pulsar/admin/cli/PulsarAdminToolTest.java | 4 ++ .../apache/pulsar/admin/cli/CmdTopics.java | 16 ++++++ .../common/policies/data/TopicOperation.java | 1 + 9 files changed, 160 insertions(+) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java index 2201ee3031c55..8aa678e9e9f0a 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java @@ -630,6 +630,7 @@ public CompletableFuture allowTopicOperationAsync(TopicName topicName, case COMPACT: case OFFLOAD: case UNLOAD: + case DELETE_METADATA: case ADD_BUNDLE_RANGE: case GET_BUNDLE_RANGE: case DELETE_BUNDLE_RANGE: diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index abe183baa7835..ad75e723707a6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -709,6 +709,58 @@ public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) return future; } + protected CompletableFuture internalRemovePropertiesAsync(boolean authoritative, String key) { + return validateTopicOwnershipAsync(topicName, authoritative) + .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.DELETE_METADATA)) + .thenCompose(__ -> { + if (topicName.isPartitioned()) { + return internalRemoveNonPartitionedTopicProperties(key); + } else { + return pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName) + .thenCompose(metadata -> { + if (metadata.partitions == 0) { + return internalRemoveNonPartitionedTopicProperties(key); + } + return namespaceResources() + .getPartitionedTopicResources().updatePartitionedTopicAsync(topicName, + p -> { + if (p.properties != null) { + p.properties.remove(key); + } + return new PartitionedTopicMetadata(p.partitions, p.properties); + }); + }); + } + }).thenAccept(__ -> + log.info("[{}] remove [{}] properties success with key {}", + clientAppId(), topicName, key)); + } + + private CompletableFuture internalRemoveNonPartitionedTopicProperties(String key) { + CompletableFuture future = new CompletableFuture<>(); + pulsar().getBrokerService().getTopicIfExists(topicName.toString()) + .thenAccept(opt -> { + if (!opt.isPresent()) { + throw new RestException(Status.NOT_FOUND, + getTopicNotFoundErrorMessage(topicName.toString())); + } + ManagedLedger managedLedger = ((PersistentTopic) opt.get()).getManagedLedger(); + managedLedger.asyncDeleteProperty(key, new AsyncCallbacks.UpdatePropertiesCallback() { + + @Override + public void updatePropertiesComplete(Map properties, Object ctx) { + future.complete(null); + } + + @Override + public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) { + future.completeExceptionally(exception); + } + }, null); + }); + return future; + } + protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, boolean authoritative, boolean force, boolean deleteSchema) { validateTopicOwnershipAsync(topicName, authoritative) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 5102da5d36f52..26889da82d417 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -951,6 +951,41 @@ public void updateProperties( }); } + @DELETE + @Path("/{tenant}/{namespace}/{topic}/properties") + @ApiOperation(value = "Remove the key in properties on the given topic.") + @ApiResponses(value = { + @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), + @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"), + @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Partitioned topic does not exist"), + @ApiResponse(code = 409, message = "Concurrent modification"), + @ApiResponse(code = 412, message = "Partitioned topic name is invalid"), + @ApiResponse(code = 500, message = "Internal server error") + }) + public void removeProperties( + @Suspended final AsyncResponse asyncResponse, + @ApiParam(value = "Specify the tenant", required = true) + @PathParam("tenant") String tenant, + @ApiParam(value = "Specify the namespace", required = true) + @PathParam("namespace") String namespace, + @ApiParam(value = "Specify topic name", required = true) + @PathParam("topic") @Encoded String encodedTopic, + @QueryParam("key") String key, + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validatePersistentTopicName(tenant, namespace, encodedTopic); + internalRemovePropertiesAsync(authoritative, key) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + if (!isRedirectException(ex)) { + log.error("[{}] Failed to remove key {} in properties on topic {}", + clientAppId(), key, topicName, ex); + } + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } + @DELETE @Path("/{tenant}/{namespace}/{topic}/partitions") @ApiOperation(value = "Delete a partitioned topic.", diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index 86e35c56d5628..cb186ce256af2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -906,6 +906,7 @@ public void testCreateAndGetTopicProperties() throws Exception { public void testUpdatePartitionedTopicProperties() throws Exception { final String namespace = "prop-xyz/ns2"; final String topicName = "persistent://" + namespace + "/testUpdatePartitionedTopicProperties"; + final String topicNameTwo = "persistent://" + namespace + "/testUpdatePartitionedTopicProperties2"; admin.namespaces().createNamespace(namespace, 20); // create partitioned topic with properties @@ -935,6 +936,25 @@ public void testUpdatePartitionedTopicProperties() throws Exception { Assert.assertEquals(properties.size(), 2); Assert.assertEquals(properties.get("key1"), "value11"); Assert.assertEquals(properties.get("key2"), "value2"); + + // create topic without properties + admin.topics().createPartitionedTopic(topicNameTwo, 2); + properties = admin.topics().getProperties(topicNameTwo); + Assert.assertNull(properties); + // remove key of properties on this topic + admin.topics().removeProperties(topicNameTwo, "key1"); + properties = admin.topics().getProperties(topicNameTwo); + Assert.assertNull(properties); + Map topicProp = new HashMap<>(); + topicProp.put("key1", "value1"); + topicProp.put("key2", "value2"); + admin.topics().updateProperties(topicNameTwo, topicProp); + properties = admin.topics().getProperties(topicNameTwo); + Assert.assertEquals(properties, topicProp); + admin.topics().removeProperties(topicNameTwo, "key1"); + topicProp.remove("key1"); + properties = admin.topics().getProperties(topicNameTwo); + Assert.assertEquals(properties, topicProp); } @Test diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java index 7409f119c04d7..be706d048d686 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -697,6 +697,24 @@ void updatePartitionedTopic(String topic, int numPartitions, boolean updateLocal */ CompletableFuture updatePropertiesAsync(String topic, Map properties); + /** + * Remove the key in properties on a topic. + * + * @param topic + * @param key + * @throws PulsarAdminException + */ + void removeProperties(String topic, String key) throws PulsarAdminException; + + /** + * Remove the key in properties on a topic asynchronously. + * + * @param topic + * @param key + * @return + */ + CompletableFuture removePropertiesAsync(String topic, String key); + /** * Delete a partitioned topic. *

diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 9f9092076f967..b4be27e0770a4 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -494,6 +494,19 @@ public void deletePartitionedTopic(String topic) throws PulsarAdminException { deletePartitionedTopic(topic, false); } + @Override + public void removeProperties(String topic, String key) throws PulsarAdminException { + sync(() -> removePropertiesAsync(topic, key)); + } + + @Override + public CompletableFuture removePropertiesAsync(String topic, String key) { + TopicName tn = validateTopic(topic); + WebTarget path = topicPath(tn, "properties") + .queryParam("key", key); + return asyncDeleteRequest(path); + } + @Override public CompletableFuture deletePartitionedTopicAsync(String topic) { return deletePartitionedTopicAsync(topic, false); diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index f7221aaf02831..5eaca80473d09 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -1436,6 +1436,10 @@ public void topics() throws Exception { props.put("x", "y,z"); verify(mockTopics).updateProperties("persistent://myprop/clust/ns1/ds1", props); + cmdTopics = new CmdTopics(() -> admin); + cmdTopics.run(split("remove-properties persistent://myprop/clust/ns1/ds1 --key a")); + verify(mockTopics).removeProperties("persistent://myprop/clust/ns1/ds1", "a"); + cmdTopics = new CmdTopics(() -> admin); cmdTopics.run(split("create-partitioned-topic persistent://myprop/clust/ns1/ds1 --partitions 32")); verify(mockTopics).createPartitionedTopic("persistent://myprop/clust/ns1/ds1", 32, null); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 94774670f5a7f..eaa907726997d 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -121,6 +121,7 @@ public CmdTopics(Supplier admin) { jcommander.addCommand("get-partitioned-topic-metadata", new GetPartitionedTopicMetadataCmd()); jcommander.addCommand("get-properties", new GetPropertiesCmd()); jcommander.addCommand("update-properties", new UpdateProperties()); + jcommander.addCommand("remove-properties", new RemoveProperties()); jcommander.addCommand("delete-partitioned-topic", new DeletePartitionedCmd()); jcommander.addCommand("peek-messages", new PeekMessages()); @@ -627,6 +628,21 @@ void run() throws Exception { } } + @Parameters(commandDescription = "Remove the key in properties of a topic") + private class RemoveProperties extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Parameter(names = {"--key", "-k"}, description = "The key to remove in the properties of topic") + private String key; + + @Override + void run() throws Exception { + String topic = validateTopicName(params); + getTopics().removeProperties(topic, key); + } + } + @Parameters(commandDescription = "Delete a partitioned topic. " + "It will also delete all the partitions of the topic if it exists.") private class DeletePartitionedCmd extends CliCommand { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java index d4de706e607b7..0184e0efb82d7 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java @@ -50,6 +50,7 @@ public enum TopicOperation { GET_STATS, GET_METADATA, + DELETE_METADATA, GET_BACKLOG_SIZE, SET_REPLICATED_SUBSCRIPTION_STATUS, From 537679f894f868abbe07b1e494d6b6829b0519d6 Mon Sep 17 00:00:00 2001 From: Flowermin <48741608+Flowermin@users.noreply.github.com> Date: Wed, 31 Aug 2022 15:38:40 +0800 Subject: [PATCH 3/3] [fix][broker] Fix NPE when updating topic's properties (#17352) Co-authored-by: bjhuxiaohua (cherry picked from commit f1d11586e96344c40cab923f0ed89961d673cd5c) --- .../pulsar/broker/admin/impl/PersistentTopicsBase.java | 3 ++- .../org/apache/pulsar/broker/admin/AdminApi2Test.java | 9 ++++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index ad75e723707a6..26170bb56f8c5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -675,7 +675,8 @@ protected CompletableFuture internalUpdatePropertiesAsync(boolean authorit return namespaceResources() .getPartitionedTopicResources().updatePartitionedTopicAsync(topicName, p -> new PartitionedTopicMetadata(p.partitions, - MapUtils.putAll(p.properties, properties.entrySet().toArray()))); + p.properties == null ? properties + : MapUtils.putAll(p.properties, properties.entrySet().toArray()))); }); } }).thenAccept(__ -> diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index cb186ce256af2..d84fab7137f86 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -909,11 +909,14 @@ public void testUpdatePartitionedTopicProperties() throws Exception { final String topicNameTwo = "persistent://" + namespace + "/testUpdatePartitionedTopicProperties2"; admin.namespaces().createNamespace(namespace, 20); - // create partitioned topic with properties + // create partitioned topic without properties + admin.topics().createPartitionedTopic(topicName, 2); + Map properties = admin.topics().getProperties(topicName); + Assert.assertNull(properties); Map topicProperties = new HashMap<>(); topicProperties.put("key1", "value1"); - admin.topics().createPartitionedTopic(topicName, 2, topicProperties); - Map properties = admin.topics().getProperties(topicName); + admin.topics().updateProperties(topicName, topicProperties); + properties = admin.topics().getProperties(topicName); Assert.assertNotNull(properties); Assert.assertEquals(properties.get("key1"), "value1");