Skip to content

Commit

Permalink
[fix][fn] Fix Deadlock in Functions Worker LeaderService (apache#21711)
Browse files Browse the repository at this point in the history
Fixes apache#21501

### Motivation

No need to `synchronized` the method `isLeader` in LeaderService

See the deadlock stack :
```
"pulsar-external-listener-44525-1":
	at org.apache.pulsar.functions.worker.FunctionMetaDataManager.giveupLeadership(FunctionMetaDataManager.java)
	- waiting to lock <0x0000100013535c90> (a org.apache.pulsar.functions.worker.FunctionMetaDataManager)
	at org.apache.pulsar.functions.worker.LeaderService.becameInactive(LeaderService.java:167)
	- locked <0x000010001344c6d8> (a org.apache.pulsar.functions.worker.LeaderService)
	at org.apache.pulsar.client.impl.ConsumerImpl.lambda$activeConsumerChanged$27(ConsumerImpl.java:1136)
	at org.apache.pulsar.client.impl.ConsumerImpl$$Lambda$2606/0x00007f854ce9cb10.run(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1136)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:635)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run([email protected]/Thread.java:833)
"pulsar-web-44514-6":
	at org.apache.pulsar.functions.worker.LeaderService.isLeader(LeaderService.java)
	- waiting to lock <0x000010001344c6d8> (a org.apache.pulsar.functions.worker.LeaderService)
	at org.apache.pulsar.functions.worker.SchedulerManager.scheduleInternal(SchedulerManager.java:200)
	at org.apache.pulsar.functions.worker.SchedulerManager.schedule(SchedulerManager.java:229)
	at org.apache.pulsar.functions.worker.FunctionMetaDataManager.updateFunctionOnLeader(FunctionMetaDataManager.java:251)
	- locked <0x0000100013535c90> (a org.apache.pulsar.functions.worker.FunctionMetaDataManager)
	at org.apache.pulsar.functions.worker.rest.api.ComponentImpl.internalProcessFunctionRequest(ComponentImpl.java:1775)
	at org.apache.pulsar.functions.worker.rest.api.ComponentImpl.updateRequest(ComponentImpl.java:996)
	at org.apache.pulsar.functions.worker.rest.api.FunctionsImpl.registerFunction(FunctionsImpl.java:222)
	at org.apache.pulsar.broker.admin.impl.FunctionsBase.registerFunction(FunctionsBase.java:196)
```
  • Loading branch information
Technoboy- authored Dec 12, 2023
1 parent 19b9344 commit 3396065
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.assertNotNull;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
Expand All @@ -33,6 +35,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
Expand All @@ -41,6 +44,7 @@
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.ClassLoaderUtils;
import org.apache.pulsar.common.util.ObjectMapperFactory;
Expand Down Expand Up @@ -242,6 +246,18 @@ public void testFunctionsCreation() throws Exception {

log.info(" -------- Start test function : {}", functionName);

int finalI = i;
Awaitility.await().atMost(1, TimeUnit.MINUTES).pollInterval(1, TimeUnit.SECONDS).untilAsserted(() -> {
final PulsarWorkerService workerService = ((PulsarWorkerService) fnWorkerServices[finalI]);
final LeaderService leaderService = workerService.getLeaderService();
assertNotNull(leaderService);
if (leaderService.isLeader()) {
assertTrue(true);
} else {
final WorkerInfo workerInfo = workerService.getMembershipManager().getLeader();
assertTrue(workerInfo != null && !workerInfo.getWorkerId().equals(workerService.getWorkerConfig().getWorkerId()));
}
});
pulsarAdmins[i].functions().createFunctionWithUrl(
functionConfig, jarFilePathUrl
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class LeaderService implements AutoCloseable, ConsumerEventListener {
private ConsumerImpl<byte[]> consumer;
private final WorkerConfig workerConfig;
private final PulsarClient pulsarClient;
private boolean isLeader = false;
private volatile boolean isLeader = false;

static final String COORDINATION_TOPIC_SUBSCRIPTION = "participants";

Expand Down Expand Up @@ -172,7 +172,7 @@ public synchronized void becameInactive(Consumer<?> consumer, int partitionId) {
}
}

public synchronized boolean isLeader() {
public boolean isLeader() {
return isLeader;
}

Expand Down

0 comments on commit 3396065

Please sign in to comment.