From 9a30a179064067b1dd418923e4d6640c8e137256 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 6 Jun 2024 17:29:07 +0800 Subject: [PATCH] [fix][broker] Fix NPE after publishing a tombstone to the service unit channel ### Motivation NPE will happen in `UnloadManager#handleEvent` after https://github.com/apache/pulsar/pull/22743. It's because the `Init` state is always associated with a null `ServiceUnitStateData`. ``` java.lang.NullPointerException: Cannot invoke "org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData.force()" because "data" is null at org.apache.pulsar.broker.loadbalance.extensions.manager.UnloadManager.handleEvent(UnloadManager.java:204) ~[classes/:?] at org.apache.pulsar.broker.loadbalance.extensions.channel.StateChangeListeners.lambda$notify$3(StateChangeListeners.java:74) ~[classes/:?] at java.base/java.util.concurrent.CopyOnWriteArrayList.forEach(CopyOnWriteArrayList.java:807) ~[?:?] at org.apache.pulsar.broker.loadbalance.extensions.channel.StateChangeListeners.notify(StateChangeListeners.java:72) ~[classes/:?] at org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.handleInitEvent(ServiceUnitStateChannelImpl.java:902) ~[classes/:?] at org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.handle(ServiceUnitStateChannelImpl.java:715) ~[classes/:?] ``` ### Modifications In `UnloadManager#handleEvent`, assume `data` is null and call `complete` directly. Fix `UnloadManagerTest`, which passes a non-null `ServiceUnitStateData` and `Init` to `handleEvent`. --- .../loadbalance/extensions/manager/UnloadManager.java | 6 +++--- .../loadbalance/extensions/manager/UnloadManagerTest.java | 6 +----- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java index 6b745345c0a43..413565247419f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java @@ -22,6 +22,7 @@ import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Owned; import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Failure; import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Unknown; +import static com.google.common.base.Preconditions.checkArgument; import com.google.common.annotations.VisibleForTesting; import io.prometheus.client.Histogram; import java.util.Map; @@ -201,9 +202,8 @@ public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable } } case Init -> { - if (data.force()) { - complete(serviceUnit, t); - } + checkArgument(data == null, "Init state must be associated with null data"); + complete(serviceUnit, t); } case Owned -> complete(serviceUnit, t); case Releasing -> LatencyMetric.RELEASE.endMeasurement(serviceUnit); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java index f7deb072688c5..be78cfcb595c5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java @@ -126,11 +126,7 @@ public void testSuccess() throws IllegalAccessException, ExecutionException, Int assertEquals(inFlightUnloadRequestMap.size(), 1); // Success with Init state. - manager.handleEvent(bundle, - new ServiceUnitStateData(ServiceUnitState.Init, null, srcBroker, false, VERSION_ID_INIT), null); - assertEquals(inFlightUnloadRequestMap.size(), 1); - manager.handleEvent(bundle, - new ServiceUnitStateData(ServiceUnitState.Init, null, srcBroker, true, VERSION_ID_INIT), null); + manager.handleEvent(bundle, null, null); assertEquals(inFlightUnloadRequestMap.size(), 0); future.get(); assertEquals(counter.getBreakdownCounters().get(Success).get(Admin).get(), 1);