Skip to content

Commit

Permalink
Introduce support for tablets
Browse files Browse the repository at this point in the history
This PR introduces changes to the driver that are necessary for
shard-awareness and token-awareness to work effectively with the
tablets feature recently introduced to ScyllaDB. It overwrites
the ring-based replica calculations on tablet-enabled keyspaces.

Now if driver sends the request to the wrong node/shard it will get the
correct tablet information from Scylla in custom payload. It uses this
information to obtain target replicas and shard numbers for tables
managed by tablet replication.

This tablet information is then stored in the driver and is used
for correctly routing all next requests.
  • Loading branch information
Bouncheck authored and avelanarius committed Feb 23, 2024
1 parent 5d2fb2c commit dbfe82a
Show file tree
Hide file tree
Showing 13 changed files with 718 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,8 @@ public ListenableFuture<Void> apply(Message.Response response) throws Exception
if (lwt != null) {
getHost().setLwtInfo(lwt);
}
TabletInfo tabletInfo = TabletInfo.parseTabletInfo(msg.supported);
getHost().setTabletInfo(tabletInfo);
return MoreFutures.VOID_SUCCESS;
case ERROR:
Responses.Error error = (Responses.Error) response;
Expand Down Expand Up @@ -507,6 +509,13 @@ public ListenableFuture<Void> apply(Void input) throws Exception {
if (lwtInfo != null) {
lwtInfo.addOption(extraOptions);
}
TabletInfo tabletInfo = getHost().getTabletInfo();
if (tabletInfo != null
&& tabletInfo.isEnabled()
&& ProtocolFeature.CUSTOM_PAYLOADS.isSupportedBy(protocolVersion)) {
logger.debug("Enabling tablet support in OPTIONS message");
TabletInfo.addOption(extraOptions);
}
Future startupResponseFuture =
write(
new Requests.Startup(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,23 @@ public void onSet(
switch (response.type) {
case RESULT:
Responses.Result rm = (Responses.Result) response;

if (rm.getCustomPayload() != null
&& rm.getCustomPayload().containsKey(TabletInfo.TABLETS_ROUTING_V1_CUSTOM_PAYLOAD_KEY)
&& (statement instanceof BoundStatement)) {
BoundStatement st = (BoundStatement) statement;
String keyspace = statement.getKeyspace();
String table =
st.preparedStatement().getPreparedId().boundValuesMetadata.variables.getTable(0);
session
.getCluster()
.getMetadata()
.getTabletMap()
.processTabletsRoutingV1Payload(
keyspace,
table,
rm.getCustomPayload().get(TabletInfo.TABLETS_ROUTING_V1_CUSTOM_PAYLOAD_KEY));
}
switch (rm.kind) {
case SET_KEYSPACE:
// propagate the keyspace change to other connections
Expand Down
11 changes: 11 additions & 0 deletions driver-core/src/main/java/com/datastax/driver/core/Host.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ public class Host {
// Can be set concurrently but the value should always be the same.
private volatile LwtInfo lwtInfo = null;

// Whether host supports TABLETS_ROUTING_V1
private volatile TabletInfo tabletInfo = null;

enum State {
ADDED,
DOWN,
Expand Down Expand Up @@ -450,6 +453,14 @@ public void setLwtInfo(LwtInfo lwtInfo) {
this.lwtInfo = lwtInfo;
}

public TabletInfo getTabletInfo() {
return tabletInfo;
}

public void setTabletInfo(TabletInfo tabletInfo) {
this.tabletInfo = tabletInfo;
}

/**
* Returns whether the host is considered up by the driver.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,9 @@ ListenableFuture<Connection> borrowConnection(
TimeUnit unit,
int maxQueueSize,
Token.Factory partitioner,
ByteBuffer routingKey) {
ByteBuffer routingKey,
String keyspace,
String table) {
Phase phase = this.phase.get();
if (phase != Phase.READY)
return Futures.immediateFailedFuture(
Expand All @@ -515,7 +517,17 @@ ListenableFuture<Connection> borrowConnection(
if (routingKey != null) {
Metadata metadata = manager.cluster.getMetadata();
Token t = metadata.newToken(partitioner, routingKey);
shardId = host.getShardingInfo().shardId(t);
shardId = -1;
if (keyspace != null && table != null) {
assert t instanceof Token.TokenLong64;
shardId =
Integer.min(
metadata.getShardForTabletToken(keyspace, table, (Token.TokenLong64) t, host),
host.getShardingInfo().getShardsCount());
}
if (shardId == -1) { // means that tablet lookup failed
shardId = host.getShardingInfo().shardId(t);
}
} else {
shardId = RAND.nextInt(host.getShardingInfo().getShardsCount());
}
Expand Down
102 changes: 98 additions & 4 deletions driver-core/src/main/java/com/datastax/driver/core/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
*/
package com.datastax.driver.core;

import com.google.common.annotations.Beta;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
Expand All @@ -35,13 +36,15 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -60,8 +63,8 @@ public class Metadata {
final ConcurrentMap<String, KeyspaceMetadata> keyspaces =
new ConcurrentHashMap<String, KeyspaceMetadata>();
private volatile TokenMap tokenMap;

final ReentrantLock lock = new ReentrantLock();
private final TabletMap tabletMap;

// See https://github.com/apache/cassandra/blob/trunk/doc/cql3/CQL.textile#appendixA
private static final IntObjectHashMap<List<char[]>> RESERVED_KEYWORDS =
Expand Down Expand Up @@ -146,6 +149,7 @@ public class Metadata {

Metadata(Cluster.Manager cluster) {
this.cluster = cluster;
this.tabletMap = TabletMap.emptyMap(cluster);
}

// rebuilds the token map with the current hosts, typically when refreshing schema metadata
Expand Down Expand Up @@ -514,21 +518,30 @@ public Set<TokenRange> getTokenRanges(String keyspace, Host host) {
}

/**
* Returns the set of hosts that are replica for a given partition key. Partitioner can be {@code
* null} and then a cluster-wide partitioner will be invoked.
* Extension of legacy method {@link Metadata#getReplicas(String, Token.Factory, ByteBuffer)}.
* Tablets model requires knowledge of the table name to determine the replicas. This method will
* first try to lookup replicas through known tablets metadata. It will default to TokenMap lookup
* if either {@code null} was passed as table name or the tablet lookup is unsuccessful for any
* other reason.
*
* <p>Returns the set of hosts that are replica for a given partition key. Partitioner can be
* {@code null} and then a cluster-wide partitioner will be invoked.
*
* <p>Note that this information is refreshed asynchronously by the control connection, when
* schema or ring topology changes. It might occasionally be stale (or even empty).
*
* @param keyspace the name of the keyspace to get replicas for.
* @param table the name of the table to get replicas for. Necessary for distinction for tablets.
* Unnecessary for regular TokenMap
* @param partitioner the partitioner to use or @{code null} for cluster-wide partitioner.
* @param partitionKey the partition key for which to find the set of replica.
* @return the (immutable) set of replicas for {@code partitionKey} as known by the driver. Note
* that the result might be stale or empty if metadata was explicitly disabled with {@link
* QueryOptions#setMetadataEnabled(boolean)}.
*/
@Beta
public Set<Host> getReplicas(
String keyspace, Token.Factory partitioner, ByteBuffer partitionKey) {
String keyspace, String table, Token.Factory partitioner, ByteBuffer partitionKey) {
keyspace = handleId(keyspace);
TokenMap current = tokenMap;
if (current == null) {
Expand All @@ -537,11 +550,40 @@ public Set<Host> getReplicas(
if (partitioner == null) {
partitioner = current.factory;
}
// If possible, try tablet lookup first
if (keyspace != null && table != null) {
Token token = partitioner.hash(partitionKey);
assert (token instanceof Token.TokenLong64);
Set<UUID> hostUuids = tabletMap.getReplicas(keyspace, table, (long) token.getValue());
if (!hostUuids.isEmpty()) {
return hostUuids.stream().map(this::getHost).collect(Collectors.toSet());
}
}
// Fall back to tokenMap
Set<Host> hosts = current.getReplicas(keyspace, partitioner.hash(partitionKey));
return hosts == null ? Collections.<Host>emptySet() : hosts;
}
}

/**
* Returns the set of hosts that are replica for a given partition key. Partitioner can be {@code
* null} and then a cluster-wide partitioner will be invoked.
*
* <p>Note that this information is refreshed asynchronously by the control connection, when
* schema or ring topology changes. It might occasionally be stale (or even empty).
*
* @param keyspace the name of the keyspace to get replicas for.
* @param partitioner the partitioner to use or @{code null} for cluster-wide partitioner.
* @param partitionKey the partition key for which to find the set of replica.
* @return the (immutable) set of replicas for {@code partitionKey} as known by the driver. Note
* that the result might be stale or empty if metadata was explicitly disabled with {@link
* QueryOptions#setMetadataEnabled(boolean)}.
*/
public Set<Host> getReplicas(
String keyspace, Token.Factory partitioner, ByteBuffer partitionKey) {
return getReplicas(keyspace, null, partitioner, partitionKey);
}

/**
* Returns the set of hosts that are replica for a given token range.
*
Expand Down Expand Up @@ -860,6 +902,58 @@ void triggerOnMaterializedViewRemoved(MaterializedViewMetadata view) {
}
}

@Beta
public int getShardForTabletToken(
String keyspace, String table, Token.TokenLong64 token, Host host) {
if (tabletMap == null) {
logger.trace(
"Could not determine shard for token {} on host {} because tablets metadata is currently null. "
+ "Returning -1.",
token,
host);
return -1;
}
UUID targetHostUuid = host.getHostId();
long tokenValue = (long) token.getValue();
TabletMap.KeyspaceTableNamePair key = new TabletMap.KeyspaceTableNamePair(keyspace, table);
NavigableSet<TabletMap.Tablet> targetTablets = tabletMap.getMapping().get(key);
if (targetTablets == null) {
logger.trace(
"Could not determine shard for token {} on host {} because table {}.{} is not present in tablets "
+ "metadata. Returning -1.",
token,
host,
keyspace,
table);
return -1;
}
TabletMap.Tablet row = targetTablets.ceiling(TabletMap.Tablet.malformedTablet(tokenValue));
if (row != null && row.getFirstToken() < tokenValue) {
for (TabletMap.HostShardPair hostShardPair : row.getReplicas()) {
if (hostShardPair.getHost().equals(targetHostUuid)) {
return hostShardPair.getShard();
}
}
}
logger.trace(
"Could not find tablet corresponding to token {} on host {} for table {} in keyspace {}. Returning -1.",
token,
host,
table,
keyspace);
return -1;
}

/**
* Getter for current {@link TabletMap}.
*
* @return current {@link TabletMap}
*/
@Beta
public TabletMap getTabletMap() {
return tabletMap;
}

private static class TokenMap {

private final Token.Factory factory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,22 @@ private Iterator<Host> getReplicas(
}

Token.Factory partitioner = statement.getPartitioner();
String tableName = null;
ColumnDefinitions defs = null;
if (statement instanceof BoundStatement) {
defs = ((BoundStatement) statement).preparedStatement().getVariables();
} else if (statement instanceof PreparedStatement) {
defs = ((PreparedStatement) statement).getVariables();
}
if (defs != null && defs.size() > 0) {
tableName = defs.getTable(0);
}

final Set<Host> replicas =
manager
.cluster
.getMetadata()
.getReplicas(Metadata.quote(keyspace), partitioner, partitionKey);
.getReplicas(Metadata.quote(keyspace), tableName, partitioner, partitionKey);

// replicas are stored in the right order starting with the primary replica
return replicas.iterator();
Expand Down Expand Up @@ -437,13 +448,28 @@ private boolean query(final Host host) {
ByteBuffer routingKey = statement.getRoutingKey(protocolVersion, codecRegistry);

PoolingOptions poolingOptions = manager.configuration().getPoolingOptions();
String statementKeyspace = statement.getKeyspace();
String statementTable = null;
ColumnDefinitions defs = null;
if (statement instanceof PreparedStatement) {
defs = ((PreparedStatement) statement).getVariables();
}
if (statement instanceof BoundStatement) {
defs = ((BoundStatement) statement).statement.getVariables();
}
if (defs != null && defs.size() > 0) {
statementTable = defs.getTable(0);
}

ListenableFuture<Connection> connectionFuture =
pool.borrowConnection(
poolingOptions.getPoolTimeoutMillis(),
TimeUnit.MILLISECONDS,
poolingOptions.getMaxQueueSize(),
statement.getPartitioner(),
routingKey);
routingKey,
statementKeyspace,
statementTable);
GuavaCompatibility.INSTANCE.addCallback(
connectionFuture,
new FutureCallback<Connection>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -733,13 +733,22 @@ private ListenableFuture<PreparedStatement> prepare(
if (entry.getKey().getEndPoint().equals(toExclude)) continue;

try {
ColumnDefinitions defs = statement.getVariables();
String statementTable = (defs != null && defs.size() > 0 ? defs.getTable(0) : null);
// Preparing is not critical: if it fails, it will fix itself later when the user tries to
// execute
// the prepared query. So don't wait if no connection is available, simply abort.
ListenableFuture<Connection> connectionFuture =
entry
.getValue()
.borrowConnection(0, TimeUnit.MILLISECONDS, 0, null, statement.getRoutingKey());
.borrowConnection(
0,
TimeUnit.MILLISECONDS,
0,
null,
statement.getRoutingKey(),
statement.getQueryKeyspace(),
statementTable);
ListenableFuture<Response> prepareFuture =
GuavaCompatibility.INSTANCE.transformAsync(
connectionFuture,
Expand Down
33 changes: 33 additions & 0 deletions driver-core/src/main/java/com/datastax/driver/core/TabletInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.datastax.driver.core;

import java.util.List;
import java.util.Map;

public class TabletInfo {
private static final String SCYLLA_TABLETS_STARTUP_OPTION_KEY = "TABLETS_ROUTING_V1";
private static final String SCYLLA_TABLETS_STARTUP_OPTION_VALUE = "";
public static final String TABLETS_ROUTING_V1_CUSTOM_PAYLOAD_KEY = "tablets-routing-v1";

private boolean enabled = false;

private TabletInfo(boolean enabled) {
this.enabled = enabled;
}

// Currently pertains only to TABLETS_ROUTING_V1
public boolean isEnabled() {
return enabled;
}

public static TabletInfo parseTabletInfo(Map<String, List<String>> supported) {
List<String> values = supported.get(SCYLLA_TABLETS_STARTUP_OPTION_KEY);
return new TabletInfo(
values != null
&& values.size() == 1
&& values.get(0).equals(SCYLLA_TABLETS_STARTUP_OPTION_VALUE));
}

public static void addOption(Map<String, String> options) {
options.put(SCYLLA_TABLETS_STARTUP_OPTION_KEY, SCYLLA_TABLETS_STARTUP_OPTION_VALUE);
}
}
Loading

0 comments on commit dbfe82a

Please sign in to comment.