diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index c14602bfca507..f9e593345d85f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -713,7 +713,7 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa Throwable actEx = FutureUtil.unwrapCompletionException(ex); if (actEx instanceof WebApplicationException restException) { if (restException.getResponse().getStatus() == Response.Status.NOT_FOUND.getStatusCode()) { - writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.MetadataError, + writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.TopicNotFound, "Tenant or namespace or topic does not exist: " + topicName.getNamespace() , requestId)); lookupSemaphore.release(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java index d06f6ea1c56aa..f3237676c8630 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java @@ -599,8 +599,7 @@ public void testNamespaceNotExist(TopicDomain topicDomain) throws Exception { fail("Expected a not found ex"); } catch (Exception ex) { Throwable unwrapEx = FutureUtil.unwrapCompletionException(ex); - assertTrue(unwrapEx instanceof PulsarClientException.BrokerMetadataException || - unwrapEx instanceof PulsarClientException.TopicDoesNotExistException); + assertTrue(unwrapEx instanceof PulsarClientException.TopicDoesNotExistException); } } // Verify: lookup semaphore has been releases. diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java index f8ae0279e08b7..9da3fcbd0edc9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java @@ -35,6 +35,7 @@ import java.util.Properties; import java.util.Set; import java.util.concurrent.TimeUnit; +import lombok.Cleanup; import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.impl.auth.AuthenticationToken; @@ -42,8 +43,9 @@ import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -56,31 +58,35 @@ public class TokenAuthenticatedProducerConsumerTest extends ProducerConsumerBase { private static final Logger log = LoggerFactory.getLogger(TokenAuthenticatedProducerConsumerTest.class); + private final static String ADMIN_ROLE = "admin"; private final String ADMIN_TOKEN; + private final String USER_TOKEN; private final String TOKEN_PUBLIC_KEY; + private final KeyPair kp; TokenAuthenticatedProducerConsumerTest() throws NoSuchAlgorithmException { KeyPairGenerator kpg = KeyPairGenerator.getInstance("RSA"); - KeyPair kp = kpg.generateKeyPair(); + kp = kpg.generateKeyPair(); byte[] encodedPublicKey = kp.getPublic().getEncoded(); TOKEN_PUBLIC_KEY = "data:;base64," + Base64.getEncoder().encodeToString(encodedPublicKey); - ADMIN_TOKEN = generateToken(kp); + ADMIN_TOKEN = generateToken(ADMIN_ROLE); + USER_TOKEN = generateToken("user"); } - private String generateToken(KeyPair kp) { + private String generateToken(String subject) { PrivateKey pkey = kp.getPrivate(); long expMillis = System.currentTimeMillis() + Duration.ofHours(1).toMillis(); Date exp = new Date(expMillis); return Jwts.builder() - .setSubject("admin") + .setSubject(subject) .setExpiration(exp) .signWith(pkey, SignatureAlgorithm.forSigningKey(pkey)) .compact(); } - @BeforeMethod + @BeforeClass @Override protected void setup() throws Exception { conf.setAuthenticationEnabled(true); @@ -118,7 +124,7 @@ protected final void clientSetup() throws Exception { .authentication(AuthenticationFactory.token(ADMIN_TOKEN))); } - @AfterMethod(alwaysRun = true) + @AfterClass(alwaysRun = true) @Override protected void cleanup() throws Exception { super.internalCleanup(); @@ -172,4 +178,53 @@ public void testTokenProducerAndConsumer() throws Exception { log.info("-- Exiting {} test --", methodName); } + @DataProvider + public static Object[][] provider() { + // The 1st element specifies whether to use TCP service URL + // The 2nd element specifies whether to use a token with correct permission + return new Object[][] { + { true, true }, + { true, false }, + { false, true }, + { false, false }, + }; + } + + @Test(dataProvider = "provider") + public void testTopicNotFound(boolean useTcpServiceUrl, boolean useCorrectToken) throws Exception { + final var operationTimeoutMs = 10000; + final var url = useTcpServiceUrl ? pulsar.getBrokerServiceUrl() : pulsar.getWebServiceAddress(); + final var token = useCorrectToken ? ADMIN_TOKEN : USER_TOKEN; + @Cleanup final var client = PulsarClient.builder().serviceUrl(url) + .operationTimeout(operationTimeoutMs, TimeUnit.MILLISECONDS) + .authentication(AuthenticationFactory.token(token)).build(); + final var topic = "my-property/not-exist/tp"; // the namespace does not exist + var start = System.currentTimeMillis(); + try { + client.newProducer().topic(topic).create(); + Assert.fail(); + } catch (PulsarClientException e) { + final var elapsedMs = System.currentTimeMillis() - start; + log.info("Failed to create producer after {} ms: {} {}", elapsedMs, e.getClass().getName(), e.getMessage()); + Assert.assertTrue(elapsedMs < operationTimeoutMs); + if (useTcpServiceUrl) { + Assert.assertTrue(e instanceof PulsarClientException.TopicDoesNotExistException); + } else { + Assert.assertTrue(e instanceof PulsarClientException.NotFoundException); + } + } + start = System.currentTimeMillis(); + try { + client.newConsumer().topic(topic).subscriptionName("sub").subscribe(); + } catch (PulsarClientException e) { + final var elapsedMs = System.currentTimeMillis() - start; + log.info("Failed to subscribe after {} ms: {} {}", elapsedMs, e.getClass().getName(), e.getMessage()); + Assert.assertTrue(elapsedMs < operationTimeoutMs); + if (useTcpServiceUrl) { + Assert.assertTrue(e instanceof PulsarClientException.TopicDoesNotExistException); + } else { + Assert.assertTrue(e instanceof PulsarClientException.NotFoundException); + } + } + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 603844eeb786e..871666620b7b4 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -418,9 +418,9 @@ private CompletableFuture checkPartitions(String topic, boolean forceNo } }).exceptionally(ex -> { Throwable actEx = FutureUtil.unwrapCompletionException(ex); - if (forceNoPartitioned && actEx instanceof PulsarClientException.NotFoundException + if (forceNoPartitioned && (actEx instanceof PulsarClientException.NotFoundException || actEx instanceof PulsarClientException.TopicDoesNotExistException - || actEx instanceof PulsarAdminException.NotFoundException) { + || actEx instanceof PulsarAdminException.NotFoundException)) { checkPartitions.complete(0); } else { checkPartitions.completeExceptionally(ex);