Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy committed Dec 10, 2024
1 parent 1b4b1df commit 9cd9fa2
Show file tree
Hide file tree
Showing 10 changed files with 92 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.system.SystemTableLoader;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SegmentsCache;

import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
Expand All @@ -48,15 +46,13 @@
import java.util.Map;
import java.util.Optional;

import static org.apache.paimon.catalog.AbstractCatalog.isSpecifiedSystemTable;
import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED;
import static org.apache.paimon.options.CatalogOptions.CACHE_EXPIRATION_INTERVAL_MS;
import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_MAX_MEMORY;
import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_MEMORY;
import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_THRESHOLD;
import static org.apache.paimon.options.CatalogOptions.CACHE_PARTITION_MAX_NUM;
import static org.apache.paimon.options.CatalogOptions.CACHE_SNAPSHOT_MAX_NUM_PER_TABLE;
import static org.apache.paimon.table.system.SystemTableLoader.SYSTEM_TABLES;

/** A {@link Catalog} to cache databases and tables and manifests. */
public class CachingCatalog extends DelegateCatalog {
Expand Down Expand Up @@ -227,27 +223,14 @@ public Table getTable(Identifier identifier) throws TableNotExistException {
return table;
}

if (isSpecifiedSystemTable(identifier)) {
Identifier originIdentifier =
new Identifier(
identifier.getDatabaseName(),
identifier.getTableName(),
identifier.getBranchName(),
null);
if (!identifier.isOriginTable()) {
// cache origin table first
Identifier originIdentifier = identifier.toOriginTable();
Table originTable = tableCache.getIfPresent(originIdentifier);
if (originTable == null) {
originTable = wrapped.getTable(originIdentifier);
putTableCache(originIdentifier, originTable);
}
table =
SystemTableLoader.load(
Preconditions.checkNotNull(identifier.getSystemTableName()),
(FileStoreTable) originTable);
if (table == null) {
throw new TableNotExistException(identifier);
}
putTableCache(identifier, table);
return table;
}

table = wrapped.getTable(identifier);
Expand Down Expand Up @@ -309,32 +292,28 @@ private class TableInvalidatingRemovalListener implements RemovalListener<Identi
public void onRemoval(Identifier identifier, Table table, @NonNull RemovalCause cause) {
LOG.debug("Evicted {} from the table cache ({})", identifier, cause);
if (RemovalCause.EXPIRED.equals(cause)) {
tryInvalidateSysTables(identifier);
tryInvalidateAttachedTables(identifier);
}
}
}

@Override
public void invalidateTable(Identifier identifier) {
tableCache.invalidate(identifier);
tryInvalidateSysTables(identifier);
tryInvalidateAttachedTables(identifier);
if (partitionCache != null) {
partitionCache.invalidate(identifier);
}
}

private void tryInvalidateSysTables(Identifier identifier) {
if (!isSpecifiedSystemTable(identifier)) {
tableCache.invalidateAll(allSystemTables(identifier));
}
}

private static Iterable<Identifier> allSystemTables(Identifier ident) {
List<Identifier> tables = new ArrayList<>();
for (String type : SYSTEM_TABLES) {
tables.add(Identifier.fromString(ident.getFullName() + SYSTEM_TABLE_SPLITTER + type));
private void tryInvalidateAttachedTables(Identifier identifier) {
if (identifier.isOriginTable()) {
for (@NonNull Identifier i : tableCache.asMap().keySet()) {
if (identifier.getTableName().equals(i.getTableName())) {
tableCache.invalidate(i);
}
}
}
return tables;
}

// ================================== refresh ================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ public Identifier(String database, String object) {
this.object = object;
}

public Identifier(String database, String table, @Nullable String branch) {
this(database, table, branch, null);
}

public Identifier(
String database, String table, @Nullable String branch, @Nullable String systemTable) {
this.database = database;
Expand Down Expand Up @@ -120,6 +124,14 @@ public String getBranchNameOrDefault() {
return systemTable == null ? null : systemTable.toLowerCase();
}

public boolean isOriginTable() {
return getSystemTableName() == null && getBranchName() == null;
}

public Identifier toOriginTable() {
return new Identifier(database, getTableName());
}

private void splitObjectName() {
if (table != null) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,18 @@ public void testInvalidateSystemTablesIfBaseTableIsModified() throws Exception {

@Test
public void testInvalidateSysTablesIfBaseTableIsDropped() throws Exception {
Catalog catalog = new CachingCatalog(this.catalog);
TestableCachingCatalog catalog =
new TestableCachingCatalog(this.catalog, EXPIRATION_TTL, ticker);
Identifier tableIdent = new Identifier("db", "tbl");
catalog.createTable(new Identifier("db", "tbl"), DEFAULT_TABLE_SCHEMA, false);
Identifier sysIdent = new Identifier("db", "tbl$files");
catalog.getTable(sysIdent);
assertThat(catalog.tableCache.asMap()).containsKey(sysIdent);
assertThat(catalog.tableCache.asMap()).containsKey(new Identifier("db", "tbl$FILES"));
Identifier sysIdent1 = new Identifier("db", "tbl$SNAPSHOTS");
catalog.getTable(sysIdent1);
assertThat(catalog.tableCache.asMap()).containsKey(sysIdent1);
assertThat(catalog.tableCache.asMap()).containsKey(new Identifier("db", "tbl$snapshots"));

catalog.dropTable(tableIdent, false);
assertThatThrownBy(() -> catalog.getTable(sysIdent))
Expand All @@ -116,6 +121,26 @@ public void testInvalidateSysTablesIfBaseTableIsDropped() throws Exception {
.hasMessage("Table db.tbl does not exist.");
}

@Test
public void testCacheBranch() throws Exception {
TestableCachingCatalog catalog =
new TestableCachingCatalog(this.catalog, EXPIRATION_TTL, ticker);
Identifier tableIdent = new Identifier("db", "tbl");
catalog.createTable(new Identifier("db", "tbl"), DEFAULT_TABLE_SCHEMA, false);
catalog.getTable(tableIdent).createBranch("b1");

Identifier s1 = new Identifier("db", "tbl$branch_b1");
catalog.getTable(s1);
assertThat(catalog.tableCache.asMap()).containsKey(s1);
Identifier s2 = new Identifier("db", "tbl$branch_b1$FILES");
catalog.getTable(s2);
assertThat(catalog.tableCache.asMap()).containsKey(s2);

catalog.dropTable(tableIdent, false);
assertThatThrownBy(() -> catalog.getTable(s1)).hasMessage("Table db.tbl does not exist.");
assertThatThrownBy(() -> catalog.getTable(s2)).hasMessage("Table db.tbl does not exist.");
}

@Test
public void testTableExpiresAfterInterval() throws Exception {
TestableCachingCatalog catalog =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ public String identifier() {
})
public String[] call(ProcedureContext procedureContext, String tableId, String branchStr)
throws Catalog.TableNotExistException {
catalog.getTable(Identifier.fromString(tableId)).deleteBranches(branchStr);
Identifier identifier = Identifier.fromString(tableId);
catalog.getTable(identifier).deleteBranches(branchStr);
catalog.invalidateTable(
new Identifier(identifier.getDatabaseName(), identifier.getTableName(), branchStr));
return new String[] {"Success"};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@

package org.apache.paimon.spark

import org.apache.spark.SparkConf
import org.junit.jupiter.api.Assertions

class DataFrameWriteTest extends PaimonSparkTestBase {

override protected def sparkConf: SparkConf = {
super.sparkConf.set("spark.sql.catalog.paimon.cache-enabled", "false")
}

test("Paimon: DataFrameWrite.saveAsTable") {

import testImplicits._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.spark

import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.functions.{col, mean, window}
Expand All @@ -27,6 +28,10 @@ import java.sql.Date

class PaimonSinkTest extends PaimonSparkTestBase with StreamTest {

override protected def sparkConf: SparkConf = {
super.sparkConf.set("spark.sql.catalog.paimon.cache-enabled", "false")
}

import testImplicits._

test("Paimon Sink: forEachBatch") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ public Table loadTable(Identifier ident, long timestamp) throws NoSuchTableExcep
@Override
public void invalidateTable(Identifier ident) {
// We do not need to check whether the table exists and whether
// it is an Paimon table to reduce remote service requests.
// it is a Paimon table to reduce remote service requests.
sparkCatalog.invalidateTable(ident);
asTableCatalog().invalidateTable(ident);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.paimon.spark.procedure;

import org.apache.paimon.spark.catalog.WithPaimonCatalog;

import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
Expand Down Expand Up @@ -61,13 +63,20 @@ public StructType outputType() {
public InternalRow[] call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
String branchStr = args.getString(1);
return modifyPaimonTable(
tableIdent,
table -> {
table.deleteBranches(branchStr);
InternalRow outputRow = newInternalRow(true);
return new InternalRow[] {outputRow};
});
InternalRow[] result =
modifyPaimonTable(
tableIdent,
table -> {
table.deleteBranches(branchStr);
InternalRow outputRow = newInternalRow(true);
return new InternalRow[] {outputRow};
});
((WithPaimonCatalog) tableCatalog())
.paimonCatalog()
.invalidateTable(
new org.apache.paimon.catalog.Identifier(
tableIdent.namespace()[0], tableIdent.name(), branchStr));
return result;
}

public static ProcedureBuilder builder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.spark

import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.functions.{col, mean, window}
Expand All @@ -27,6 +28,10 @@ import java.sql.Date

class PaimonSinkTest extends PaimonSparkTestBase with StreamTest {

override protected def sparkConf: SparkConf = {
super.sparkConf.set("spark.sql.catalog.paimon.cache-enabled", "false")
}

import testImplicits._

test("Paimon Sink: forEachBatch") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,19 @@ package org.apache.paimon.spark.sql

import org.apache.paimon.spark.PaimonSparkTestBase

import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.DecimalType
import org.junit.jupiter.api.Assertions

import java.sql.{Date, Timestamp}

class DataFrameWriteTest extends PaimonSparkTestBase {

override protected def sparkConf: SparkConf = {
super.sparkConf.set("spark.sql.catalog.paimon.cache-enabled", "false")
}

import testImplicits._

test("Paimon: DataFrameWrite.saveAsTable") {
Expand Down

0 comments on commit 9cd9fa2

Please sign in to comment.