diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 2d356bea4012c..bc528f6a07202 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -236,6 +236,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { private final ExecutorProvider brokerClientSharedExternalExecutorProvider; private final ScheduledExecutorProvider brokerClientSharedScheduledExecutorProvider; private final Timer brokerClientSharedTimer; + private final ExecutorProvider brokerClientSharedLookupExecutorProvider; private MetricsGenerator metricsGenerator; @@ -342,6 +343,8 @@ public PulsarService(ServiceConfiguration config, new ScheduledExecutorProvider(1, "broker-client-shared-scheduled-executor"); this.brokerClientSharedTimer = new HashedWheelTimer(new DefaultThreadFactory("broker-client-shared-timer"), 1, TimeUnit.MILLISECONDS); + this.brokerClientSharedLookupExecutorProvider = + new ScheduledExecutorProvider(1, "broker-client-shared-lookup-executor"); } public MetadataStore createConfigurationMetadataStore() throws MetadataStoreException { @@ -551,6 +554,7 @@ public CompletableFuture closeAsync() { brokerClientSharedExternalExecutorProvider.shutdownNow(); brokerClientSharedInternalExecutorProvider.shutdownNow(); brokerClientSharedScheduledExecutorProvider.shutdownNow(); + brokerClientSharedLookupExecutorProvider.shutdownNow(); brokerClientSharedTimer.stop(); ioEventLoopGroup.shutdownGracefully(); @@ -1376,6 +1380,7 @@ public PulsarClientImpl createClientImpl(ClientConfigurationData clientConf) .internalExecutorProvider(brokerClientSharedInternalExecutorProvider) .externalExecutorProvider(brokerClientSharedExternalExecutorProvider) .scheduledExecutorProvider(brokerClientSharedScheduledExecutorProvider) + .lookupExecutorProvider(brokerClientSharedLookupExecutorProvider) .build(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java index 9405fe3c0607e..8c6f8c87ab6c5 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java @@ -20,6 +20,7 @@ import static java.lang.String.format; import io.netty.buffer.ByteBuf; +import io.netty.util.concurrent.DefaultThreadFactory; import java.net.InetSocketAddress; import java.net.URI; import java.util.ArrayList; @@ -28,6 +29,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -53,9 +55,11 @@ public class BinaryProtoLookupService implements LookupService { private final PulsarClientImpl client; private final ServiceNameResolver serviceNameResolver; private final boolean useTls; - private final ExecutorService executor; + private final ExecutorService scheduleExecutor; private final String listenerName; private final int maxLookupRedirects; + private final ExecutorService lookupPinnedExecutor; + private final boolean createdLookupPinnedExecutor; private final ConcurrentHashMap>> lookupInProgress = new ConcurrentHashMap<>(); @@ -63,27 +67,55 @@ public class BinaryProtoLookupService implements LookupService { private final ConcurrentHashMap> partitionedMetadataInProgress = new ConcurrentHashMap<>(); + /** + * @deprecated use {@link + * #BinaryProtoLookupService(PulsarClientImpl, String, String, boolean, ExecutorService, ExecutorService)} instead. + */ + @Deprecated public BinaryProtoLookupService(PulsarClientImpl client, String serviceUrl, boolean useTls, - ExecutorService executor) + ExecutorService scheduleExecutor) throws PulsarClientException { - this(client, serviceUrl, null, useTls, executor); + this(client, serviceUrl, null, useTls, scheduleExecutor); } + /** + * @deprecated use {@link + * #BinaryProtoLookupService(PulsarClientImpl, String, String, boolean, ExecutorService, ExecutorService)} instead. + */ + @Deprecated public BinaryProtoLookupService(PulsarClientImpl client, String serviceUrl, String listenerName, boolean useTls, - ExecutorService executor) + ExecutorService scheduleExecutor) + throws PulsarClientException { + this(client, serviceUrl, listenerName, useTls, scheduleExecutor, null); + } + + public BinaryProtoLookupService(PulsarClientImpl client, + String serviceUrl, + String listenerName, + boolean useTls, + ExecutorService scheduleExecutor, + ExecutorService lookupPinnedExecutor) throws PulsarClientException { this.client = client; this.useTls = useTls; - this.executor = executor; + this.scheduleExecutor = scheduleExecutor; this.maxLookupRedirects = client.getConfiguration().getMaxLookupRedirects(); this.serviceNameResolver = new PulsarServiceNameResolver(); this.listenerName = listenerName; updateServiceUrl(serviceUrl); + if (lookupPinnedExecutor == null) { + this.createdLookupPinnedExecutor = true; + this.lookupPinnedExecutor = + Executors.newSingleThreadExecutor(new DefaultThreadFactory("pulsar-client-binary-proto-lookup")); + } else { + this.createdLookupPinnedExecutor = false; + this.lookupPinnedExecutor = lookupPinnedExecutor; + } } @Override @@ -148,7 +180,7 @@ private CompletableFuture> findBroker return addressFuture; } - client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> { + client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> { long requestId = client.newRequestId(); ByteBuf request = Commands.newLookup(topicName.toString(), listenerName, authoritative, requestId); clientCnx.newLookup(request, requestId).whenComplete((r, t) -> { @@ -213,7 +245,7 @@ private CompletableFuture> findBroker } client.getCnxPool().releaseConnection(clientCnx); }); - }).exceptionally(connectionException -> { + }, lookupPinnedExecutor).exceptionally(connectionException -> { addressFuture.completeExceptionally(FutureUtil.unwrapCompletionException(connectionException)); return null; }); @@ -225,7 +257,7 @@ private CompletableFuture getPartitionedTopicMetadata( CompletableFuture partitionFuture = new CompletableFuture(); - client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> { + client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> { long requestId = client.newRequestId(); ByteBuf request = Commands.newPartitionMetadataRequest(topicName.toString(), requestId); clientCnx.newLookup(request, requestId).whenComplete((r, t) -> { @@ -246,7 +278,7 @@ private CompletableFuture getPartitionedTopicMetadata( } client.getCnxPool().releaseConnection(clientCnx); }); - }).exceptionally(connectionException -> { + }, lookupPinnedExecutor).exceptionally(connectionException -> { partitionFuture.completeExceptionally(FutureUtil.unwrapCompletionException(connectionException)); return null; }); @@ -268,7 +300,7 @@ public CompletableFuture> getSchema(TopicName topicName, by return schemaFuture; } InetSocketAddress socketAddress = serviceNameResolver.resolveHost(); - client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> { + client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> { long requestId = client.newRequestId(); ByteBuf request = Commands.newGetSchema(requestId, topicName.toString(), Optional.ofNullable(BytesSchemaVersion.of(version))); @@ -282,7 +314,7 @@ public CompletableFuture> getSchema(TopicName topicName, by } client.getCnxPool().releaseConnection(clientCnx); }); - }).exceptionally(ex -> { + }, lookupPinnedExecutor).exceptionally(ex -> { schemaFuture.completeExceptionally(FutureUtil.unwrapCompletionException(ex)); return null; }); @@ -314,7 +346,7 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress, AtomicLong remainingTime, CompletableFuture> topicsFuture, Mode mode) { - client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> { + client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> { long requestId = client.newRequestId(); ByteBuf request = Commands.newGetTopicsOfNamespaceRequest( namespace.toString(), requestId, mode); @@ -341,7 +373,7 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress, } client.getCnxPool().releaseConnection(clientCnx); }); - }).exceptionally((e) -> { + }, lookupPinnedExecutor).exceptionally((e) -> { long nextDelay = Math.min(backoff.next(), remainingTime.get()); if (nextDelay <= 0) { topicsFuture.completeExceptionally( @@ -351,7 +383,7 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress, return null; } - ((ScheduledExecutorService) executor).schedule(() -> { + ((ScheduledExecutorService) scheduleExecutor).schedule(() -> { log.warn("[namespace: {}] Could not get connection while getTopicsUnderNamespace -- Will try again in" + " {} ms", namespace, nextDelay); remainingTime.addAndGet(-nextDelay); @@ -364,7 +396,9 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress, @Override public void close() throws Exception { - // no-op + if (createdLookupPinnedExecutor && lookupPinnedExecutor != null && !lookupPinnedExecutor.isShutdown()) { + lookupPinnedExecutor.shutdown(); + } } public static class LookupDataResult { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 4c1a301fe3a49..ce33b91f0b980 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -98,6 +98,7 @@ public class PulsarClientImpl implements PulsarClient { private final boolean createdExecutorProviders; private final boolean createdScheduledProviders; + private final boolean createdLookupProviders; private LookupService lookup; private final ConnectionPool cnxPool; @Getter @@ -105,6 +106,7 @@ public class PulsarClientImpl implements PulsarClient { private boolean needStopTimer; private final ExecutorProvider externalExecutorProvider; private final ExecutorProvider internalExecutorProvider; + private final ExecutorProvider lookupExecutorProvider; private final ScheduledExecutorProvider scheduledExecutorProvider; private final boolean createdEventLoopGroup; @@ -144,29 +146,39 @@ public SchemaInfoProvider load(String topicName) { private TransactionCoordinatorClientImpl tcClient; public PulsarClientImpl(ClientConfigurationData conf) throws PulsarClientException { - this(conf, null, null, null, null, null, null); + this(conf, null, null, null, null, null, null, null); } public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException { - this(conf, eventLoopGroup, null, null, null, null, null); + this(conf, eventLoopGroup, null, null, null, null, null, null); } public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool) throws PulsarClientException { - this(conf, eventLoopGroup, cnxPool, null, null, null, null); + this(conf, eventLoopGroup, cnxPool, null, null, null, null, null); } public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool, Timer timer) throws PulsarClientException { - this(conf, eventLoopGroup, cnxPool, timer, null, null, null); + this(conf, eventLoopGroup, cnxPool, timer, null, null, null, null); + } + + public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool connectionPool, + Timer timer, ExecutorProvider externalExecutorProvider, + ExecutorProvider internalExecutorProvider, + ScheduledExecutorProvider scheduledExecutorProvider) + throws PulsarClientException { + this(conf, eventLoopGroup, connectionPool, timer, externalExecutorProvider, internalExecutorProvider, + scheduledExecutorProvider, null); } @Builder(builderClassName = "PulsarClientImplBuilder") private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool connectionPool, Timer timer, ExecutorProvider externalExecutorProvider, ExecutorProvider internalExecutorProvider, - ScheduledExecutorProvider scheduledExecutorProvider) throws PulsarClientException { + ScheduledExecutorProvider scheduledExecutorProvider, + ExecutorProvider lookupExecutorProvider) throws PulsarClientException { EventLoopGroup eventLoopGroupReference = null; ConnectionPool connectionPoolReference = null; try { @@ -178,6 +190,7 @@ private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopG } this.createdExecutorProviders = externalExecutorProvider == null; this.createdScheduledProviders = scheduledExecutorProvider == null; + this.createdLookupProviders = lookupExecutorProvider == null; eventLoopGroupReference = eventLoopGroup != null ? eventLoopGroup : getEventLoopGroup(conf); this.eventLoopGroup = eventLoopGroupReference; if (conf == null || isBlank(conf.getServiceUrl())) { @@ -193,13 +206,17 @@ private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopG new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener"); this.internalExecutorProvider = internalExecutorProvider != null ? internalExecutorProvider : new ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal"); + this.scheduledExecutorProvider = scheduledExecutorProvider != null ? scheduledExecutorProvider : new ScheduledExecutorProvider(conf.getNumIoThreads(), "pulsar-client-scheduled"); + this.lookupExecutorProvider = lookupExecutorProvider != null ? lookupExecutorProvider : + new ExecutorProvider(1, "pulsar-client-lookup"); if (conf.getServiceUrl().startsWith("http")) { lookup = new HttpLookupService(conf, this.eventLoopGroup); } else { lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), conf.getListenerName(), - conf.isUseTls(), this.scheduledExecutorProvider.getExecutor()); + conf.isUseTls(), this.scheduledExecutorProvider.getExecutor(), + this.lookupExecutorProvider.getExecutor()); } if (timer == null) { this.timer = new HashedWheelTimer(getThreadFactory("pulsar-timer"), 1, TimeUnit.MILLISECONDS); @@ -858,6 +875,16 @@ private void shutdownExecutors() throws PulsarClientException { pulsarClientException = PulsarClientException.unwrap(t); } } + + if (createdLookupProviders && lookupExecutorProvider != null && !lookupExecutorProvider.isShutdown()) { + try { + lookupExecutorProvider.shutdownNow(); + } catch (Throwable t) { + log.warn("Failed to shutdown lookupExecutorProvider", t); + pulsarClientException = PulsarClientException.unwrap(t); + } + } + if (pulsarClientException != null) { throw pulsarClientException; } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java index 6c94d56fc7ce9..b9d91cfef64b1 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java @@ -25,28 +25,42 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import io.netty.buffer.ByteBuf; - +import io.netty.util.concurrent.DefaultThreadFactory; import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; - +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.client.api.PulsarClientException.LookupException; import org.apache.pulsar.client.impl.BinaryProtoLookupService.LookupDataResult; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.common.api.proto.BaseCommand; +import org.apache.pulsar.common.api.proto.BaseCommand.Type; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.protocol.Commands; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; public class BinaryProtoLookupServiceTest { private BinaryProtoLookupService lookup; private TopicName topicName; + private ExecutorService internalExecutor; + + @AfterMethod + public void cleanup() throws Exception { + internalExecutor.shutdown(); + lookup.close(); + } @BeforeMethod public void setup() throws Exception { @@ -72,9 +86,15 @@ public void setup() throws Exception { doReturn(cnxPool).when(client).getCnxPool(); doReturn(clientConfig).when(client).getConfiguration(); doReturn(1L).when(client).newRequestId(); + ClientConfigurationData data = new ClientConfigurationData(); + doReturn(data).when(client).getConfiguration(); + internalExecutor = + Executors.newSingleThreadExecutor(new DefaultThreadFactory("pulsar-client-test-internal-executor")); + doReturn(internalExecutor).when(client).getInternalExecutorService(); + + lookup = spy(new BinaryProtoLookupService(client, "pulsar://localhost:6650", null, false, + mock(ExecutorService.class), internalExecutor)); - lookup = spy( - new BinaryProtoLookupService(client, "pulsar://localhost:6650", false, mock(ExecutorService.class))); topicName = TopicName.get("persistent://tenant1/ns1/t1"); } @@ -116,6 +136,37 @@ public void maxLookupRedirectsTest3() throws Exception { } } + @Test + public void testCommandUnChangedInDifferentThread() throws Exception { + BaseCommand successCommand = Commands.newSuccessCommand(10000); + lookup.getBroker(topicName).get(); + assertEquals(successCommand.getType(), Type.SUCCESS); + lookup.getPartitionedTopicMetadata(topicName).get(); + assertEquals(successCommand.getType(), Type.SUCCESS); + } + + @Test + public void testCommandChangedInSameThread() throws Exception { + AtomicReference successCommand = new AtomicReference<>(); + internalExecutor.execute(() -> successCommand.set(Commands.newSuccessCommand(10000))); + Awaitility.await().untilAsserted(() -> { + BaseCommand baseCommand = successCommand.get(); + assertNotNull(baseCommand); + assertEquals(baseCommand.getType(), Type.SUCCESS); + }); + lookup.getBroker(topicName).get(); + assertEquals(successCommand.get().getType(), Type.LOOKUP); + + internalExecutor.execute(() -> successCommand.set(Commands.newSuccessCommand(10000))); + Awaitility.await().untilAsserted(() -> { + BaseCommand baseCommand = successCommand.get(); + assertNotNull(baseCommand); + assertEquals(baseCommand.getType(), Type.SUCCESS); + }); + lookup.getPartitionedTopicMetadata(topicName).get(); + assertEquals(successCommand.get().getType(), Type.PARTITIONED_METADATA); + } + private static LookupDataResult createLookupDataResult(String brokerUrl, boolean redirect) throws Exception { LookupDataResult lookupResult = new LookupDataResult(-1);