Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: Invalidate tablets on schema changes #382

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions driver-core/src/main/java/com/datastax/driver/core/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,22 @@ public class Metadata {
Metadata(Cluster.Manager cluster) {
this.cluster = cluster;
this.tabletMap = TabletMap.emptyMap(cluster);
if (cluster.configuration.getQueryOptions().isMetadataEnabled()) {
cluster
.getCluster()
.register(
new TabletMapListener(tabletMap) {
@Override
public void onRegister(Cluster cluster) {
logger.info("Registered event listener for tablet map.");
}

@Override
public void onUnregister(Cluster cluster) {
logger.info("Unregistered tablet map's event listener.");
}
});
}
}

// rebuilds the token map with the current hosts, typically when refreshing schema metadata
Expand Down
18 changes: 18 additions & 0 deletions driver-core/src/main/java/com/datastax/driver/core/TabletMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,24 @@ public TypeCodec<TupleValue> getTabletPayloadCodec() {
return tabletPayloadCodec;
}

public void removeTableMappings(KeyspaceTableNamePair key) {
this.mapping.remove(key);
}

public void removeTableMappings(String keyspace, String table) {
removeTableMappings(new KeyspaceTableNamePair(keyspace, table));
}

public void removeTableMappings(String keyspace) {
Iterator<TabletMap.KeyspaceTableNamePair> it = getMapping().keySet().iterator();
while (it.hasNext()) {
KeyspaceTableNamePair key = it.next();
if (key.getKeyspace().equals(keyspace)) {
it.remove();
}
}
}

/**
* Simple class to hold UUID of a host and a shard number. Class itself makes no checks or
* guarantees about existence of a shard on corresponding host.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.datastax.driver.core;

public class TabletMapListener extends SchemaChangeListenerBase {
private final TabletMap tabletMap;

public TabletMapListener(TabletMap tabletMap) {
this.tabletMap = tabletMap;
}

@Override
public void onTableChanged(TableMetadata current, TableMetadata previous) {
tabletMap.removeTableMappings(previous.getKeyspace().getName(), previous.getName());
}

@Override
public void onTableRemoved(TableMetadata table) {
tabletMap.removeTableMappings(table.getKeyspace().getName(), table.getName());
}

@Override
public void onKeyspaceRemoved(KeyspaceMetadata keyspace) {
tabletMap.removeTableMappings(keyspace.getName());
}

@Override
public void onKeyspaceChanged(KeyspaceMetadata current, KeyspaceMetadata previous) {
tabletMap.removeTableMappings(previous.getName());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
package com.datastax.driver.core;

import static com.datastax.driver.core.Assertions.assertThat;
import static com.datastax.driver.core.Metadata.handleId;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.after;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;

import com.datastax.driver.core.policies.RoundRobinPolicy;
import com.datastax.driver.core.utils.ScyllaVersion;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.mockito.ArgumentCaptor;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@CCMConfig(
createCluster = false,
numberOfNodes = 2,
jvmArgs = {
"--experimental-features=consistent-topology-changes",
"--experimental-features=tablets"
})
@ScyllaVersion(minOSS = "6.0.0", minEnterprise = "2024.2", description = "Needs to support tablets")
public class TabletMapListenerTest extends CCMTestsSupport {
private static final int INITIAL_TABLETS = 32;
private static final String KEYSPACE_NAME = "listenerTest";
private static final String TABLE_NAME = "testTable";
private static final String CREATE_TABLETS_KEYSPACE_QUERY =
"CREATE KEYSPACE "
+ KEYSPACE_NAME
+ " WITH replication = {'class': "
+ "'NetworkTopologyStrategy', "
+ "'replication_factor': '1'} AND durable_writes = true AND tablets = "
+ "{'initial': "
+ INITIAL_TABLETS
+ "};";

private static final String CREATE_KEYSPACE = CREATE_TABLETS_KEYSPACE_QUERY;
private static final String ALTER_KEYSPACE =
"ALTER KEYSPACE " + KEYSPACE_NAME + " WITH durable_writes = false";
private static final String DROP_KEYSPACE = "DROP KEYSPACE " + KEYSPACE_NAME;

private static final String CREATE_TABLE =
"CREATE TABLE " + KEYSPACE_NAME + "." + TABLE_NAME + "(i int primary key)";
private static final String INSERT_QUERY_TEMPLATE =
"INSERT INTO " + KEYSPACE_NAME + "." + TABLE_NAME + " (i) VALUES (%s)";
private static final String INSERT_ALTERED_TEMPLATE =
"INSERT INTO " + KEYSPACE_NAME + "." + TABLE_NAME + " (i,j) VALUES (%s,%s)";
private static final String SELECT_PK_WHERE =
"SELECT i FROM " + KEYSPACE_NAME + "." + TABLE_NAME + " WHERE i = ?";
private static final String ALTER_TABLE =
"ALTER TABLE " + KEYSPACE_NAME + "." + TABLE_NAME + " ADD j int";
private static final String DROP_TABLE = "DROP TABLE " + KEYSPACE_NAME + "." + TABLE_NAME;

/** The maximum time that the test will wait to check that listeners have been notified. */
private static final long NOTIF_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(1);
/** Shorter timeout for less important checks that listeners did not react to specific actions */
private static final long SHORT_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(4);

Cluster cluster;
Session session;
SchemaChangeListener listener;
List<SchemaChangeListener> listeners;

@BeforeMethod(groups = "short")
public void setup() throws InterruptedException {
cluster =
createClusterBuilderNoDebouncing().withLoadBalancingPolicy(new RoundRobinPolicy()).build();

session = cluster.connect();

cluster.register(listener = mock(TabletMapListener.class));
listeners = Lists.newArrayList(listener);
}

// Checks for tablet removal both on table update and removal
@Test(groups = "short")
public void should_remove_tablets_on_table_alterations() throws InterruptedException {
session.execute(CREATE_KEYSPACE);
ArgumentCaptor<KeyspaceMetadata> added = null;
for (SchemaChangeListener listener : listeners) {
added = ArgumentCaptor.forClass(KeyspaceMetadata.class);
verify(listener, timeout(NOTIF_TIMEOUT_MS).times(1)).onKeyspaceAdded(added.capture());
assertThat(added.getValue()).hasName(handleId(KEYSPACE_NAME));
}
assert added != null;

TabletMap tabletMap;
tabletMap = cluster.getMetadata().getTabletMap();

session.execute(CREATE_TABLE);
assertThat(tabletMap.getMapping())
.doesNotContainKey(
new TabletMap.KeyspaceTableNamePair(handleId(KEYSPACE_NAME), handleId(TABLE_NAME)));

session.execute(String.format(INSERT_QUERY_TEMPLATE, "42"));
session.execute(session.prepare(SELECT_PK_WHERE).bind(42));
session.execute(session.prepare(SELECT_PK_WHERE).bind(42));
assertThat(tabletMap.getMapping())
.containsKey(
new TabletMap.KeyspaceTableNamePair(handleId(KEYSPACE_NAME), handleId(TABLE_NAME)));

session.execute(ALTER_TABLE);
for (SchemaChangeListener listener : listeners) {
ArgumentCaptor<TableMetadata> current = ArgumentCaptor.forClass(TableMetadata.class);
ArgumentCaptor<TableMetadata> previous = ArgumentCaptor.forClass(TableMetadata.class);
verify(listener, timeout(NOTIF_TIMEOUT_MS).times(1))
.onTableChanged(current.capture(), previous.capture());
assertThat(previous.getValue().getKeyspace()).hasName(handleId(KEYSPACE_NAME));
assertThat(previous.getValue()).hasName(handleId(TABLE_NAME));
}
assertThat(tabletMap.getMapping())
.doesNotContainKey(
new TabletMap.KeyspaceTableNamePair(handleId(KEYSPACE_NAME), handleId(TABLE_NAME)));

session.execute(String.format(INSERT_ALTERED_TEMPLATE, "42", "42"));
session.execute(session.prepare(SELECT_PK_WHERE).bind(42));
session.execute(session.prepare(SELECT_PK_WHERE).bind(42));
assertThat(tabletMap.getMapping())
.containsKey(
new TabletMap.KeyspaceTableNamePair(handleId(KEYSPACE_NAME), handleId(TABLE_NAME)));

session.execute(DROP_TABLE);
ArgumentCaptor<TableMetadata> removed = null;
for (SchemaChangeListener listener : listeners) {
removed = ArgumentCaptor.forClass(TableMetadata.class);
verify(listener, timeout(NOTIF_TIMEOUT_MS).times(1)).onTableRemoved(removed.capture());
assertThat(removed.getValue().getKeyspace()).hasName(handleId(KEYSPACE_NAME));
assertThat(removed.getValue()).hasName(handleId(TABLE_NAME));
}
assert removed != null;
assertThat(tabletMap.getMapping())
.doesNotContainKey(
new TabletMap.KeyspaceTableNamePair(handleId(KEYSPACE_NAME), handleId(TABLE_NAME)));

session.execute(DROP_KEYSPACE);
}

@Test(groups = "short")
public void should_remove_tablets_on_keyspace_alterations() {
session.execute(CREATE_KEYSPACE);
ArgumentCaptor<KeyspaceMetadata> added = null;
for (SchemaChangeListener listener : listeners) {
added = ArgumentCaptor.forClass(KeyspaceMetadata.class);
verify(listener, timeout(NOTIF_TIMEOUT_MS).times(1)).onKeyspaceAdded(added.capture());
assertThat(added.getValue()).hasName(handleId(KEYSPACE_NAME));
}
assert added != null;

TabletMap tabletMap;
tabletMap = cluster.getMetadata().getTabletMap();

session.execute(CREATE_TABLE);
session.execute(String.format(INSERT_QUERY_TEMPLATE, "42"));
session.execute(session.prepare(SELECT_PK_WHERE).bind(42));
session.execute(session.prepare(SELECT_PK_WHERE).bind(42));
assertThat(tabletMap.getMapping())
.containsKey(
new TabletMap.KeyspaceTableNamePair(handleId(KEYSPACE_NAME), handleId(TABLE_NAME)));

assertThat(cluster.getMetadata().getKeyspace(KEYSPACE_NAME).isDurableWrites()).isTrue();

session.execute(ALTER_KEYSPACE);
assertThat(cluster.getMetadata().getKeyspace(KEYSPACE_NAME)).isNotDurableWrites();
for (SchemaChangeListener listener : listeners) {
ArgumentCaptor<KeyspaceMetadata> current = ArgumentCaptor.forClass(KeyspaceMetadata.class);
ArgumentCaptor<KeyspaceMetadata> previous = ArgumentCaptor.forClass(KeyspaceMetadata.class);
verify(listener, timeout(NOTIF_TIMEOUT_MS).times(1))
.onKeyspaceChanged(current.capture(), previous.capture());
assertThat(previous.getValue()).hasName(handleId(KEYSPACE_NAME));
}
for (SchemaChangeListener listener : listeners) {
verify(listener, after((int) SHORT_TIMEOUT_MS).never())
.onTableChanged(anyObject(), anyObject());
}
assertThat(tabletMap.getMapping())
.doesNotContainKey(
new TabletMap.KeyspaceTableNamePair(handleId(KEYSPACE_NAME), handleId(TABLE_NAME)));

session.execute(session.prepare(SELECT_PK_WHERE).bind(42));
session.execute(session.prepare(SELECT_PK_WHERE).bind(42));

assertThat(tabletMap.getMapping())
.containsKey(
new TabletMap.KeyspaceTableNamePair(handleId(KEYSPACE_NAME), handleId(TABLE_NAME)));

session.execute(DROP_KEYSPACE);
for (SchemaChangeListener listener : listeners) {
ArgumentCaptor<KeyspaceMetadata> removed = ArgumentCaptor.forClass(KeyspaceMetadata.class);
verify(listener, timeout(NOTIF_TIMEOUT_MS).times(1)).onKeyspaceRemoved(removed.capture());
assertThat(removed.getValue()).hasName(handleId(KEYSPACE_NAME));
}
assertThat(tabletMap.getMapping())
.doesNotContainKey(
new TabletMap.KeyspaceTableNamePair(handleId(KEYSPACE_NAME), handleId(TABLE_NAME)));
}

@AfterMethod(groups = "short", alwaysRun = true)
public void teardown() {
if (session != null) session.close();
if (cluster != null) cluster.close();
}
}
Loading