Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Clean up invalid branch cache and not cache system table in caching catalog #4681

Merged
merged 3 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ public Table getTable(Identifier identifier) throws TableNotExistException {
throw new TableNotExistException(identifier);
}
return table;
} else if (isSpecifiedSystemTable(identifier)) {
} else if (identifier.isSystemTable()) {
Table originTable =
getDataOrFormatTable(
new Identifier(
Expand Down Expand Up @@ -519,12 +519,8 @@ protected void assertMainBranch(Identifier identifier) {
}
}

public static boolean isSpecifiedSystemTable(Identifier identifier) {
return identifier.getSystemTableName() != null;
}

protected static boolean isTableInSystemDatabase(Identifier identifier) {
return isSystemDatabase(identifier.getDatabaseName()) || isSpecifiedSystemTable(identifier);
return isSystemDatabase(identifier.getDatabaseName()) || identifier.isSystemTable();
}

protected static void checkNotSystemTable(Identifier identifier, String method) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.system.SystemTableLoader;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SegmentsCache;

import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
Expand All @@ -48,15 +47,14 @@
import java.util.Map;
import java.util.Optional;

import static org.apache.paimon.catalog.AbstractCatalog.isSpecifiedSystemTable;
import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED;
import static org.apache.paimon.options.CatalogOptions.CACHE_EXPIRATION_INTERVAL_MS;
import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_MAX_MEMORY;
import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_MEMORY;
import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_THRESHOLD;
import static org.apache.paimon.options.CatalogOptions.CACHE_PARTITION_MAX_NUM;
import static org.apache.paimon.options.CatalogOptions.CACHE_SNAPSHOT_MAX_NUM_PER_TABLE;
import static org.apache.paimon.table.system.SystemTableLoader.SYSTEM_TABLES;
import static org.apache.paimon.utils.Preconditions.checkNotNull;

/** A {@link Catalog} to cache databases and tables and manifests. */
public class CachingCatalog extends DelegateCatalog {
Expand Down Expand Up @@ -227,26 +225,17 @@ public Table getTable(Identifier identifier) throws TableNotExistException {
return table;
}

if (isSpecifiedSystemTable(identifier)) {
Identifier originIdentifier =
new Identifier(
identifier.getDatabaseName(),
identifier.getTableName(),
identifier.getBranchName(),
null);
Table originTable = tableCache.getIfPresent(originIdentifier);
if (originTable == null) {
originTable = wrapped.getTable(originIdentifier);
putTableCache(originIdentifier, originTable);
}
// For system table, do not cache it directly. Instead, cache the origin table and then wrap
// it to generate the system table.
if (identifier.isSystemTable()) {
Table originTable = getTable(identifier.toOriginTable());
Zouxxyy marked this conversation as resolved.
Show resolved Hide resolved
table =
SystemTableLoader.load(
Preconditions.checkNotNull(identifier.getSystemTableName()),
checkNotNull(identifier.getSystemTableName()),
(FileStoreTable) originTable);
if (table == null) {
throw new TableNotExistException(identifier);
}
putTableCache(identifier, table);
return table;
Zouxxyy marked this conversation as resolved.
Show resolved Hide resolved
}

Expand Down Expand Up @@ -309,32 +298,29 @@ private class TableInvalidatingRemovalListener implements RemovalListener<Identi
public void onRemoval(Identifier identifier, Table table, @NonNull RemovalCause cause) {
LOG.debug("Evicted {} from the table cache ({})", identifier, cause);
if (RemovalCause.EXPIRED.equals(cause)) {
tryInvalidateSysTables(identifier);
tryInvalidateAttachedTables(identifier);
}
}
}

@Override
public void invalidateTable(Identifier identifier) {
tableCache.invalidate(identifier);
tryInvalidateSysTables(identifier);
tryInvalidateAttachedTables(identifier);
if (partitionCache != null) {
partitionCache.invalidate(identifier);
}
}

private void tryInvalidateSysTables(Identifier identifier) {
if (!isSpecifiedSystemTable(identifier)) {
tableCache.invalidateAll(allSystemTables(identifier));
}
}

private static Iterable<Identifier> allSystemTables(Identifier ident) {
List<Identifier> tables = new ArrayList<>();
for (String type : SYSTEM_TABLES) {
tables.add(Identifier.fromString(ident.getFullName() + SYSTEM_TABLE_SPLITTER + type));
private void tryInvalidateAttachedTables(Identifier identifier) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if the system table doesn't need to be stored in the cache anymore. Just wrap it when retrieving it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked the previous logic again. Now we reuse the table (in cache) and then wrap it to the system table. In this case, I think we don't need to cache the system table.

Another solution is to only cache the system table (not warp from cache table).

Which one do you think is more suitable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can just cache the original table.

if (identifier.isMainTable()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to care about branch tables.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why, when the table is deleted, the cached branch table should also be cleaned up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can just do this in invalidateTable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, we can move this logic to drop table

// invalidate cached branches
for (@NonNull Identifier i : tableCache.asMap().keySet()) {
if (identifier.getTableName().equals(i.getTableName())) {
tableCache.invalidate(i);
}
}
}
return tables;
}

// ================================== refresh ================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ public Identifier(String database, String object) {
this.object = object;
}

public Identifier(String database, String table, @Nullable String branch) {
this(database, table, branch, null);
}

public Identifier(
String database, String table, @Nullable String branch, @Nullable String systemTable) {
this.database = database;
Expand Down Expand Up @@ -119,6 +123,22 @@ public String getBranchNameOrDefault() {
return systemTable;
}

public boolean isMainTable() {
return getBranchName() == null && getSystemTableName() == null;
}

public boolean isBranch() {
return getBranchName() != null && getSystemTableName() == null;
}

public boolean isSystemTable() {
return getSystemTableName() != null;
}

public Identifier toOriginTable() {
return new Identifier(database, getTableName(), getBranchName());
}

private void splitObjectName() {
if (table != null) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import java.io.FileNotFoundException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -101,14 +100,49 @@ public void testInvalidateSystemTablesIfBaseTableIsModified() throws Exception {

@Test
public void testInvalidateSysTablesIfBaseTableIsDropped() throws Exception {
Catalog catalog = new CachingCatalog(this.catalog);
TestableCachingCatalog catalog =
new TestableCachingCatalog(this.catalog, EXPIRATION_TTL, ticker);
Identifier tableIdent = new Identifier("db", "tbl");
catalog.createTable(new Identifier("db", "tbl"), DEFAULT_TABLE_SCHEMA, false);
Identifier sysIdent = new Identifier("db", "tbl$files");
// get system table will only cache the origin table
catalog.getTable(sysIdent);
assertThat(catalog.tableCache.asMap()).containsKey(tableIdent);
assertThat(catalog.tableCache.asMap()).doesNotContainKey(sysIdent);
// test case sensitivity
Identifier sysIdent1 = new Identifier("db", "tbl$SNAPSHOTS");
catalog.getTable(sysIdent1);
assertThat(catalog.tableCache.asMap()).doesNotContainKey(sysIdent1);

catalog.dropTable(tableIdent, false);
assertThat(catalog.tableCache.asMap()).doesNotContainKey(tableIdent);
assertThatThrownBy(() -> catalog.getTable(sysIdent))
.hasMessage("Table db.tbl does not exist.");
assertThatThrownBy(() -> catalog.getTable(sysIdent1))
.hasMessage("Table db.tbl does not exist.");
}

@Test
public void testInvalidateBranchIfBaseTableIsDropped() throws Exception {
TestableCachingCatalog catalog =
new TestableCachingCatalog(this.catalog, EXPIRATION_TTL, ticker);
Identifier tableIdent = new Identifier("db", "tbl");
catalog.createTable(new Identifier("db", "tbl"), DEFAULT_TABLE_SCHEMA, false);
catalog.getTable(tableIdent).createBranch("b1");

Identifier branchIdent = new Identifier("db", "tbl$branch_b1");
Identifier branchSysIdent = new Identifier("db", "tbl$branch_b1$FILES");
// get system table will only cache the origin table
catalog.getTable(branchSysIdent);
assertThat(catalog.tableCache.asMap()).containsKey(branchIdent);
assertThat(catalog.tableCache.asMap()).doesNotContainKey(branchSysIdent);

catalog.dropTable(tableIdent, false);
assertThat(catalog.tableCache.asMap()).doesNotContainKey(branchIdent);
assertThatThrownBy(() -> catalog.getTable(branchIdent))
.hasMessage("Table db.tbl$branch_b1 does not exist.");
assertThatThrownBy(() -> catalog.getTable(branchSysIdent))
.hasMessage("Table db.tbl$branch_b1 does not exist.");
}

@Test
Expand Down Expand Up @@ -176,7 +210,7 @@ public void testCatalogExpirationTtlRefreshesAfterAccessViaCatalog() throws Exce
}

@Test
public void testCacheExpirationEagerlyRemovesSysTables() throws Exception {
public void testCacheExpirationEagerlyRemovesBranch() throws Exception {
TestableCachingCatalog catalog =
new TestableCachingCatalog(this.catalog, EXPIRATION_TTL, ticker);

Expand All @@ -190,26 +224,21 @@ public void testCacheExpirationEagerlyRemovesSysTables() throws Exception {
assertThat(catalog.tableCache().asMap()).containsKey(tableIdent);
assertThat(catalog.ageOf(tableIdent)).get().isEqualTo(HALF_OF_EXPIRATION);

for (Identifier sysTable : sysTables(tableIdent)) {
catalog.getTable(sysTable);
}
assertThat(catalog.tableCache().asMap()).containsKeys(sysTables(tableIdent));
assertThat(Arrays.stream(sysTables(tableIdent)).map(catalog::ageOf))
.isNotEmpty()
.allMatch(age -> age.isPresent() && age.get().equals(Duration.ZERO));
catalog.getTable(tableIdent).createBranch("b1");
Identifier branchIdent = new Identifier("db", "tbl$branch_b1");
catalog.getTable(branchIdent);

assertThat(catalog.tableCache().asMap()).containsKeys(branchIdent);
assertThat(catalog.ageOf(branchIdent)).get().isEqualTo(Duration.ZERO);

assertThat(catalog.remainingAgeFor(tableIdent))
.as("Loading a non-cached sys table should refresh the main table's age")
.isEqualTo(Optional.of(EXPIRATION_TTL));

// Move time forward and access already cached sys tables.
// Move time forward and access already cached branch.
ticker.advance(HALF_OF_EXPIRATION);
for (Identifier sysTable : sysTables(tableIdent)) {
catalog.getTable(sysTable);
}
assertThat(Arrays.stream(sysTables(tableIdent)).map(catalog::ageOf))
.isNotEmpty()
.allMatch(age -> age.isPresent() && age.get().equals(Duration.ZERO));
catalog.getTable(branchIdent);
assertThat(catalog.ageOf(branchIdent)).get().isEqualTo(Duration.ZERO);

assertThat(catalog.remainingAgeFor(tableIdent))
.as("Accessing a cached sys table should not affect the main table's age")
Expand All @@ -218,14 +247,7 @@ public void testCacheExpirationEagerlyRemovesSysTables() throws Exception {
// Move time forward so the data table drops.
ticker.advance(HALF_OF_EXPIRATION);
assertThat(catalog.tableCache().asMap()).doesNotContainKey(tableIdent);

Arrays.stream(sysTables(tableIdent))
.forEach(
sysTable ->
assertThat(catalog.tableCache().asMap())
.as(
"When a data table expires, its sys tables should expire regardless of age")
.doesNotContainKeys(sysTable));
assertThat(catalog.tableCache().asMap()).doesNotContainKey(branchIdent);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ public String identifier() {
})
public String[] call(ProcedureContext procedureContext, String tableId, String branchStr)
throws Catalog.TableNotExistException {
catalog.getTable(Identifier.fromString(tableId)).deleteBranches(branchStr);
Identifier identifier = Identifier.fromString(tableId);
catalog.getTable(identifier).deleteBranches(branchStr);
catalog.invalidateTable(
new Identifier(identifier.getDatabaseName(), identifier.getTableName(), branchStr));
return new String[] {"Success"};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@

package org.apache.paimon.spark

import org.apache.spark.SparkConf
import org.junit.jupiter.api.Assertions

class DataFrameWriteTest extends PaimonSparkTestBase {

override protected def sparkConf: SparkConf = {
super.sparkConf.set("spark.sql.catalog.paimon.cache-enabled", "false")
}

test("Paimon: DataFrameWrite.saveAsTable") {

import testImplicits._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.spark

import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.functions.{col, mean, window}
Expand All @@ -27,6 +28,10 @@ import java.sql.Date

class PaimonSinkTest extends PaimonSparkTestBase with StreamTest {

override protected def sparkConf: SparkConf = {
super.sparkConf.set("spark.sql.catalog.paimon.cache-enabled", "false")
}

import testImplicits._

test("Paimon Sink: forEachBatch") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ public Table loadTable(Identifier ident, long timestamp) throws NoSuchTableExcep
@Override
public void invalidateTable(Identifier ident) {
// We do not need to check whether the table exists and whether
// it is an Paimon table to reduce remote service requests.
// it is a Paimon table to reduce remote service requests.
sparkCatalog.invalidateTable(ident);
asTableCatalog().invalidateTable(ident);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.paimon.spark.procedure;

import org.apache.paimon.spark.catalog.WithPaimonCatalog;

import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
Expand Down Expand Up @@ -61,13 +63,20 @@ public StructType outputType() {
public InternalRow[] call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
String branchStr = args.getString(1);
return modifyPaimonTable(
tableIdent,
table -> {
table.deleteBranches(branchStr);
InternalRow outputRow = newInternalRow(true);
return new InternalRow[] {outputRow};
});
InternalRow[] result =
modifyPaimonTable(
tableIdent,
table -> {
table.deleteBranches(branchStr);
InternalRow outputRow = newInternalRow(true);
return new InternalRow[] {outputRow};
});
((WithPaimonCatalog) tableCatalog())
.paimonCatalog()
.invalidateTable(
new org.apache.paimon.catalog.Identifier(
tableIdent.namespace()[0], tableIdent.name(), branchStr));
return result;
}

public static ProcedureBuilder builder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.spark

import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.functions.{col, mean, window}
Expand All @@ -27,6 +28,10 @@ import java.sql.Date

class PaimonSinkTest extends PaimonSparkTestBase with StreamTest {

override protected def sparkConf: SparkConf = {
super.sparkConf.set("spark.sql.catalog.paimon.cache-enabled", "false")
}

import testImplicits._

test("Paimon Sink: forEachBatch") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ class PaimonSparkTestBase
super.sparkConf
.set("spark.sql.catalog.paimon", classOf[SparkCatalog].getName)
.set("spark.sql.catalog.paimon.warehouse", tempDBDir.getCanonicalPath)
.set("spark.sql.catalog.paimon.cache-enabled", "false")
.set("spark.sql.extensions", classOf[PaimonSparkSessionExtensions].getName)
.set("spark.serializer", serializer)
}
Expand Down
Loading
Loading