diff --git a/driver-core/src/main/java/com/datastax/driver/core/KeyspaceMetadata.java b/driver-core/src/main/java/com/datastax/driver/core/KeyspaceMetadata.java index 2f893cf42c..4e6af116ad 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/KeyspaceMetadata.java +++ b/driver-core/src/main/java/com/datastax/driver/core/KeyspaceMetadata.java @@ -53,6 +53,9 @@ public class KeyspaceMetadata { final Map aggregates = new ConcurrentHashMap(); + // Scylla feature + private boolean usesTablets = false; + @VisibleForTesting @Deprecated KeyspaceMetadata(String name, boolean durableWrites, Map replication) { @@ -458,4 +461,12 @@ void add(UserType type) { ReplicationStrategy replicationStrategy() { return strategy; } + + void setUsesTablets(boolean predicate) { + this.usesTablets = predicate; + } + + public boolean usesTablets() { + return this.usesTablets; + } } diff --git a/driver-core/src/main/java/com/datastax/driver/core/Metadata.java b/driver-core/src/main/java/com/datastax/driver/core/Metadata.java index 4c0ab210e1..3e5b8929ab 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/Metadata.java +++ b/driver-core/src/main/java/com/datastax/driver/core/Metadata.java @@ -558,26 +558,31 @@ public Set getTokenRanges(String keyspace, Host host) { public Set getReplicas( String keyspace, String table, Token.Factory partitioner, ByteBuffer partitionKey) { keyspace = handleId(keyspace); + table = handleId(table); TokenMap current = tokenMap; - if (current == null) { + if (partitioner == null && current != null) { + partitioner = current.factory; + } + if (partitioner == null) { return Collections.emptySet(); - } else { - if (partitioner == null) { - partitioner = current.factory; - } - // If possible, try tablet lookup first + } + Token token = partitioner.hash(partitionKey); + + // Tablets: + KeyspaceMetadata ksMetadata = getKeyspace(keyspace); + if (ksMetadata != null && ksMetadata.usesTablets()) { if (keyspace != null && table != null) { - Token token = partitioner.hash(partitionKey); assert (token instanceof Token.TokenLong64); - Set replicas = tabletMap.getReplicas(keyspace, table, (long) token.getValue()); - if (!replicas.isEmpty()) { - return replicas; - } + return tabletMap.getReplicas(keyspace, table, (long) token.getValue()); + } else { + return Collections.emptySet(); } - // Fall back to tokenMap - Set hosts = current.getReplicas(keyspace, partitioner.hash(partitionKey)); - return hosts == null ? Collections.emptySet() : hosts; } + + // TokenMap: + if (current == null) return Collections.emptySet(); + Set hosts = current.getReplicas(keyspace, token); + return hosts == null ? Collections.emptySet() : hosts; } /** diff --git a/driver-core/src/main/java/com/datastax/driver/core/SchemaParser.java b/driver-core/src/main/java/com/datastax/driver/core/SchemaParser.java index 6354595b9e..ffd3ac42ce 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/SchemaParser.java +++ b/driver-core/src/main/java/com/datastax/driver/core/SchemaParser.java @@ -30,17 +30,20 @@ import com.datastax.driver.core.exceptions.BusyConnectionException; import com.datastax.driver.core.exceptions.ConnectionException; +import com.datastax.driver.core.exceptions.InvalidQueryException; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.concurrent.ExecutionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,6 +59,9 @@ abstract class SchemaParser { private static final SchemaParser V3_PARSER = new V3SchemaParser(); private static final SchemaParser V4_PARSER = new V4SchemaParser(); + private static final String SELECT_SCYLLA_KEYSPACES = + "SELECT * FROM system_schema.scylla_keyspaces"; + static SchemaParser forVersion(VersionNumber cassandraVersion) { if (cassandraVersion.getMajor() >= 4) return V4_PARSER; if (cassandraVersion.getMajor() >= 3) return V3_PARSER; @@ -197,7 +203,6 @@ void refresh( private Map buildKeyspaces( SystemRows rows, VersionNumber cassandraVersion, Cluster cluster) { - Map keyspaces = new LinkedHashMap(); for (Row keyspaceRow : rows.keyspaces) { KeyspaceMetadata keyspace = KeyspaceMetadata.build(keyspaceRow, cassandraVersion); @@ -239,6 +244,13 @@ private Map buildKeyspaces( for (MaterializedViewMetadata view : views.values()) { keyspace.add(view); } + Row scyllaKeyspacesRow = rows.scyllaKeyspaces.getOrDefault(keyspace.getName(), null); + if (scyllaKeyspacesRow != null) { + if (scyllaKeyspacesRow.getColumnDefinitions().contains("initial_tablets") + && !scyllaKeyspacesRow.isNull("initial_tablets")) { + keyspace.setUsesTablets(true); + } + } keyspaces.put(keyspace.getName(), keyspace); } if (rows.virtualKeyspaces != null) { @@ -619,6 +631,29 @@ private void updateViews( } } + static Set toKeyspaceSet(ResultSet rs) { + if (rs == null) return Collections.emptySet(); + + Set result = new HashSet<>(); + for (Row row : rs) { + result.add(row.getString(KeyspaceMetadata.KS_NAME)); + } + return result; + } + + static Map groupByKeyspacePk(ResultSet rs) { + // Assumes keyspace name is full primary key, therefore + // each keyspace name identifies at most one row + if (rs == null) return Collections.emptyMap(); + + Map result = new HashMap(); + for (Row row : rs) { + String ksName = row.getString(KeyspaceMetadata.KS_NAME); + result.put(ksName, row); + } + return result; + } + static Map> groupByKeyspace(ResultSet rs) { if (rs == null) return Collections.emptyMap(); @@ -696,6 +731,25 @@ private static ResultSet get(ResultSetFuture future) return (future == null) ? null : future.get(); } + private static ResultSet getIfExists(ResultSetFuture future) + throws InterruptedException, ExecutionException { + // Some of Scylla specific tables/columns may not exist depending on version. + // This method is meant to try to get results without failing whole schema parse + // if something additional does not exist. + if (future == null) return null; + try { + ResultSet resultSet = future.get(); + return resultSet; + } catch (ExecutionException ex) { + if (ex.getCause() instanceof InvalidQueryException) { + // meant to handle keyspace/table does not exist exceptions + return null; + } + // rethrow if it's something else + throw ex; + } + } + /** * The rows from the system tables that we want to parse to metadata classes. The format of these * rows depends on the Cassandra version, but our parsing code knows how to handle the @@ -713,6 +767,7 @@ private static class SystemRows { final ResultSet virtualKeyspaces; final Map> virtualTables; final Map>> virtualColumns; + final Map scyllaKeyspaces; public SystemRows( ResultSet keyspaces, @@ -725,7 +780,8 @@ public SystemRows( Map>> indexes, ResultSet virtualKeyspaces, Map> virtualTables, - Map>> virtualColumns) { + Map>> virtualColumns, + Map scyllaKeyspaces) { this.keyspaces = keyspaces; this.tables = tables; this.columns = columns; @@ -737,6 +793,7 @@ public SystemRows( this.virtualKeyspaces = virtualKeyspaces; this.virtualTables = virtualTables; this.virtualColumns = virtualColumns; + this.scyllaKeyspaces = scyllaKeyspaces; } } @@ -790,7 +847,8 @@ else if (targetType == AGGREGATE) cfFuture = null, colsFuture = null, functionsFuture = null, - aggregatesFuture = null; + aggregatesFuture = null, + scyllaKsFuture = null; ProtocolVersion protocolVersion = cluster.getConfiguration().getProtocolOptions().getProtocolVersion(); @@ -812,6 +870,21 @@ else if (targetType == AGGREGATE) if (isSchemaOrKeyspace && supportsUdfs(cassandraVersion) || targetType == AGGREGATE) aggregatesFuture = queryAsync(SELECT_AGGREGATES + whereClause, connection, protocolVersion); + if (isSchemaOrKeyspace) { + if (targetType == KEYSPACE) { + scyllaKsFuture = + queryAsync( + SELECT_SCYLLA_KEYSPACES + + " WHERE keyspace_name = '" + + targetKeyspace + + "' LIMIT 1;", + connection, + protocolVersion); + } else { + scyllaKsFuture = queryAsync(SELECT_SCYLLA_KEYSPACES, connection, protocolVersion); + } + } + return new SystemRows( get(ksFuture), groupByKeyspace(get(cfFuture)), @@ -824,7 +897,8 @@ else if (targetType == AGGREGATE) Collections.>>emptyMap(), null, Collections.>emptyMap(), - Collections.>>emptyMap()); + Collections.>>emptyMap(), + groupByKeyspacePk(getIfExists(scyllaKsFuture))); } @Override @@ -1197,9 +1271,19 @@ private Map buildSchema( cluster.getConfiguration().getProtocolOptions().getProtocolVersion(); Map keyspaces = new LinkedHashMap(); + ResultSetFuture scyllaKeyspacesFuture = + queryAsync(SELECT_SCYLLA_KEYSPACES, connection, protocolVersion); ResultSet keyspacesData = queryAsync(SELECT_KEYSPACES, connection, protocolVersion).get(); + Map scyllaKeyspacesData = groupByKeyspacePk(getIfExists(scyllaKeyspacesFuture)); for (Row keyspaceRow : keyspacesData) { KeyspaceMetadata keyspace = KeyspaceMetadata.build(keyspaceRow, cassandraVersion); + Row scyllaKeyspacesRow = scyllaKeyspacesData.getOrDefault(keyspace.getName(), null); + if (scyllaKeyspacesRow != null) { + if (scyllaKeyspacesRow.getColumnDefinitions().contains("initial_tablets") + && !scyllaKeyspacesRow.isNull("initial_tablets")) { + keyspace.setUsesTablets(true); + } + } keyspaces.put(keyspace.getName(), keyspace); } @@ -1288,7 +1372,8 @@ SystemRows fetchSystemRows( functionsFuture = null, aggregatesFuture = null, indexesFuture = null, - viewsFuture = null; + viewsFuture = null, + scyllaKsFuture = null; ProtocolVersion protocolVersion = cluster.getConfiguration().getProtocolOptions().getProtocolVersion(); @@ -1356,6 +1441,21 @@ SystemRows fetchSystemRows( connection, protocolVersion); + if (isSchemaOrKeyspace) { + if (targetType == KEYSPACE) { + scyllaKsFuture = + queryAsync( + SELECT_SCYLLA_KEYSPACES + + " WHERE keyspace_name = '" + + targetKeyspace + + "' LIMIT 1;", + connection, + protocolVersion); + } else { + scyllaKsFuture = queryAsync(SELECT_SCYLLA_KEYSPACES, connection, protocolVersion); + } + } + return new SystemRows( get(ksFuture), groupByKeyspace(get(cfFuture)), @@ -1367,7 +1467,8 @@ SystemRows fetchSystemRows( groupByKeyspaceAndCf(get(indexesFuture), TABLE_NAME), null, Collections.>emptyMap(), - Collections.>>emptyMap()); + Collections.>>emptyMap(), + groupByKeyspacePk(getIfExists(scyllaKsFuture))); } @Override @@ -1499,7 +1600,8 @@ SystemRows fetchSystemRows( viewsFuture = null, virtualKeyspacesFuture = null, virtualTableFuture = null, - virtualColumnsFuture = null; + virtualColumnsFuture = null, + scyllaKsFuture = null; ProtocolVersion protocolVersion = cluster.getConfiguration().getProtocolOptions().getProtocolVersion(); @@ -1589,6 +1691,21 @@ SystemRows fetchSystemRows( protocolVersion); } + if (isSchemaOrKeyspace) { + if (targetType == KEYSPACE) { + scyllaKsFuture = + queryAsync( + SELECT_SCYLLA_KEYSPACES + + " WHERE keyspace_name = '" + + targetKeyspace + + "' LIMIT 1;", + connection, + protocolVersion); + } else { + scyllaKsFuture = queryAsync(SELECT_SCYLLA_KEYSPACES, connection, protocolVersion); + } + } + return new SystemRows( get(ksFuture), groupByKeyspace(get(cfFuture)), @@ -1600,7 +1717,8 @@ SystemRows fetchSystemRows( groupByKeyspaceAndCf(get(indexesFuture), TABLE_NAME), get(virtualKeyspacesFuture), groupByKeyspace(get(virtualTableFuture)), - groupByKeyspaceAndCf(get(virtualColumnsFuture), cassandraVersion, TABLE_NAME)); + groupByKeyspaceAndCf(get(virtualColumnsFuture), cassandraVersion, TABLE_NAME), + groupByKeyspacePk(getIfExists(scyllaKsFuture))); } } }