Skip to content

Commit

Permalink
Add option to consider initial contact points during reconnection
Browse files Browse the repository at this point in the history
When control connection tries to reconnect usually it considers only nodes
provided by load balancing policy. Usually those do not include what was
initially passed to the driver but the recently seen alive nodes. In some
setups the IPs can keep changing so it may be useful to have an option to
try initial contact points as one of the options during reconnection.
Mainly if the contact point is a hostname.

This change adds the option to the `QueryOptions` to control that behaviour
and adds necessary logic to `ControlConnection` class. It is disabled
by default, meaning that default behaviour remains unchanged.

Additionally adds org.burningwave tools dependency.
This dependency has features that allow for easier host resolution mocking.

Adds MappedHostResolverProvider class for testing as a single entry point
for controlling hostname resolution.

Adds an option to CcmBridge Builder to specify cluster name. Driver checks the
cluster name when reconnecting so it will refuse to reconnect to a different
CcmBridge auto-generated name.
  • Loading branch information
Bouncheck committed Sep 27, 2024
1 parent ec66feb commit 8970735
Show file tree
Hide file tree
Showing 8 changed files with 201 additions and 0 deletions.
7 changes: 7 additions & 0 deletions driver-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,13 @@
<version>1.78.1</version>
</dependency>

<!-- added for easier DNS hostname resolution mocking -->
<dependency>
<groupId>org.burningwave</groupId>
<artifactId>tools</artifactId>
<scope>test</scope>
</dependency>


</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.datastax.driver.core.utils.MoreFutures;
import com.datastax.driver.core.utils.MoreObjects;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterators;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
Expand Down Expand Up @@ -160,6 +161,15 @@ protected Connection tryReconnect() throws ConnectionException {
if (isShutdown) throw new ConnectionException(null, "Control connection was shut down");

try {
if (cluster
.configuration
.getQueryOptions()
.shouldAddOriginalContactsToReconnectionPlan()) {
List<Host> initialContacts = cluster.metadata.getContactPoints();
Collections.shuffle(initialContacts);
return reconnectInternal(
Iterators.concat(queryPlan(), initialContacts.iterator()), false);
}
return reconnectInternal(queryPlan(), false);
} catch (NoHostAvailableException e) {
throw new ConnectionException(null, e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ public class QueryOptions {

private volatile boolean schemaQueriesPaged = true;

private volatile boolean addOriginalContactsToReconnectionPlan = false;

/**
* Creates a new {@link QueryOptions} instance using the {@link #DEFAULT_CONSISTENCY_LEVEL},
* {@link #DEFAULT_SERIAL_CONSISTENCY_LEVEL} and {@link #DEFAULT_FETCH_SIZE}.
Expand Down Expand Up @@ -499,6 +501,26 @@ public int getMaxPendingRefreshNodeRequests() {
return maxPendingRefreshNodeRequests;
}

/**
* Whether the driver should use original contact points when reconnecting to a control node. In
* practice this forces driver to manually add original contact points to the end of the query
* plan. It is possible that it may introduce duplicates (but under differnet Host class
* instances) in the query plan. If this is set to false it does not mean that original contact
* points will be excluded.
*
* <p>One use case of this feature is that if the original contact point is defined by hostname
* and its IP address changes then setting this to {@code true} allows trying reconnecting to the
* new IP if all connection was lost.
*/
public QueryOptions setAddOriginalContactsToReconnectionPlan(boolean enabled) {
this.addOriginalContactsToReconnectionPlan = enabled;
return this;
}

public boolean shouldAddOriginalContactsToReconnectionPlan() {
return this.addOriginalContactsToReconnectionPlan;
}

@Override
public boolean equals(Object that) {
if (that == null || !(that instanceof QueryOptions)) {
Expand Down
12 changes: 12 additions & 0 deletions driver-core/src/test/java/com/datastax/driver/core/CCMBridge.java
Original file line number Diff line number Diff line change
Expand Up @@ -949,6 +949,7 @@ public static class Builder {
private static final Pattern RANDOM_PORT_PATTERN = Pattern.compile(RANDOM_PORT);

private String ipPrefix = TestUtils.IP_PREFIX;
private String providedClusterName = null;
int[] nodes = {1};
private int[] jmxPorts = {};
private boolean start = true;
Expand Down Expand Up @@ -991,6 +992,15 @@ public Builder withSniProxy() {
return this;
}

/**
* Builder takes care of naming and numbering clusters on its own. Use if you really need a
* specific name
*/
public Builder withClusterName(String clusterName) {
this.providedClusterName = clusterName;
return this;
}

/** Enables SSL encryption. */
public Builder withSSL() {
cassandraConfiguration.put("client_encryption_options.enabled", "true");
Expand Down Expand Up @@ -1115,6 +1125,8 @@ public CCMBridge build() {
// be careful NOT to alter internal state (hashCode/equals) during build!
String clusterName = TestUtils.generateIdentifier("ccm_");

if (providedClusterName != null) clusterName = providedClusterName;

VersionNumber dseVersion;
VersionNumber cassandraVersion;
boolean versionConfigured = this.version != null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package com.datastax.driver.core;

import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertTrue;

import java.net.InetSocketAddress;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.Test;

public class DnsEndpointTests {

private static final Logger logger = LoggerFactory.getLogger(DnsEndpointTests.class);

@Test(groups = "long")
public void replace_cluster_test() {
// Configure host resolution
MappedHostResolverProvider.addResolverEntry("control.reconnect.test", "127.1.1.1");

Cluster cluster = null;
Session session = null;
CCMBridge bridgeA = null;
try {
bridgeA =
CCMBridge.builder()
.withNodes(1)
.withIpPrefix("127.1.1.")
.withBinaryPort(9042)
.withClusterName("same_name")
.build();
bridgeA.start();

cluster =
Cluster.builder()
.addContactPointsWithPorts(
InetSocketAddress.createUnresolved("control.reconnect.test", 9042))
.withPort(9042)
.withoutAdvancedShardAwareness()
.withQueryOptions(new QueryOptions().setAddOriginalContactsToReconnectionPlan(true))
.build();
session = cluster.connect();

ResultSet rs = session.execute("select * from system.local");
Row row = rs.one();
String address = row.getInet("broadcast_address").toString();
logger.info("Queried node has broadcast_address: {}}", address);
System.out.flush();
} finally {
assert bridgeA != null;
bridgeA.close();
}

CCMBridge bridgeB = null;
// Overwrite host resolution
MappedHostResolverProvider.removeResolverEntry("control.reconnect.test");
MappedHostResolverProvider.addResolverEntry("control.reconnect.test", "127.2.2.1");
try {
bridgeB =
CCMBridge.builder()
.withNodes(1)
.withIpPrefix("127.2.2.")
.withBinaryPort(9042)
.withClusterName("same_name")
.build();
bridgeB.start();
Thread.sleep(1000 * 92);
ResultSet rs = session.execute("select * from system.local");
Row row = rs.one();
String address = row.getInet("broadcast_address").toString();
logger.info("Queried node has broadcast_address: {}}", address);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
assert bridgeB != null;
bridgeB.close();
}
}

@Test(groups = "long")
public void should_connect_with_mocked_hostname() {
MappedHostResolverProvider.addResolverEntry("mocked.hostname.test", "127.0.1.1");
try (CCMBridge ccmBridge =
CCMBridge.builder().withNodes(1).withIpPrefix("127.0.1.").withBinaryPort(9042).build();
Cluster cluster =
Cluster.builder()
.addContactPointsWithPorts(
InetSocketAddress.createUnresolved("mocked.hostname.test", 9042))
.withPort(9042)
.withoutAdvancedShardAwareness()
.build()) {
ccmBridge.start();
Session session = cluster.connect();
ResultSet rs = session.execute("SELECT * FROM system.local");
List<Row> rows = rs.all();
assertThat(rows).hasSize(1);
Row row = rows.get(0);
assertThat(row.getInet("broadcast_address").toString()).contains("127.0.1.1");
assertTrue(
session.getCluster().getMetadata().getAllHosts().stream()
.map(Host::toString)
.anyMatch(hostString -> hostString.contains("mocked.hostname.test")));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.datastax.driver.core;

import org.burningwave.tools.net.DefaultHostResolver;
import org.burningwave.tools.net.HostResolutionRequestInterceptor;
import org.burningwave.tools.net.MappedHostResolver;

public class MappedHostResolverProvider {
private static volatile MappedHostResolver resolver = null;

private MappedHostResolverProvider() {}

public static synchronized boolean setResolver(MappedHostResolver newResolver) {
if (resolver != null) {
return false;
}
resolver = newResolver;
HostResolutionRequestInterceptor.INSTANCE.install(resolver, DefaultHostResolver.INSTANCE);
return true;
}

public static synchronized void addResolverEntry(String hostname, String address) {
if (resolver == null) {
setResolver(new MappedHostResolver());
}
resolver.putHost(hostname, address);
}

public static synchronized void removeResolverEntry(String hostname) {
if (resolver == null) {
return;
}
resolver.removeHost(hostname);
}
}
4 changes: 4 additions & 0 deletions driver-core/src/test/resources/burningwave.static.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
managed-logger.repository=autodetect
managed-logger.repository.enabled=false
banner.hide=true
priority-of-this-configuration=1000
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
<scassandra.version>1.1.2</scassandra.version>
<logback.version>1.2.13</logback.version>
<byteman.version>3.0.8</byteman.version>
<burningwave.tools.version>0.26.2</burningwave.tools.version>
<ipprefix>127.0.1.</ipprefix>
<!-- defaults below are overridden by profiles and/or submodules -->
<test.groups>unit</test.groups>
Expand Down Expand Up @@ -398,6 +399,12 @@
<version>${groovy.version}</version>
</dependency>

<dependency>
<groupId>org.burningwave</groupId>
<artifactId>tools</artifactId>
<version>${burningwave.tools.version}</version>
</dependency>

</dependencies>

</dependencyManagement>
Expand Down

0 comments on commit 8970735

Please sign in to comment.