diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/auth/PlainTextAuthProviderBase.java b/core/src/main/java/com/datastax/oss/driver/api/core/auth/PlainTextAuthProviderBase.java index fb85797af9e..36f2cd7e7e5 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/auth/PlainTextAuthProviderBase.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/auth/PlainTextAuthProviderBase.java @@ -21,14 +21,17 @@ import com.datastax.oss.driver.api.core.metadata.EndPoint; import com.datastax.oss.driver.api.core.session.Session; import com.datastax.oss.driver.shaded.guava.common.base.Charsets; +import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList; import edu.umd.cs.findbugs.annotations.NonNull; import edu.umd.cs.findbugs.annotations.Nullable; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.charset.StandardCharsets; import java.util.Arrays; +import java.util.List; import java.util.Objects; import net.jcip.annotations.ThreadSafe; import org.slf4j.Logger; @@ -171,6 +174,12 @@ public SocketAddress resolve() { return new InetSocketAddress("127.0.0.1", 9042); } + @NonNull + @Override + public List resolveAll() throws UnknownHostException { + return ImmutableList.of(this); + } + @NonNull @Override public String asMetricPrefix() { diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/metadata/EndPoint.java b/core/src/main/java/com/datastax/oss/driver/api/core/metadata/EndPoint.java index 530f2ad38ac..c629bdbcbbf 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/metadata/EndPoint.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/metadata/EndPoint.java @@ -20,6 +20,8 @@ import edu.umd.cs.findbugs.annotations.NonNull; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.net.UnknownHostException; +import java.util.List; /** * Encapsulates the information needed to open connections to a node. @@ -40,6 +42,14 @@ public interface EndPoint { @NonNull SocketAddress resolve(); + /** + * Resolves this instance to a socket address. + * + *

This will be called each time the driver opens a new connection to the node. The returned + * address cannot be null. + */ + @NonNull + List resolveAll() throws UnknownHostException; /** * Returns an alternate string representation for use in node-level metric names. * diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/ContactPoints.java b/core/src/main/java/com/datastax/oss/driver/internal/core/ContactPoints.java index 1ed2a1cebf3..821b3ca2f79 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/ContactPoints.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/ContactPoints.java @@ -19,6 +19,7 @@ import com.datastax.oss.driver.api.core.metadata.EndPoint; import com.datastax.oss.driver.internal.core.metadata.DefaultEndPoint; +import com.datastax.oss.driver.internal.core.metadata.UnresolvedEndPoint; import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet; import com.datastax.oss.driver.shaded.guava.common.collect.Sets; import java.net.InetAddress; @@ -41,18 +42,17 @@ public static Set merge( Set result = Sets.newHashSet(programmaticContactPoints); for (String spec : configContactPoints) { - for (InetSocketAddress address : extract(spec, resolve)) { - DefaultEndPoint endPoint = new DefaultEndPoint(address); + for (EndPoint endPoint : extract(spec, resolve)) { boolean wasNew = result.add(endPoint); if (!wasNew) { - LOG.warn("Duplicate contact point {}", address); + LOG.warn("Duplicate contact point {}", endPoint); } } } return ImmutableSet.copyOf(result); } - private static Set extract(String spec, boolean resolve) { + private static Set extract(String spec, boolean resolve) { int separator = spec.lastIndexOf(':'); if (separator < 0) { LOG.warn("Ignoring invalid contact point {} (expecting host:port)", spec); @@ -69,7 +69,7 @@ private static Set extract(String spec, boolean resolve) { return Collections.emptySet(); } if (!resolve) { - return ImmutableSet.of(InetSocketAddress.createUnresolved(host, port)); + return ImmutableSet.of(new UnresolvedEndPoint(host, port)); } else { try { InetAddress[] inetAddresses = InetAddress.getAllByName(host); @@ -79,9 +79,9 @@ private static Set extract(String spec, boolean resolve) { spec, Arrays.deepToString(inetAddresses)); } - Set result = new HashSet<>(); + Set result = new HashSet<>(); for (InetAddress inetAddress : inetAddresses) { - result.add(new InetSocketAddress(inetAddress, port)); + result.add(new DefaultEndPoint(new InetSocketAddress(inetAddress, port))); } return result; } catch (UnknownHostException e) { diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultEndPoint.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultEndPoint.java index 7ffbee8e4bb..905c8c9f16b 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultEndPoint.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultEndPoint.java @@ -18,9 +18,11 @@ package com.datastax.oss.driver.internal.core.metadata; import com.datastax.oss.driver.api.core.metadata.EndPoint; +import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList; import edu.umd.cs.findbugs.annotations.NonNull; import java.io.Serializable; import java.net.InetSocketAddress; +import java.util.List; import java.util.Objects; public class DefaultEndPoint implements EndPoint, Serializable { @@ -41,6 +43,12 @@ public InetSocketAddress resolve() { return address; } + @NonNull + @Override + public List resolveAll() { + return ImmutableList.of(this); + } + @Override public boolean equals(Object other) { if (other == this) { diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/InitialNodeListRefresh.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/InitialNodeListRefresh.java index c21d5d8171e..2eed0a3ceda 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/InitialNodeListRefresh.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/InitialNodeListRefresh.java @@ -72,21 +72,21 @@ public Result compute( + "keeping only the first one", logPrefix, hostId); + continue; + } + EndPoint endPoint = nodeInfo.getEndPoint(); + DefaultNode node = findIn(contactPoints, endPoint); + if (node == null) { + node = new DefaultNode(endPoint, context); + LOG.debug("[{}] Adding new node {}", logPrefix, node); } else { - EndPoint endPoint = nodeInfo.getEndPoint(); - DefaultNode node = findIn(contactPoints, endPoint); - if (node == null) { - node = new DefaultNode(endPoint, context); - LOG.debug("[{}] Adding new node {}", logPrefix, node); - } else { - LOG.debug("[{}] Copying contact point {}", logPrefix, node); - } - if (tokenMapEnabled && tokenFactory == null && nodeInfo.getPartitioner() != null) { - tokenFactory = tokenFactoryRegistry.tokenFactoryFor(nodeInfo.getPartitioner()); - } - copyInfos(nodeInfo, node, context); - newNodes.put(hostId, node); + LOG.debug("[{}] Copying contact point {}", logPrefix, node); + } + if (tokenMapEnabled && tokenFactory == null && nodeInfo.getPartitioner() != null) { + tokenFactory = tokenFactoryRegistry.tokenFactoryFor(nodeInfo.getPartitioner()); } + copyInfos(nodeInfo, node, context); + newNodes.put(hostId, node); } ImmutableList.Builder eventsBuilder = ImmutableList.builder(); diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java index 7aa2fb13bcd..aa39c373509 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java @@ -48,6 +48,7 @@ import edu.umd.cs.findbugs.annotations.NonNull; import io.netty.util.concurrent.EventExecutor; import java.net.InetSocketAddress; +import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.Collections; import java.util.List; @@ -55,6 +56,8 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.CopyOnWriteArraySet; +import jnr.ffi.annotations.Synchronized; import net.jcip.annotations.ThreadSafe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,7 +83,8 @@ public class MetadataManager implements AsyncAutoCloseable { private volatile KeyspaceFilter keyspaceFilter; private volatile Boolean schemaEnabledProgrammatically; private volatile boolean tokenMapEnabled; - private volatile Set contactPoints; + private volatile Set contactPoints; + private volatile Set resolvedContactPoints; private volatile boolean wasImplicitContactPoint; private volatile TypeCodec tabletPayloadCodec = null; @@ -102,7 +106,7 @@ protected MetadataManager(InternalDriverContext context, DefaultMetadata initial DefaultDriverOption.METADATA_SCHEMA_REFRESHED_KEYSPACES, Collections.emptyList()); this.keyspaceFilter = KeyspaceFilter.newInstance(logPrefix, refreshedKeyspaces); this.tokenMapEnabled = config.getBoolean(DefaultDriverOption.METADATA_TOKEN_MAP_ENABLED); - + this.resolvedContactPoints = new CopyOnWriteArraySet<>(); context.getEventBus().register(ConfigChangeEvent.class, this::onConfigChanged); } @@ -145,18 +149,19 @@ public void addContactPoints(Set providedContactPoints) { // Convert the EndPoints to Nodes, but we can't put them into the Metadata yet, because we // don't know their host_id. So store them in a volatile field instead, they will get copied // during the first node refresh. - ImmutableSet.Builder contactPointsBuilder = ImmutableSet.builder(); + ImmutableSet.Builder contactPointsBuilder = ImmutableSet.builder(); if (providedContactPoints == null || providedContactPoints.isEmpty()) { LOG.info( "[{}] No contact points provided, defaulting to {}", logPrefix, DEFAULT_CONTACT_POINT); this.wasImplicitContactPoint = true; - contactPointsBuilder.add(new DefaultNode(DEFAULT_CONTACT_POINT, context)); + contactPointsBuilder.add(DEFAULT_CONTACT_POINT); } else { for (EndPoint endPoint : providedContactPoints) { - contactPointsBuilder.add(new DefaultNode(endPoint, context)); + contactPointsBuilder.add(endPoint); } } this.contactPoints = contactPointsBuilder.build(); + this.resolveContactPoints(); LOG.debug("[{}] Adding initial contact points {}", logPrefix, contactPoints); } @@ -167,7 +172,30 @@ public void addContactPoints(Set providedContactPoints) { * @see #wasImplicitContactPoint() */ public Set getContactPoints() { - return contactPoints; + return resolvedContactPoints; + } + + @Synchronized + public void resolveContactPoints() { + ImmutableSet.Builder resultBuilder = ImmutableSet.builder(); + for (EndPoint endPoint : contactPoints) { + try { + resultBuilder.addAll(endPoint.resolveAll()); + } catch (UnknownHostException e) { + LOG.error("failed to resolve contact endpoint {}", endPoint, e); + } + } + + Set result = resultBuilder.build(); + for (EndPoint endPoint : result) { + if (resolvedContactPoints.stream() + .anyMatch(resolved -> resolved.getEndPoint().equals(endPoint))) { + continue; + } + this.resolvedContactPoints.add(new DefaultNode(endPoint, context)); + } + + this.resolvedContactPoints.removeIf(endPoint -> !result.contains(endPoint.getEndPoint())); } /** Whether the default contact point was used (because none were provided explicitly). */ @@ -337,10 +365,13 @@ private SingleThreaded(InternalDriverContext context, DriverExecutionProfile con } private Void refreshNodes(Iterable nodeInfos) { + if (!didFirstNodeListRefresh) { + resolveContactPoints(); + } MetadataRefresh refresh = didFirstNodeListRefresh ? new FullNodeListRefresh(nodeInfos) - : new InitialNodeListRefresh(nodeInfos, contactPoints); + : new InitialNodeListRefresh(nodeInfos, resolvedContactPoints); didFirstNodeListRefresh = true; return apply(refresh); } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/SniEndPoint.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/SniEndPoint.java index ace4e82617d..f5085df5ab8 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/SniEndPoint.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/SniEndPoint.java @@ -18,6 +18,7 @@ package com.datastax.oss.driver.internal.core.metadata; import com.datastax.oss.driver.api.core.metadata.EndPoint; +import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList; import com.datastax.oss.driver.shaded.guava.common.primitives.UnsignedBytes; import edu.umd.cs.findbugs.annotations.NonNull; import java.net.InetAddress; @@ -25,6 +26,7 @@ import java.net.UnknownHostException; import java.util.Arrays; import java.util.Comparator; +import java.util.List; import java.util.Objects; import java.util.concurrent.atomic.AtomicLong; @@ -72,6 +74,12 @@ public InetSocketAddress resolve() { } } + @NonNull + @Override + public List resolveAll() { + return ImmutableList.of(this); + } + @Override public boolean equals(Object other) { if (other == this) { diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/UnresolvedEndPoint.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/UnresolvedEndPoint.java new file mode 100644 index 00000000000..23c4dc235e3 --- /dev/null +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/UnresolvedEndPoint.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.driver.internal.core.metadata; + +import com.datastax.oss.driver.api.core.metadata.EndPoint; +import edu.umd.cs.findbugs.annotations.NonNull; +import java.io.Serializable; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class UnresolvedEndPoint implements EndPoint, Serializable { + private final String metricPrefix; + String host; + int port; + + public UnresolvedEndPoint(String host, int port) { + this.host = host; + this.port = port; + this.metricPrefix = buildMetricPrefix(host, port); + } + + @NonNull + @Override + public SocketAddress resolve() { + throw new RuntimeException( + String.format( + "This endpoint %s should never been resolved, but it happened, it somehow leaked to downstream code.", + this)); + } + + @NonNull + @Override + public List resolveAll() throws UnknownHostException { + InetAddress[] inetAddresses = InetAddress.getAllByName(host); + Set result = new HashSet<>(); + for (InetAddress inetAddress : inetAddresses) { + result.add(new DefaultEndPoint(new InetSocketAddress(inetAddress, port))); + } + return new ArrayList<>(result); + } + + @Override + public boolean equals(Object other) { + if (other == this) { + return true; + } + if (other instanceof UnresolvedEndPoint) { + UnresolvedEndPoint that = (UnresolvedEndPoint) other; + return this.host.equals(that.host) && this.port == that.port; + } + return false; + } + + @Override + public int hashCode() { + return host.toLowerCase().hashCode() + port; + } + + @Override + public String toString() { + return host + ":" + port; + } + + @NonNull + @Override + public String asMetricPrefix() { + return metricPrefix; + } + + private static String buildMetricPrefix(String host, int port) { + // Append the port since Cassandra 4 supports nodes with different ports + return host.replace('.', '_') + ':' + port; + } +} diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/ContactPointsTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/ContactPointsTest.java index 9e0d8737619..0e9c49436e5 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/ContactPointsTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/ContactPointsTest.java @@ -29,6 +29,7 @@ import ch.qos.logback.core.Appender; import com.datastax.oss.driver.api.core.metadata.EndPoint; import com.datastax.oss.driver.internal.core.metadata.DefaultEndPoint; +import com.datastax.oss.driver.internal.core.metadata.UnresolvedEndPoint; import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList; import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet; import java.net.InetAddress; @@ -96,7 +97,7 @@ public void should_parse_host_and_port_in_configuration_and_create_unresolved() assertThat(endPoints) .containsExactly( - new DefaultEndPoint(InetSocketAddress.createUnresolved("localhost", 9042))); + new UnresolvedEndPoint("localhost", 9042)); } @Test diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/channel/EmbeddedEndPoint.java b/core/src/test/java/com/datastax/oss/driver/internal/core/channel/EmbeddedEndPoint.java index 5e463299a66..0d9895374d3 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/channel/EmbeddedEndPoint.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/channel/EmbeddedEndPoint.java @@ -20,6 +20,7 @@ import com.datastax.oss.driver.api.core.metadata.EndPoint; import edu.umd.cs.findbugs.annotations.NonNull; import java.net.SocketAddress; +import java.util.List; /** Endpoint implementation for unit tests that use an embedded Netty channel. */ public class EmbeddedEndPoint implements EndPoint { @@ -30,6 +31,12 @@ public SocketAddress resolve() { throw new UnsupportedOperationException("This should not get called from unit tests"); } + @NonNull + @Override + public List resolveAll() { + throw new UnsupportedOperationException("This should not get called from unit tests"); + } + @NonNull @Override public String asMetricPrefix() { diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/channel/LocalEndPoint.java b/core/src/test/java/com/datastax/oss/driver/internal/core/channel/LocalEndPoint.java index c90731eece9..8836ae607d7 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/channel/LocalEndPoint.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/channel/LocalEndPoint.java @@ -18,9 +18,11 @@ package com.datastax.oss.driver.internal.core.channel; import com.datastax.oss.driver.api.core.metadata.EndPoint; +import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList; import edu.umd.cs.findbugs.annotations.NonNull; import io.netty.channel.local.LocalAddress; import java.net.SocketAddress; +import java.util.List; /** Endpoint implementation for unit tests that use the local Netty transport. */ public class LocalEndPoint implements EndPoint { @@ -37,6 +39,12 @@ public SocketAddress resolve() { return localAddress; } + @NonNull + @Override + public List resolveAll() { + return ImmutableList.of(this); + } + @NonNull @Override public String asMetricPrefix() { diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/resolver/MockResolverIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/resolver/MockResolverIT.java index 6dc2d2ca77f..878c3ffb464 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/resolver/MockResolverIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/resolver/MockResolverIT.java @@ -33,11 +33,14 @@ import com.datastax.oss.driver.api.core.config.TypedDriverOption; import com.datastax.oss.driver.api.core.cql.ResultSet; import com.datastax.oss.driver.api.core.cql.Row; +import com.datastax.oss.driver.api.core.cql.SimpleStatement; +import com.datastax.oss.driver.api.core.cql.SimpleStatementBuilder; import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.testinfra.ccm.CcmBridge; import com.datastax.oss.driver.categories.IsolatedTests; import com.datastax.oss.driver.internal.core.config.typesafe.DefaultProgrammaticDriverConfigLoaderBuilder; import java.net.InetSocketAddress; +import java.time.Duration; import java.util.Collection; import java.util.Collections; import java.util.Iterator; @@ -237,4 +240,40 @@ public void run_replace_test_20_times() { replace_cluster_test(); } } + + @Test + public void cannot_connect_if_first_node_is_unavailable() { + // Reproduce case when first two dns record contains nodes that are unresponsive + // With RESOLVE_CONTACT_POINTS set to false + DriverConfigLoader loader = + new DefaultProgrammaticDriverConfigLoaderBuilder() + .withBoolean(TypedDriverOption.RESOLVE_CONTACT_POINTS.getRawOption(), false) + .withBoolean(TypedDriverOption.RECONNECT_ON_INIT.getRawOption(), true) + .withStringList( + TypedDriverOption.CONTACT_POINTS.getRawOption(), + Collections.singletonList("test.cluster.fake:9042")) + .build(); + + CqlSessionBuilder builder = new CqlSessionBuilder().withConfigLoader(loader); + CqlSession session; + try (CcmBridge ccmBridge = CcmBridge.builder().withNodes(3).withIpPrefix("127.0.1.").build()) { + MultimapHostResolverProvider.removeResolverEntries("test.cluster.fake"); + MultimapHostResolverProvider.addResolverEntry( + "test.cluster.fake", ccmBridge.getNodeIpAddress(11)); + MultimapHostResolverProvider.addResolverEntry( + "test.cluster.fake", ccmBridge.getNodeIpAddress(2)); + MultimapHostResolverProvider.addResolverEntry( + "test.cluster.fake", ccmBridge.getNodeIpAddress(3)); + ccmBridge.create(); + ccmBridge.start(); + session = builder.build(); + SimpleStatement statement = + new SimpleStatementBuilder("SELECT * FROM system.local") + .setTimeout(Duration.ofSeconds(3)) + .build(); + session.execute(statement); + ccmBridge.stop(2); + session.execute(statement); + } + } }