From abe28110600f5dad0cf14d877414ee0bd46c908a Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Wojciech=20B=C4=85czkowski?=
<36934780+Bouncheck@users.noreply.github.com>
Date: Tue, 8 Oct 2024 16:56:39 +0200
Subject: [PATCH] 4.x: Add optional fallback for `ControlConnection#reconnect()
` (#341)
* Add `MockResolverIT#cannot_reconnect_with_resolved_socket()`
Adds a method for testing the issues that surface after cluster
replacements. Due to the variable, sometimes long runtime it is not added
to any of the test groups.
* Add optional fallback for `ControlConnection#reconnect()`
Adds an experimental option to allow `ControlConnection` to try
reconnecting to the original contact points held by `MetadataManager`,
in case of getting empty query plan from the load balancing policy.
In order to separate this logic from query plans of other queries
`LoadBalancingPolicyWrapper#newControlReconnectionQueryPlan()` was introduced
and is called during reconnection in place of `newQueryPlan()`.
---
.../api/core/config/DefaultDriverOption.java | 9 +
.../driver/api/core/config/OptionsMap.java | 1 +
.../api/core/config/TypedDriverOption.java | 4 +
.../core/control/ControlConnection.java | 5 +-
.../metadata/LoadBalancingPolicyWrapper.java | 22 ++-
core/src/main/resources/reference.conf | 11 ++
.../control/ControlConnectionTestBase.java | 16 +-
.../LoadBalancingPolicyWrapperTest.java | 33 +++-
.../driver/core/resolver/MockResolverIT.java | 176 ++++++++++++++++++
9 files changed, 270 insertions(+), 7 deletions(-)
diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java b/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java
index 55e8d53dc66..241185d121b 100644
--- a/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java
+++ b/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java
@@ -668,6 +668,15 @@ public enum DefaultDriverOption implements DriverOption {
*/
CONTROL_CONNECTION_AGREEMENT_WARN("advanced.control-connection.schema-agreement.warn-on-failure"),
+ /**
+ * Whether to forcibly add original contact points held by MetadataManager to the reconnection
+ * plan, in case there is no live nodes available according to LBP. Experimental.
+ *
+ *
Value-type: boolean
+ */
+ CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS(
+ "advanced.control-connection.reconnection.fallback-to-original-contact-points"),
+
/**
* Whether `Session.prepare` calls should be sent to all nodes in the cluster.
*
diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/config/OptionsMap.java b/core/src/main/java/com/datastax/oss/driver/api/core/config/OptionsMap.java
index 8906e1dd349..53e5f4caa6f 100644
--- a/core/src/main/java/com/datastax/oss/driver/api/core/config/OptionsMap.java
+++ b/core/src/main/java/com/datastax/oss/driver/api/core/config/OptionsMap.java
@@ -360,6 +360,7 @@ protected static void fillWithDriverDefaults(OptionsMap map) {
map.put(TypedDriverOption.CONTROL_CONNECTION_AGREEMENT_INTERVAL, Duration.ofMillis(200));
map.put(TypedDriverOption.CONTROL_CONNECTION_AGREEMENT_TIMEOUT, Duration.ofSeconds(10));
map.put(TypedDriverOption.CONTROL_CONNECTION_AGREEMENT_WARN, true);
+ map.put(TypedDriverOption.CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS, false);
map.put(TypedDriverOption.PREPARE_ON_ALL_NODES, true);
map.put(TypedDriverOption.REPREPARE_ENABLED, true);
map.put(TypedDriverOption.REPREPARE_CHECK_SYSTEM_TABLE, false);
diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java b/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java
index 9be69d0424f..64f4bd5a224 100644
--- a/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java
+++ b/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java
@@ -566,6 +566,10 @@ public String toString() {
public static final TypedDriverOption CONTROL_CONNECTION_AGREEMENT_WARN =
new TypedDriverOption<>(
DefaultDriverOption.CONTROL_CONNECTION_AGREEMENT_WARN, GenericType.BOOLEAN);
+ /** Whether to forcibly try original contacts if no live nodes are available */
+ public static final TypedDriverOption CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS =
+ new TypedDriverOption<>(
+ DefaultDriverOption.CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS, GenericType.BOOLEAN);
/** Whether `Session.prepare` calls should be sent to all nodes in the cluster. */
public static final TypedDriverOption PREPARE_ON_ALL_NODES =
new TypedDriverOption<>(DefaultDriverOption.PREPARE_ON_ALL_NODES, GenericType.BOOLEAN);
diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java b/core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java
index 7e9592c64d3..6cf16f9c4de 100644
--- a/core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java
+++ b/core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java
@@ -300,7 +300,8 @@ private void init(
.withOwnerLogPrefix(logPrefix + "|control")
.build();
- Queue nodes = context.getLoadBalancingPolicyWrapper().newQueryPlan();
+ Queue nodes =
+ context.getLoadBalancingPolicyWrapper().newControlReconnectionQueryPlan();
connect(
nodes,
@@ -336,7 +337,7 @@ private void init(
private CompletionStage reconnect() {
assert adminExecutor.inEventLoop();
- Queue nodes = context.getLoadBalancingPolicyWrapper().newQueryPlan();
+ Queue nodes = context.getLoadBalancingPolicyWrapper().newControlReconnectionQueryPlan();
CompletableFuture result = new CompletableFuture<>();
connect(
nodes,
diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/LoadBalancingPolicyWrapper.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/LoadBalancingPolicyWrapper.java
index 20d045d4e72..7922b7b5780 100644
--- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/LoadBalancingPolicyWrapper.java
+++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/LoadBalancingPolicyWrapper.java
@@ -17,6 +17,7 @@
*/
package com.datastax.oss.driver.internal.core.metadata;
+import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.loadbalancing.LoadBalancingPolicy;
import com.datastax.oss.driver.api.core.loadbalancing.NodeDistance;
@@ -161,8 +162,25 @@ public Queue newQueryPlan(
}
@NonNull
- public Queue newQueryPlan() {
- return newQueryPlan(null, DriverExecutionProfile.DEFAULT_NAME, null);
+ public Queue newControlReconnectionQueryPlan() {
+ // First try the original way
+ Queue regularQueryPlan = newQueryPlan(null, DriverExecutionProfile.DEFAULT_NAME, null);
+ if (!regularQueryPlan.isEmpty()) return regularQueryPlan;
+
+ if (context
+ .getConfig()
+ .getDefaultProfile()
+ .getBoolean(DefaultDriverOption.CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS)) {
+ Set originalNodes = context.getMetadataManager().getContactPoints();
+ List nodes = new ArrayList<>();
+ for (DefaultNode node : originalNodes) {
+ nodes.add(new DefaultNode(node.getEndPoint(), context));
+ }
+ Collections.shuffle(nodes);
+ return new ConcurrentLinkedQueue<>(nodes);
+ } else {
+ return regularQueryPlan;
+ }
}
// when it comes in from the outside
diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf
index 75bed97e498..601543790a0 100644
--- a/core/src/main/resources/reference.conf
+++ b/core/src/main/resources/reference.conf
@@ -2113,6 +2113,17 @@ datastax-java-driver {
# Overridable in a profile: no
warn-on-failure = true
}
+
+ reconnection {
+ # Whether to forcibly add original contact points held by MetadataManager to the reconnection plan,
+ # in case there is no live nodes available according to LBP.
+ # Experimental.
+ #
+ # Required: yes
+ # Modifiable at runtime: yes, the new value will be used for checks issued after the change.
+ # Overridable in a profile: no
+ fallback-to-original-contact-points = false
+ }
}
advanced.prepared-statements {
diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionTestBase.java b/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionTestBase.java
index c52199465a8..64c9f06b42e 100644
--- a/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionTestBase.java
+++ b/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionTestBase.java
@@ -132,11 +132,25 @@ public void setup() {
when(defaultProfile.getBoolean(DefaultDriverOption.CONNECTION_WARN_INIT_ERROR))
.thenReturn(false);
+ when(context.getConfig()).thenReturn(config);
+ when(config.getDefaultProfile()).thenReturn(defaultProfile);
+ when(defaultProfile.getBoolean(DefaultDriverOption.CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS))
+ .thenReturn(false);
+
controlConnection = new ControlConnection(context);
}
protected void mockQueryPlan(Node... nodes) {
- when(loadBalancingPolicyWrapper.newQueryPlan())
+ when(loadBalancingPolicyWrapper.newControlReconnectionQueryPlan())
+ .thenAnswer(
+ i -> {
+ ConcurrentLinkedQueue queryPlan = new ConcurrentLinkedQueue<>();
+ for (Node node : nodes) {
+ queryPlan.offer(node);
+ }
+ return queryPlan;
+ });
+ when(loadBalancingPolicyWrapper.newControlReconnectionQueryPlan())
.thenAnswer(
i -> {
ConcurrentLinkedQueue queryPlan = new ConcurrentLinkedQueue<>();
diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/LoadBalancingPolicyWrapperTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/LoadBalancingPolicyWrapperTest.java
index 1a0292e3947..8ad325a1f31 100644
--- a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/LoadBalancingPolicyWrapperTest.java
+++ b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/LoadBalancingPolicyWrapperTest.java
@@ -118,10 +118,22 @@ public void setup() {
policy3));
}
+ @Test
+ public void should_build_control_connection_query_plan_from_contact_points_before_init() {
+ // When
+ Queue queryPlan = wrapper.newControlReconnectionQueryPlan();
+
+ // Then
+ for (LoadBalancingPolicy policy : ImmutableList.of(policy1, policy2, policy3)) {
+ verify(policy, never()).newQueryPlan(null, null);
+ }
+ assertThat(queryPlan).hasSameElementsAs(contactPoints);
+ }
+
@Test
public void should_build_query_plan_from_contact_points_before_init() {
// When
- Queue queryPlan = wrapper.newQueryPlan();
+ Queue queryPlan = wrapper.newQueryPlan(null, DriverExecutionProfile.DEFAULT_NAME, null);
// Then
for (LoadBalancingPolicy policy : ImmutableList.of(policy1, policy2, policy3)) {
@@ -139,7 +151,24 @@ public void should_fetch_query_plan_from_policy_after_init() {
}
// When
- Queue queryPlan = wrapper.newQueryPlan();
+ Queue queryPlan = wrapper.newControlReconnectionQueryPlan();
+
+ // Then
+ // no-arg newQueryPlan() uses the default profile
+ verify(policy1).newQueryPlan(null, null);
+ assertThat(queryPlan).isEqualTo(defaultPolicyQueryPlan);
+ }
+
+ @Test
+ public void should_fetch_control_connection_query_plan_from_policy_after_init() {
+ // Given
+ wrapper.init();
+ for (LoadBalancingPolicy policy : ImmutableList.of(policy1, policy2, policy3)) {
+ verify(policy).init(anyMap(), any(DistanceReporter.class));
+ }
+
+ // When
+ Queue queryPlan = wrapper.newQueryPlan(null, DriverExecutionProfile.DEFAULT_NAME, null);
// Then
// no-arg newQueryPlan() uses the default profile
diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/resolver/MockResolverIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/resolver/MockResolverIT.java
index 6dc2d2ca77f..93ecbf1815c 100644
--- a/integration-tests/src/test/java/com/datastax/oss/driver/core/resolver/MockResolverIT.java
+++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/resolver/MockResolverIT.java
@@ -24,6 +24,7 @@
package com.datastax.oss.driver.core.resolver;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -33,11 +34,14 @@
import com.datastax.oss.driver.api.core.config.TypedDriverOption;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
+import com.datastax.oss.driver.api.core.cql.SimpleStatement;
+import com.datastax.oss.driver.api.core.cql.SimpleStatementBuilder;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.testinfra.ccm.CcmBridge;
import com.datastax.oss.driver.categories.IsolatedTests;
import com.datastax.oss.driver.internal.core.config.typesafe.DefaultProgrammaticDriverConfigLoaderBuilder;
import java.net.InetSocketAddress;
+import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
@@ -237,4 +241,176 @@ public void run_replace_test_20_times() {
replace_cluster_test();
}
}
+
+ // This is too long to run during CI, but is useful for manual investigations.
+ @SuppressWarnings("unused")
+ public void cannot_reconnect_with_resolved_socket() {
+ DriverConfigLoader loader =
+ new DefaultProgrammaticDriverConfigLoaderBuilder()
+ .withBoolean(TypedDriverOption.RESOLVE_CONTACT_POINTS.getRawOption(), false)
+ .withBoolean(TypedDriverOption.RECONNECT_ON_INIT.getRawOption(), true)
+ .withStringList(
+ TypedDriverOption.CONTACT_POINTS.getRawOption(),
+ Collections.singletonList("test.cluster.fake:9042"))
+ .build();
+
+ CqlSessionBuilder builder = new CqlSessionBuilder().withConfigLoader(loader);
+ CqlSession session;
+ Collection nodes;
+ Set filteredNodes;
+ try (CcmBridge ccmBridge = CcmBridge.builder().withNodes(3).withIpPrefix("127.0.1.").build()) {
+ MultimapHostResolverProvider.removeResolverEntries("test.cluster.fake");
+ MultimapHostResolverProvider.addResolverEntry(
+ "test.cluster.fake", ccmBridge.getNodeIpAddress(1));
+ MultimapHostResolverProvider.addResolverEntry(
+ "test.cluster.fake", ccmBridge.getNodeIpAddress(2));
+ MultimapHostResolverProvider.addResolverEntry(
+ "test.cluster.fake", ccmBridge.getNodeIpAddress(3));
+ ccmBridge.create();
+ ccmBridge.start();
+ session = builder.build();
+ long endTime = System.currentTimeMillis() + CLUSTER_WAIT_SECONDS * 1000;
+ while (System.currentTimeMillis() < endTime) {
+ try {
+ nodes = session.getMetadata().getNodes().values();
+ int upNodes = 0;
+ for (Node node : nodes) {
+ if (node.getUpSinceMillis() > 0) {
+ upNodes++;
+ }
+ }
+ if (upNodes == 3) {
+ break;
+ }
+ // session.refreshSchema();
+ SimpleStatement statement =
+ new SimpleStatementBuilder("SELECT * FROM system.local")
+ .setTimeout(Duration.ofSeconds(3))
+ .build();
+ session.executeAsync(statement);
+ Thread.sleep(3000);
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ ResultSet rs = session.execute("SELECT * FROM system.local");
+ assertThat(rs).isNotNull();
+ Row row = rs.one();
+ assertThat(row).isNotNull();
+ nodes = session.getMetadata().getNodes().values();
+ assertThat(nodes).hasSize(3);
+ Iterator iterator = nodes.iterator();
+ while (iterator.hasNext()) {
+ LOG.trace("Metadata node: " + iterator.next().toString());
+ }
+ filteredNodes =
+ nodes.stream()
+ .filter(x -> x.toString().contains("test.cluster.fake"))
+ .collect(Collectors.toSet());
+ assertThat(filteredNodes).hasSize(1);
+ }
+ int counter = 0;
+ while (filteredNodes.size() == 1) {
+ counter++;
+ if (counter == 255) {
+ LOG.error("Completed 254 runs. Breaking.");
+ break;
+ }
+ LOG.warn(
+ "Launching another cluster until we lose resolved socket from metadata (run {}).",
+ counter);
+ try (CcmBridge ccmBridge =
+ CcmBridge.builder().withNodes(3).withIpPrefix("127.0." + counter + ".").build()) {
+ MultimapHostResolverProvider.removeResolverEntries("test.cluster.fake");
+ MultimapHostResolverProvider.addResolverEntry(
+ "test.cluster.fake", ccmBridge.getNodeIpAddress(1));
+ MultimapHostResolverProvider.addResolverEntry(
+ "test.cluster.fake", ccmBridge.getNodeIpAddress(2));
+ MultimapHostResolverProvider.addResolverEntry(
+ "test.cluster.fake", ccmBridge.getNodeIpAddress(3));
+ ccmBridge.create();
+ ccmBridge.start();
+ long endTime = System.currentTimeMillis() + CLUSTER_WAIT_SECONDS * 1000;
+ while (System.currentTimeMillis() < endTime) {
+ try {
+ nodes = session.getMetadata().getNodes().values();
+ int upNodes = 0;
+ for (Node node : nodes) {
+ if (node.getUpSinceMillis() > 0) {
+ upNodes++;
+ }
+ }
+ if (upNodes == 3) {
+ break;
+ }
+ SimpleStatement statement =
+ new SimpleStatementBuilder("SELECT * FROM system.local")
+ .setTimeout(Duration.ofSeconds(3))
+ .build();
+ session.executeAsync(statement);
+ Thread.sleep(3000);
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ nodes = session.getMetadata().getNodes().values();
+ assertThat(nodes).hasSize(3);
+ Iterator iterator = nodes.iterator();
+ while (iterator.hasNext()) {
+ LOG.trace("Metadata node: " + iterator.next().toString());
+ }
+ filteredNodes =
+ nodes.stream()
+ .filter(x -> x.toString().contains("test.cluster.fake"))
+ .collect(Collectors.toSet());
+ if (filteredNodes.size() > 1) {
+ fail(
+ "Somehow there is more than 1 node in metadata with unresolved hostname. This should not ever happen.");
+ }
+ }
+ }
+ Iterator iterator = nodes.iterator();
+ while (iterator.hasNext()) {
+ InetSocketAddress address = (InetSocketAddress) iterator.next().getEndPoint().resolve();
+ assertFalse(address.isUnresolved());
+ }
+ try (CcmBridge ccmBridge = CcmBridge.builder().withNodes(3).withIpPrefix("127.1.1.").build()) {
+ MultimapHostResolverProvider.removeResolverEntries("test.cluster.fake");
+ MultimapHostResolverProvider.addResolverEntry(
+ "test.cluster.fake", ccmBridge.getNodeIpAddress(1));
+ MultimapHostResolverProvider.addResolverEntry(
+ "test.cluster.fake", ccmBridge.getNodeIpAddress(2));
+ MultimapHostResolverProvider.addResolverEntry(
+ "test.cluster.fake", ccmBridge.getNodeIpAddress(3));
+ // Now the driver should fail to reconnect since unresolved hostname is gone.
+ ccmBridge.create();
+ ccmBridge.start();
+ long endTime = System.currentTimeMillis() + CLUSTER_WAIT_SECONDS * 1000;
+ while (System.currentTimeMillis() < endTime) {
+ try {
+ nodes = session.getMetadata().getNodes().values();
+ int upNodes = 0;
+ for (Node node : nodes) {
+ if (node.getUpSinceMillis() > 0) {
+ upNodes++;
+ }
+ }
+ if (upNodes == 3) {
+ break;
+ }
+ // session.refreshSchema();
+ SimpleStatement statement =
+ new SimpleStatementBuilder("SELECT * FROM system.local")
+ .setTimeout(Duration.ofSeconds(3))
+ .build();
+ session.executeAsync(statement);
+ Thread.sleep(3000);
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ session.execute("SELECT * FROM system.local");
+ }
+ session.close();
+ }
}