Skip to content

Commit

Permalink
[fix][broker] Fix currently client retries until operation timeout if…
Browse files Browse the repository at this point in the history
… the topic does not exist (apache#23530)
  • Loading branch information
BewareMyPower authored Nov 6, 2024
1 parent c4ee362 commit 8eeb0e2
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,7 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa
Throwable actEx = FutureUtil.unwrapCompletionException(ex);
if (actEx instanceof WebApplicationException restException) {
if (restException.getResponse().getStatus() == Response.Status.NOT_FOUND.getStatusCode()) {
writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.MetadataError,
writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.TopicNotFound,
"Tenant or namespace or topic does not exist: " + topicName.getNamespace() ,
requestId));
lookupSemaphore.release();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -599,8 +599,7 @@ public void testNamespaceNotExist(TopicDomain topicDomain) throws Exception {
fail("Expected a not found ex");
} catch (Exception ex) {
Throwable unwrapEx = FutureUtil.unwrapCompletionException(ex);
assertTrue(unwrapEx instanceof PulsarClientException.BrokerMetadataException ||
unwrapEx instanceof PulsarClientException.TopicDoesNotExistException);
assertTrue(unwrapEx instanceof PulsarClientException.TopicDoesNotExistException);
}
}
// Verify: lookup semaphore has been releases.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,17 @@
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

Expand All @@ -56,31 +58,35 @@
public class TokenAuthenticatedProducerConsumerTest extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(TokenAuthenticatedProducerConsumerTest.class);

private final static String ADMIN_ROLE = "admin";
private final String ADMIN_TOKEN;
private final String USER_TOKEN;
private final String TOKEN_PUBLIC_KEY;
private final KeyPair kp;

TokenAuthenticatedProducerConsumerTest() throws NoSuchAlgorithmException {
KeyPairGenerator kpg = KeyPairGenerator.getInstance("RSA");
KeyPair kp = kpg.generateKeyPair();
kp = kpg.generateKeyPair();

byte[] encodedPublicKey = kp.getPublic().getEncoded();
TOKEN_PUBLIC_KEY = "data:;base64," + Base64.getEncoder().encodeToString(encodedPublicKey);
ADMIN_TOKEN = generateToken(kp);
ADMIN_TOKEN = generateToken(ADMIN_ROLE);
USER_TOKEN = generateToken("user");
}

private String generateToken(KeyPair kp) {
private String generateToken(String subject) {
PrivateKey pkey = kp.getPrivate();
long expMillis = System.currentTimeMillis() + Duration.ofHours(1).toMillis();
Date exp = new Date(expMillis);

return Jwts.builder()
.setSubject("admin")
.setSubject(subject)
.setExpiration(exp)
.signWith(pkey, SignatureAlgorithm.forSigningKey(pkey))
.compact();
}

@BeforeMethod
@BeforeClass
@Override
protected void setup() throws Exception {
conf.setAuthenticationEnabled(true);
Expand Down Expand Up @@ -118,7 +124,7 @@ protected final void clientSetup() throws Exception {
.authentication(AuthenticationFactory.token(ADMIN_TOKEN)));
}

@AfterMethod(alwaysRun = true)
@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
Expand Down Expand Up @@ -172,4 +178,53 @@ public void testTokenProducerAndConsumer() throws Exception {
log.info("-- Exiting {} test --", methodName);
}

@DataProvider
public static Object[][] provider() {
// The 1st element specifies whether to use TCP service URL
// The 2nd element specifies whether to use a token with correct permission
return new Object[][] {
{ true, true },
{ true, false },
{ false, true },
{ false, false },
};
}

@Test(dataProvider = "provider")
public void testTopicNotFound(boolean useTcpServiceUrl, boolean useCorrectToken) throws Exception {
final var operationTimeoutMs = 10000;
final var url = useTcpServiceUrl ? pulsar.getBrokerServiceUrl() : pulsar.getWebServiceAddress();
final var token = useCorrectToken ? ADMIN_TOKEN : USER_TOKEN;
@Cleanup final var client = PulsarClient.builder().serviceUrl(url)
.operationTimeout(operationTimeoutMs, TimeUnit.MILLISECONDS)
.authentication(AuthenticationFactory.token(token)).build();
final var topic = "my-property/not-exist/tp"; // the namespace does not exist
var start = System.currentTimeMillis();
try {
client.newProducer().topic(topic).create();
Assert.fail();
} catch (PulsarClientException e) {
final var elapsedMs = System.currentTimeMillis() - start;
log.info("Failed to create producer after {} ms: {} {}", elapsedMs, e.getClass().getName(), e.getMessage());
Assert.assertTrue(elapsedMs < operationTimeoutMs);
if (useTcpServiceUrl) {
Assert.assertTrue(e instanceof PulsarClientException.TopicDoesNotExistException);
} else {
Assert.assertTrue(e instanceof PulsarClientException.NotFoundException);
}
}
start = System.currentTimeMillis();
try {
client.newConsumer().topic(topic).subscriptionName("sub").subscribe();
} catch (PulsarClientException e) {
final var elapsedMs = System.currentTimeMillis() - start;
log.info("Failed to subscribe after {} ms: {} {}", elapsedMs, e.getClass().getName(), e.getMessage());
Assert.assertTrue(elapsedMs < operationTimeoutMs);
if (useTcpServiceUrl) {
Assert.assertTrue(e instanceof PulsarClientException.TopicDoesNotExistException);
} else {
Assert.assertTrue(e instanceof PulsarClientException.NotFoundException);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -418,9 +418,9 @@ private CompletableFuture<Integer> checkPartitions(String topic, boolean forceNo
}
}).exceptionally(ex -> {
Throwable actEx = FutureUtil.unwrapCompletionException(ex);
if (forceNoPartitioned && actEx instanceof PulsarClientException.NotFoundException
if (forceNoPartitioned && (actEx instanceof PulsarClientException.NotFoundException
|| actEx instanceof PulsarClientException.TopicDoesNotExistException
|| actEx instanceof PulsarAdminException.NotFoundException) {
|| actEx instanceof PulsarAdminException.NotFoundException)) {
checkPartitions.complete(0);
} else {
checkPartitions.completeExceptionally(ex);
Expand Down

0 comments on commit 8eeb0e2

Please sign in to comment.