diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java index 1e8b26beee38a..9882b15450e40 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java @@ -20,6 +20,8 @@ import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.assertNotNull; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Sets; @@ -33,6 +35,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; @@ -41,6 +44,7 @@ import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.impl.auth.AuthenticationTls; import org.apache.pulsar.common.functions.FunctionConfig; +import org.apache.pulsar.common.functions.WorkerInfo; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.util.ClassLoaderUtils; import org.apache.pulsar.common.util.ObjectMapperFactory; @@ -242,6 +246,18 @@ public void testFunctionsCreation() throws Exception { log.info(" -------- Start test function : {}", functionName); + int finalI = i; + Awaitility.await().atMost(1, TimeUnit.MINUTES).pollInterval(1, TimeUnit.SECONDS).untilAsserted(() -> { + final PulsarWorkerService workerService = ((PulsarWorkerService) fnWorkerServices[finalI]); + final LeaderService leaderService = workerService.getLeaderService(); + assertNotNull(leaderService); + if (leaderService.isLeader()) { + assertTrue(true); + } else { + final WorkerInfo workerInfo = workerService.getMembershipManager().getLeader(); + assertTrue(workerInfo != null && !workerInfo.getWorkerId().equals(workerService.getWorkerConfig().getWorkerId())); + } + }); pulsarAdmins[i].functions().createFunctionWithUrl( functionConfig, jarFilePathUrl ); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java index 7f035b5562f24..e7816f06aacc8 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java @@ -41,7 +41,7 @@ public class LeaderService implements AutoCloseable, ConsumerEventListener { private ConsumerImpl consumer; private final WorkerConfig workerConfig; private final PulsarClient pulsarClient; - private boolean isLeader = false; + private volatile boolean isLeader = false; static final String COORDINATION_TOPIC_SUBSCRIPTION = "participants"; @@ -172,7 +172,7 @@ public synchronized void becameInactive(Consumer consumer, int partitionId) { } } - public synchronized boolean isLeader() { + public boolean isLeader() { return isLeader; }