From e54498aa952b486484b4c244cbe62a49e5cf1eeb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20B=C4=85czkowski?= Date: Fri, 13 Dec 2024 11:59:47 +0100 Subject: [PATCH] Adjust for zero token nodes Adds ZeroTokenNodesIT that checks the behaviour of the driver when zero-token nodes are involved. Adds two options that control if peers that do not own any tokens are considered valid and if peers that do not own any tokens are considered for query planning. Query planning setting does not by default override LBP behavior. DCAwareRoundRobinPolicy, RackAwareRoundRobinPolicy and RoundRobinPolicy were adjusted to follow this setting. Driver behavior with those options disabled, which is the default option, should remain unchanged. --- .../driver/core/ControlConnection.java | 12 +- .../java/com/datastax/driver/core/Host.java | 13 + .../datastax/driver/core/QueryOptions.java | 32 +++ .../policies/DCAwareRoundRobinPolicy.java | 9 + .../policies/RackAwareRoundRobinPolicy.java | 9 + .../core/policies/RoundRobinPolicy.java | 13 +- .../driver/core/ZeroTokenNodesIT.java | 240 ++++++++++++++++++ 7 files changed, 325 insertions(+), 3 deletions(-) create mode 100644 driver-core/src/test/java/com/datastax/driver/core/ZeroTokenNodesIT.java diff --git a/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java b/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java index 7f3c3bde636..780670ac692 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java +++ b/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java @@ -681,6 +681,10 @@ private static void updateInfo( host.setHostId(row.getUUID("host_id")); host.setSchemaVersion(row.getUUID("schema_version")); + if (row.getColumnDefinitions().contains("tokens")) { + host.setZeroToken(row.isNull("tokens")); + } + EndPoint endPoint = cluster.configuration.getPolicies().getEndPointFactory().create(row); if (endPoint != null) { host.setEndPoint(endPoint); @@ -950,8 +954,12 @@ private void refreshNodeListAndTokenMap( peerHost.setSchemaVersion(schemaVersions.get(i)); } - if (metadataEnabled && factory != null && allTokens.get(i) != null) + if (metadataEnabled && factory != null && allTokens.get(i) != null) { tokenMap.put(peerHost, allTokens.get(i)); + } + if (allTokens.get(i) == null) { + peerHost.setZeroToken(true); + } if (!isNew && isInitialConnection) { // If we're at init and the node already existed, it means it was a contact point, so we @@ -1008,7 +1016,7 @@ private boolean isValidPeer(Row peerRow, boolean logIfInvalid) { && peerRow.getColumnDefinitions().contains("rack") && !peerRow.isNull("rack") && peerRow.getColumnDefinitions().contains("tokens") - && !peerRow.isNull("tokens"); + && (!peerRow.isNull("tokens") || cluster.configuration.getQueryOptions().shouldConsiderZeroTokenNodesValidPeers()); } if (!isValid && logIfInvalid) logger.warn( diff --git a/driver-core/src/main/java/com/datastax/driver/core/Host.java b/driver-core/src/main/java/com/datastax/driver/core/Host.java index 603a5dc1010..4086085862d 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/Host.java +++ b/driver-core/src/main/java/com/datastax/driver/core/Host.java @@ -73,6 +73,9 @@ public class Host { // Whether host supports TABLETS_ROUTING_V1 private volatile TabletInfo tabletInfo = null; + // Whether at the time of the last refresh system.peers/local row had column 'tokens' with 'null' value. + private volatile boolean zeroToken = false; + enum State { ADDED, DOWN, @@ -461,6 +464,16 @@ public void setTabletInfo(TabletInfo tabletInfo) { this.tabletInfo = tabletInfo; } + public boolean setZeroToken(boolean newValue) { + boolean previousValue = this.zeroToken; + this.zeroToken = newValue; + return previousValue; + } + + public boolean isZeroToken() { + return this.zeroToken; + } + /** * Returns whether the host is considered up by the driver. * diff --git a/driver-core/src/main/java/com/datastax/driver/core/QueryOptions.java b/driver-core/src/main/java/com/datastax/driver/core/QueryOptions.java index 137c56aa747..204b3d40eda 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/QueryOptions.java +++ b/driver-core/src/main/java/com/datastax/driver/core/QueryOptions.java @@ -74,6 +74,10 @@ public class QueryOptions { private volatile boolean addOriginalContactsToReconnectionPlan = false; + private volatile boolean excludeZeroTokenNodesFromQueryPlan = false; + + private volatile boolean considerZeroTokenNodesValidPeers = false; + /** * Creates a new {@link QueryOptions} instance using the {@link #DEFAULT_CONSISTENCY_LEVEL}, * {@link #DEFAULT_SERIAL_CONSISTENCY_LEVEL} and {@link #DEFAULT_FETCH_SIZE}. @@ -521,6 +525,34 @@ public boolean shouldAddOriginalContactsToReconnectionPlan() { return this.addOriginalContactsToReconnectionPlan; } + /** + * Whether the load balancing policies should consider zero-token nodes as valid targets for regular queries. + * Decision ultimately lies in LBP implementation. Config should be generally honored by current implementations. + */ + public QueryOptions setExcludeZeroTokenNodesFromQueryPlan(boolean enabled) { + this.excludeZeroTokenNodesFromQueryPlan = enabled; + return this; + } + + public boolean shouldExcludeZeroTokenNodesFromQueryPlan() { + return this.excludeZeroTokenNodesFromQueryPlan; + } + + /** + * Scylla zero-token nodes have null value in "tokens" column in their system.peers rows. + * By default extended peer check considers such rows as invalid. + * Enabling this option will exclude this field from the check, and allow such rows + * from system.peers queries to be used when refreshing metadata. + */ + public QueryOptions setConsiderZeroTokenNodesValidPeers(boolean enabled) { + this.considerZeroTokenNodesValidPeers = enabled; + return this; + } + + public boolean shouldConsiderZeroTokenNodesValidPeers() { + return this.considerZeroTokenNodesValidPeers; + } + @Override public boolean equals(Object that) { if (that == null || !(that instanceof QueryOptions)) { diff --git a/driver-core/src/main/java/com/datastax/driver/core/policies/DCAwareRoundRobinPolicy.java b/driver-core/src/main/java/com/datastax/driver/core/policies/DCAwareRoundRobinPolicy.java index a1274f6b458..bbc329a513f 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/policies/DCAwareRoundRobinPolicy.java +++ b/driver-core/src/main/java/com/datastax/driver/core/policies/DCAwareRoundRobinPolicy.java @@ -102,6 +102,9 @@ public void init(Cluster cluster, Collection hosts) { ArrayList notInLocalDC = new ArrayList(); for (Host host : hosts) { + if (host.isZeroToken() && cluster.getConfiguration().getQueryOptions().shouldExcludeZeroTokenNodesFromQueryPlan()) { + continue; + } String dc = dc(host); // If the localDC was in "auto-discover" mode and it's the first host for which we have a DC, @@ -156,6 +159,9 @@ private static CopyOnWriteArrayList cloneList(CopyOnWriteArrayList l */ @Override public HostDistance distance(Host host) { + if (host.isZeroToken() && configuration.getQueryOptions().shouldExcludeZeroTokenNodesFromQueryPlan()) { + return HostDistance.IGNORED; + } String dc = dc(host); if (dc == UNSET || dc.equals(localDc)) return HostDistance.LOCAL; @@ -252,6 +258,9 @@ protected Host computeNext() { @Override public void onUp(Host host) { + if (host.isZeroToken() && configuration.getQueryOptions().shouldExcludeZeroTokenNodesFromQueryPlan()) { + return; + } String dc = dc(host); // If the localDC was in "auto-discover" mode and it's the first host for which we have a DC, diff --git a/driver-core/src/main/java/com/datastax/driver/core/policies/RackAwareRoundRobinPolicy.java b/driver-core/src/main/java/com/datastax/driver/core/policies/RackAwareRoundRobinPolicy.java index 60761c65afa..0e65466d37d 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/policies/RackAwareRoundRobinPolicy.java +++ b/driver-core/src/main/java/com/datastax/driver/core/policies/RackAwareRoundRobinPolicy.java @@ -119,6 +119,9 @@ public void init(Cluster cluster, Collection hosts) { ArrayList notInLocalRack = new ArrayList(); for (Host host : hosts) { + if (host.isZeroToken() && cluster.getConfiguration().getQueryOptions().shouldExcludeZeroTokenNodesFromQueryPlan()) { + continue; + } String dc = dc(host); String rack = rack(host); @@ -204,6 +207,9 @@ private static CopyOnWriteArrayList cloneList(CopyOnWriteArrayList l */ @Override public HostDistance distance(Host host) { + if (host.isZeroToken() && configuration.getQueryOptions().shouldExcludeZeroTokenNodesFromQueryPlan()) { + return HostDistance.IGNORED; + } String dc = dc(host); if (dc == UNSET || dc.equals(localDc)) return HostDistance.LOCAL; @@ -313,6 +319,9 @@ protected Host computeNext() { @Override public void onUp(Host host) { + if (host.isZeroToken() && configuration.getQueryOptions().shouldExcludeZeroTokenNodesFromQueryPlan()) { + return; + } String dc = dc(host); String rack = rack(host); diff --git a/driver-core/src/main/java/com/datastax/driver/core/policies/RoundRobinPolicy.java b/driver-core/src/main/java/com/datastax/driver/core/policies/RoundRobinPolicy.java index 49b24f63e3f..47ce4564bde 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/policies/RoundRobinPolicy.java +++ b/driver-core/src/main/java/com/datastax/driver/core/policies/RoundRobinPolicy.java @@ -59,8 +59,13 @@ public RoundRobinPolicy() {} @Override public void init(Cluster cluster, Collection hosts) { - this.liveHosts.addAll(hosts); this.configuration = cluster.getConfiguration(); + for (Host host : hosts) { + if (host.isZeroToken() && cluster.getConfiguration().getQueryOptions().shouldExcludeZeroTokenNodesFromQueryPlan()) { + continue; + } + this.liveHosts.add(host); + } this.index.set(new Random().nextInt(Math.max(hosts.size(), 1))); } @@ -76,6 +81,9 @@ public void init(Cluster cluster, Collection hosts) { */ @Override public HostDistance distance(Host host) { + if (host.isZeroToken() && configuration.getQueryOptions().shouldExcludeZeroTokenNodesFromQueryPlan()) { + return HostDistance.IGNORED; + } return HostDistance.LOCAL; } @@ -139,6 +147,9 @@ protected Host computeNext() { @Override public void onUp(Host host) { + if (host.isZeroToken() && configuration.getQueryOptions().shouldExcludeZeroTokenNodesFromQueryPlan()) { + return; + } liveHosts.addIfAbsent(host); } diff --git a/driver-core/src/test/java/com/datastax/driver/core/ZeroTokenNodesIT.java b/driver-core/src/test/java/com/datastax/driver/core/ZeroTokenNodesIT.java new file mode 100644 index 00000000000..8067a5f91f9 --- /dev/null +++ b/driver-core/src/test/java/com/datastax/driver/core/ZeroTokenNodesIT.java @@ -0,0 +1,240 @@ +package com.datastax.driver.core; + +import static org.apache.log4j.Level.WARN; +import static org.assertj.core.api.Assertions.assertThat; + +import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy; +import com.datastax.driver.core.policies.LoadBalancingPolicy; +import com.datastax.driver.core.policies.RoundRobinPolicy; +import com.datastax.driver.core.policies.TokenAwarePolicy; +import com.datastax.driver.core.utils.ScyllaVersion; +import java.net.InetSocketAddress; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +public class ZeroTokenNodesIT { + private Logger logger = Logger.getLogger(ControlConnection.class); + private MemoryAppender appender; + private Level originalLevel; + + @DataProvider(name = "loadBalancingPolicies") + public static Object[][] loadBalancingPolicies() { + return new Object[][] { + {DCAwareRoundRobinPolicy.builder().build()}, + {new TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().build())}, + {new TokenAwarePolicy(new RoundRobinPolicy())} + }; + } + + @BeforeMethod(groups = "short") + public void startCapturingLogs() { + originalLevel = logger.getLevel(); + logger.setLevel(WARN); + logger.addAppender(appender = new MemoryAppender()); + } + + @AfterMethod(groups = "short") + public void stopCapturingLogs() { + logger.setLevel(originalLevel); + logger.removeAppender(appender); + } + + @Test(groups = "short") + @ScyllaVersion( + minOSS = "6.2.0", + minEnterprise = "2024.2.2", + description = "Zero-token nodes introduced in scylladb/scylladb#19684") + public void should_not_ignore_zero_token_peer() { + Cluster cluster = null; + Session session = null; + CCMBridge ccmBridge = null; + try { + ccmBridge = + CCMBridge.builder().withNodes(3).withIpPrefix("127.0.1.").withBinaryPort(9042).build(); + ccmBridge.start(); + ccmBridge.add(4); + ccmBridge.updateNodeConfig(4, "join_ring", false); + ccmBridge.start(4); + ccmBridge.waitForUp(4); + + cluster = + Cluster.builder() + .withQueryOptions(new QueryOptions().setExcludeZeroTokenNodesFromQueryPlan(true).setConsiderZeroTokenNodesValidPeers(true)) + .addContactPointsWithPorts(new InetSocketAddress(ccmBridge.ipOfNode(1), 9042)) + .withPort(9042) + .withoutAdvancedShardAwareness() + .build(); + session = cluster.connect(); + + assertThat(cluster.manager.controlConnection.connectedHost().getEndPoint().resolve()) + .isEqualTo(ccmBridge.addressOfNode(1)); + Set hosts = cluster.getMetadata().getAllHosts(); + Set toStrings = hosts.stream().map(Host::toString).collect(Collectors.toSet()); + assertThat(toStrings).containsOnly("/127.0.1.1:9042", "/127.0.1.2:9042", "/127.0.1.3:9042", "/127.0.1.4:9042"); + } finally { + if (ccmBridge != null) ccmBridge.close(); + if (session != null) session.close(); + if (cluster != null) cluster.close(); + } + } + + @Test(groups = "short") + @ScyllaVersion( + minOSS = "6.2.0", + minEnterprise = "2024.2.2", + description = "Zero-token nodes introduced in scylladb/scylladb#19684") + public void should_discover_but_exclude_zero_token_DC() { + // Should discover the nodes, but not use them in query planning + Cluster cluster = null; + Session session = null; + CCMBridge ccmBridge = null; + try { + ccmBridge = + CCMBridge.builder().withNodes(2).withIpPrefix("127.0.1.").withBinaryPort(9042).build(); + ccmBridge.start(); + ccmBridge.add(2, 3); + ccmBridge.updateNodeConfig(3, "join_ring", false); + ccmBridge.start(3); + ccmBridge.add(2, 4); + ccmBridge.updateNodeConfig(4, "join_ring", false); + ccmBridge.start(4); + ccmBridge.waitForUp(3); + ccmBridge.waitForUp(4); + + cluster = + Cluster.builder() + .withQueryOptions(new QueryOptions().setExcludeZeroTokenNodesFromQueryPlan(true).setConsiderZeroTokenNodesValidPeers(true)) + .addContactPointsWithPorts(new InetSocketAddress(ccmBridge.ipOfNode(1), 9042)) + .withPort(9042) + .withoutAdvancedShardAwareness() + .build(); + session = cluster.connect(); + + assertThat(cluster.manager.controlConnection.connectedHost().getEndPoint().resolve()) + .isEqualTo(ccmBridge.addressOfNode(1)); + Set hosts = cluster.getMetadata().getAllHosts(); + Set toStrings = hosts.stream().map(Host::toString).collect(Collectors.toSet()); + assertThat(toStrings).containsOnly("/127.0.1.1:9042", "/127.0.1.2:9042", "/127.0.1.3:9042", "/127.0.1.4:9042"); + } finally { + if (ccmBridge != null) ccmBridge.close(); + if (session != null) session.close(); + if (cluster != null) cluster.close(); + } + } + + @Test(groups = "short", dataProvider = "loadBalancingPolicies") + @ScyllaVersion( + minOSS = "6.2.0", + minEnterprise = "2024.2.2", + description = "Zero-token nodes introduced in scylladb/scylladb#19684") + public void should_connect_to_zero_token_contact_point(LoadBalancingPolicy loadBalancingPolicy) { + Cluster cluster = null; + Session session = null; + CCMBridge ccmBridge = null; + + try { + ccmBridge = + CCMBridge.builder().withNodes(2).withIpPrefix("127.0.1.").withBinaryPort(9042).build(); + ccmBridge.start(); + ccmBridge.add(3); + ccmBridge.updateNodeConfig(3, "join_ring", false); + ccmBridge.start(3); + ccmBridge.waitForUp(3); + + cluster = + Cluster.builder() + .withQueryOptions(new QueryOptions().setExcludeZeroTokenNodesFromQueryPlan(true).setConsiderZeroTokenNodesValidPeers(true)) + .addContactPointsWithPorts(new InetSocketAddress(ccmBridge.ipOfNode(3), 9042)) + .withPort(9042) + .withLoadBalancingPolicy(loadBalancingPolicy) + .withoutAdvancedShardAwareness() + .build(); + session = cluster.connect(); + + assertThat(cluster.manager.controlConnection.connectedHost().getEndPoint().resolve()) + .isEqualTo(ccmBridge.addressOfNode(3)); + + session.execute("DROP KEYSPACE IF EXISTS ZeroTokenNodesIT"); + session.execute( + "CREATE KEYSPACE ZeroTokenNodesIT WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2};"); + session.execute("CREATE TABLE ZeroTokenNodesIT.tab (id INT PRIMARY KEY)"); + for (int i = 0; i < 30; i++) { + ResultSet rs = session.execute("INSERT INTO ZeroTokenNodesIT.tab (id) VALUES (" + i + ")"); + assertThat(rs.getExecutionInfo().getQueriedHost().getEndPoint().resolve()) + .isNotEqualTo(ccmBridge.addressOfNode(3)); + assertThat(rs.getExecutionInfo().getQueriedHost().getEndPoint().resolve()) + .isIn(ccmBridge.addressOfNode(1), ccmBridge.addressOfNode(2)); + } + Set hosts = cluster.getMetadata().getAllHosts(); + Set toStrings = hosts.stream().map(Host::toString).collect(Collectors.toSet()); + assertThat(toStrings).containsOnly("/127.0.1.1:9042", "/127.0.1.2:9042", "/127.0.1.3:9042"); + } finally { + if (ccmBridge != null) ccmBridge.close(); + if (session != null) session.close(); + if (cluster != null) cluster.close(); + } + } + + @Test(groups = "short", dataProvider = "loadBalancingPolicies") + @ScyllaVersion( + minOSS = "6.2.0", + minEnterprise = "2024.2.2", + description = "Zero-token nodes introduced in scylladb/scylladb#19684") + public void should_connect_to_zero_token_DC(LoadBalancingPolicy loadBalancingPolicy) { + Cluster cluster = null; + Session session = null; + CCMBridge ccmBridge = null; + + try { + ccmBridge = + CCMBridge.builder().withNodes(2).withIpPrefix("127.0.1.").withBinaryPort(9042).build(); + ccmBridge.start(); + ccmBridge.add(2, 3); + ccmBridge.updateNodeConfig(3, "join_ring", false); + ccmBridge.start(3); + ccmBridge.add(2, 4); + ccmBridge.updateNodeConfig(4, "join_ring", false); + ccmBridge.start(4); + ccmBridge.waitForUp(3); + ccmBridge.waitForUp(4); + + cluster = + Cluster.builder() + .withQueryOptions(new QueryOptions().setExcludeZeroTokenNodesFromQueryPlan(true).setConsiderZeroTokenNodesValidPeers(true)) + .addContactPointsWithPorts(new InetSocketAddress(ccmBridge.ipOfNode(3), 9042)) + .withPort(9042) + .withLoadBalancingPolicy(loadBalancingPolicy) + .withoutAdvancedShardAwareness() + .build(); + session = cluster.connect(); + + assertThat(cluster.manager.controlConnection.connectedHost().getEndPoint().resolve()) + .isEqualTo(ccmBridge.addressOfNode(3)); + + session.execute("DROP KEYSPACE IF EXISTS ZeroTokenNodesIT"); + session.execute( + "CREATE KEYSPACE ZeroTokenNodesIT WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2};"); + session.execute("CREATE TABLE ZeroTokenNodesIT.tab (id INT PRIMARY KEY)"); + for (int i = 0; i < 30; i++) { + ResultSet rs = session.execute("INSERT INTO ZeroTokenNodesIT.tab (id) VALUES (" + i + ")"); + assertThat(rs.getExecutionInfo().getQueriedHost().getEndPoint().resolve()) + .isNotEqualTo(ccmBridge.addressOfNode(3)); + assertThat(rs.getExecutionInfo().getQueriedHost().getEndPoint().resolve()) + .isIn(ccmBridge.addressOfNode(1), ccmBridge.addressOfNode(2)); + } + Set hosts = cluster.getMetadata().getAllHosts(); + Set toStrings = hosts.stream().map(Host::toString).collect(Collectors.toSet()); + assertThat(toStrings).containsOnly("/127.0.1.1:9042", "/127.0.1.2:9042", "/127.0.1.3:9042", "/127.0.1.4:9042"); + } finally { + if (ccmBridge != null) ccmBridge.close(); + if (session != null) session.close(); + if (cluster != null) cluster.close(); + } + } +}