diff --git a/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java b/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java index 7f3c3bde63..3f70833b87 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java +++ b/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java @@ -131,7 +131,12 @@ CloseFuture closeAsync() { Host connectedHost() { Connection current = connectionRef.get(); - return (current == null) ? null : cluster.metadata.getHost(current.endPoint); + if (current == null) return null; + Host host = cluster.metadata.getHost(current.endPoint); + // If the host is not in metadata, then it may be zero-token contact point + if (host == null && cluster.configuration.getQueryOptions().shouldSkipZeroTokenNodes()) + host = cluster.metadata.getContactPoint(current.endPoint); + return host; } void triggerReconnect() { @@ -392,6 +397,11 @@ static void refreshSchema( throws ConnectionException, BusyConnectionException, ExecutionException, InterruptedException { Host host = cluster.metadata.getHost(connection.endPoint); + // Host may have been deliberately not added to metadata, because it's a zero-token node + // Try checking contact points if its null: + if (host == null && cluster.configuration.getQueryOptions().shouldSkipZeroTokenNodes()) { + host = cluster.metadata.getContactPoint(connection.endPoint); + } // Neither host, nor it's version should be null. But instead of dying if there is a race or // something, we can kind of try to infer // a Cassandra version from the protocol version (this is not full proof, we can have the @@ -826,7 +836,14 @@ private void refreshNodeListAndTokenMap( } } if (isInitialConnection) { - cluster.metadata.addIfAbsent(controlHost); + if (localRow.isNull("tokens") + && cluster.configuration.getQueryOptions().shouldSkipZeroTokenNodes()) { + logger.warn( + "Control host ({}) is a zero-token node. It will not be added to metadata hosts and will be excluded from regular query planning.", + connection.endPoint); + } else { + cluster.metadata.addIfAbsent(controlHost); + } } } @@ -984,7 +1001,10 @@ private static Set toTokens(Token.Factory factory, Set tokensStr) private boolean isValidPeer(Row peerRow, boolean logIfInvalid) { boolean isValid = - peerRow.getColumnDefinitions().contains("host_id") && !peerRow.isNull("host_id"); + peerRow.getColumnDefinitions().contains("host_id") + && !peerRow.isNull("host_id") + && peerRow.getColumnDefinitions().contains("tokens") + && !peerRow.isNull("tokens"); if (isPeersV2) { isValid &= @@ -1006,14 +1026,12 @@ private boolean isValidPeer(Row peerRow, boolean logIfInvalid) { peerRow.getColumnDefinitions().contains("data_center") && !peerRow.isNull("data_center") && peerRow.getColumnDefinitions().contains("rack") - && !peerRow.isNull("rack") - && peerRow.getColumnDefinitions().contains("tokens") - && !peerRow.isNull("tokens"); + && !peerRow.isNull("rack"); } if (!isValid && logIfInvalid) logger.warn( "Found invalid row in system.peers: {}. " - + "This is likely a gossip or snitch issue, this host will be ignored.", + + "This is likely a gossip or snitch issue or a zero-token node, this host will be ignored.", formatInvalidPeer(peerRow)); return isValid; } diff --git a/driver-core/src/main/java/com/datastax/driver/core/QueryOptions.java b/driver-core/src/main/java/com/datastax/driver/core/QueryOptions.java index 137c56aa74..0a94d00bc6 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/QueryOptions.java +++ b/driver-core/src/main/java/com/datastax/driver/core/QueryOptions.java @@ -74,6 +74,8 @@ public class QueryOptions { private volatile boolean addOriginalContactsToReconnectionPlan = false; + private volatile boolean skipZeroTokenNodes = false; + /** * Creates a new {@link QueryOptions} instance using the {@link #DEFAULT_CONSISTENCY_LEVEL}, * {@link #DEFAULT_SERIAL_CONSISTENCY_LEVEL} and {@link #DEFAULT_FETCH_SIZE}. @@ -521,6 +523,20 @@ public boolean shouldAddOriginalContactsToReconnectionPlan() { return this.addOriginalContactsToReconnectionPlan; } + /** + * Whether the driver should skip adding zero-token nodes to the metadata's hosts set. This mostly + * makes a difference for zero-token contact points, because driver won't reach other zero-token + * nodes since their records in system.peers are incomplete, thus considered invalid and omitted. + */ + public QueryOptions setSkipZeroTokenNodes(boolean enabled) { + this.skipZeroTokenNodes = enabled; + return this; + } + + public boolean shouldSkipZeroTokenNodes() { + return this.skipZeroTokenNodes; + } + @Override public boolean equals(Object that) { if (that == null || !(that instanceof QueryOptions)) { diff --git a/driver-core/src/test/java/com/datastax/driver/core/ControlConnectionTest.java b/driver-core/src/test/java/com/datastax/driver/core/ControlConnectionTest.java index 7fa9c8550a..88627a9bc5 100644 --- a/driver-core/src/test/java/com/datastax/driver/core/ControlConnectionTest.java +++ b/driver-core/src/test/java/com/datastax/driver/core/ControlConnectionTest.java @@ -343,13 +343,13 @@ static void run_with_null_peer_info(String columns, boolean expectPeer2, boolean expectedError = String.format( "Found invalid row in system.peers: [peer=%s, %s]. " - + "This is likely a gossip or snitch issue, this host will be ignored.", + + "This is likely a gossip or snitch issue or a zero-token node, this host will be ignored.", node2Address, columnData); } else { expectedError = String.format( "Found invalid row in system.peers: [peer=%s, %s%s%s%s]. " - + "This is likely a gossip or snitch issue, this host will be ignored.", + + "This is likely a gossip or snitch issue or a zero-token node, this host will be ignored.", node2Address, !splitColumnsSet.contains("native_transport_address") ? "missing native_transport_address, " diff --git a/driver-core/src/test/java/com/datastax/driver/core/ZeroTokenNodesIT.java b/driver-core/src/test/java/com/datastax/driver/core/ZeroTokenNodesIT.java new file mode 100644 index 0000000000..f7a737dcd7 --- /dev/null +++ b/driver-core/src/test/java/com/datastax/driver/core/ZeroTokenNodesIT.java @@ -0,0 +1,280 @@ +package com.datastax.driver.core; + +import static org.apache.log4j.Level.WARN; +import static org.assertj.core.api.Assertions.assertThat; + +import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy; +import com.datastax.driver.core.policies.LoadBalancingPolicy; +import com.datastax.driver.core.policies.RoundRobinPolicy; +import com.datastax.driver.core.policies.TokenAwarePolicy; +import com.datastax.driver.core.utils.ScyllaVersion; +import java.net.InetSocketAddress; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +public class ZeroTokenNodesIT { + private Logger logger = Logger.getLogger(ControlConnection.class); + private MemoryAppender appender; + private Level originalLevel; + + @DataProvider(name = "loadBalancingPolicies") + public static Object[][] loadBalancingPolicies() { + return new Object[][] { + {DCAwareRoundRobinPolicy.builder().build()}, + {new TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().build())}, + {new TokenAwarePolicy(new RoundRobinPolicy())} + }; + } + + @BeforeMethod(groups = "short") + public void startCapturingLogs() { + originalLevel = logger.getLevel(); + logger.setLevel(WARN); + logger.addAppender(appender = new MemoryAppender()); + } + + @AfterMethod(groups = "short") + public void stopCapturingLogs() { + logger.setLevel(originalLevel); + logger.removeAppender(appender); + } + + @Test(groups = "short") + @ScyllaVersion( + minOSS = "6.2.0", + minEnterprise = "2024.2.2", + description = "Zero-token nodes introduced in scylladb/scylladb#19684") + public void should_ignore_zero_token_peer() { + // Given 4 node cluster with 1 zero-token node and normal contact point, + // make sure that it's not included in the metadata. + // By extension, it won't be included in the query planning. + Cluster cluster = null; + Session session = null; + CCMBridge ccmBridge = null; + try { + ccmBridge = + CCMBridge.builder().withNodes(3).withIpPrefix("127.0.1.").withBinaryPort(9042).build(); + ccmBridge.start(); + ccmBridge.add(4); + ccmBridge.updateNodeConfig(4, "join_ring", false); + ccmBridge.start(4); + ccmBridge.waitForUp(4); + + cluster = + Cluster.builder() + .withQueryOptions(new QueryOptions().setSkipZeroTokenNodes(true)) + .addContactPointsWithPorts(new InetSocketAddress(ccmBridge.ipOfNode(1), 9042)) + .withPort(9042) + .withoutAdvancedShardAwareness() + .build(); + session = cluster.connect(); + + assertThat(cluster.manager.controlConnection.connectedHost().getEndPoint().resolve()) + .isEqualTo(ccmBridge.addressOfNode(1)); + String line = null; + try { + line = appender.waitAndGet(5000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + assertThat(line).contains("Found invalid row in system.peers"); + assertThat(line).contains("tokens=null"); + Set hosts = cluster.getMetadata().getAllHosts(); + Set toStrings = hosts.stream().map(Host::toString).collect(Collectors.toSet()); + assertThat(toStrings).containsOnly("/127.0.1.1:9042", "/127.0.1.2:9042", "/127.0.1.3:9042"); + } finally { + if (ccmBridge != null) ccmBridge.close(); + if (session != null) session.close(); + if (cluster != null) cluster.close(); + } + } + + @Test(groups = "short") + @ScyllaVersion( + minOSS = "6.2.0", + minEnterprise = "2024.2.2", + description = "Zero-token nodes introduced in scylladb/scylladb#19684") + public void should_ignore_zero_token_DC() { + Cluster cluster = null; + Session session = null; + CCMBridge ccmBridge = null; + try { + ccmBridge = + CCMBridge.builder().withNodes(2).withIpPrefix("127.0.1.").withBinaryPort(9042).build(); + ccmBridge.start(); + ccmBridge.add(2, 3); + ccmBridge.updateNodeConfig(3, "join_ring", false); + ccmBridge.start(3); + ccmBridge.add(2, 4); + ccmBridge.updateNodeConfig(4, "join_ring", false); + ccmBridge.start(4); + ccmBridge.waitForUp(3); + ccmBridge.waitForUp(4); + + cluster = + Cluster.builder() + .withQueryOptions(new QueryOptions().setSkipZeroTokenNodes(true)) + .addContactPointsWithPorts(new InetSocketAddress(ccmBridge.ipOfNode(1), 9042)) + .withPort(9042) + .withoutAdvancedShardAwareness() + .build(); + session = cluster.connect(); + + assertThat(cluster.manager.controlConnection.connectedHost().getEndPoint().resolve()) + .isEqualTo(ccmBridge.addressOfNode(1)); + for (int i = 0; i < 2; i++) { + String line = null; + try { + line = appender.waitAndGet(5000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + assertThat(line).contains("Found invalid row in system.peers"); + assertThat(line).contains("tokens=null"); + } + Set hosts = cluster.getMetadata().getAllHosts(); + Set toStrings = hosts.stream().map(Host::toString).collect(Collectors.toSet()); + assertThat(toStrings).containsOnly("/127.0.1.1:9042", "/127.0.1.2:9042"); + } finally { + if (ccmBridge != null) ccmBridge.close(); + if (session != null) session.close(); + if (cluster != null) cluster.close(); + } + } + + @Test(groups = "short", dataProvider = "loadBalancingPolicies") + @ScyllaVersion( + minOSS = "6.2.0", + minEnterprise = "2024.2.2", + description = "Zero-token nodes introduced in scylladb/scylladb#19684") + public void should_connect_to_zero_token_contact_point(LoadBalancingPolicy loadBalancingPolicy) { + Cluster cluster = null; + Session session = null; + CCMBridge ccmBridge = null; + + try { + ccmBridge = + CCMBridge.builder().withNodes(2).withIpPrefix("127.0.1.").withBinaryPort(9042).build(); + ccmBridge.start(); + ccmBridge.add(3); + ccmBridge.updateNodeConfig(3, "join_ring", false); + ccmBridge.start(3); + ccmBridge.waitForUp(3); + + cluster = + Cluster.builder() + .withQueryOptions(new QueryOptions().setSkipZeroTokenNodes(true)) + .addContactPointsWithPorts(new InetSocketAddress(ccmBridge.ipOfNode(3), 9042)) + .withPort(9042) + .withLoadBalancingPolicy(loadBalancingPolicy) + .withoutAdvancedShardAwareness() + .build(); + session = cluster.connect(); + + assertThat(cluster.manager.controlConnection.connectedHost().getEndPoint().resolve()) + .isEqualTo(ccmBridge.addressOfNode(3)); + String line = null; + try { + line = appender.waitAndGet(5000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + assertThat(line) + .contains( + "is a zero-token node. It will not be added to metadata hosts and will be excluded from regular query planning."); + assertThat(line).contains(ccmBridge.ipOfNode(3)); + + session.execute("DROP KEYSPACE IF EXISTS ZeroTokenNodesIT"); + session.execute( + "CREATE KEYSPACE ZeroTokenNodesIT WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2};"); + session.execute("CREATE TABLE ZeroTokenNodesIT.tab (id INT PRIMARY KEY)"); + for (int i = 0; i < 30; i++) { + ResultSet rs = session.execute("INSERT INTO ZeroTokenNodesIT.tab (id) VALUES (" + i + ")"); + assertThat(rs.getExecutionInfo().getQueriedHost().getEndPoint().resolve()) + .isNotEqualTo(ccmBridge.addressOfNode(3)); + assertThat(rs.getExecutionInfo().getQueriedHost().getEndPoint().resolve()) + .isIn(ccmBridge.addressOfNode(1), ccmBridge.addressOfNode(2)); + } + Set hosts = cluster.getMetadata().getAllHosts(); + Set toStrings = hosts.stream().map(Host::toString).collect(Collectors.toSet()); + assertThat(toStrings).containsOnly("/127.0.1.1:9042", "/127.0.1.2:9042"); + } finally { + if (ccmBridge != null) ccmBridge.close(); + if (session != null) session.close(); + if (cluster != null) cluster.close(); + } + } + + @Test(groups = "short", dataProvider = "loadBalancingPolicies") + @ScyllaVersion( + minOSS = "6.2.0", + minEnterprise = "2024.2.2", + description = "Zero-token nodes introduced in scylladb/scylladb#19684") + public void should_connect_to_zero_token_DC(LoadBalancingPolicy loadBalancingPolicy) { + Cluster cluster = null; + Session session = null; + CCMBridge ccmBridge = null; + + try { + ccmBridge = + CCMBridge.builder().withNodes(2).withIpPrefix("127.0.1.").withBinaryPort(9042).build(); + ccmBridge.start(); + ccmBridge.add(2, 3); + ccmBridge.updateNodeConfig(3, "join_ring", false); + ccmBridge.start(3); + ccmBridge.add(2, 4); + ccmBridge.updateNodeConfig(4, "join_ring", false); + ccmBridge.start(4); + ccmBridge.waitForUp(3); + ccmBridge.waitForUp(4); + + cluster = + Cluster.builder() + .withQueryOptions(new QueryOptions().setSkipZeroTokenNodes(true)) + .addContactPointsWithPorts(new InetSocketAddress(ccmBridge.ipOfNode(3), 9042)) + .withPort(9042) + .withLoadBalancingPolicy(loadBalancingPolicy) + .withoutAdvancedShardAwareness() + .build(); + session = cluster.connect(); + + assertThat(cluster.manager.controlConnection.connectedHost().getEndPoint().resolve()) + .isEqualTo(ccmBridge.addressOfNode(3)); + String line = null; + try { + line = appender.waitAndGet(5000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + assertThat(line) + .contains( + "is a zero-token node. It will not be added to metadata hosts and will be excluded from regular query planning."); + assertThat(line).contains(ccmBridge.ipOfNode(3)); + + session.execute("DROP KEYSPACE IF EXISTS ZeroTokenNodesIT"); + session.execute( + "CREATE KEYSPACE ZeroTokenNodesIT WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2};"); + session.execute("CREATE TABLE ZeroTokenNodesIT.tab (id INT PRIMARY KEY)"); + for (int i = 0; i < 30; i++) { + ResultSet rs = session.execute("INSERT INTO ZeroTokenNodesIT.tab (id) VALUES (" + i + ")"); + assertThat(rs.getExecutionInfo().getQueriedHost().getEndPoint().resolve()) + .isNotEqualTo(ccmBridge.addressOfNode(3)); + assertThat(rs.getExecutionInfo().getQueriedHost().getEndPoint().resolve()) + .isIn(ccmBridge.addressOfNode(1), ccmBridge.addressOfNode(2)); + } + Set hosts = cluster.getMetadata().getAllHosts(); + Set toStrings = hosts.stream().map(Host::toString).collect(Collectors.toSet()); + assertThat(toStrings).containsOnly("/127.0.1.1:9042", "/127.0.1.2:9042"); + } finally { + if (ccmBridge != null) ccmBridge.close(); + if (session != null) session.close(); + if (cluster != null) cluster.close(); + } + } +}