diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java index 01a9eedcca357..2b075675d900f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java @@ -39,6 +39,7 @@ import org.apache.pulsar.metadata.api.MetadataStoreConfig; import org.apache.pulsar.metadata.api.MetadataStoreFactory; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; +import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,9 +50,17 @@ public class PulsarClusterMetadataTeardown { private static class Arguments { @Parameter(names = { "-zk", - "--zookeeper"}, description = "Local ZooKeeper quorum connection string", required = true) + "--zookeeper"}, description = "Local ZooKeeper quorum connection string") private String zookeeper; + @Parameter(names = { + "--metadata-store"}, description = "Metadata Store service url. eg: zk:my-zk:2181") + private String metadataStoreUrl; + + @Parameter(names = {"-mscp", + "--metadata-store-config-path"}, description = "Metadata Store config path") + private String metadataStoreConfigPath; + @Parameter(names = { "--zookeeper-session-timeout-ms" }, description = "Local zookeeper session timeout ms") @@ -63,6 +72,11 @@ private static class Arguments { @Parameter(names = { "-cs", "--configuration-store" }, description = "Configuration Store connection string") private String configurationStore; + @Parameter(names = {"-cmscp", + "--configuration-metadata-store-config-path"}, description = "Configuration Metadata Store config path", + hidden = false) + private String configurationStoreConfigPath; + @Parameter(names = { "--bookkeeper-metadata-service-uri" }, description = "Metadata service uri of BookKeeper") private String bkMetadataServiceUri; @@ -97,11 +111,21 @@ public static void main(String[] args) throws Exception { throw e; } + if (arguments.metadataStoreUrl == null && arguments.zookeeper == null) { + jcommander.usage(); + throw new IllegalArgumentException("Metadata store address argument is required (--metadata-store)"); + } + + if (arguments.metadataStoreUrl == null) { + arguments.metadataStoreUrl = ZKMetadataStore.ZK_SCHEME_IDENTIFIER + arguments.zookeeper; + } + @Cleanup - MetadataStoreExtended metadataStore = MetadataStoreExtended.create(arguments.zookeeper, + MetadataStoreExtended metadataStore = MetadataStoreExtended.create(arguments.metadataStoreUrl, MetadataStoreConfig.builder() .sessionTimeoutMillis(arguments.zkSessionTimeoutMillis) .metadataStoreName(MetadataStoreConfig.METADATA_STORE) + .configFilePath(arguments.metadataStoreConfigPath) .build()); if (arguments.bkMetadataServiceUri != null) { @@ -125,6 +149,7 @@ public static void main(String[] args) throws Exception { @Cleanup MetadataStore configMetadataStore = MetadataStoreFactory.create(arguments.configurationStore, MetadataStoreConfig.builder().sessionTimeoutMillis(arguments.zkSessionTimeoutMillis) + .configFilePath(arguments.configurationStoreConfigPath) .metadataStoreName(MetadataStoreConfig.CONFIGURATION_METADATA_STORE).build()); deleteRecursively(configMetadataStore, "/admin/clusters/" + arguments.cluster).join(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataSetupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataSetupTest.java index 6a7d54dbf7cae..cc25cd95c645a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataSetupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataSetupTest.java @@ -533,7 +533,7 @@ public void start() throws IOException { log.info("ZooKeeper started at {}", hostPort); } - private void clear() { + void clear() { zks.getZKDatabase().clear(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataTeardownTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataTeardownTest.java new file mode 100644 index 0000000000000..5184afade9c85 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataTeardownTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.zookeeper; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import java.util.SortedMap; +import org.apache.pulsar.PulsarClusterMetadataSetup; +import org.apache.pulsar.PulsarClusterMetadataTeardown; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class ClusterMetadataTeardownTest { + + private ClusterMetadataSetupTest.ZookeeperServerTest localZkS; + + @BeforeClass + void setup() throws Exception { + localZkS = new ClusterMetadataSetupTest.ZookeeperServerTest(0); + localZkS.start(); + } + + @AfterClass + void teardown() throws Exception { + localZkS.close(); + } + + @AfterMethod(alwaysRun = true) + void cleanup() { + localZkS.clear(); + } + + @Test + public void testSetupClusterMetadataAndTeardown() throws Exception { + String[] args1 = { + "--cluster", "testReSetupClusterMetadata-cluster", + "--zookeeper", "127.0.0.1:" + localZkS.getZookeeperPort(), + "--configuration-store", "127.0.0.1:" + localZkS.getZookeeperPort(), + "--configuration-metadata-store-config-path", "src/test/resources/conf/zk_client_enable_sasl.conf", + "--web-service-url", "http://127.0.0.1:8080", + "--web-service-url-tls", "https://127.0.0.1:8443", + "--broker-service-url", "pulsar://127.0.0.1:6650", + "--broker-service-url-tls", "pulsar+ssl://127.0.0.1:6651" + }; + PulsarClusterMetadataSetup.main(args1); + SortedMap data1 = localZkS.dumpData(); + String clusterDataJson = data1.get("/admin/clusters/testReSetupClusterMetadata-cluster"); + assertNotNull(clusterDataJson); + ClusterData clusterData = ObjectMapperFactory + .getMapper() + .reader() + .readValue(clusterDataJson, ClusterData.class); + assertEquals(clusterData.getServiceUrl(), "http://127.0.0.1:8080"); + assertEquals(clusterData.getServiceUrlTls(), "https://127.0.0.1:8443"); + assertEquals(clusterData.getBrokerServiceUrl(), "pulsar://127.0.0.1:6650"); + assertEquals(clusterData.getBrokerServiceUrlTls(), "pulsar+ssl://127.0.0.1:6651"); + assertFalse(clusterData.isBrokerClientTlsEnabled()); + + String[] args2 = { + "--cluster", "testReSetupClusterMetadata-cluster", + "--zookeeper", "127.0.0.1:" + localZkS.getZookeeperPort(), + "--configuration-store", "127.0.0.1:" + localZkS.getZookeeperPort(), + "--configuration-metadata-store-config-path", "src/test/resources/conf/zk_client_enable_sasl.conf", + }; + PulsarClusterMetadataTeardown.main(args2); + SortedMap data2 = localZkS.dumpData(); + assertFalse(data2.containsKey("/admin/clusters/testReSetupClusterMetadata-cluster")); + } +}