diff --git a/driver-core/src/main/java/com/datastax/driver/core/Metadata.java b/driver-core/src/main/java/com/datastax/driver/core/Metadata.java index c4cb08ba2b..5bbcb3a4db 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/Metadata.java +++ b/driver-core/src/main/java/com/datastax/driver/core/Metadata.java @@ -150,6 +150,22 @@ public class Metadata { Metadata(Cluster.Manager cluster) { this.cluster = cluster; this.tabletMap = TabletMap.emptyMap(cluster); + if (cluster.configuration.getQueryOptions().isMetadataEnabled()) { + cluster + .getCluster() + .register( + new TabletMapListener(tabletMap) { + @Override + public void onRegister(Cluster cluster) { + logger.info("Registered event listener for tablet map."); + } + + @Override + public void onUnregister(Cluster cluster) { + logger.info("Unregistered tablet map's event listener."); + } + }); + } } // rebuilds the token map with the current hosts, typically when refreshing schema metadata diff --git a/driver-core/src/main/java/com/datastax/driver/core/TabletMap.java b/driver-core/src/main/java/com/datastax/driver/core/TabletMap.java index 0920802c59..baecc5959b 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/TabletMap.java +++ b/driver-core/src/main/java/com/datastax/driver/core/TabletMap.java @@ -198,6 +198,24 @@ public TypeCodec getTabletPayloadCodec() { return tabletPayloadCodec; } + public void removeTableMappings(KeyspaceTableNamePair key) { + this.mapping.remove(key); + } + + public void removeTableMappings(String keyspace, String table) { + removeTableMappings(new KeyspaceTableNamePair(keyspace, table)); + } + + public void removeTableMappings(String keyspace) { + Iterator it = getMapping().keySet().iterator(); + while (it.hasNext()) { + KeyspaceTableNamePair key = it.next(); + if (key.getKeyspace().equals(keyspace)) { + it.remove(); + } + } + } + /** * Simple class to hold UUID of a host and a shard number. Class itself makes no checks or * guarantees about existence of a shard on corresponding host. diff --git a/driver-core/src/main/java/com/datastax/driver/core/TabletMapListener.java b/driver-core/src/main/java/com/datastax/driver/core/TabletMapListener.java new file mode 100644 index 0000000000..7b3ffa5901 --- /dev/null +++ b/driver-core/src/main/java/com/datastax/driver/core/TabletMapListener.java @@ -0,0 +1,29 @@ +package com.datastax.driver.core; + +public class TabletMapListener extends SchemaChangeListenerBase { + private final TabletMap tabletMap; + + public TabletMapListener(TabletMap tabletMap) { + this.tabletMap = tabletMap; + } + + @Override + public void onTableChanged(TableMetadata current, TableMetadata previous) { + tabletMap.removeTableMappings(previous.getKeyspace().getName(), previous.getName()); + } + + @Override + public void onTableRemoved(TableMetadata table) { + tabletMap.removeTableMappings(table.getKeyspace().getName(), table.getName()); + } + + @Override + public void onKeyspaceRemoved(KeyspaceMetadata keyspace) { + tabletMap.removeTableMappings(keyspace.getName()); + } + + @Override + public void onKeyspaceChanged(KeyspaceMetadata current, KeyspaceMetadata previous) { + tabletMap.removeTableMappings(previous.getName()); + } +} diff --git a/driver-core/src/test/java/com/datastax/driver/core/TabletMapListenerTest.java b/driver-core/src/test/java/com/datastax/driver/core/TabletMapListenerTest.java new file mode 100644 index 0000000000..2953c9d6f5 --- /dev/null +++ b/driver-core/src/test/java/com/datastax/driver/core/TabletMapListenerTest.java @@ -0,0 +1,208 @@ +package com.datastax.driver.core; + +import static com.datastax.driver.core.Assertions.assertThat; +import static com.datastax.driver.core.Metadata.handleId; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.after; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; + +import com.datastax.driver.core.policies.RoundRobinPolicy; +import com.datastax.driver.core.utils.ScyllaVersion; +import com.google.common.collect.Lists; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.mockito.ArgumentCaptor; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@CCMConfig( + createCluster = false, + numberOfNodes = 2, + jvmArgs = { + "--experimental-features=consistent-topology-changes", + "--experimental-features=tablets" + }) +@ScyllaVersion(minOSS = "6.0.0", minEnterprise = "2024.2", description = "Needs to support tablets") +public class TabletMapListenerTest extends CCMTestsSupport { + private static final int INITIAL_TABLETS = 32; + private static final String KEYSPACE_NAME = "listenerTest"; + private static final String TABLE_NAME = "testTable"; + private static final String CREATE_TABLETS_KEYSPACE_QUERY = + "CREATE KEYSPACE " + + KEYSPACE_NAME + + " WITH replication = {'class': " + + "'NetworkTopologyStrategy', " + + "'replication_factor': '1'} AND durable_writes = true AND tablets = " + + "{'initial': " + + INITIAL_TABLETS + + "};"; + + private static final String CREATE_KEYSPACE = CREATE_TABLETS_KEYSPACE_QUERY; + private static final String ALTER_KEYSPACE = + "ALTER KEYSPACE " + KEYSPACE_NAME + " WITH durable_writes = false"; + private static final String DROP_KEYSPACE = "DROP KEYSPACE " + KEYSPACE_NAME; + + private static final String CREATE_TABLE = + "CREATE TABLE " + KEYSPACE_NAME + "." + TABLE_NAME + "(i int primary key)"; + private static final String INSERT_QUERY_TEMPLATE = + "INSERT INTO " + KEYSPACE_NAME + "." + TABLE_NAME + " (i) VALUES (%s)"; + private static final String INSERT_ALTERED_TEMPLATE = + "INSERT INTO " + KEYSPACE_NAME + "." + TABLE_NAME + " (i,j) VALUES (%s,%s)"; + private static final String SELECT_PK_WHERE = + "SELECT i FROM " + KEYSPACE_NAME + "." + TABLE_NAME + " WHERE i = ?"; + private static final String ALTER_TABLE = + "ALTER TABLE " + KEYSPACE_NAME + "." + TABLE_NAME + " ADD j int"; + private static final String DROP_TABLE = "DROP TABLE " + KEYSPACE_NAME + "." + TABLE_NAME; + + /** The maximum time that the test will wait to check that listeners have been notified. */ + private static final long NOTIF_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(1); + /** Shorter timeout for less important checks that listeners did not react to specific actions */ + private static final long SHORT_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(4); + + Cluster cluster; + Session session; + SchemaChangeListener listener; + List listeners; + + @BeforeMethod(groups = "short") + public void setup() throws InterruptedException { + cluster = + createClusterBuilderNoDebouncing().withLoadBalancingPolicy(new RoundRobinPolicy()).build(); + + session = cluster.connect(); + + cluster.register(listener = mock(TabletMapListener.class)); + listeners = Lists.newArrayList(listener); + } + + // Checks for tablet removal both on table update and removal + @Test(groups = "short") + public void should_remove_tablets_on_table_alterations() throws InterruptedException { + session.execute(CREATE_KEYSPACE); + ArgumentCaptor added = null; + for (SchemaChangeListener listener : listeners) { + added = ArgumentCaptor.forClass(KeyspaceMetadata.class); + verify(listener, timeout(NOTIF_TIMEOUT_MS).times(1)).onKeyspaceAdded(added.capture()); + assertThat(added.getValue()).hasName(handleId(KEYSPACE_NAME)); + } + assert added != null; + + TabletMap tabletMap; + tabletMap = cluster.getMetadata().getTabletMap(); + + session.execute(CREATE_TABLE); + assertThat(tabletMap.getMapping()) + .doesNotContainKey( + new TabletMap.KeyspaceTableNamePair(handleId(KEYSPACE_NAME), handleId(TABLE_NAME))); + + session.execute(String.format(INSERT_QUERY_TEMPLATE, "42")); + session.execute(session.prepare(SELECT_PK_WHERE).bind(42)); + session.execute(session.prepare(SELECT_PK_WHERE).bind(42)); + assertThat(tabletMap.getMapping()) + .containsKey( + new TabletMap.KeyspaceTableNamePair(handleId(KEYSPACE_NAME), handleId(TABLE_NAME))); + + session.execute(ALTER_TABLE); + for (SchemaChangeListener listener : listeners) { + ArgumentCaptor current = ArgumentCaptor.forClass(TableMetadata.class); + ArgumentCaptor previous = ArgumentCaptor.forClass(TableMetadata.class); + verify(listener, timeout(NOTIF_TIMEOUT_MS).times(1)) + .onTableChanged(current.capture(), previous.capture()); + assertThat(previous.getValue().getKeyspace()).hasName(handleId(KEYSPACE_NAME)); + assertThat(previous.getValue()).hasName(handleId(TABLE_NAME)); + } + assertThat(tabletMap.getMapping()) + .doesNotContainKey( + new TabletMap.KeyspaceTableNamePair(handleId(KEYSPACE_NAME), handleId(TABLE_NAME))); + + session.execute(String.format(INSERT_ALTERED_TEMPLATE, "42", "42")); + session.execute(session.prepare(SELECT_PK_WHERE).bind(42)); + session.execute(session.prepare(SELECT_PK_WHERE).bind(42)); + assertThat(tabletMap.getMapping()) + .containsKey( + new TabletMap.KeyspaceTableNamePair(handleId(KEYSPACE_NAME), handleId(TABLE_NAME))); + + session.execute(DROP_TABLE); + ArgumentCaptor removed = null; + for (SchemaChangeListener listener : listeners) { + removed = ArgumentCaptor.forClass(TableMetadata.class); + verify(listener, timeout(NOTIF_TIMEOUT_MS).times(1)).onTableRemoved(removed.capture()); + assertThat(removed.getValue().getKeyspace()).hasName(handleId(KEYSPACE_NAME)); + assertThat(removed.getValue()).hasName(handleId(TABLE_NAME)); + } + assert removed != null; + assertThat(tabletMap.getMapping()) + .doesNotContainKey( + new TabletMap.KeyspaceTableNamePair(handleId(KEYSPACE_NAME), handleId(TABLE_NAME))); + + session.execute(DROP_KEYSPACE); + } + + @Test(groups = "short") + public void should_remove_tablets_on_keyspace_alterations() { + session.execute(CREATE_KEYSPACE); + ArgumentCaptor added = null; + for (SchemaChangeListener listener : listeners) { + added = ArgumentCaptor.forClass(KeyspaceMetadata.class); + verify(listener, timeout(NOTIF_TIMEOUT_MS).times(1)).onKeyspaceAdded(added.capture()); + assertThat(added.getValue()).hasName(handleId(KEYSPACE_NAME)); + } + assert added != null; + + TabletMap tabletMap; + tabletMap = cluster.getMetadata().getTabletMap(); + + session.execute(CREATE_TABLE); + session.execute(String.format(INSERT_QUERY_TEMPLATE, "42")); + session.execute(session.prepare(SELECT_PK_WHERE).bind(42)); + session.execute(session.prepare(SELECT_PK_WHERE).bind(42)); + assertThat(tabletMap.getMapping()) + .containsKey( + new TabletMap.KeyspaceTableNamePair(handleId(KEYSPACE_NAME), handleId(TABLE_NAME))); + + assertThat(cluster.getMetadata().getKeyspace(KEYSPACE_NAME).isDurableWrites()).isTrue(); + + session.execute(ALTER_KEYSPACE); + assertThat(cluster.getMetadata().getKeyspace(KEYSPACE_NAME)).isNotDurableWrites(); + for (SchemaChangeListener listener : listeners) { + ArgumentCaptor current = ArgumentCaptor.forClass(KeyspaceMetadata.class); + ArgumentCaptor previous = ArgumentCaptor.forClass(KeyspaceMetadata.class); + verify(listener, timeout(NOTIF_TIMEOUT_MS).times(1)) + .onKeyspaceChanged(current.capture(), previous.capture()); + assertThat(previous.getValue()).hasName(handleId(KEYSPACE_NAME)); + } + for (SchemaChangeListener listener : listeners) { + verify(listener, after((int) SHORT_TIMEOUT_MS).never()) + .onTableChanged(anyObject(), anyObject()); + } + assertThat(tabletMap.getMapping()) + .doesNotContainKey( + new TabletMap.KeyspaceTableNamePair(handleId(KEYSPACE_NAME), handleId(TABLE_NAME))); + + session.execute(session.prepare(SELECT_PK_WHERE).bind(42)); + session.execute(session.prepare(SELECT_PK_WHERE).bind(42)); + + assertThat(tabletMap.getMapping()) + .containsKey( + new TabletMap.KeyspaceTableNamePair(handleId(KEYSPACE_NAME), handleId(TABLE_NAME))); + + session.execute(DROP_KEYSPACE); + for (SchemaChangeListener listener : listeners) { + ArgumentCaptor removed = ArgumentCaptor.forClass(KeyspaceMetadata.class); + verify(listener, timeout(NOTIF_TIMEOUT_MS).times(1)).onKeyspaceRemoved(removed.capture()); + assertThat(removed.getValue()).hasName(handleId(KEYSPACE_NAME)); + } + assertThat(tabletMap.getMapping()) + .doesNotContainKey( + new TabletMap.KeyspaceTableNamePair(handleId(KEYSPACE_NAME), handleId(TABLE_NAME))); + } + + @AfterMethod(groups = "short", alwaysRun = true) + public void teardown() { + if (session != null) session.close(); + if (cluster != null) cluster.close(); + } +}