Skip to content

Commit

Permalink
[fix][broker] Fix PulsarService.getLookupServiceAddress returns wrong…
Browse files Browse the repository at this point in the history
… port if TLS is enabled (apache#21015)
  • Loading branch information
Technoboy- authored Sep 18, 2023
1 parent 62a88f9 commit 1363777
Show file tree
Hide file tree
Showing 24 changed files with 101 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1729,18 +1729,18 @@ public static String webAddressTls(String host, int port) {
}

public String getSafeWebServiceAddress() {
return webServiceAddress != null ? webServiceAddress : webServiceAddressTls;
return webServiceAddressTls != null ? webServiceAddressTls : webServiceAddress;
}

@Deprecated
public String getSafeBrokerServiceUrl() {
return brokerServiceUrl != null ? brokerServiceUrl : brokerServiceUrlTls;
return brokerServiceUrlTls != null ? brokerServiceUrlTls : brokerServiceUrl;
}

public String getLookupServiceAddress() {
return String.format("%s:%s", advertisedAddress, config.getWebServicePort().isPresent()
? config.getWebServicePort().get()
: config.getWebServicePortTls().orElseThrow());
return String.format("%s:%s", advertisedAddress, config.getWebServicePortTls().isPresent()
? config.getWebServicePortTls().get()
: config.getWebServicePort().orElseThrow());
}

public TopicPoliciesService getTopicPoliciesService() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1591,10 +1591,10 @@ public static NamespaceName getHeartbeatNamespaceV2(String lookupBroker, Service

public static NamespaceName getSLAMonitorNamespace(String host, ServiceConfiguration config) {
Integer port = null;
if (config.getWebServicePort().isPresent()) {
port = config.getWebServicePort().get();
} else if (config.getWebServicePortTls().isPresent()) {
if (config.getWebServicePortTls().isPresent()) {
port = config.getWebServicePortTls().get();
} else if (config.getWebServicePort().isPresent()) {
port = config.getWebServicePort().get();
}
return NamespaceName.get(String.format(SLA_NAMESPACE_FMT, config.getClusterName(), host, port));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ protected void cleanup() throws Exception {
@Override
protected void doInitConf() throws Exception {
super.doInitConf();
conf.setBrokerServicePortTls(Optional.of(0));
conf.setWebServicePortTls(Optional.of(0));
if (useStaticPorts) {
conf.setBrokerServicePortTls(Optional.of(6651));
conf.setBrokerServicePort(Optional.of(6660));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,19 +439,13 @@ public void testTopicPoliciesWithMultiBroker() throws Exception {
String tenantName = newUniqueName("prop-xyz2");
admin.tenants().createTenant(tenantName, tenantInfo);
admin.namespaces().createNamespace(tenantName + "/ns1", Set.of("test"));
conf.setBrokerServicePort(Optional.of(1024));
conf.setBrokerServicePortTls(Optional.of(1025));
conf.setWebServicePort(Optional.of(1026));
conf.setWebServicePortTls(Optional.of(1027));
ServiceConfiguration config2 = super.getDefaultConf();
@Cleanup
PulsarTestContext pulsarTestContext2 = createAdditionalPulsarTestContext(conf);
PulsarTestContext pulsarTestContext2 = createAdditionalPulsarTestContext(config2);
PulsarService pulsar2 = pulsarTestContext2.getPulsarService();
conf.setBrokerServicePort(Optional.of(2048));
conf.setBrokerServicePortTls(Optional.of(2049));
conf.setWebServicePort(Optional.of(2050));
conf.setWebServicePortTls(Optional.of(2051));
ServiceConfiguration config3 = super.getDefaultConf();
@Cleanup
PulsarTestContext pulsarTestContext3 = createAdditionalPulsarTestContext(conf);
PulsarTestContext pulsarTestContext3 = createAdditionalPulsarTestContext(config3);
PulsarService pulsar3 = pulsarTestContext.getPulsarService();
@Cleanup
PulsarAdmin admin2 = PulsarAdmin.builder().serviceHttpUrl(pulsar2.getWebServiceAddress()).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -931,7 +931,7 @@ public void persistentTopics(String topicName) throws Exception {
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 10);
assertEquals(topicStats.getPublishers().size(), 0);
assertEquals(topicStats.getOwnerBroker(),
pulsar.getAdvertisedAddress() + ":" + pulsar.getConfiguration().getWebServicePort().get());
pulsar.getAdvertisedAddress() + ":" + pulsar.getConfiguration().getWebServicePortTls().get());

PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(persistentTopicName, false);
assertEquals(internalStats.cursors.keySet(), Set.of(Codec.encode(subName)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,10 +227,8 @@ protected void doInitConf() throws Exception {
this.conf.setBrokerShutdownTimeoutMs(0L);
this.conf.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
this.conf.setBrokerServicePort(Optional.of(0));
this.conf.setBrokerServicePortTls(Optional.of(0));
this.conf.setAdvertisedAddress("localhost");
this.conf.setWebServicePort(Optional.of(0));
this.conf.setWebServicePortTls(Optional.of(0));
this.conf.setNumExecutorThreadPoolSize(5);
this.conf.setExposeBundlesMetricsInPrometheus(true);
}
Expand Down Expand Up @@ -470,9 +468,7 @@ protected ServiceConfiguration getDefaultConf() {
configuration.setBrokerShutdownTimeoutMs(0L);
configuration.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
configuration.setBrokerServicePort(Optional.of(0));
configuration.setBrokerServicePortTls(Optional.of(0));
configuration.setWebServicePort(Optional.of(0));
configuration.setWebServicePortTls(Optional.of(0));
configuration.setBookkeeperClientExposeStatsToPrometheus(true);
configuration.setNumExecutorThreadPoolSize(5);
configuration.setBrokerMaxConnections(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ private void updateConfig(ServiceConfiguration conf, String advertisedAddress) {
",public_https:https://localhost:" + httpsPort);
conf.setBrokerServicePort(Optional.of(pulsarPort));
conf.setWebServicePort(Optional.of(httpPort));
conf.setWebServicePortTls(Optional.of(httpsPort));
}

@Test
Expand All @@ -101,7 +100,6 @@ public void testLookup() throws Exception {

assertEquals(new URI(ld.getBrokerUrl()).getHost(), "localhost");
assertEquals(new URI(ld.getHttpUrl()).getHost(), "localhost");
assertEquals(new URI(ld.getHttpUrlTls()).getHost(), "localhost");


// Produce data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ void setup() throws Exception {
config.setAdvertisedAddress("localhost");
config.setWebServicePort(Optional.of(0));
config.setBrokerServicePortTls(Optional.of(0));
config.setWebServicePortTls(Optional.of(0));
config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
config.setBrokerShutdownTimeoutMs(0L);
config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ void setup() throws Exception {
config1.setBrokerServicePort(Optional.of(0));
config1.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
config1.setBrokerServicePortTls(Optional.of(0));
config1.setWebServicePortTls(Optional.of(0));
config1.setAdvertisedAddress("localhost");
pulsar1 = new PulsarService(config1);
pulsar1.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,6 @@ void setup() throws Exception {
config1.setBrokerShutdownTimeoutMs(0L);
config1.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config1.setBrokerServicePort(Optional.of(0));
config1.setBrokerServicePortTls(Optional.of(0));
config1.setWebServicePortTls(Optional.of(0));
pulsar1 = new PulsarService(config1);
pulsar1.start();

Expand All @@ -189,8 +187,6 @@ void setup() throws Exception {
config2.setBrokerShutdownTimeoutMs(0L);
config2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config2.setBrokerServicePort(Optional.of(0));
config2.setBrokerServicePortTls(Optional.of(0));
config2.setWebServicePortTls(Optional.of(0));
pulsar2 = new PulsarService(config2);
pulsar2.start();

Expand All @@ -204,8 +200,6 @@ void setup() throws Exception {
config.setBrokerShutdownTimeoutMs(0L);
config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setBrokerServicePort(Optional.of(0));
config.setBrokerServicePortTls(Optional.of(0));
config.setWebServicePortTls(Optional.of(0));
pulsar3 = new PulsarService(config);

secondaryHost = String.format("%s:%d", "localhost", pulsar2.getListenPortHTTP().get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,8 @@ protected void startBroker() throws Exception {
conf.setBrokerShutdownTimeoutMs(0L);
conf.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
conf.setBrokerServicePort(Optional.of(0));
conf.setBrokerServicePortTls(Optional.of(0));
conf.setAdvertisedAddress("localhost");
conf.setWebServicePort(Optional.of(0));
conf.setWebServicePortTls(Optional.of(0));
serviceConfigurationList.add(conf);

PulsarTestContext.Builder testContextBuilder =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1786,4 +1786,13 @@ public void testUnsubscribeNonDurableSub() throws Exception {
fail("Unsubscribe failed");
}
}

@Test
public void testGetLookupServiceAddress() throws Exception {
cleanup();
setup();
conf.setWebServicePortTls(Optional.of(8081));
assertEquals(pulsar.getLookupServiceAddress(), "localhost:8081");
resetState();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import java.lang.reflect.Method;
import java.net.URL;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import org.apache.pulsar.broker.BrokerTestUtil;
Expand Down Expand Up @@ -507,6 +508,14 @@ protected void setup() throws Exception {
super.setupWithClusterName(clusterName);
}

@Override
protected void doInitConf() throws Exception {
super.doInitConf();
this.conf.setWebServicePortTls(Optional.of(0));
this.conf.setBrokerServicePortTls(Optional.of(0));
}


public PulsarService getPulsarService() {
return pulsar;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ public void activeBrokerParse() throws Exception {
-> admin2.clusters().getCluster(cluster2) != null);

List<String> list = admin1.brokers().getActiveBrokers(cluster2);
assertEquals(list.get(0), url2.toString().replace("http://", ""));
assertEquals(list.get(0), urlTls2.toString().replace("https://", ""));
//restore configuration
pulsar1.getConfiguration().setAuthorizationEnabled(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,8 @@ protected void startBroker() throws Exception {
conf.setBrokerShutdownTimeoutMs(0L);
conf.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
conf.setBrokerServicePort(Optional.of(0));
conf.setBrokerServicePortTls(Optional.of(0));
conf.setAdvertisedAddress("localhost");
conf.setWebServicePort(Optional.of(0));
conf.setWebServicePortTls(Optional.of(0));
conf.setTransactionCoordinatorEnabled(true);
conf.setBrokerDeduplicationEnabled(true);
conf.setTransactionBufferSnapshotMaxTransactionCount(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ public void testWebserviceServiceTls() throws Exception {
// request [3]
doReturn(true).when(loadManager1).isCentralized();
doReturn(true).when(loadManager2).isCentralized();
SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar.getWebServiceAddress(), null);
SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar.getWebServiceAddressTls(), null);
doReturn(Optional.of(resourceUnit)).when(loadManager2).getLeastLoaded(any(ServiceUnitId.class));
doReturn(Optional.of(resourceUnit)).when(loadManager1).getLeastLoaded(any(ServiceUnitId.class));

Expand Down Expand Up @@ -507,6 +507,9 @@ public void testWebserviceServiceTls() throws Exception {

loadManager1 = null;
loadManager2 = null;

conf.setBrokerServicePortTls(Optional.empty());
conf.setWebServicePortTls(Optional.empty());
}

/**
Expand Down Expand Up @@ -840,6 +843,8 @@ public void testPartitionedMetadataWithDeprecatedVersion() throws Exception {
admin.topics().createPartitionedTopic(dest.toString(), totalPartitions);

stopBroker();
conf.setBrokerServicePortTls(Optional.empty());
conf.setWebServicePortTls(Optional.empty());
conf.setClientLibraryVersionCheckEnabled(true);
startBroker();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.expectThrows;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -50,7 +51,8 @@ protected void doInitConf() throws Exception {
Set<String> providers = new HashSet<>();
providers.add(AuthenticationProviderTls.class.getName());
conf.setAuthenticationProviders(providers);

conf.setWebServicePortTls(Optional.of(0));
conf.setBrokerServicePortTls(Optional.of(0));
conf.setTlsKeyFilePath(BROKER_KEY_FILE_PATH);
conf.setTlsCertificateFilePath(BROKER_CERT_FILE_PATH);
conf.setTlsTrustCertsFilePath(CA_CERT_FILE_PATH);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ protected void doInitConf() throws Exception {
conf.setAdvertisedAddress(null);
conf.setAuthenticateOriginalAuthData(true);
conf.setBrokerServicePort(Optional.of(0));
conf.setWebServicePortTls(Optional.of(0));
conf.setWebServicePort(Optional.of(0));

Set<String> superUserRoles = new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ protected void setup() throws Exception {

protected void doInitConf() throws Exception {
super.doInitConf();
this.conf.setBrokerServicePortTls(Optional.of(0));
this.conf.setTlsCertificateFilePath(PROXY_CERT_FILE_PATH);
this.conf.setTlsKeyFilePath(PROXY_KEY_FILE_PATH);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,13 @@ public class BrokerContainer extends PulsarContainer<BrokerContainer> {
public static final String NAME = "pulsar-broker";

public BrokerContainer(String clusterName, String hostName) {
super(clusterName, hostName, hostName, "bin/run-broker.sh", BROKER_PORT, BROKER_PORT_TLS,
BROKER_HTTP_PORT, BROKER_HTTPS_PORT, DEFAULT_HTTP_PATH, DEFAULT_IMAGE_NAME);
this(clusterName, hostName, false);
}

public BrokerContainer(String clusterName, String hostName, boolean enableTls) {
super(clusterName, hostName, hostName, "bin/run-broker.sh", BROKER_PORT,
enableTls ? BROKER_PORT_TLS : 0, BROKER_HTTP_PORT,
enableTls ? BROKER_HTTPS_PORT : 0, DEFAULT_HTTP_PATH, DEFAULT_IMAGE_NAME);
tailContainerLog();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,13 @@ public class ProxyContainer extends PulsarContainer<ProxyContainer> {
public static final String NAME = "pulsar-proxy";

public ProxyContainer(String clusterName, String hostName) {
super(clusterName, hostName, hostName, "bin/run-proxy.sh", BROKER_PORT, BROKER_PORT_TLS, BROKER_HTTP_PORT,
BROKER_HTTPS_PORT, DEFAULT_HTTP_PATH, DEFAULT_IMAGE_NAME);
this(clusterName, hostName, false);
}

public ProxyContainer(String clusterName, String hostName, boolean enableTls) {
super(clusterName, hostName, hostName, "bin/run-proxy.sh", BROKER_PORT,
enableTls ? BROKER_PORT_TLS : 0, BROKER_HTTP_PORT,
enableTls ? BROKER_HTTPS_PORT : 0, DEFAULT_HTTP_PATH, DEFAULT_IMAGE_NAME);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

Expand All @@ -41,6 +42,14 @@ private static String loadCertificateAuthorityFile(String name) {
return Resources.getResource("certificate-authority/" + name).getPath();
}

@Override
protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster(
String clusterName,
PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) {
specBuilder.enableTls(true);
return specBuilder;
}

@DataProvider(name = "adminUrls")
public Object[][] adminUrls() {
return new Object[][]{
Expand Down
Loading

0 comments on commit 1363777

Please sign in to comment.