From a8d09ce3aeaa3440c079c554417b1c192a581526 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Sat, 4 Nov 2023 07:26:07 +0200 Subject: [PATCH] [fix][test] Fix multiple thread leaks in tests, part 2 (#21513) --- .../tests/ThreadLeakDetectorListener.java | 4 +++ .../broker/admin/v1/V1_AdminApiTest.java | 1 + .../pulsar/admin/cli/PulsarAdminToolTest.java | 16 ++++++---- .../pulsar/metadata/MetadataCacheTest.java | 1 + .../pulsar/metadata/MetadataStoreTest.java | 1 + .../PulsarLedgerAuditorManagerTest.java | 2 ++ .../impl/RocksdbMetadataStoreTest.java | 1 + .../pulsar/proxy/server/ProxyService.java | 29 +++++++++++++++++-- .../server/ProxyWithAuthorizationTest.java | 3 ++ .../impl/TxnLogBufferedWriterTest.java | 4 ++- 10 files changed, 52 insertions(+), 10 deletions(-) diff --git a/buildtools/src/main/java/org/apache/pulsar/tests/ThreadLeakDetectorListener.java b/buildtools/src/main/java/org/apache/pulsar/tests/ThreadLeakDetectorListener.java index ef6296a5f6cc9..98ac0ae2c2776 100644 --- a/buildtools/src/main/java/org/apache/pulsar/tests/ThreadLeakDetectorListener.java +++ b/buildtools/src/main/java/org/apache/pulsar/tests/ThreadLeakDetectorListener.java @@ -218,6 +218,10 @@ private static boolean shouldSkipThread(Thread thread) { if (threadName.startsWith("testcontainers-wait-")) { return true; } + // org.rnorth.ducttape.timeouts.Timeouts.EXECUTOR_SERVICE thread pool, used by Testcontainers + if (threadName.startsWith("ducttape-")) { + return true; + } } Runnable target = extractRunnableTarget(thread); if (target != null) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java index 3d195b5167949..d9f2d41a30b6b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java @@ -173,6 +173,7 @@ public void setup() throws Exception { @AfterClass(alwaysRun = true) @Override public void cleanup() throws Exception { + pulsar.getConfiguration().setBrokerShutdownTimeoutMs(0); adminTls.close(); otheradmin.close(); super.internalCleanup(); diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index a722abe19df81..07efa5165e5fd 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -54,6 +54,7 @@ import java.util.Properties; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.admin.cli.extensions.CustomCommandFactory; import org.apache.pulsar.admin.cli.utils.SchemaExtractor; @@ -2254,7 +2255,9 @@ public void requestTimeout() throws Exception { //Ok } - ClientConfigurationData conf = ((PulsarAdminImpl)tool.getPulsarAdminSupplier().get()).getClientConfigData(); + @Cleanup + PulsarAdminImpl pulsarAdmin = (PulsarAdminImpl) tool.getPulsarAdminSupplier().get(); + ClientConfigurationData conf = pulsarAdmin.getClientConfigData(); assertEquals(1000, conf.getRequestTimeoutMs()); } @@ -2264,7 +2267,6 @@ public void testSourceCreateMissingSourceConfigFileFaileWithExitCode1() throws E Properties properties = new Properties(); properties.put("webServiceUrl", "http://localhost:2181"); PulsarAdminTool tool = new PulsarAdminTool(properties); - assertFalse(tool.run("sources create --source-config-file doesnotexist.yaml".split(" "))); } @@ -2298,8 +2300,9 @@ public void testAuthTlsWithJsonParam() throws Exception { } // validate Authentication-tls has been configured - ClientConfigurationData conf = ((PulsarAdminImpl)tool.getPulsarAdminSupplier().get()) - .getClientConfigData(); + @Cleanup + PulsarAdminImpl pulsarAdmin = (PulsarAdminImpl) tool.getPulsarAdminSupplier().get(); + ClientConfigurationData conf = pulsarAdmin.getClientConfigData(); AuthenticationTls atuh = (AuthenticationTls) conf.getAuthentication(); assertEquals(atuh.getCertFilePath(), certFilePath); assertEquals(atuh.getKeyFilePath(), keyFilePath); @@ -2312,8 +2315,9 @@ public void testAuthTlsWithJsonParam() throws Exception { // Ok } - conf = conf = ((PulsarAdminImpl)tool.getPulsarAdminSupplier().get()) - .getClientConfigData(); + @Cleanup + PulsarAdminImpl pulsarAdmin2 = (PulsarAdminImpl) tool.getPulsarAdminSupplier().get(); + conf = pulsarAdmin2.getClientConfigData(); atuh = (AuthenticationTls) conf.getAuthentication(); assertEquals(atuh.getCertFilePath(), certFilePath); assertEquals(atuh.getKeyFilePath(), keyFilePath); diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java index 0c30b238049c0..df59d25bdcc0e 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java @@ -486,6 +486,7 @@ public void readModifyUpdateBadVersionRetry() throws Exception { String url = zks.getConnectionString(); @Cleanup MetadataStore sourceStore1 = MetadataStoreFactory.create(url, MetadataStoreConfig.builder().build()); + @Cleanup MetadataStore sourceStore2 = MetadataStoreFactory.create(url, MetadataStoreConfig.builder().build()); MetadataCache objCache1 = sourceStore1.getMetadataCache(MyClass.class); diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java index 246661edc43ee..244ed025e3ed9 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java @@ -515,6 +515,7 @@ public void testPersistent(String provider, Supplier urlSupplier) throws @Test(dataProvider = "impl") public void testConcurrentPutGetOneKey(String provider, Supplier urlSupplier) throws Exception { + @Cleanup MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(), MetadataStoreConfig.builder().fsyncEnable(false).build()); byte[] data = new byte[]{0}; diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerAuditorManagerTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerAuditorManagerTest.java index b5232f49bf44a..bb3d157b3c5c8 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerAuditorManagerTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerAuditorManagerTest.java @@ -77,6 +77,7 @@ public void testSimple(String provider, Supplier urlSupplier) throws Exc methodSetup(urlSupplier); + @Cleanup LedgerAuditorManager lam1 = new PulsarLedgerAuditorManager(store1, ledgersRootPath); assertNull(lam1.getCurrentAuditor()); @@ -89,6 +90,7 @@ public void testSimple(String provider, Supplier urlSupplier) throws Exc @Cleanup("shutdownNow") ExecutorService executor = Executors.newCachedThreadPool(); + @Cleanup LedgerAuditorManager lam2 = new PulsarLedgerAuditorManager(store2, ledgersRootPath); assertEquals(lam2.getCurrentAuditor(), BookieId.parse("bookie-1:3181")); diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStoreTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStoreTest.java index 7700fb3654d7e..e0f509cbce9d7 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStoreTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStoreTest.java @@ -132,6 +132,7 @@ public void testMultipleInstances() throws Exception { store1.close(); store2.put("/test-2", new byte[0], Optional.empty()).join(); Assert.assertTrue(store2.exists("/test-2").join()); + store2.close(); FileUtils.deleteQuietly(tempDir.toFile()); } diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java index 216d9ea308539..719c7c2cbdade 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java @@ -34,6 +34,7 @@ import io.netty.resolver.dns.DnsAddressResolverGroup; import io.netty.resolver.dns.DnsNameResolverBuilder; import io.netty.util.concurrent.DefaultThreadFactory; +import io.netty.util.concurrent.Future; import io.prometheus.client.Counter; import io.prometheus.client.Gauge; import java.io.Closeable; @@ -151,6 +152,8 @@ public class ProxyService implements Closeable { @Getter private final ConnectionController connectionController; + private boolean gracefulShutdown = true; + public ProxyService(ProxyConfiguration proxyConfig, AuthenticationService authenticationService) throws Exception { requireNonNull(proxyConfig); @@ -373,7 +376,7 @@ public void close() throws IOException { // Don't accept any new connections try { - acceptorGroup.shutdownGracefully().sync(); + shutdownEventLoop(acceptorGroup).sync(); } catch (InterruptedException e) { LOG.info("Shutdown of acceptorGroup interrupted"); Thread.currentThread().interrupt(); @@ -413,14 +416,14 @@ public void close() throws IOException { } } try { - workerGroup.shutdownGracefully().sync(); + shutdownEventLoop(workerGroup).sync(); } catch (InterruptedException e) { LOG.info("Shutdown of workerGroup interrupted"); Thread.currentThread().interrupt(); } for (EventLoopGroup group : extensionsWorkerGroups) { try { - group.shutdownGracefully().sync(); + shutdownEventLoop(group).sync(); } catch (InterruptedException e) { LOG.info("Shutdown of {} interrupted", group); Thread.currentThread().interrupt(); @@ -532,4 +535,24 @@ public synchronized void addPrometheusRawMetricsProvider(PrometheusRawMetricsPro protected LookupProxyHandler newLookupProxyHandler(ProxyConnection proxyConnection) { return new LookupProxyHandler(this, proxyConnection); } + + // Shutdown the event loop. + // If graceful is true, will wait for the current requests to be completed, up to 15 seconds. + // Graceful shutdown can be disabled by setting the gracefulShutdown flag to false. This is used in tests + // to speed up the shutdown process. + private Future shutdownEventLoop(EventLoopGroup eventLoop) { + if (gracefulShutdown) { + return eventLoop.shutdownGracefully(); + } else { + return eventLoop.shutdownGracefully(0, 0, TimeUnit.SECONDS); + } + } + + public boolean isGracefulShutdown() { + return gracefulShutdown; + } + + public void setGracefulShutdown(boolean gracefulShutdown) { + this.gracefulShutdown = gracefulShutdown; + } } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java index fc01dee3da01d..4e4c3c550cfd6 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java @@ -229,6 +229,7 @@ protected void setup() throws Exception { AuthenticationService authService = new AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig)); proxyService = Mockito.spy(new ProxyService(proxyConfig, authService)); + proxyService.setGracefulShutdown(false); webServer = new WebServer(proxyConfig, authService); } @@ -455,9 +456,11 @@ public void tlsCiphersAndProtocols(Set tlsCiphers, Set tlsProtoc proxyConfig.setTlsProtocols(tlsProtocols); proxyConfig.setTlsCiphers(tlsCiphers); + @Cleanup ProxyService proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)))); + proxyService.setGracefulShutdown(false); try { proxyService.start(); } catch (Exception ex) { diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java index 3c6b2e382e311..8b496af6aa718 100644 --- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java +++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java @@ -927,6 +927,7 @@ private void releaseTxnLogBufferedWriterContext(TxnLogBufferedWriterContext cont context.txnLogBufferedWriter.close().get(); context.metrics.close(); context.timer.stop(); + context.orderedExecutor.shutdownNow(); CollectorRegistry.defaultRegistry.clear(); } @@ -936,6 +937,7 @@ private static class TxnLogBufferedWriterContext{ MockedManagedLedger mockedManagedLedger; Timer timer; TxnLogBufferedWriterMetricsStats metrics; + OrderedExecutor orderedExecutor; } @AllArgsConstructor @@ -970,7 +972,7 @@ private TxnLogBufferedWriterContext createTxnBufferedWriterContextWithMetrics( dataSerializer, batchedWriteMaxRecords, batchedWriteMaxSize, batchedWriteMaxDelayInMillis, true, metricsStats); return new TxnLogBufferedWriterContext(txnLogBufferedWriter, mockedManagedLedger, transactionTimer, - metricsStats); + metricsStats, orderedExecutor); } private void verifyTheCounterMetrics(int triggeredByRecordCount, int triggeredByMaxSize, int triggeredByMaxDelay,