diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java index f67f19700d6e..286fad18b3c8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java @@ -25,8 +25,6 @@ import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; -import org.apache.paimon.table.system.SystemTableLoader; -import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.SegmentsCache; import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; @@ -48,7 +46,6 @@ import java.util.Map; import java.util.Optional; -import static org.apache.paimon.catalog.AbstractCatalog.isSpecifiedSystemTable; import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED; import static org.apache.paimon.options.CatalogOptions.CACHE_EXPIRATION_INTERVAL_MS; import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_MAX_MEMORY; @@ -56,7 +53,6 @@ import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_THRESHOLD; import static org.apache.paimon.options.CatalogOptions.CACHE_PARTITION_MAX_NUM; import static org.apache.paimon.options.CatalogOptions.CACHE_SNAPSHOT_MAX_NUM_PER_TABLE; -import static org.apache.paimon.table.system.SystemTableLoader.SYSTEM_TABLES; /** A {@link Catalog} to cache databases and tables and manifests. */ public class CachingCatalog extends DelegateCatalog { @@ -227,27 +223,14 @@ public Table getTable(Identifier identifier) throws TableNotExistException { return table; } - if (isSpecifiedSystemTable(identifier)) { - Identifier originIdentifier = - new Identifier( - identifier.getDatabaseName(), - identifier.getTableName(), - identifier.getBranchName(), - null); + if (!identifier.isOriginTable()) { + // cache origin table first + Identifier originIdentifier = identifier.toOriginTable(); Table originTable = tableCache.getIfPresent(originIdentifier); if (originTable == null) { originTable = wrapped.getTable(originIdentifier); putTableCache(originIdentifier, originTable); } - table = - SystemTableLoader.load( - Preconditions.checkNotNull(identifier.getSystemTableName()), - (FileStoreTable) originTable); - if (table == null) { - throw new TableNotExistException(identifier); - } - putTableCache(identifier, table); - return table; } table = wrapped.getTable(identifier); @@ -309,7 +292,7 @@ private class TableInvalidatingRemovalListener implements RemovalListener allSystemTables(Identifier ident) { - List tables = new ArrayList<>(); - for (String type : SYSTEM_TABLES) { - tables.add(Identifier.fromString(ident.getFullName() + SYSTEM_TABLE_SPLITTER + type)); + private void tryInvalidateAttachedTables(Identifier identifier) { + if (identifier.isOriginTable()) { + for (@NonNull Identifier i : tableCache.asMap().keySet()) { + if (identifier.getTableName().equals(i.getTableName())) { + tableCache.invalidate(i); + } + } } - return tables; } // ================================== refresh ================================================ diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Identifier.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Identifier.java index 1259889d2d3d..1faa45fa45f3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/Identifier.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Identifier.java @@ -66,6 +66,10 @@ public Identifier(String database, String object) { this.object = object; } + public Identifier(String database, String table, @Nullable String branch) { + this(database, table, branch, null); + } + public Identifier( String database, String table, @Nullable String branch, @Nullable String systemTable) { this.database = database; @@ -120,6 +124,14 @@ public String getBranchNameOrDefault() { return systemTable == null ? null : systemTable.toLowerCase(); } + public boolean isOriginTable() { + return getSystemTableName() == null && getBranchName() == null; + } + + public Identifier toOriginTable() { + return new Identifier(database, getTableName()); + } + private void splitObjectName() { if (table != null) { return; diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java index c71dfae7f8d8..49cb209e33d8 100644 --- a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java @@ -101,13 +101,18 @@ public void testInvalidateSystemTablesIfBaseTableIsModified() throws Exception { @Test public void testInvalidateSysTablesIfBaseTableIsDropped() throws Exception { - Catalog catalog = new CachingCatalog(this.catalog); + TestableCachingCatalog catalog = + new TestableCachingCatalog(this.catalog, EXPIRATION_TTL, ticker); Identifier tableIdent = new Identifier("db", "tbl"); catalog.createTable(new Identifier("db", "tbl"), DEFAULT_TABLE_SCHEMA, false); Identifier sysIdent = new Identifier("db", "tbl$files"); catalog.getTable(sysIdent); + assertThat(catalog.tableCache.asMap()).containsKey(sysIdent); + assertThat(catalog.tableCache.asMap()).containsKey(new Identifier("db", "tbl$FILES")); Identifier sysIdent1 = new Identifier("db", "tbl$SNAPSHOTS"); catalog.getTable(sysIdent1); + assertThat(catalog.tableCache.asMap()).containsKey(sysIdent1); + assertThat(catalog.tableCache.asMap()).containsKey(new Identifier("db", "tbl$snapshots")); catalog.dropTable(tableIdent, false); assertThatThrownBy(() -> catalog.getTable(sysIdent)) @@ -116,6 +121,26 @@ public void testInvalidateSysTablesIfBaseTableIsDropped() throws Exception { .hasMessage("Table db.tbl does not exist."); } + @Test + public void testCacheBranch() throws Exception { + TestableCachingCatalog catalog = + new TestableCachingCatalog(this.catalog, EXPIRATION_TTL, ticker); + Identifier tableIdent = new Identifier("db", "tbl"); + catalog.createTable(new Identifier("db", "tbl"), DEFAULT_TABLE_SCHEMA, false); + catalog.getTable(tableIdent).createBranch("b1"); + + Identifier s1 = new Identifier("db", "tbl$branch_b1"); + catalog.getTable(s1); + assertThat(catalog.tableCache.asMap()).containsKey(s1); + Identifier s2 = new Identifier("db", "tbl$branch_b1$FILES"); + catalog.getTable(s2); + assertThat(catalog.tableCache.asMap()).containsKey(s2); + + catalog.dropTable(tableIdent, false); + assertThatThrownBy(() -> catalog.getTable(s1)).hasMessage("Table db.tbl does not exist."); + assertThatThrownBy(() -> catalog.getTable(s2)).hasMessage("Table db.tbl does not exist."); + } + @Test public void testTableExpiresAfterInterval() throws Exception { TestableCachingCatalog catalog = diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteBranchProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteBranchProcedure.java index c95fd62bee40..56c649028650 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteBranchProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteBranchProcedure.java @@ -49,7 +49,10 @@ public String identifier() { }) public String[] call(ProcedureContext procedureContext, String tableId, String branchStr) throws Catalog.TableNotExistException { - catalog.getTable(Identifier.fromString(tableId)).deleteBranches(branchStr); + Identifier identifier = Identifier.fromString(tableId); + catalog.getTable(identifier).deleteBranches(branchStr); + catalog.invalidateTable( + new Identifier(identifier.getDatabaseName(), identifier.getTableName(), branchStr)); return new String[] {"Success"}; } } diff --git a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/DataFrameWriteTest.scala b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/DataFrameWriteTest.scala index a3cecfc72e1d..cb449edb4ccb 100644 --- a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/DataFrameWriteTest.scala +++ b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/DataFrameWriteTest.scala @@ -18,10 +18,15 @@ package org.apache.paimon.spark +import org.apache.spark.SparkConf import org.junit.jupiter.api.Assertions class DataFrameWriteTest extends PaimonSparkTestBase { + override protected def sparkConf: SparkConf = { + super.sparkConf.set("spark.sql.catalog.paimon.cache-enabled", "false") + } + test("Paimon: DataFrameWrite.saveAsTable") { import testImplicits._ diff --git a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala index 18fb9e116ba4..ab4a9bcd9dbf 100644 --- a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala +++ b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala @@ -18,6 +18,7 @@ package org.apache.paimon.spark +import org.apache.spark.SparkConf import org.apache.spark.sql.{Dataset, Row} import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.functions.{col, mean, window} @@ -27,6 +28,10 @@ import java.sql.Date class PaimonSinkTest extends PaimonSparkTestBase with StreamTest { + override protected def sparkConf: SparkConf = { + super.sparkConf.set("spark.sql.catalog.paimon.cache-enabled", "false") + } + import testImplicits._ test("Paimon Sink: forEachBatch") { diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java index 9957f0cdf91f..63d75a53ef2e 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java @@ -186,7 +186,7 @@ public Table loadTable(Identifier ident, long timestamp) throws NoSuchTableExcep @Override public void invalidateTable(Identifier ident) { // We do not need to check whether the table exists and whether - // it is an Paimon table to reduce remote service requests. + // it is a Paimon table to reduce remote service requests. sparkCatalog.invalidateTable(ident); asTableCatalog().invalidateTable(ident); } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DeleteBranchProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DeleteBranchProcedure.java index e398eee0261f..4a01c33d6af1 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DeleteBranchProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DeleteBranchProcedure.java @@ -18,6 +18,8 @@ package org.apache.paimon.spark.procedure; +import org.apache.paimon.spark.catalog.WithPaimonCatalog; + import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.TableCatalog; @@ -61,13 +63,20 @@ public StructType outputType() { public InternalRow[] call(InternalRow args) { Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); String branchStr = args.getString(1); - return modifyPaimonTable( - tableIdent, - table -> { - table.deleteBranches(branchStr); - InternalRow outputRow = newInternalRow(true); - return new InternalRow[] {outputRow}; - }); + InternalRow[] result = + modifyPaimonTable( + tableIdent, + table -> { + table.deleteBranches(branchStr); + InternalRow outputRow = newInternalRow(true); + return new InternalRow[] {outputRow}; + }); + ((WithPaimonCatalog) tableCatalog()) + .paimonCatalog() + .invalidateTable( + new org.apache.paimon.catalog.Identifier( + tableIdent.namespace()[0], tableIdent.name(), branchStr)); + return result; } public static ProcedureBuilder builder() { diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala index 63203122ac40..61bf5524942d 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala @@ -18,6 +18,7 @@ package org.apache.paimon.spark +import org.apache.spark.SparkConf import org.apache.spark.sql.{Dataset, Row} import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.functions.{col, mean, window} @@ -27,6 +28,10 @@ import java.sql.Date class PaimonSinkTest extends PaimonSparkTestBase with StreamTest { + override protected def sparkConf: SparkConf = { + super.sparkConf.set("spark.sql.catalog.paimon.cache-enabled", "false") + } + import testImplicits._ test("Paimon Sink: forEachBatch") { diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala index a0a94afacfb9..edd092c85ce8 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala @@ -20,6 +20,7 @@ package org.apache.paimon.spark.sql import org.apache.paimon.spark.PaimonSparkTestBase +import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.spark.sql.types.DecimalType import org.junit.jupiter.api.Assertions @@ -27,6 +28,11 @@ import org.junit.jupiter.api.Assertions import java.sql.{Date, Timestamp} class DataFrameWriteTest extends PaimonSparkTestBase { + + override protected def sparkConf: SparkConf = { + super.sparkConf.set("spark.sql.catalog.paimon.cache-enabled", "false") + } + import testImplicits._ test("Paimon: DataFrameWrite.saveAsTable") {