Skip to content

Commit

Permalink
CURATOR-710: Fix leaking watch in EnsembleTracker (#508)
Browse files Browse the repository at this point in the history
CURATOR-667(#474) fixes asynchronous event path for `getConfig` to
"/zookeeper/config" by using `CuratorFramework::usingNamespace(null)` to
fetch data.

It causes watcher not registering to possible `WatcherRemovalManager`,
so leaking in `WatcherRemoveCuratorFramework::removeWatchers`.

Signed-off-by: tison <[email protected]>
Co-authored-by: tison <[email protected]>
  • Loading branch information
kezhuw and tisonkun authored Jan 19, 2025
1 parent 8eb6f9a commit fb78e23
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ public class GetConfigBuilderImpl
private Stat stat;

public GetConfigBuilderImpl(CuratorFrameworkImpl client) {
this.client = (CuratorFrameworkImpl) client.usingNamespace(null);
backgrounding = new Backgrounding();
watching = new Watching(this.client);
this(client, new Backgrounding(), null, null);
}

public GetConfigBuilderImpl(CuratorFrameworkImpl client, Backgrounding backgrounding, Watcher watcher, Stat stat) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,13 @@ class WatcherRemovalFacade extends CuratorFrameworkImpl implements WatcherRemove
private final WatcherRemovalManager removalManager;

WatcherRemovalFacade(CuratorFrameworkImpl client) {
this(client, new WatcherRemovalManager(client));
}

private WatcherRemovalFacade(CuratorFrameworkImpl client, WatcherRemovalManager removalManager) {
super(client);
this.client = client;
removalManager = new WatcherRemovalManager(client);
this.removalManager = removalManager;
}

@Override
Expand Down Expand Up @@ -73,7 +77,8 @@ public CuratorFramework nonNamespaceView() {

@Override
public CuratorFramework usingNamespace(String newNamespace) {
return client.usingNamespace(newNamespace);
final CuratorFrameworkImpl newClient = (CuratorFrameworkImpl) client.usingNamespace(newNamespace);
return new WatcherRemovalFacade(newClient, removalManager);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,8 @@ void commitWatcher(int rc, boolean isExists) {
doCommit = (rc == KeeperException.Code.OK.intValue());
}

if (doCommit && (namespaceWatcher != null)) {
if (client.getWatcherRemovalManager() != null) {
client.getWatcherRemovalManager().add(namespaceWatcher);
}
if (doCommit && namespaceWatcher != null && client.getWatcherRemovalManager() != null) {
client.getWatcherRemovalManager().add(namespaceWatcher);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
Expand All @@ -34,12 +35,27 @@
import org.apache.curator.test.Timing;
import org.apache.curator.test.WatchersDebug;
import org.apache.curator.test.compatibility.CuratorTestBase;
import org.apache.curator.utils.DebugUtils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.server.quorum.QuorumPeer;
import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class TestWatcherRemovalManager extends CuratorTestBase {
private static final String superUserPasswordDigest = "curator-test:zghsj3JfJqK7DbWf0RQ1BgbJH9w="; // ran from
private static final String superUserPassword = "curator-test";

@BeforeEach
@Override
public void setup() throws Exception {
System.setProperty("zookeeper.DigestAuthenticationProvider.superDigest", superUserPasswordDigest);
super.setup();
}

@Test
public void testSameWatcherDifferentPaths1Triggered() throws Exception {
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
Expand Down Expand Up @@ -302,6 +318,54 @@ public void testBasicNamespace3() throws Exception {
}
}

@Test
public void testEnsembleTracker() throws Exception {
// given: client with ensemble tracker
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(server.getConnectString())
.retryPolicy(new RetryOneTime(1))
.namespace("hey")
.ensembleTracker(true)
.authorization("digest", superUserPassword.getBytes())
.build();
try {
client.start();

// We are using standalone, so "/zookeeper/config" will be empty.
// So let's set it directly.
QuorumMaj quorumMaj = new QuorumMaj(Collections.singletonMap(
1L,
new QuorumPeer.QuorumServer(1, "127.0.0.1:2182:2183:participant;" + server.getConnectString())));
quorumMaj.setVersion(1);
client.usingNamespace(null)
.setData()
.forPath(ZooDefs.CONFIG_NODE, quorumMaj.toString().getBytes());

// when: zookeeper config node data fetched
while (client.getCurrentConfig().getVersion() == 0) {
Thread.sleep(100);
}

// then: the watcher must be attached
assertEquals(
1,
WatchersDebug.getDataWatches(client.getZookeeperClient().getZooKeeper())
.size());

// when: ensemble tracker closed
System.setProperty(DebugUtils.PROPERTY_REMOVE_WATCHERS_IN_FOREGROUND, "true");
((CuratorFrameworkImpl) client).getEnsembleTracker().close();

// then: the watcher must be removed
assertEquals(
0,
WatchersDebug.getDataWatches(client.getZookeeperClient().getZooKeeper())
.size());
} finally {
TestCleanState.closeAndTestClean(client);
}
}

@Test
public void testSameWatcher() throws Exception {
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
Expand Down

0 comments on commit fb78e23

Please sign in to comment.