Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: Adjust for zero token nodes #399

Draft
wants to merge 1 commit into
base: scylla-3.x
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,12 @@ CloseFuture closeAsync() {

Host connectedHost() {
Connection current = connectionRef.get();
return (current == null) ? null : cluster.metadata.getHost(current.endPoint);
if (current == null) return null;
Host host = cluster.metadata.getHost(current.endPoint);
// If the host is not in metadata, then it may be zero-token contact point
if (host == null && cluster.configuration.getQueryOptions().shouldSkipZeroTokenNodes())
host = cluster.metadata.getContactPoint(current.endPoint);
return host;
}

void triggerReconnect() {
Expand Down Expand Up @@ -392,6 +397,11 @@ static void refreshSchema(
throws ConnectionException, BusyConnectionException, ExecutionException,
InterruptedException {
Host host = cluster.metadata.getHost(connection.endPoint);
// Host may have been deliberately not added to metadata, because it's a zero-token node
// Try checking contact points if its null:
if (host == null && cluster.configuration.getQueryOptions().shouldSkipZeroTokenNodes()) {
host = cluster.metadata.getContactPoint(connection.endPoint);
}
// Neither host, nor it's version should be null. But instead of dying if there is a race or
// something, we can kind of try to infer
// a Cassandra version from the protocol version (this is not full proof, we can have the
Expand Down Expand Up @@ -826,7 +836,14 @@ private void refreshNodeListAndTokenMap(
}
}
if (isInitialConnection) {
cluster.metadata.addIfAbsent(controlHost);
if (localRow.isNull("tokens")
&& cluster.configuration.getQueryOptions().shouldSkipZeroTokenNodes()) {
logger.warn(
"Control host ({}) is a zero-token node. It will not be added to metadata hosts and will be excluded from regular query planning.",
connection.endPoint);
} else {
cluster.metadata.addIfAbsent(controlHost);
}
}
}

Expand Down Expand Up @@ -984,7 +1001,10 @@ private static Set<Token> toTokens(Token.Factory factory, Set<String> tokensStr)

private boolean isValidPeer(Row peerRow, boolean logIfInvalid) {
boolean isValid =
peerRow.getColumnDefinitions().contains("host_id") && !peerRow.isNull("host_id");
peerRow.getColumnDefinitions().contains("host_id")
&& !peerRow.isNull("host_id")
&& peerRow.getColumnDefinitions().contains("tokens")
&& !peerRow.isNull("tokens");

if (isPeersV2) {
isValid &=
Expand All @@ -1006,14 +1026,12 @@ private boolean isValidPeer(Row peerRow, boolean logIfInvalid) {
peerRow.getColumnDefinitions().contains("data_center")
&& !peerRow.isNull("data_center")
&& peerRow.getColumnDefinitions().contains("rack")
&& !peerRow.isNull("rack")
&& peerRow.getColumnDefinitions().contains("tokens")
&& !peerRow.isNull("tokens");
&& !peerRow.isNull("rack");
}
if (!isValid && logIfInvalid)
logger.warn(
"Found invalid row in system.peers: {}. "
+ "This is likely a gossip or snitch issue, this host will be ignored.",
+ "This is likely a gossip or snitch issue or a zero-token node, this host will be ignored.",
formatInvalidPeer(peerRow));
return isValid;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ public class QueryOptions {

private volatile boolean addOriginalContactsToReconnectionPlan = false;

private volatile boolean skipZeroTokenNodes = false;

/**
* Creates a new {@link QueryOptions} instance using the {@link #DEFAULT_CONSISTENCY_LEVEL},
* {@link #DEFAULT_SERIAL_CONSISTENCY_LEVEL} and {@link #DEFAULT_FETCH_SIZE}.
Expand Down Expand Up @@ -521,6 +523,20 @@ public boolean shouldAddOriginalContactsToReconnectionPlan() {
return this.addOriginalContactsToReconnectionPlan;
}

/**
* Whether the driver should skip adding zero-token nodes to the metadata's hosts set. This mostly
* makes a difference for zero-token contact points, because driver won't reach other zero-token
* nodes since their records in system.peers are incomplete, thus considered invalid and omitted.
*/
public QueryOptions setSkipZeroTokenNodes(boolean enabled) {
this.skipZeroTokenNodes = enabled;
return this;
}

public boolean shouldSkipZeroTokenNodes() {
return this.skipZeroTokenNodes;
}

@Override
public boolean equals(Object that) {
if (that == null || !(that instanceof QueryOptions)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,13 +343,13 @@ static void run_with_null_peer_info(String columns, boolean expectPeer2, boolean
expectedError =
String.format(
"Found invalid row in system.peers: [peer=%s, %s]. "
+ "This is likely a gossip or snitch issue, this host will be ignored.",
+ "This is likely a gossip or snitch issue or a zero-token node, this host will be ignored.",
node2Address, columnData);
} else {
expectedError =
String.format(
"Found invalid row in system.peers: [peer=%s, %s%s%s%s]. "
+ "This is likely a gossip or snitch issue, this host will be ignored.",
+ "This is likely a gossip or snitch issue or a zero-token node, this host will be ignored.",
node2Address,
!splitColumnsSet.contains("native_transport_address")
? "missing native_transport_address, "
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
package com.datastax.driver.core;

import static org.apache.log4j.Level.WARN;
import static org.assertj.core.api.Assertions.assertThat;

import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
import com.datastax.driver.core.policies.LoadBalancingPolicy;
import com.datastax.driver.core.policies.RoundRobinPolicy;
import com.datastax.driver.core.policies.TokenAwarePolicy;
import com.datastax.driver.core.utils.ScyllaVersion;
import java.net.InetSocketAddress;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

public class ZeroTokenNodesIT {
private Logger logger = Logger.getLogger(ControlConnection.class);
private MemoryAppender appender;
private Level originalLevel;

@DataProvider(name = "loadBalancingPolicies")
public static Object[][] loadBalancingPolicies() {
return new Object[][] {
{DCAwareRoundRobinPolicy.builder().build()},
{new TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().build())},
{new TokenAwarePolicy(new RoundRobinPolicy())}
};
}

@BeforeMethod(groups = "short")
public void startCapturingLogs() {
originalLevel = logger.getLevel();
logger.setLevel(WARN);
logger.addAppender(appender = new MemoryAppender());
}

@AfterMethod(groups = "short")
public void stopCapturingLogs() {
logger.setLevel(originalLevel);
logger.removeAppender(appender);
}

@Test(groups = "short")
@ScyllaVersion(
minOSS = "6.2.0",
minEnterprise = "2024.2.2",
description = "Zero-token nodes introduced in scylladb/scylladb#19684")
public void should_ignore_zero_token_peer() {
// Given 4 node cluster with 1 zero-token node and normal contact point,
// make sure that it's not included in the metadata.
// By extension, it won't be included in the query planning.
Cluster cluster = null;
Session session = null;
CCMBridge ccmBridge = null;
try {
ccmBridge =
CCMBridge.builder().withNodes(3).withIpPrefix("127.0.1.").withBinaryPort(9042).build();
ccmBridge.start();
ccmBridge.add(4);
ccmBridge.updateNodeConfig(4, "join_ring", false);
ccmBridge.start(4);
ccmBridge.waitForUp(4);

cluster =
Cluster.builder()
.withQueryOptions(new QueryOptions().setSkipZeroTokenNodes(true))
.addContactPointsWithPorts(new InetSocketAddress(ccmBridge.ipOfNode(1), 9042))
.withPort(9042)
.withoutAdvancedShardAwareness()
.build();
session = cluster.connect();

assertThat(cluster.manager.controlConnection.connectedHost().getEndPoint().resolve())
.isEqualTo(ccmBridge.addressOfNode(1));
String line = null;
try {
line = appender.waitAndGet(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
assertThat(line).contains("Found invalid row in system.peers");
assertThat(line).contains("tokens=null");
Set<Host> hosts = cluster.getMetadata().getAllHosts();
Set<String> toStrings = hosts.stream().map(Host::toString).collect(Collectors.toSet());
assertThat(toStrings).containsOnly("/127.0.1.1:9042", "/127.0.1.2:9042", "/127.0.1.3:9042");
} finally {
if (ccmBridge != null) ccmBridge.close();
if (session != null) session.close();
if (cluster != null) cluster.close();
}
}

@Test(groups = "short")
@ScyllaVersion(
minOSS = "6.2.0",
minEnterprise = "2024.2.2",
description = "Zero-token nodes introduced in scylladb/scylladb#19684")
public void should_ignore_zero_token_DC() {
Cluster cluster = null;
Session session = null;
CCMBridge ccmBridge = null;
try {
ccmBridge =
CCMBridge.builder().withNodes(2).withIpPrefix("127.0.1.").withBinaryPort(9042).build();
ccmBridge.start();
ccmBridge.add(2, 3);
ccmBridge.updateNodeConfig(3, "join_ring", false);
ccmBridge.start(3);
ccmBridge.add(2, 4);
ccmBridge.updateNodeConfig(4, "join_ring", false);
ccmBridge.start(4);
ccmBridge.waitForUp(3);
ccmBridge.waitForUp(4);

cluster =
Cluster.builder()
.withQueryOptions(new QueryOptions().setSkipZeroTokenNodes(true))
.addContactPointsWithPorts(new InetSocketAddress(ccmBridge.ipOfNode(1), 9042))
.withPort(9042)
.withoutAdvancedShardAwareness()
.build();
session = cluster.connect();

assertThat(cluster.manager.controlConnection.connectedHost().getEndPoint().resolve())
.isEqualTo(ccmBridge.addressOfNode(1));
for (int i = 0; i < 2; i++) {
String line = null;
try {
line = appender.waitAndGet(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
assertThat(line).contains("Found invalid row in system.peers");
assertThat(line).contains("tokens=null");
}
Set<Host> hosts = cluster.getMetadata().getAllHosts();
Set<String> toStrings = hosts.stream().map(Host::toString).collect(Collectors.toSet());
assertThat(toStrings).containsOnly("/127.0.1.1:9042", "/127.0.1.2:9042");
} finally {
if (ccmBridge != null) ccmBridge.close();
if (session != null) session.close();
if (cluster != null) cluster.close();
}
}

@Test(groups = "short", dataProvider = "loadBalancingPolicies")
@ScyllaVersion(
minOSS = "6.2.0",
minEnterprise = "2024.2.2",
description = "Zero-token nodes introduced in scylladb/scylladb#19684")
public void should_connect_to_zero_token_contact_point(LoadBalancingPolicy loadBalancingPolicy) {
Cluster cluster = null;
Session session = null;
CCMBridge ccmBridge = null;

try {
ccmBridge =
CCMBridge.builder().withNodes(2).withIpPrefix("127.0.1.").withBinaryPort(9042).build();
ccmBridge.start();
ccmBridge.add(3);
ccmBridge.updateNodeConfig(3, "join_ring", false);
ccmBridge.start(3);
ccmBridge.waitForUp(3);

cluster =
Cluster.builder()
.withQueryOptions(new QueryOptions().setSkipZeroTokenNodes(true))
.addContactPointsWithPorts(new InetSocketAddress(ccmBridge.ipOfNode(3), 9042))
.withPort(9042)
.withLoadBalancingPolicy(loadBalancingPolicy)
.withoutAdvancedShardAwareness()
.build();
session = cluster.connect();

assertThat(cluster.manager.controlConnection.connectedHost().getEndPoint().resolve())
.isEqualTo(ccmBridge.addressOfNode(3));
String line = null;
try {
line = appender.waitAndGet(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
assertThat(line)
.contains(
"is a zero-token node. It will not be added to metadata hosts and will be excluded from regular query planning.");
assertThat(line).contains(ccmBridge.ipOfNode(3));

session.execute("DROP KEYSPACE IF EXISTS ZeroTokenNodesIT");
session.execute(
"CREATE KEYSPACE ZeroTokenNodesIT WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2};");
session.execute("CREATE TABLE ZeroTokenNodesIT.tab (id INT PRIMARY KEY)");
for (int i = 0; i < 30; i++) {
ResultSet rs = session.execute("INSERT INTO ZeroTokenNodesIT.tab (id) VALUES (" + i + ")");
assertThat(rs.getExecutionInfo().getQueriedHost().getEndPoint().resolve())
.isNotEqualTo(ccmBridge.addressOfNode(3));
assertThat(rs.getExecutionInfo().getQueriedHost().getEndPoint().resolve())
.isIn(ccmBridge.addressOfNode(1), ccmBridge.addressOfNode(2));
}
Set<Host> hosts = cluster.getMetadata().getAllHosts();
Set<String> toStrings = hosts.stream().map(Host::toString).collect(Collectors.toSet());
assertThat(toStrings).containsOnly("/127.0.1.1:9042", "/127.0.1.2:9042");
} finally {
if (ccmBridge != null) ccmBridge.close();
if (session != null) session.close();
if (cluster != null) cluster.close();
}
}

@Test(groups = "short", dataProvider = "loadBalancingPolicies")
@ScyllaVersion(
minOSS = "6.2.0",
minEnterprise = "2024.2.2",
description = "Zero-token nodes introduced in scylladb/scylladb#19684")
public void should_connect_to_zero_token_DC(LoadBalancingPolicy loadBalancingPolicy) {
Cluster cluster = null;
Session session = null;
CCMBridge ccmBridge = null;

try {
ccmBridge =
CCMBridge.builder().withNodes(2).withIpPrefix("127.0.1.").withBinaryPort(9042).build();
ccmBridge.start();
ccmBridge.add(2, 3);
ccmBridge.updateNodeConfig(3, "join_ring", false);
ccmBridge.start(3);
ccmBridge.add(2, 4);
ccmBridge.updateNodeConfig(4, "join_ring", false);
ccmBridge.start(4);
ccmBridge.waitForUp(3);
ccmBridge.waitForUp(4);

cluster =
Cluster.builder()
.withQueryOptions(new QueryOptions().setSkipZeroTokenNodes(true))
.addContactPointsWithPorts(new InetSocketAddress(ccmBridge.ipOfNode(3), 9042))
.withPort(9042)
.withLoadBalancingPolicy(loadBalancingPolicy)
.withoutAdvancedShardAwareness()
.build();
session = cluster.connect();

assertThat(cluster.manager.controlConnection.connectedHost().getEndPoint().resolve())
.isEqualTo(ccmBridge.addressOfNode(3));
String line = null;
try {
line = appender.waitAndGet(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
assertThat(line)
.contains(
"is a zero-token node. It will not be added to metadata hosts and will be excluded from regular query planning.");
assertThat(line).contains(ccmBridge.ipOfNode(3));

session.execute("DROP KEYSPACE IF EXISTS ZeroTokenNodesIT");
session.execute(
"CREATE KEYSPACE ZeroTokenNodesIT WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2};");
session.execute("CREATE TABLE ZeroTokenNodesIT.tab (id INT PRIMARY KEY)");
for (int i = 0; i < 30; i++) {
ResultSet rs = session.execute("INSERT INTO ZeroTokenNodesIT.tab (id) VALUES (" + i + ")");
assertThat(rs.getExecutionInfo().getQueriedHost().getEndPoint().resolve())
.isNotEqualTo(ccmBridge.addressOfNode(3));
assertThat(rs.getExecutionInfo().getQueriedHost().getEndPoint().resolve())
.isIn(ccmBridge.addressOfNode(1), ccmBridge.addressOfNode(2));
}
Set<Host> hosts = cluster.getMetadata().getAllHosts();
Set<String> toStrings = hosts.stream().map(Host::toString).collect(Collectors.toSet());
assertThat(toStrings).containsOnly("/127.0.1.1:9042", "/127.0.1.2:9042");
} finally {
if (ccmBridge != null) ccmBridge.close();
if (session != null) session.close();
if (cluster != null) cluster.close();
}
}
}