From 31fe347b39dcc8ce4c1e8d69eeb04649506bd4c6 Mon Sep 17 00:00:00 2001 From: labuladong Date: Fri, 3 Feb 2023 07:40:17 +0800 Subject: [PATCH] [improve][cli] improve admin `set-backlog-quota` more clear (#19300) --- .../pulsar/admin/cli/PulsarAdminToolTest.java | 27 ++++++++---- .../pulsar/admin/cli/CmdNamespaces.java | 39 +++++++++-------- .../pulsar/admin/cli/CmdTopicPolicies.java | 43 +++++++++++-------- 3 files changed, 63 insertions(+), 46 deletions(-) diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 37b8c33e114e5..ccae1b1176527 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -114,6 +114,7 @@ import org.apache.pulsar.common.util.ObjectMapperFactory; import org.mockito.ArgumentMatcher; import org.mockito.Mockito; +import org.testng.Assert; import org.testng.annotations.Test; @Slf4j @@ -437,6 +438,12 @@ public void namespaces() throws Exception { namespaces.run(split("unload myprop/clust/ns1")); verify(mockNamespaces).unload("myprop/clust/ns1"); + // message_age must have time limit, destination_storage must have size limit + Assert.assertFalse(namespaces.run( + split("set-backlog-quota myprop/clust/ns1 -p producer_exception -l 10G -t message_age"))); + Assert.assertFalse(namespaces.run( + split("set-backlog-quota myprop/clust/ns1 -p producer_exception -lt 10h -t destination_storage"))); + mockNamespaces = mock(Namespaces.class); when(admin.namespaces()).thenReturn(mockNamespaces); namespaces = new CmdNamespaces(() -> admin); @@ -498,23 +505,21 @@ public void namespaces() throws Exception { when(admin.namespaces()).thenReturn(mockNamespaces); namespaces = new CmdNamespaces(() -> admin); - namespaces.run(split("set-backlog-quota myprop/clust/ns1 -p consumer_backlog_eviction -l 10K -lt 10m")); + namespaces.run(split("set-backlog-quota myprop/clust/ns1 -p consumer_backlog_eviction -lt 10m -t message_age")); verify(mockNamespaces).setBacklogQuota("myprop/clust/ns1", BacklogQuota.builder() - .limitSize(10 * 1024) .limitTime(10 * 60) .retentionPolicy(RetentionPolicy.consumer_backlog_eviction) .build(), - BacklogQuota.BacklogQuotaType.destination_storage); + BacklogQuota.BacklogQuotaType.message_age); mockNamespaces = mock(Namespaces.class); when(admin.namespaces()).thenReturn(mockNamespaces); namespaces = new CmdNamespaces(() -> admin); - namespaces.run(split("set-backlog-quota myprop/clust/ns1 -p producer_exception -l 10G -lt 10000 -t message_age")); + namespaces.run(split("set-backlog-quota myprop/clust/ns1 -p producer_exception -lt 10000 -t message_age")); verify(mockNamespaces).setBacklogQuota("myprop/clust/ns1", BacklogQuota.builder() - .limitSize(10l * 1024 * 1024 * 1024) .limitTime(10000) .retentionPolicy(RetentionPolicy.producer_exception) .build(), @@ -1216,24 +1221,28 @@ public void topicPolicies() throws Exception { cmdTopics = new CmdTopicPolicies(() -> admin); cmdTopics.run(split("set-message-ttl persistent://myprop/clust/ns1/ds1 -t 10h")); verify(mockTopicsPolicies).setMessageTTL("persistent://myprop/clust/ns1/ds1", 10 * 60 * 60); - cmdTopics.run(split("set-backlog-quota persistent://myprop/clust/ns1/ds1 -lt 1w -p consumer_backlog_eviction")); + cmdTopics.run(split("set-backlog-quota persistent://myprop/clust/ns1/ds1 -lt 1w -p consumer_backlog_eviction -t message_age")); verify(mockTopicsPolicies).setBacklogQuota("persistent://myprop/clust/ns1/ds1", BacklogQuota.builder() - .limitSize(-1) .limitTime(60 * 60 * 24 * 7) .retentionPolicy(RetentionPolicy.consumer_backlog_eviction) .build(), - BacklogQuota.BacklogQuotaType.destination_storage); + BacklogQuota.BacklogQuotaType.message_age); //cmd with option cannot be executed repeatedly. cmdTopics = new CmdTopicPolicies(() -> admin); cmdTopics.run(split("set-backlog-quota persistent://myprop/clust/ns1/ds1 -lt 1000 -p producer_request_hold -t message_age")); verify(mockTopicsPolicies).setBacklogQuota("persistent://myprop/clust/ns1/ds1", BacklogQuota.builder() - .limitSize(-1) .limitTime(1000) .retentionPolicy(RetentionPolicy.producer_request_hold) .build(), BacklogQuota.BacklogQuotaType.message_age); + //cmd with option cannot be executed repeatedly. + cmdTopics = new CmdTopicPolicies(() -> admin); + Assert.assertFalse(cmdTopics.run(split("set-backlog-quota persistent://myprop/clust/ns1/ds1 -l 1000 -p producer_request_hold -t message_age"))); + cmdTopics = new CmdTopicPolicies(() -> admin); + Assert.assertFalse(cmdTopics.run(split("set-backlog-quota persistent://myprop/clust/ns1/ds1 -lt 60 -p producer_request_hold -t destination_storage"))); + //cmd with option cannot be executed repeatedly. cmdTopics = new CmdTopicPolicies(() -> admin); cmdTopics.run(split("remove-backlog-quota persistent://myprop/clust/ns1/ds1")); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java index c92f1f8838c73..998591f8177d1 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java @@ -1266,7 +1266,7 @@ private class SetBacklogQuota extends CliCommand { @Parameter(description = "tenant/namespace", required = true) private java.util.List params; - @Parameter(names = { "-l", "--limit" }, description = "Size limit (eg: 10M, 16G)", required = true) + @Parameter(names = { "-l", "--limit" }, description = "Size limit (eg: 10M, 16G)") private String limitStr; @Parameter(names = { "-lt", "--limitTime" }, @@ -1280,8 +1280,8 @@ private class SetBacklogQuota extends CliCommand { private String policyStr; @Parameter(names = {"-t", "--type"}, description = "Backlog quota type to set. Valid options are: " - + "destination_storage and message_age. " - + "destination_storage limits backlog by size (in bytes). " + + "destination_storage (default) and message_age. " + + "destination_storage limits backlog by size. " + "message_age limits backlog by time, that is, message timestamp (broker or publish timestamp). " + "You can set size or time to control the backlog, or combine them together to control the backlog. ") private String backlogQuotaTypeStr = BacklogQuota.BacklogQuotaType.destination_storage.name(); @@ -1289,7 +1289,6 @@ private class SetBacklogQuota extends CliCommand { @Override void run() throws PulsarAdminException { BacklogQuota.RetentionPolicy policy; - long limit = validateSizeString(limitStr); BacklogQuota.BacklogQuotaType backlogQuotaType; try { @@ -1306,26 +1305,30 @@ void run() throws PulsarAdminException { backlogQuotaTypeStr, Arrays.toString(BacklogQuota.BacklogQuotaType.values()))); } - long limitTimeInSec = -1; - if (limitTimeStr != null) { + String namespace = validateNamespace(params); + + BacklogQuota.Builder builder = BacklogQuota.builder().retentionPolicy(policy); + if (backlogQuotaType == BacklogQuota.BacklogQuotaType.destination_storage) { + // set quota by storage size + if (limitStr == null) { + throw new ParameterException("Quota type of 'destination_storage' needs a size limit"); + } + long limit = validateSizeString(limitStr); + builder.limitSize(limit); + } else { + // set quota by time + if (limitTimeStr == null) { + throw new ParameterException("Quota type of 'message_age' needs a time limit"); + } + long limitTimeInSec; try { limitTimeInSec = RelativeTimeUtil.parseRelativeTimeInSeconds(limitTimeStr); } catch (IllegalArgumentException e) { throw new ParameterException(e.getMessage()); } + builder.limitTime((int) limitTimeInSec); } - if (limitTimeInSec > Integer.MAX_VALUE) { - throw new ParameterException( - String.format("Time limit cannot be greater than %d seconds", Integer.MAX_VALUE)); - } - - String namespace = validateNamespace(params); - getAdmin().namespaces().setBacklogQuota(namespace, - BacklogQuota.builder().limitSize(limit) - .limitTime((int) limitTimeInSec) - .retentionPolicy(policy) - .build(), - backlogQuotaType); + getAdmin().namespaces().setBacklogQuota(namespace, builder.build(), backlogQuotaType); } } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java index 7cd3b49796f96..d567d0b3671b5 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java @@ -964,7 +964,7 @@ private class SetBacklogQuota extends CliCommand { private java.util.List params; @Parameter(names = { "-l", "--limit" }, description = "Size limit (eg: 10M, 16G)") - private String limitStr = "-1"; + private String limitStr = null; @Parameter(names = { "-lt", "--limitTime" }, description = "Time limit in second (or minutes, hours, days, weeks eg: 100m, 3h, 2d, 5w), " @@ -977,8 +977,8 @@ private class SetBacklogQuota extends CliCommand { private String policyStr; @Parameter(names = {"-t", "--type"}, description = "Backlog quota type to set. Valid options are: " - + "destination_storage and message_age. " - + "destination_storage limits backlog by size (in bytes). " + + "destination_storage (default) and message_age. " + + "destination_storage limits backlog by size. " + "message_age limits backlog by time, that is, message timestamp (broker or publish timestamp). " + "You can set size or time to control the backlog, or combine them together to control the backlog. ") private String backlogQuotaTypeStr = BacklogQuota.BacklogQuotaType.destination_storage.name(); @@ -990,7 +990,6 @@ private class SetBacklogQuota extends CliCommand { @Override void run() throws PulsarAdminException { BacklogQuota.RetentionPolicy policy; - long limit; BacklogQuota.BacklogQuotaType backlogQuotaType; try { @@ -999,35 +998,41 @@ void run() throws PulsarAdminException { throw new ParameterException(String.format("Invalid retention policy type '%s'. Valid options are: %s", policyStr, Arrays.toString(BacklogQuota.RetentionPolicy.values()))); } - - limit = validateSizeString(limitStr); - try { backlogQuotaType = BacklogQuota.BacklogQuotaType.valueOf(backlogQuotaTypeStr); } catch (IllegalArgumentException e) { throw new ParameterException(String.format("Invalid backlog quota type '%s'. Valid options are: %s", backlogQuotaTypeStr, Arrays.toString(BacklogQuota.BacklogQuotaType.values()))); } + String persistentTopic = validatePersistentTopic(params); + BacklogQuota.Builder builder = BacklogQuota.builder().retentionPolicy(policy); - long limitTimeInSec = -1; - if (limitTimeStr != null) { + if (backlogQuotaType == BacklogQuota.BacklogQuotaType.destination_storage) { + // set quota by storage size + if (limitStr == null) { + throw new ParameterException("Quota type of 'destination_storage' needs a size limit"); + } + long limit = validateSizeString(limitStr); + builder.limitSize((int) limit); + } else { + // set quota by time + if (limitTimeStr == null) { + throw new ParameterException("Quota type of 'message_age' needs a time limit"); + } + long limitTimeInSec; try { limitTimeInSec = RelativeTimeUtil.parseRelativeTimeInSeconds(limitTimeStr); } catch (IllegalArgumentException e) { throw new ParameterException(e.getMessage()); } + if (limitTimeInSec > Integer.MAX_VALUE) { + throw new ParameterException( + String.format("Time limit cannot be greater than %d seconds", Integer.MAX_VALUE)); + } + builder.limitTime((int) limitTimeInSec); } - if (limitTimeInSec > Integer.MAX_VALUE) { - throw new ParameterException( - String.format("Time limit cannot be greater than %d seconds", Integer.MAX_VALUE)); - } - - String persistentTopic = validatePersistentTopic(params); getTopicPolicies(isGlobal).setBacklogQuota(persistentTopic, - BacklogQuota.builder().limitSize(limit) - .limitTime((int) limitTimeInSec) - .retentionPolicy(policy) - .build(), + builder.build(), backlogQuotaType); } }