Skip to content

Commit

Permalink
temporary
Browse files Browse the repository at this point in the history
  • Loading branch information
Bouncheck committed Nov 7, 2024
1 parent 07d0f66 commit 57d62ca
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 64 deletions.
23 changes: 15 additions & 8 deletions driver-core/src/main/java/com/datastax/driver/core/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.slf4j.Logger;
Expand Down Expand Up @@ -916,8 +916,8 @@ public int getShardForTabletToken(
UUID targetHostUuid = host.getHostId();
long tokenValue = (long) token.getValue();
TabletMap.KeyspaceTableNamePair key = new TabletMap.KeyspaceTableNamePair(keyspace, table);
NavigableSet<TabletMap.Tablet> targetTablets = tabletMap.getMapping().get(key);
if (targetTablets == null) {
TabletMap.TabletSet targetSet = tabletMap.getMapping().get(key);
if (targetSet == null) {
logger.trace(
"Could not determine shard for token {} on host {} because table {}.{} is not present in tablets "
+ "metadata. Returning -1.",
Expand All @@ -927,13 +927,20 @@ public int getShardForTabletToken(
table);
return -1;
}
TabletMap.Tablet row = targetTablets.ceiling(TabletMap.Tablet.malformedTablet(tokenValue));
if (row != null && row.getFirstToken() < tokenValue) {
for (TabletMap.HostShardPair hostShardPair : row.getReplicas()) {
if (hostShardPair.getHost().equals(targetHostUuid)) {
return hostShardPair.getShard();
Lock readLock = targetSet.lock.readLock();
try {
readLock.lock();
TabletMap.Tablet row =
targetSet.tablets.ceiling(TabletMap.Tablet.malformedTablet(tokenValue));
if (row != null && row.getFirstToken() < tokenValue) {
for (TabletMap.HostShardPair hostShardPair : row.getReplicas()) {
if (hostShardPair.getHost().equals(targetHostUuid)) {
return hostShardPair.getShard();
}
}
}
} finally {
readLock.unlock();
}
logger.trace(
"Could not find tablet corresponding to token {} on host {} for table {} in keyspace {}. Returning -1.",
Expand Down
135 changes: 79 additions & 56 deletions driver-core/src/main/java/com/datastax/driver/core/TabletMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -25,7 +27,9 @@
public class TabletMap {
private static final Logger logger = LoggerFactory.getLogger(TabletMap.class);

private final ConcurrentMap<KeyspaceTableNamePair, NavigableSet<Tablet>> mapping;
// There are no additional locking mechanisms for the mapping field itself, however each TabletSet
// inside has its own ReadWriteLock that should be used when dealing with its internals.
private final ConcurrentMap<KeyspaceTableNamePair, TabletSet> mapping;

private final Cluster.Manager cluster;

Expand All @@ -34,7 +38,7 @@ public class TabletMap {
private TypeCodec<TupleValue> tabletPayloadCodec = null;

public TabletMap(
Cluster.Manager cluster, ConcurrentMap<KeyspaceTableNamePair, NavigableSet<Tablet>> mapping) {
Cluster.Manager cluster, ConcurrentMap<KeyspaceTableNamePair, TabletSet> mapping) {
this.cluster = cluster;
this.mapping = mapping;
}
Expand All @@ -46,9 +50,9 @@ public static TabletMap emptyMap(Cluster.Manager cluster) {
/**
* Returns the mapping of tables to their tablets.
*
* @return the Map keyed by (keyspace,table) pairs with Set of tablets as value type.
* @return the Map keyed by (keyspace,table) pairs with {@link TabletSet} as value type.
*/
public Map<KeyspaceTableNamePair, NavigableSet<Tablet>> getMapping() {
public Map<KeyspaceTableNamePair, TabletSet> getMapping() {
return mapping;
}

Expand All @@ -68,28 +72,34 @@ public Set<UUID> getReplicas(String keyspace, String table, long token) {
return Collections.emptySet();
}

NavigableSet<Tablet> set = mapping.get(key);
if (set == null) {
TabletSet tabletSet = mapping.get(key);
if (tabletSet == null) {
logger.trace(
"There is no tablets for {}.{} in this mapping. Returning empty set.", keyspace, table);
return Collections.emptySet();
}
Tablet row = mapping.get(key).ceiling(Tablet.malformedTablet(token));
if (row == null || row.firstToken >= token) {
logger.trace(
"Could not find tablet for {}.{} that owns token {}. Returning empty set.",
keyspace,
table,
token);
return Collections.emptySet();
}
Lock readLock = tabletSet.lock.readLock();
try {
readLock.lock();
Tablet row = mapping.get(key).tablets.ceiling(Tablet.malformedTablet(token));
if (row == null || row.firstToken >= token) {
logger.trace(
"Could not find tablet for {}.{} that owns token {}. Returning empty set.",
keyspace,
table,
token);
return Collections.emptySet();
}

HashSet<UUID> uuidSet = new HashSet<>();
for (HostShardPair hostShardPair : row.replicas) {
if (cluster.metadata.getHost(hostShardPair.getHost()) != null)
uuidSet.add(hostShardPair.getHost());
HashSet<UUID> uuidSet = new HashSet<>();
for (HostShardPair hostShardPair : row.replicas) {
if (cluster.metadata.getHost(hostShardPair.getHost()) != null)
uuidSet.add(hostShardPair.getHost());
}
return uuidSet;
} finally {
readLock.unlock();
}
return uuidSet;
}

/**
Expand Down Expand Up @@ -121,46 +131,47 @@ void processTabletsRoutingV1Payload(String keyspace, String table, ByteBuffer pa
HostShardPair hostShardPair = new HostShardPair(tuple.getUUID(0), tuple.getInt(1));
replicas.add(hostShardPair);
}

// Working on a copy to avoid concurrent modification of the same set
NavigableSet<Tablet> existingTablets =
new TreeSet<>(mapping.computeIfAbsent(ktPair, k -> new TreeSet<>()));

// Single tablet token range is represented by (firstToken, lastToken] interval
// We need to do two sweeps: remove overlapping tablets by looking at lastToken of existing
// tablets
// and then by looking at firstToken of existing tablets. Currently, the tablets are sorted
// according
// to their lastTokens.

// First sweep: remove all tablets whose lastToken is inside this interval
Iterator<Tablet> it =
existingTablets.headSet(Tablet.malformedTablet(lastToken), true).descendingIterator();
while (it.hasNext()) {
Tablet tablet = it.next();
if (tablet.lastToken <= firstToken) {
break;
Tablet newTablet = new Tablet(keyspace, null, table, firstToken, lastToken, replicas);

TabletSet tabletSet = mapping.computeIfAbsent(ktPair, k -> new TabletSet());
Lock writeLock = tabletSet.lock.writeLock();
try {
writeLock.lock();
NavigableSet<Tablet> currentTablets = tabletSet.tablets;
// Single tablet token range is represented by (firstToken, lastToken] interval
// We need to do two sweeps: remove overlapping tablets by looking at lastToken of existing
// tablets
// and then by looking at firstToken of existing tablets. Currently, the tablets are sorted
// according
// to their lastTokens.

// First sweep: remove all tablets whose lastToken is inside this interval
Iterator<Tablet> it = currentTablets.headSet(newTablet, true).descendingIterator();
while (it.hasNext()) {
Tablet tablet = it.next();
if (tablet.lastToken <= firstToken) {
break;
}
it.remove();
}
it.remove();
}

// Second sweep: remove all tablets whose firstToken is inside this tuple's (firstToken,
// lastToken]
// After the first sweep, this theoretically should remove at most 1 tablet
it = existingTablets.tailSet(Tablet.malformedTablet(lastToken), true).iterator();
while (it.hasNext()) {
Tablet tablet = it.next();
if (tablet.firstToken >= lastToken) {
break;
// Second sweep: remove all tablets whose firstToken is inside this tuple's (firstToken,
// lastToken]
// After the first sweep, this theoretically should remove at most 1 tablet
it = currentTablets.tailSet(newTablet, true).iterator();
while (it.hasNext()) {
Tablet tablet = it.next();
if (tablet.firstToken >= lastToken) {
break;
}
it.remove();
}
it.remove();
}

// Add new (now) non-overlapping tablet
existingTablets.add(new Tablet(keyspace, null, table, firstToken, lastToken, replicas));

// Set the updated result in the main map
mapping.put(ktPair, existingTablets);
// Add new (now) non-overlapping tablet
currentTablets.add(newTablet);
} finally {
writeLock.unlock();
}
}

public TupleType getPayloadOuterTuple() {
Expand Down Expand Up @@ -258,6 +269,18 @@ public int hashCode() {
}
}

/**
* Set of tablets bundled with ReadWriteLock to allow concurrent modification for different sets.
*/
public static class TabletSet {
final NavigableSet<Tablet> tablets;
final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

public TabletSet() {
this.tablets = new TreeSet<>();
}
}

/**
* Represents a single tablet created from tablets-routing-v1 custom payload. Its {@code
* compareTo} implementation intentionally relies solely on {@code lastToken} in order to allow
Expand Down

0 comments on commit 57d62ca

Please sign in to comment.