From 3396065a3a62af3e3586700f0bbfcbff93716b48 Mon Sep 17 00:00:00 2001 From: Jiwei Guo Date: Tue, 12 Dec 2023 17:09:17 +0800 Subject: [PATCH] [fix][fn] Fix Deadlock in Functions Worker LeaderService (#21711) Fixes #21501 ### Motivation No need to `synchronized` the method `isLeader` in LeaderService See the deadlock stack : ``` "pulsar-external-listener-44525-1": at org.apache.pulsar.functions.worker.FunctionMetaDataManager.giveupLeadership(FunctionMetaDataManager.java) - waiting to lock <0x0000100013535c90> (a org.apache.pulsar.functions.worker.FunctionMetaDataManager) at org.apache.pulsar.functions.worker.LeaderService.becameInactive(LeaderService.java:167) - locked <0x000010001344c6d8> (a org.apache.pulsar.functions.worker.LeaderService) at org.apache.pulsar.client.impl.ConsumerImpl.lambda$activeConsumerChanged$27(ConsumerImpl.java:1136) at org.apache.pulsar.client.impl.ConsumerImpl$$Lambda$2606/0x00007f854ce9cb10.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@17.0.8.1/ThreadPoolExecutor.java:1136) at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@17.0.8.1/ThreadPoolExecutor.java:635) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(java.base@17.0.8.1/Thread.java:833) "pulsar-web-44514-6": at org.apache.pulsar.functions.worker.LeaderService.isLeader(LeaderService.java) - waiting to lock <0x000010001344c6d8> (a org.apache.pulsar.functions.worker.LeaderService) at org.apache.pulsar.functions.worker.SchedulerManager.scheduleInternal(SchedulerManager.java:200) at org.apache.pulsar.functions.worker.SchedulerManager.schedule(SchedulerManager.java:229) at org.apache.pulsar.functions.worker.FunctionMetaDataManager.updateFunctionOnLeader(FunctionMetaDataManager.java:251) - locked <0x0000100013535c90> (a org.apache.pulsar.functions.worker.FunctionMetaDataManager) at org.apache.pulsar.functions.worker.rest.api.ComponentImpl.internalProcessFunctionRequest(ComponentImpl.java:1775) at org.apache.pulsar.functions.worker.rest.api.ComponentImpl.updateRequest(ComponentImpl.java:996) at org.apache.pulsar.functions.worker.rest.api.FunctionsImpl.registerFunction(FunctionsImpl.java:222) at org.apache.pulsar.broker.admin.impl.FunctionsBase.registerFunction(FunctionsBase.java:196) ``` --- .../functions/worker/PulsarFunctionTlsTest.java | 16 ++++++++++++++++ .../pulsar/functions/worker/LeaderService.java | 4 ++-- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java index 1e8b26beee38a..9882b15450e40 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java @@ -20,6 +20,8 @@ import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.assertNotNull; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Sets; @@ -33,6 +35,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; @@ -41,6 +44,7 @@ import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.impl.auth.AuthenticationTls; import org.apache.pulsar.common.functions.FunctionConfig; +import org.apache.pulsar.common.functions.WorkerInfo; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.util.ClassLoaderUtils; import org.apache.pulsar.common.util.ObjectMapperFactory; @@ -242,6 +246,18 @@ public void testFunctionsCreation() throws Exception { log.info(" -------- Start test function : {}", functionName); + int finalI = i; + Awaitility.await().atMost(1, TimeUnit.MINUTES).pollInterval(1, TimeUnit.SECONDS).untilAsserted(() -> { + final PulsarWorkerService workerService = ((PulsarWorkerService) fnWorkerServices[finalI]); + final LeaderService leaderService = workerService.getLeaderService(); + assertNotNull(leaderService); + if (leaderService.isLeader()) { + assertTrue(true); + } else { + final WorkerInfo workerInfo = workerService.getMembershipManager().getLeader(); + assertTrue(workerInfo != null && !workerInfo.getWorkerId().equals(workerService.getWorkerConfig().getWorkerId())); + } + }); pulsarAdmins[i].functions().createFunctionWithUrl( functionConfig, jarFilePathUrl ); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java index 7f035b5562f24..e7816f06aacc8 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java @@ -41,7 +41,7 @@ public class LeaderService implements AutoCloseable, ConsumerEventListener { private ConsumerImpl consumer; private final WorkerConfig workerConfig; private final PulsarClient pulsarClient; - private boolean isLeader = false; + private volatile boolean isLeader = false; static final String COORDINATION_TOPIC_SUBSCRIPTION = "participants"; @@ -172,7 +172,7 @@ public synchronized void becameInactive(Consumer consumer, int partitionId) { } } - public synchronized boolean isLeader() { + public boolean isLeader() { return isLeader; }