From 8f9f7016c78a05557935e0ca1506372fcfb62dda Mon Sep 17 00:00:00 2001 From: Kai Wang Date: Mon, 9 Sep 2024 17:22:08 +0800 Subject: [PATCH] [improve][broker] Make cluster metadata init command support metadata config path (#23269) (cherry picked from commit 46f99b91145ec8f71e38b8cb9671d39628ed53de) (cherry picked from commit fa2e7d8f08313c07225c71fb963317c89185a8d9) --- .../pulsar/PulsarClusterMetadataSetup.java | 38 ++++++++++++++++--- .../pulsar/PulsarInitialNamespaceSetup.java | 11 +++++- ...arTransactionCoordinatorMetadataSetup.java | 11 +++++- .../zookeeper/ClusterMetadataSetupTest.java | 9 +++-- .../resources/conf/zk_client_enable_sasl.conf | 20 ++++++++++ 5 files changed, 76 insertions(+), 13 deletions(-) create mode 100644 pulsar-broker/src/test/resources/conf/zk_client_enable_sasl.conf diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java index 624f8f9236c5e..fede660c2c716 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java @@ -137,8 +137,17 @@ private static class Arguments { hidden = false) private String configurationMetadataStore; + @Parameter(names = {"-mscp", + "--metadata-store-config-path"}, description = "Metadata Store config path", hidden = false) + private String metadataStoreConfigPath; + + @Parameter(names = {"-cmscp", + "--configuration-metadata-store-config-path"}, description = "Configuration Metadata Store config path", + hidden = false) + private String configurationStoreConfigPath; + @Parameter(names = { - "--initial-num-stream-storage-containers" + "--initial-num-stream-storage-containers" }, description = "Num storage containers of BookKeeper stream storage") private int numStreamStorageContainers = 16; @@ -281,9 +290,11 @@ private static void initializeCluster(Arguments arguments, int bundleNumberForDe log.info("Setting up cluster {} with metadata-store={} configuration-metadata-store={}", arguments.cluster, arguments.metadataStoreUrl, arguments.configurationMetadataStore); - MetadataStoreExtended localStore = - initLocalMetadataStore(arguments.metadataStoreUrl, arguments.zkSessionTimeoutMillis); + MetadataStoreExtended localStore = initLocalMetadataStore(arguments.metadataStoreUrl, + arguments.metadataStoreConfigPath, + arguments.zkSessionTimeoutMillis); MetadataStoreExtended configStore = initConfigMetadataStore(arguments.configurationMetadataStore, + arguments.configurationStoreConfigPath, arguments.zkSessionTimeoutMillis); final String metadataStoreUrlNoIdentifer = MetadataStoreFactoryImpl @@ -462,9 +473,17 @@ static void createPartitionedTopic(MetadataStore configStore, TopicName topicNam } } - public static MetadataStoreExtended initLocalMetadataStore(String connection, int sessionTimeout) throws Exception { + public static MetadataStoreExtended initLocalMetadataStore(String connection, + int sessionTimeout) throws Exception { + return initLocalMetadataStore(connection, null, sessionTimeout); + } + + public static MetadataStoreExtended initLocalMetadataStore(String connection, + String configPath, + int sessionTimeout) throws Exception { MetadataStoreExtended store = MetadataStoreExtended.create(connection, MetadataStoreConfig.builder() .sessionTimeoutMillis(sessionTimeout) + .configFilePath(configPath) .metadataStoreName(MetadataStoreConfig.METADATA_STORE) .build()); if (store instanceof MetadataStoreLifecycle) { @@ -473,10 +492,19 @@ public static MetadataStoreExtended initLocalMetadataStore(String connection, in return store; } - public static MetadataStoreExtended initConfigMetadataStore(String connection, int sessionTimeout) + public static MetadataStoreExtended initConfigMetadataStore(String connection, + int sessionTimeout) + throws Exception { + return initConfigMetadataStore(connection, null, sessionTimeout); + } + + public static MetadataStoreExtended initConfigMetadataStore(String connection, + String configPath, + int sessionTimeout) throws Exception { MetadataStoreExtended store = MetadataStoreExtended.create(connection, MetadataStoreConfig.builder() .sessionTimeoutMillis(sessionTimeout) + .configFilePath(configPath) .metadataStoreName(MetadataStoreConfig.CONFIGURATION_METADATA_STORE) .build()); if (store instanceof MetadataStoreLifecycle) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarInitialNamespaceSetup.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarInitialNamespaceSetup.java index 22b38e59676ec..ac7ef9961d338 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarInitialNamespaceSetup.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarInitialNamespaceSetup.java @@ -40,6 +40,11 @@ private static class Arguments { "--configuration-store" }, description = "Configuration Store connection string", required = true) private String configurationStore; + @Parameter(names = {"-cmscp", + "--configuration-metadata-store-config-path"}, description = "Configuration Metadata Store config path", + hidden = false) + private String configurationStoreConfigPath; + @Parameter(names = { "--zookeeper-session-timeout-ms" }, description = "Local zookeeper session timeout ms") @@ -82,8 +87,10 @@ public static int doMain(String[] args) throws Exception { return 1; } - try (MetadataStore configStore = PulsarClusterMetadataSetup - .initConfigMetadataStore(arguments.configurationStore, arguments.zkSessionTimeoutMillis)) { + try (MetadataStore configStore = PulsarClusterMetadataSetup.initConfigMetadataStore( + arguments.configurationStore, + arguments.configurationStoreConfigPath, + arguments.zkSessionTimeoutMillis)) { PulsarResources pulsarResources = new PulsarResources(null, configStore); for (String namespace : arguments.namespaces) { NamespaceName namespaceName = null; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarTransactionCoordinatorMetadataSetup.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarTransactionCoordinatorMetadataSetup.java index 6aedfe13a5b50..ec4c7f237ba3b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarTransactionCoordinatorMetadataSetup.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarTransactionCoordinatorMetadataSetup.java @@ -41,6 +41,11 @@ private static class Arguments { "--configuration-store" }, description = "Configuration Store connection string", required = true) private String configurationStore; + @Parameter(names = {"-cmscp", + "--configuration-metadata-store-config-path"}, description = "Configuration Metadata Store config path", + hidden = false) + private String configurationStoreConfigPath; + @Parameter(names = { "--zookeeper-session-timeout-ms" }, description = "Local zookeeper session timeout ms") @@ -90,8 +95,10 @@ public static void main(String[] args) throws Exception { System.exit(1); } - try (MetadataStoreExtended configStore = PulsarClusterMetadataSetup - .initConfigMetadataStore(arguments.configurationStore, arguments.zkSessionTimeoutMillis)) { + try (MetadataStoreExtended configStore = PulsarClusterMetadataSetup.initConfigMetadataStore( + arguments.configurationStore, + arguments.configurationStoreConfigPath, + arguments.zkSessionTimeoutMillis)) { PulsarResources pulsarResources = new PulsarResources(null, configStore); // Create system tenant PulsarClusterMetadataSetup diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataSetupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataSetupTest.java index 4267c7564fa6f..6a7d54dbf7cae 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataSetupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataSetupTest.java @@ -74,10 +74,11 @@ public void testReSetupClusterMetadata() throws Exception { "--cluster", "testReSetupClusterMetadata-cluster", "--zookeeper", "127.0.0.1:" + localZkS.getZookeeperPort(), "--configuration-store", "127.0.0.1:" + localZkS.getZookeeperPort(), + "--configuration-metadata-store-config-path", "src/test/resources/conf/zk_client_enable_sasl.conf", "--web-service-url", "http://127.0.0.1:8080", "--web-service-url-tls", "https://127.0.0.1:8443", "--broker-service-url", "pulsar://127.0.0.1:6650", - "--broker-service-url-tls","pulsar+ssl://127.0.0.1:6651" + "--broker-service-url-tls", "pulsar+ssl://127.0.0.1:6651" }; PulsarClusterMetadataSetup.main(args); SortedMap data1 = localZkS.dumpData(); @@ -505,11 +506,9 @@ static class ZookeeperServerTest implements Closeable { private ZooKeeperServer zks; private NIOServerCnxnFactory serverFactory; private final int zkPort; - private final String hostPort; public ZookeeperServerTest(int zkPort) throws IOException { this.zkPort = zkPort; - this.hostPort = "127.0.0.1:" + zkPort; this.zkTmpDir = File.createTempFile("zookeeper", "test"); log.info("**** Start GZK on {} ****", zkTmpDir); if (!zkTmpDir.delete() || !zkTmpDir.mkdir()) { @@ -519,15 +518,17 @@ public ZookeeperServerTest(int zkPort) throws IOException { public void start() throws IOException { try { + System.setProperty("zookeeper.4lw.commands.whitelist", "*"); zks = new ZooKeeperServer(zkTmpDir, zkTmpDir, ZooKeeperServer.DEFAULT_TICK_TIME); zks.setMaxSessionTimeout(20000); serverFactory = new NIOServerCnxnFactory(); - serverFactory.configure(new InetSocketAddress(zkPort), 1000); + serverFactory.configure(new InetSocketAddress("127.0.0.1", zkPort), 1000); serverFactory.startup(zks); } catch (Exception e) { log.error("Exception while instantiating ZooKeeper", e); } + String hostPort = "127.0.0.1:" + serverFactory.getLocalPort(); LocalBookkeeperEnsemble.waitForServerUp(hostPort, 30000); log.info("ZooKeeper started at {}", hostPort); } diff --git a/pulsar-broker/src/test/resources/conf/zk_client_enable_sasl.conf b/pulsar-broker/src/test/resources/conf/zk_client_enable_sasl.conf new file mode 100644 index 0000000000000..c59e093450d39 --- /dev/null +++ b/pulsar-broker/src/test/resources/conf/zk_client_enable_sasl.conf @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +zookeeper.sasl.client=true