diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java index 2b075675d900f..e47923563c638 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java @@ -21,8 +21,10 @@ import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; import com.google.protobuf.InvalidProtocolBufferException; +import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import lombok.Cleanup; import org.apache.bookkeeper.client.BKException; @@ -31,12 +33,18 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; +import org.apache.pulsar.broker.resources.NamespaceResources; +import org.apache.pulsar.broker.resources.PulsarResources; +import org.apache.pulsar.broker.resources.TenantResources; import org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator; +import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.docs.tools.CmdGenerateDocs; import org.apache.pulsar.metadata.api.MetadataStore; import org.apache.pulsar.metadata.api.MetadataStoreConfig; +import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.MetadataStoreFactory; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; import org.apache.pulsar.metadata.impl.ZKMetadataStore; @@ -151,12 +159,45 @@ public static void main(String[] args) throws Exception { MetadataStoreConfig.builder().sessionTimeoutMillis(arguments.zkSessionTimeoutMillis) .configFilePath(arguments.configurationStoreConfigPath) .metadataStoreName(MetadataStoreConfig.CONFIGURATION_METADATA_STORE).build()); - deleteRecursively(configMetadataStore, "/admin/clusters/" + arguments.cluster).join(); + PulsarResources resources = new PulsarResources(metadataStore, configMetadataStore); + // Cleanup replication cluster from all tenants and namespaces + TenantResources tenantResources = resources.getTenantResources(); + NamespaceResources namespaceResources = resources.getNamespaceResources(); + List tenants = tenantResources.listTenants(); + for (String tenant : tenants) { + List namespaces = namespaceResources.listNamespacesAsync(tenant).get(); + for (String namespace : namespaces) { + namespaceResources.setPolicies(NamespaceName.get(tenant, namespace), policies -> { + policies.replication_clusters.remove(arguments.cluster); + return policies; + }); + } + removeCurrentClusterFromAllowedClusters(tenantResources, tenant, arguments.cluster); + } + try { + resources.getClusterResources().deleteCluster(arguments.cluster); + } catch (MetadataStoreException.NotFoundException ex) { + // Ignore if the cluster does not exist + log.info("Cluster metadata for '{}' does not exist.", arguments.cluster); + } } log.info("Cluster metadata for '{}' teardown.", arguments.cluster); } + private static void removeCurrentClusterFromAllowedClusters( + TenantResources tenantResources, String tenant, String curCluster) + throws MetadataStoreException, InterruptedException, ExecutionException { + Optional tenantInfoOptional = tenantResources.getTenant(tenant); + if (tenantInfoOptional.isEmpty()) { + return; + } + tenantResources.updateTenantAsync(tenant, ti -> { + ti.getAllowedClusters().remove(curCluster); + return ti; + }).get(); + } + private static CompletableFuture deleteRecursively(MetadataStore metadataStore, String path) { return metadataStore.getChildren(path) .thenCompose(children -> FutureUtil.waitForAll( diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataTeardownTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataTeardownTest.java index 5184afade9c85..c689bb60fedf7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataTeardownTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataTeardownTest.java @@ -21,6 +21,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; import java.util.SortedMap; import org.apache.pulsar.PulsarClusterMetadataSetup; import org.apache.pulsar.PulsarClusterMetadataTeardown; @@ -54,7 +55,7 @@ void cleanup() { @Test public void testSetupClusterMetadataAndTeardown() throws Exception { String[] args1 = { - "--cluster", "testReSetupClusterMetadata-cluster", + "--cluster", "cluster1", "--zookeeper", "127.0.0.1:" + localZkS.getZookeeperPort(), "--configuration-store", "127.0.0.1:" + localZkS.getZookeeperPort(), "--configuration-metadata-store-config-path", "src/test/resources/conf/zk_client_enable_sasl.conf", @@ -65,7 +66,7 @@ public void testSetupClusterMetadataAndTeardown() throws Exception { }; PulsarClusterMetadataSetup.main(args1); SortedMap data1 = localZkS.dumpData(); - String clusterDataJson = data1.get("/admin/clusters/testReSetupClusterMetadata-cluster"); + String clusterDataJson = data1.get("/admin/clusters/cluster1"); assertNotNull(clusterDataJson); ClusterData clusterData = ObjectMapperFactory .getMapper() @@ -78,13 +79,78 @@ public void testSetupClusterMetadataAndTeardown() throws Exception { assertFalse(clusterData.isBrokerClientTlsEnabled()); String[] args2 = { - "--cluster", "testReSetupClusterMetadata-cluster", + "--cluster", "cluster1", "--zookeeper", "127.0.0.1:" + localZkS.getZookeeperPort(), "--configuration-store", "127.0.0.1:" + localZkS.getZookeeperPort(), "--configuration-metadata-store-config-path", "src/test/resources/conf/zk_client_enable_sasl.conf", }; PulsarClusterMetadataTeardown.main(args2); SortedMap data2 = localZkS.dumpData(); - assertFalse(data2.containsKey("/admin/clusters/testReSetupClusterMetadata-cluster")); + assertFalse(data2.containsKey("/admin/clusters/cluster1")); + } + + @Test + public void testSetupMultipleClusterMetadataAndTeardown() throws Exception { + String[] cluster1Args = { + "--cluster", "cluster1", + "--zookeeper", "127.0.0.1:" + localZkS.getZookeeperPort(), + "--configuration-store", "127.0.0.1:" + localZkS.getZookeeperPort(), + "--configuration-metadata-store-config-path", "src/test/resources/conf/zk_client_enable_sasl.conf", + "--web-service-url", "http://127.0.0.1:8080", + "--web-service-url-tls", "https://127.0.0.1:8443", + "--broker-service-url", "pulsar://127.0.0.1:6650", + "--broker-service-url-tls", "pulsar+ssl://127.0.0.1:6651" + }; + PulsarClusterMetadataSetup.main(cluster1Args); + String[] cluster2Args = { + "--cluster", "cluster2", + "--zookeeper", "127.0.0.1:" + localZkS.getZookeeperPort(), + "--configuration-store", "127.0.0.1:" + localZkS.getZookeeperPort(), + "--configuration-metadata-store-config-path", "src/test/resources/conf/zk_client_enable_sasl.conf", + "--web-service-url", "http://127.0.0.1:8081", + "--web-service-url-tls", "https://127.0.0.1:8445", + "--broker-service-url", "pulsar://127.0.0.1:6651", + "--broker-service-url-tls", "pulsar+ssl://127.0.0.1:6652" + }; + PulsarClusterMetadataSetup.main(cluster2Args); + SortedMap data1 = localZkS.dumpData(); + String clusterDataJson = data1.get("/admin/clusters/cluster1"); + assertNotNull(clusterDataJson); + ClusterData clusterData = ObjectMapperFactory + .getMapper() + .reader() + .readValue(clusterDataJson, ClusterData.class); + assertEquals(clusterData.getServiceUrl(), "http://127.0.0.1:8080"); + assertEquals(clusterData.getServiceUrlTls(), "https://127.0.0.1:8443"); + assertEquals(clusterData.getBrokerServiceUrl(), "pulsar://127.0.0.1:6650"); + assertEquals(clusterData.getBrokerServiceUrlTls(), "pulsar+ssl://127.0.0.1:6651"); + assertFalse(clusterData.isBrokerClientTlsEnabled()); + + String[] args2 = { + "--cluster", "cluster1", + "--zookeeper", "127.0.0.1:" + localZkS.getZookeeperPort(), + "--configuration-store", "127.0.0.1:" + localZkS.getZookeeperPort(), + "--configuration-metadata-store-config-path", "src/test/resources/conf/zk_client_enable_sasl.conf", + }; + PulsarClusterMetadataTeardown.main(args2); + SortedMap data2 = localZkS.dumpData(); + assertFalse(data2.containsKey("/admin/clusters/cluster1")); + assertTrue(data2.containsKey("/admin/clusters/cluster2")); + + assertTrue(data2.containsKey("/admin/policies/public")); + assertFalse(data2.get("/admin/policies/public").contains("cluster1")); + assertTrue(data2.get("/admin/policies/public").contains("cluster2")); + + assertTrue(data2.containsKey("/admin/policies/pulsar")); + assertFalse(data2.get("/admin/policies/pulsar").contains("cluster1")); + assertTrue(data2.get("/admin/policies/pulsar").contains("cluster2")); + + assertTrue(data2.containsKey("/admin/policies/public/default")); + assertFalse(data2.get("/admin/policies/public/default").contains("cluster1")); + assertTrue(data2.get("/admin/policies/public/default").contains("cluster2")); + + assertTrue(data2.containsKey("/admin/policies/pulsar/system")); + assertFalse(data2.get("/admin/policies/pulsar/system").contains("cluster1")); + assertTrue(data2.get("/admin/policies/pulsar/system").contains("cluster2")); } }