Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Clean up invalid branch cache and not cache system table in caching catalog #4681

Merged
merged 3 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ public Table getTable(Identifier identifier) throws TableNotExistException {
throw new TableNotExistException(identifier);
}
return table;
} else if (isSpecifiedSystemTable(identifier)) {
} else if (identifier.isSystemTable()) {
Table originTable =
getDataOrFormatTable(
new Identifier(
Expand Down Expand Up @@ -519,12 +519,8 @@ protected void assertMainBranch(Identifier identifier) {
}
}

public static boolean isSpecifiedSystemTable(Identifier identifier) {
return identifier.getSystemTableName() != null;
}

protected static boolean isTableInSystemDatabase(Identifier identifier) {
return isSystemDatabase(identifier.getDatabaseName()) || isSpecifiedSystemTable(identifier);
return isSystemDatabase(identifier.getDatabaseName()) || identifier.isSystemTable();
}

protected static void checkNotSystemTable(Identifier identifier, String method) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.system.SystemTableLoader;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SegmentsCache;

import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
Expand All @@ -48,15 +47,14 @@
import java.util.Map;
import java.util.Optional;

import static org.apache.paimon.catalog.AbstractCatalog.isSpecifiedSystemTable;
import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED;
import static org.apache.paimon.options.CatalogOptions.CACHE_EXPIRATION_INTERVAL_MS;
import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_MAX_MEMORY;
import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_MEMORY;
import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_THRESHOLD;
import static org.apache.paimon.options.CatalogOptions.CACHE_PARTITION_MAX_NUM;
import static org.apache.paimon.options.CatalogOptions.CACHE_SNAPSHOT_MAX_NUM_PER_TABLE;
import static org.apache.paimon.table.system.SystemTableLoader.SYSTEM_TABLES;
import static org.apache.paimon.utils.Preconditions.checkNotNull;

/** A {@link Catalog} to cache databases and tables and manifests. */
public class CachingCatalog extends DelegateCatalog {
Expand Down Expand Up @@ -203,6 +201,9 @@ public void dropTable(Identifier identifier, boolean ignoreIfNotExists)
throws TableNotExistException {
super.dropTable(identifier, ignoreIfNotExists);
invalidateTable(identifier);
if (identifier.isMainTable()) {
invalidateAttachedTables(identifier);
}
}

@Override
Expand All @@ -227,26 +228,23 @@ public Table getTable(Identifier identifier) throws TableNotExistException {
return table;
}

if (isSpecifiedSystemTable(identifier)) {
// For system table, do not cache it directly. Instead, cache the origin table and then wrap
// it to generate the system table.
if (identifier.isSystemTable()) {
Identifier originIdentifier =
new Identifier(
identifier.getDatabaseName(),
identifier.getTableName(),
identifier.getBranchName(),
null);
Table originTable = tableCache.getIfPresent(originIdentifier);
if (originTable == null) {
originTable = wrapped.getTable(originIdentifier);
putTableCache(originIdentifier, originTable);
}
Table originTable = getTable(originIdentifier);
table =
SystemTableLoader.load(
Preconditions.checkNotNull(identifier.getSystemTableName()),
checkNotNull(identifier.getSystemTableName()),
(FileStoreTable) originTable);
if (table == null) {
throw new TableNotExistException(identifier);
}
putTableCache(identifier, table);
return table;
Zouxxyy marked this conversation as resolved.
Show resolved Hide resolved
}

Expand Down Expand Up @@ -309,32 +307,26 @@ private class TableInvalidatingRemovalListener implements RemovalListener<Identi
public void onRemoval(Identifier identifier, Table table, @NonNull RemovalCause cause) {
LOG.debug("Evicted {} from the table cache ({})", identifier, cause);
if (RemovalCause.EXPIRED.equals(cause)) {
tryInvalidateSysTables(identifier);
// ignore now
}
}
}

@Override
public void invalidateTable(Identifier identifier) {
tableCache.invalidate(identifier);
tryInvalidateSysTables(identifier);
if (partitionCache != null) {
partitionCache.invalidate(identifier);
}
}

private void tryInvalidateSysTables(Identifier identifier) {
if (!isSpecifiedSystemTable(identifier)) {
tableCache.invalidateAll(allSystemTables(identifier));
}
}

private static Iterable<Identifier> allSystemTables(Identifier ident) {
List<Identifier> tables = new ArrayList<>();
for (String type : SYSTEM_TABLES) {
tables.add(Identifier.fromString(ident.getFullName() + SYSTEM_TABLE_SPLITTER + type));
/** invalidate attached tables, such as cached branches. */
private void invalidateAttachedTables(Identifier identifier) {
for (@NonNull Identifier i : tableCache.asMap().keySet()) {
if (identifier.getTableName().equals(i.getTableName())) {
tableCache.invalidate(i);
}
}
return tables;
}

// ================================== refresh ================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ public Identifier(String database, String object) {
this.object = object;
}

public Identifier(String database, String table, @Nullable String branch) {
this(database, table, branch, null);
}

public Identifier(
String database, String table, @Nullable String branch, @Nullable String systemTable) {
this.database = database;
Expand Down Expand Up @@ -119,6 +123,18 @@ public String getBranchNameOrDefault() {
return systemTable;
}

public boolean isMainTable() {
return getBranchName() == null && getSystemTableName() == null;
}

public boolean isBranch() {
return getBranchName() != null && getSystemTableName() == null;
}

public boolean isSystemTable() {
return getSystemTableName() != null;
}

private void splitObjectName() {
if (table != null) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,8 @@
import java.io.FileNotFoundException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -101,14 +99,49 @@ public void testInvalidateSystemTablesIfBaseTableIsModified() throws Exception {

@Test
public void testInvalidateSysTablesIfBaseTableIsDropped() throws Exception {
Catalog catalog = new CachingCatalog(this.catalog);
TestableCachingCatalog catalog =
new TestableCachingCatalog(this.catalog, EXPIRATION_TTL, ticker);
Identifier tableIdent = new Identifier("db", "tbl");
catalog.createTable(new Identifier("db", "tbl"), DEFAULT_TABLE_SCHEMA, false);
Identifier sysIdent = new Identifier("db", "tbl$files");
// get system table will only cache the origin table
catalog.getTable(sysIdent);
assertThat(catalog.tableCache.asMap()).containsKey(tableIdent);
assertThat(catalog.tableCache.asMap()).doesNotContainKey(sysIdent);
// test case sensitivity
Identifier sysIdent1 = new Identifier("db", "tbl$SNAPSHOTS");
catalog.getTable(sysIdent1);
assertThat(catalog.tableCache.asMap()).doesNotContainKey(sysIdent1);

catalog.dropTable(tableIdent, false);
assertThat(catalog.tableCache.asMap()).doesNotContainKey(tableIdent);
assertThatThrownBy(() -> catalog.getTable(sysIdent))
.hasMessage("Table db.tbl does not exist.");
assertThatThrownBy(() -> catalog.getTable(sysIdent1))
.hasMessage("Table db.tbl does not exist.");
}

@Test
public void testInvalidateBranchIfBaseTableIsDropped() throws Exception {
TestableCachingCatalog catalog =
new TestableCachingCatalog(this.catalog, EXPIRATION_TTL, ticker);
Identifier tableIdent = new Identifier("db", "tbl");
catalog.createTable(new Identifier("db", "tbl"), DEFAULT_TABLE_SCHEMA, false);
catalog.getTable(tableIdent).createBranch("b1");

Identifier branchIdent = new Identifier("db", "tbl$branch_b1");
Identifier branchSysIdent = new Identifier("db", "tbl$branch_b1$FILES");
// get system table will only cache the origin table
catalog.getTable(branchSysIdent);
assertThat(catalog.tableCache.asMap()).containsKey(branchIdent);
assertThat(catalog.tableCache.asMap()).doesNotContainKey(branchSysIdent);

catalog.dropTable(tableIdent, false);
assertThat(catalog.tableCache.asMap()).doesNotContainKey(branchIdent);
assertThatThrownBy(() -> catalog.getTable(branchIdent))
.hasMessage("Table db.tbl$branch_b1 does not exist.");
assertThatThrownBy(() -> catalog.getTable(branchSysIdent))
.hasMessage("Table db.tbl$branch_b1 does not exist.");
}

@Test
Expand Down Expand Up @@ -175,59 +208,6 @@ public void testCatalogExpirationTtlRefreshesAfterAccessViaCatalog() throws Exce
assertThat(catalog.remainingAgeFor(tableIdent)).get().isEqualTo(HALF_OF_EXPIRATION);
}

@Test
public void testCacheExpirationEagerlyRemovesSysTables() throws Exception {
TestableCachingCatalog catalog =
new TestableCachingCatalog(this.catalog, EXPIRATION_TTL, ticker);

Identifier tableIdent = new Identifier("db", "tbl");
catalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false);
Table table = catalog.getTable(tableIdent);
assertThat(catalog.tableCache().asMap()).containsKey(tableIdent);
assertThat(catalog.ageOf(tableIdent)).get().isEqualTo(Duration.ZERO);

ticker.advance(HALF_OF_EXPIRATION);
assertThat(catalog.tableCache().asMap()).containsKey(tableIdent);
assertThat(catalog.ageOf(tableIdent)).get().isEqualTo(HALF_OF_EXPIRATION);

for (Identifier sysTable : sysTables(tableIdent)) {
catalog.getTable(sysTable);
}
assertThat(catalog.tableCache().asMap()).containsKeys(sysTables(tableIdent));
assertThat(Arrays.stream(sysTables(tableIdent)).map(catalog::ageOf))
.isNotEmpty()
.allMatch(age -> age.isPresent() && age.get().equals(Duration.ZERO));

assertThat(catalog.remainingAgeFor(tableIdent))
.as("Loading a non-cached sys table should refresh the main table's age")
.isEqualTo(Optional.of(EXPIRATION_TTL));

// Move time forward and access already cached sys tables.
ticker.advance(HALF_OF_EXPIRATION);
for (Identifier sysTable : sysTables(tableIdent)) {
catalog.getTable(sysTable);
}
assertThat(Arrays.stream(sysTables(tableIdent)).map(catalog::ageOf))
.isNotEmpty()
.allMatch(age -> age.isPresent() && age.get().equals(Duration.ZERO));

assertThat(catalog.remainingAgeFor(tableIdent))
.as("Accessing a cached sys table should not affect the main table's age")
.isEqualTo(Optional.of(HALF_OF_EXPIRATION));

// Move time forward so the data table drops.
ticker.advance(HALF_OF_EXPIRATION);
assertThat(catalog.tableCache().asMap()).doesNotContainKey(tableIdent);

Arrays.stream(sysTables(tableIdent))
.forEach(
sysTable ->
assertThat(catalog.tableCache().asMap())
.as(
"When a data table expires, its sys tables should expire regardless of age")
.doesNotContainKeys(sysTable));
}

@Test
public void testPartitionCache() throws Exception {
TestableCachingCatalog catalog =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ public String identifier() {
})
public String[] call(ProcedureContext procedureContext, String tableId, String branchStr)
throws Catalog.TableNotExistException {
catalog.getTable(Identifier.fromString(tableId)).deleteBranches(branchStr);
Identifier identifier = Identifier.fromString(tableId);
catalog.getTable(identifier).deleteBranches(branchStr);
catalog.invalidateTable(
new Identifier(identifier.getDatabaseName(), identifier.getTableName(), branchStr));
return new String[] {"Success"};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@

package org.apache.paimon.spark

import org.apache.spark.SparkConf
import org.junit.jupiter.api.Assertions

class DataFrameWriteTest extends PaimonSparkTestBase {

override protected def sparkConf: SparkConf = {
super.sparkConf.set("spark.sql.catalog.paimon.cache-enabled", "false")
}

test("Paimon: DataFrameWrite.saveAsTable") {

import testImplicits._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.spark

import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.functions.{col, mean, window}
Expand All @@ -27,6 +28,10 @@ import java.sql.Date

class PaimonSinkTest extends PaimonSparkTestBase with StreamTest {

override protected def sparkConf: SparkConf = {
super.sparkConf.set("spark.sql.catalog.paimon.cache-enabled", "false")
}

import testImplicits._

test("Paimon Sink: forEachBatch") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ public Table loadTable(Identifier ident, long timestamp) throws NoSuchTableExcep
@Override
public void invalidateTable(Identifier ident) {
// We do not need to check whether the table exists and whether
// it is an Paimon table to reduce remote service requests.
// it is a Paimon table to reduce remote service requests.
sparkCatalog.invalidateTable(ident);
asTableCatalog().invalidateTable(ident);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.paimon.spark.procedure;

import org.apache.paimon.spark.catalog.WithPaimonCatalog;

import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
Expand Down Expand Up @@ -61,13 +63,20 @@ public StructType outputType() {
public InternalRow[] call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
String branchStr = args.getString(1);
return modifyPaimonTable(
tableIdent,
table -> {
table.deleteBranches(branchStr);
InternalRow outputRow = newInternalRow(true);
return new InternalRow[] {outputRow};
});
InternalRow[] result =
modifyPaimonTable(
tableIdent,
table -> {
table.deleteBranches(branchStr);
InternalRow outputRow = newInternalRow(true);
return new InternalRow[] {outputRow};
});
((WithPaimonCatalog) tableCatalog())
.paimonCatalog()
.invalidateTable(
new org.apache.paimon.catalog.Identifier(
tableIdent.namespace()[0], tableIdent.name(), branchStr));
return result;
}

public static ProcedureBuilder builder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.spark

import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.functions.{col, mean, window}
Expand All @@ -27,6 +28,10 @@ import java.sql.Date

class PaimonSinkTest extends PaimonSparkTestBase with StreamTest {

override protected def sparkConf: SparkConf = {
super.sparkConf.set("spark.sql.catalog.paimon.cache-enabled", "false")
}

import testImplicits._

test("Paimon Sink: forEachBatch") {
Expand Down
Loading
Loading