Skip to content

Commit

Permalink
[improve][broker] Make cluster metadata teardown command support meta…
Browse files Browse the repository at this point in the history
…data config path (apache#23520)

(cherry picked from commit 69ca0cb)
(cherry picked from commit 271fee2)
  • Loading branch information
Demogorgon314 authored and nikhil-ctds committed Nov 20, 2024
1 parent f110dec commit 143e139
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreFactory;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -49,9 +50,17 @@ public class PulsarClusterMetadataTeardown {

private static class Arguments {
@Parameter(names = { "-zk",
"--zookeeper"}, description = "Local ZooKeeper quorum connection string", required = true)
"--zookeeper"}, description = "Local ZooKeeper quorum connection string")
private String zookeeper;

@Parameter(names = {
"--metadata-store"}, description = "Metadata Store service url. eg: zk:my-zk:2181")
private String metadataStoreUrl;

@Parameter(names = {"-mscp",
"--metadata-store-config-path"}, description = "Metadata Store config path")
private String metadataStoreConfigPath;

@Parameter(names = {
"--zookeeper-session-timeout-ms"
}, description = "Local zookeeper session timeout ms")
Expand All @@ -63,6 +72,11 @@ private static class Arguments {
@Parameter(names = { "-cs", "--configuration-store" }, description = "Configuration Store connection string")
private String configurationStore;

@Parameter(names = {"-cmscp",
"--configuration-metadata-store-config-path"}, description = "Configuration Metadata Store config path",
hidden = false)
private String configurationStoreConfigPath;

@Parameter(names = { "--bookkeeper-metadata-service-uri" }, description = "Metadata service uri of BookKeeper")
private String bkMetadataServiceUri;

Expand Down Expand Up @@ -97,11 +111,21 @@ public static void main(String[] args) throws Exception {
throw e;
}

if (arguments.metadataStoreUrl == null && arguments.zookeeper == null) {
jcommander.usage();
throw new IllegalArgumentException("Metadata store address argument is required (--metadata-store)");
}

if (arguments.metadataStoreUrl == null) {
arguments.metadataStoreUrl = ZKMetadataStore.ZK_SCHEME_IDENTIFIER + arguments.zookeeper;
}

@Cleanup
MetadataStoreExtended metadataStore = MetadataStoreExtended.create(arguments.zookeeper,
MetadataStoreExtended metadataStore = MetadataStoreExtended.create(arguments.metadataStoreUrl,
MetadataStoreConfig.builder()
.sessionTimeoutMillis(arguments.zkSessionTimeoutMillis)
.metadataStoreName(MetadataStoreConfig.METADATA_STORE)
.configFilePath(arguments.metadataStoreConfigPath)
.build());

if (arguments.bkMetadataServiceUri != null) {
Expand All @@ -125,6 +149,7 @@ public static void main(String[] args) throws Exception {
@Cleanup
MetadataStore configMetadataStore = MetadataStoreFactory.create(arguments.configurationStore,
MetadataStoreConfig.builder().sessionTimeoutMillis(arguments.zkSessionTimeoutMillis)
.configFilePath(arguments.configurationStoreConfigPath)
.metadataStoreName(MetadataStoreConfig.CONFIGURATION_METADATA_STORE).build());
deleteRecursively(configMetadataStore, "/admin/clusters/" + arguments.cluster).join();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ public void start() throws IOException {
log.info("ZooKeeper started at {}", hostPort);
}

private void clear() {
void clear() {
zks.getZKDatabase().clear();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.zookeeper;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import java.util.SortedMap;
import org.apache.pulsar.PulsarClusterMetadataSetup;
import org.apache.pulsar.PulsarClusterMetadataTeardown;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

public class ClusterMetadataTeardownTest {

private ClusterMetadataSetupTest.ZookeeperServerTest localZkS;

@BeforeClass
void setup() throws Exception {
localZkS = new ClusterMetadataSetupTest.ZookeeperServerTest(0);
localZkS.start();
}

@AfterClass
void teardown() throws Exception {
localZkS.close();
}

@AfterMethod(alwaysRun = true)
void cleanup() {
localZkS.clear();
}

@Test
public void testSetupClusterMetadataAndTeardown() throws Exception {
String[] args1 = {
"--cluster", "testReSetupClusterMetadata-cluster",
"--zookeeper", "127.0.0.1:" + localZkS.getZookeeperPort(),
"--configuration-store", "127.0.0.1:" + localZkS.getZookeeperPort(),
"--configuration-metadata-store-config-path", "src/test/resources/conf/zk_client_enable_sasl.conf",
"--web-service-url", "http://127.0.0.1:8080",
"--web-service-url-tls", "https://127.0.0.1:8443",
"--broker-service-url", "pulsar://127.0.0.1:6650",
"--broker-service-url-tls", "pulsar+ssl://127.0.0.1:6651"
};
PulsarClusterMetadataSetup.main(args1);
SortedMap<String, String> data1 = localZkS.dumpData();
String clusterDataJson = data1.get("/admin/clusters/testReSetupClusterMetadata-cluster");
assertNotNull(clusterDataJson);
ClusterData clusterData = ObjectMapperFactory
.getMapper()
.reader()
.readValue(clusterDataJson, ClusterData.class);
assertEquals(clusterData.getServiceUrl(), "http://127.0.0.1:8080");
assertEquals(clusterData.getServiceUrlTls(), "https://127.0.0.1:8443");
assertEquals(clusterData.getBrokerServiceUrl(), "pulsar://127.0.0.1:6650");
assertEquals(clusterData.getBrokerServiceUrlTls(), "pulsar+ssl://127.0.0.1:6651");
assertFalse(clusterData.isBrokerClientTlsEnabled());

String[] args2 = {
"--cluster", "testReSetupClusterMetadata-cluster",
"--zookeeper", "127.0.0.1:" + localZkS.getZookeeperPort(),
"--configuration-store", "127.0.0.1:" + localZkS.getZookeeperPort(),
"--configuration-metadata-store-config-path", "src/test/resources/conf/zk_client_enable_sasl.conf",
};
PulsarClusterMetadataTeardown.main(args2);
SortedMap<String, String> data2 = localZkS.dumpData();
assertFalse(data2.containsKey("/admin/clusters/testReSetupClusterMetadata-cluster"));
}
}

0 comments on commit 143e139

Please sign in to comment.