diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html b/docs/layouts/shortcodes/generated/catalog_configuration.html
index a583c74714ba..f0aa4c4b85af 100644
--- a/docs/layouts/shortcodes/generated/catalog_configuration.html
+++ b/docs/layouts/shortcodes/generated/catalog_configuration.html
@@ -32,6 +32,18 @@
Boolean |
Indicates whether this catalog allow upper case, its default value depends on the implementation of the specific catalog. |
+
+ cache-enabled |
+ true |
+ Boolean |
+ Controls whether the catalog will cache table entries upon load. |
+
+
+ cache.expiration-interval |
+ 30 s |
+ Duration |
+ Controls the duration for which entries in the catalog are cached. |
+
client-pool-size |
2 |
diff --git a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
index 9ef681809eab..28a751f5025e 100644
--- a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
@@ -91,6 +91,20 @@ public class CatalogOptions {
.defaultValue(2)
.withDescription("Configure the size of the connection pool.");
+ public static final ConfigOption CACHE_ENABLED =
+ key("cache-enabled")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Controls whether the catalog will cache table entries upon load.");
+
+ public static final ConfigOption CACHE_EXPIRATION_INTERVAL_MS =
+ key("cache.expiration-interval")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(30))
+ .withDescription(
+ "Controls the duration for which entries in the catalog are cached.");
+
public static final ConfigOption LINEAGE_META =
key("lineage-meta")
.stringType()
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index afe64666a875..062e935328fc 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -45,7 +45,6 @@
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -58,17 +57,12 @@
import static org.apache.paimon.options.CatalogOptions.LINEAGE_META;
import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
-import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Common implementation of {@link Catalog}. */
public abstract class AbstractCatalog implements Catalog {
- public static final String DB_SUFFIX = ".db";
- protected static final String TABLE_DEFAULT_OPTION_PREFIX = "table-default.";
- protected static final String DB_LOCATION_PROP = "location";
-
protected final FileIO fileIO;
protected final Map tableDefaultOptions;
protected final Options catalogOptions;
@@ -86,8 +80,7 @@ protected AbstractCatalog(FileIO fileIO, Options options) {
this.fileIO = fileIO;
this.lineageMetaFactory =
findAndCreateLineageMeta(options, AbstractCatalog.class.getClassLoader());
- this.tableDefaultOptions =
- convertToPropertiesPrefixKey(options.toMap(), TABLE_DEFAULT_OPTION_PREFIX);
+ this.tableDefaultOptions = Catalog.tableDefaultOptions(options.toMap());
this.catalogOptions = options;
}
@@ -445,12 +438,12 @@ protected void assertMainBranch(String branchName) {
}
}
- private static boolean isSpecifiedSystemTable(Identifier identifier) {
+ public static boolean isSpecifiedSystemTable(Identifier identifier) {
return identifier.getObjectName().contains(SYSTEM_TABLE_SPLITTER)
&& !getOriginalIdentifierAndBranch(identifier).isPresent();
}
- protected boolean isSystemTable(Identifier identifier) {
+ protected static boolean isSystemTable(Identifier identifier) {
return isSystemDatabase(identifier.getDatabaseName()) || isSpecifiedSystemTable(identifier);
}
@@ -463,11 +456,11 @@ protected void checkNotSystemTable(Identifier identifier, String method) {
}
}
- public void copyTableDefaultOptions(Map options) {
+ private void copyTableDefaultOptions(Map options) {
tableDefaultOptions.forEach(options::putIfAbsent);
}
- private String[] tableAndSystemName(Identifier identifier) {
+ public static String[] tableAndSystemName(Identifier identifier) {
String[] splits = StringUtils.split(identifier.getObjectName(), SYSTEM_TABLE_SPLITTER);
if (splits.length != 2) {
throw new IllegalArgumentException(
@@ -493,7 +486,7 @@ public static Path newDatabasePath(String warehouse, String database) {
return new Path(warehouse, database + DB_SUFFIX);
}
- private boolean isSystemDatabase(String database) {
+ public static boolean isSystemDatabase(String database) {
return SYSTEM_DATABASE_NAME.equals(database);
}
@@ -504,30 +497,9 @@ protected void checkNotSystemDatabase(String database) {
}
}
- /** Validate database, table and field names must be lowercase when not case-sensitive. */
- public static void validateCaseInsensitive(
- boolean caseSensitive, String type, String... names) {
- validateCaseInsensitive(caseSensitive, type, Arrays.asList(names));
- }
-
- /** Validate database, table and field names must be lowercase when not case-sensitive. */
- public static void validateCaseInsensitive(
- boolean caseSensitive, String type, List names) {
- if (caseSensitive) {
- return;
- }
- List illegalNames =
- names.stream().filter(f -> !f.equals(f.toLowerCase())).collect(Collectors.toList());
- checkArgument(
- illegalNames.isEmpty(),
- String.format(
- "%s name %s cannot contain upper case in the catalog.",
- type, illegalNames));
- }
-
protected void validateIdentifierNameCaseInsensitive(Identifier identifier) {
- validateCaseInsensitive(allowUpperCase(), "Database", identifier.getDatabaseName());
- validateCaseInsensitive(allowUpperCase(), "Table", identifier.getObjectName());
+ Catalog.validateCaseInsensitive(allowUpperCase(), "Database", identifier.getDatabaseName());
+ Catalog.validateCaseInsensitive(allowUpperCase(), "Table", identifier.getObjectName());
}
private void validateFieldNameCaseInsensitiveInSchemaChange(List changes) {
@@ -545,7 +517,7 @@ private void validateFieldNameCaseInsensitiveInSchemaChange(List c
}
protected void validateFieldNameCaseInsensitive(List fieldNames) {
- validateCaseInsensitive(allowUpperCase(), "Field", fieldNames);
+ Catalog.validateCaseInsensitive(allowUpperCase(), "Field", fieldNames);
}
private void validateAutoCreateClose(Map options) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
new file mode 100644
index 000000000000..893d38810e4a
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.catalog;
+
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.system.SystemTableLoader;
+
+import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
+import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.RemovalCause;
+import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.RemovalListener;
+import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Ticker;
+
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.catalog.AbstractCatalog.isSpecifiedSystemTable;
+import static org.apache.paimon.catalog.AbstractCatalog.tableAndSystemName;
+import static org.apache.paimon.options.CatalogOptions.CACHE_EXPIRATION_INTERVAL_MS;
+import static org.apache.paimon.table.system.SystemTableLoader.SYSTEM_TABLES;
+
+/** A {@link Catalog} to cache databases and tables and manifests. */
+public class CachingCatalog extends DelegateCatalog {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CachingCatalog.class);
+
+ protected final Cache> databaseCache;
+ protected final Cache tableCache;
+
+ public CachingCatalog(Catalog wrapped) {
+ this(wrapped, CACHE_EXPIRATION_INTERVAL_MS.defaultValue());
+ }
+
+ public CachingCatalog(Catalog wrapped, Duration expirationInterval) {
+ this(wrapped, expirationInterval, Ticker.systemTicker());
+ }
+
+ public CachingCatalog(Catalog wrapped, Duration expirationInterval, Ticker ticker) {
+ super(wrapped);
+ if (expirationInterval.isZero() || expirationInterval.isNegative()) {
+ throw new IllegalArgumentException(
+ "When cache.expiration-interval is set to negative or 0, the catalog cache should be disabled.");
+ }
+
+ this.databaseCache =
+ Caffeine.newBuilder()
+ .softValues()
+ .executor(Runnable::run)
+ .expireAfterAccess(expirationInterval)
+ .ticker(ticker)
+ .build();
+ this.tableCache =
+ Caffeine.newBuilder()
+ .softValues()
+ .removalListener(new TableInvalidatingRemovalListener())
+ .executor(Runnable::run)
+ .expireAfterAccess(expirationInterval)
+ .ticker(ticker)
+ .build();
+ }
+
+ @Override
+ public Map loadDatabaseProperties(String databaseName)
+ throws DatabaseNotExistException {
+ Map properties = databaseCache.getIfPresent(databaseName);
+ if (properties != null) {
+ return properties;
+ }
+
+ properties = super.loadDatabaseProperties(databaseName);
+ databaseCache.put(databaseName, properties);
+ return properties;
+ }
+
+ @Override
+ public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
+ throws DatabaseNotExistException, DatabaseNotEmptyException {
+ super.dropDatabase(name, ignoreIfNotExists, cascade);
+ databaseCache.invalidate(name);
+ if (cascade) {
+ List tables = new ArrayList<>();
+ for (Identifier identifier : tableCache.asMap().keySet()) {
+ if (identifier.getDatabaseName().equals(name)) {
+ tables.add(identifier);
+ }
+ }
+ tables.forEach(tableCache::invalidate);
+ }
+ }
+
+ @Override
+ public void dropTable(Identifier identifier, boolean ignoreIfNotExists)
+ throws TableNotExistException {
+ super.dropTable(identifier, ignoreIfNotExists);
+ invalidateTable(identifier);
+ }
+
+ @Override
+ public void renameTable(Identifier fromTable, Identifier toTable, boolean ignoreIfNotExists)
+ throws TableNotExistException, TableAlreadyExistException {
+ super.renameTable(fromTable, toTable, ignoreIfNotExists);
+ invalidateTable(fromTable);
+ }
+
+ @Override
+ public void alterTable(
+ Identifier identifier, List changes, boolean ignoreIfNotExists)
+ throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException {
+ super.alterTable(identifier, changes, ignoreIfNotExists);
+ invalidateTable(identifier);
+ }
+
+ @Override
+ public Table getTable(Identifier identifier) throws TableNotExistException {
+ Table table = tableCache.getIfPresent(identifier);
+ if (table != null) {
+ return table;
+ }
+
+ if (isSpecifiedSystemTable(identifier)) {
+ String[] splits = tableAndSystemName(identifier);
+ String tableName = splits[0];
+ String type = splits[1];
+
+ Identifier originIdentifier =
+ Identifier.create(identifier.getDatabaseName(), tableName);
+ Table originTable = tableCache.getIfPresent(originIdentifier);
+ if (originTable == null) {
+ originTable = wrapped.getTable(originIdentifier);
+ tableCache.put(originIdentifier, originTable);
+ }
+ table = SystemTableLoader.load(type, (FileStoreTable) originTable);
+ if (table == null) {
+ throw new TableNotExistException(identifier);
+ }
+ tableCache.put(identifier, table);
+ return table;
+ }
+
+ table = wrapped.getTable(identifier);
+ tableCache.put(identifier, table);
+ return table;
+ }
+
+ private class TableInvalidatingRemovalListener implements RemovalListener {
+ @Override
+ public void onRemoval(
+ Identifier tableIdentifier, Table table, @NonNull RemovalCause cause) {
+ LOG.debug("Evicted {} from the table cache ({})", tableIdentifier, cause);
+ if (RemovalCause.EXPIRED.equals(cause)) {
+ if (!isSpecifiedSystemTable(tableIdentifier)) {
+ tableCache.invalidateAll(allSystemTables(tableIdentifier));
+ }
+ }
+ }
+ }
+
+ private void invalidateTable(Identifier identifier) {
+ tableCache.invalidate(identifier);
+ tableCache.invalidateAll(allSystemTables(identifier));
+ }
+
+ private static Iterable allSystemTables(Identifier ident) {
+ List tables = new ArrayList<>();
+ for (String type : SYSTEM_TABLES) {
+ tables.add(Identifier.fromString(ident.getFullName() + SYSTEM_TABLE_SPLITTER + type));
+ }
+ return tables;
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index fe8e0b68b813..0f5fe6a2233e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -26,10 +26,15 @@
import org.apache.paimon.table.Table;
import java.io.Serializable;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
/**
* This interface is responsible for reading and writing metadata such as database/table from a
@@ -47,6 +52,9 @@ public interface Catalog extends AutoCloseable {
String SYSTEM_TABLE_SPLITTER = "$";
String SYSTEM_DATABASE_NAME = "sys";
String COMMENT_PROP = "comment";
+ String TABLE_DEFAULT_OPTION_PREFIX = "table-default.";
+ String DB_LOCATION_PROP = "location";
+ String DB_SUFFIX = ".db";
/** Warehouse root path containing all database directories in this catalog. */
String warehouse();
@@ -277,6 +285,29 @@ default void repairTable(Identifier identifier) throws TableNotExistException {
throw new UnsupportedOperationException();
}
+ static Map tableDefaultOptions(Map options) {
+ return convertToPropertiesPrefixKey(options, TABLE_DEFAULT_OPTION_PREFIX);
+ }
+
+ /** Validate database, table and field names must be lowercase when not case-sensitive. */
+ static void validateCaseInsensitive(boolean caseSensitive, String type, String... names) {
+ validateCaseInsensitive(caseSensitive, type, Arrays.asList(names));
+ }
+
+ /** Validate database, table and field names must be lowercase when not case-sensitive. */
+ static void validateCaseInsensitive(boolean caseSensitive, String type, List names) {
+ if (caseSensitive) {
+ return;
+ }
+ List illegalNames =
+ names.stream().filter(f -> !f.equals(f.toLowerCase())).collect(Collectors.toList());
+ checkArgument(
+ illegalNames.isEmpty(),
+ String.format(
+ "%s name %s cannot contain upper case in the catalog.",
+ type, illegalNames));
+ }
+
/** Exception for trying to drop on a database that is not empty. */
class DatabaseNotEmptyException extends Exception {
private static final String MSG = "Database %s is not empty.";
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java
index c153f0ee39c1..2eff8e902b6e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java
@@ -24,11 +24,16 @@
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
+import org.apache.paimon.privilege.FileBasedPrivilegeManager;
+import org.apache.paimon.privilege.PrivilegeManager;
+import org.apache.paimon.privilege.PrivilegedCatalog;
import org.apache.paimon.utils.Preconditions;
import java.io.IOException;
import java.io.UncheckedIOException;
+import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED;
+import static org.apache.paimon.options.CatalogOptions.CACHE_EXPIRATION_INTERVAL_MS;
import static org.apache.paimon.options.CatalogOptions.METASTORE;
import static org.apache.paimon.options.CatalogOptions.WAREHOUSE;
@@ -67,6 +72,27 @@ static Catalog createCatalog(CatalogContext options) {
}
static Catalog createCatalog(CatalogContext context, ClassLoader classLoader) {
+ Catalog catalog = createUnwrappedCatalog(context, classLoader);
+
+ Options options = context.options();
+ if (options.get(CACHE_ENABLED)) {
+ catalog = new CachingCatalog(catalog, options.get(CACHE_EXPIRATION_INTERVAL_MS));
+ }
+
+ PrivilegeManager privilegeManager =
+ new FileBasedPrivilegeManager(
+ catalog.warehouse(),
+ catalog.fileIO(),
+ context.options().get(PrivilegedCatalog.USER),
+ context.options().get(PrivilegedCatalog.PASSWORD));
+ if (privilegeManager.privilegeEnabled()) {
+ catalog = new PrivilegedCatalog(catalog, privilegeManager);
+ }
+
+ return catalog;
+ }
+
+ static Catalog createUnwrappedCatalog(CatalogContext context, ClassLoader classLoader) {
Options options = context.options();
String metastore = options.get(METASTORE);
CatalogFactory catalogFactory =
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
index c2ff376019bf..64f38a106c93 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
@@ -63,7 +63,7 @@ public List listDatabases() {
@Override
protected void createDatabaseImpl(String name, Map properties) {
- if (properties.containsKey(AbstractCatalog.DB_LOCATION_PROP)) {
+ if (properties.containsKey(Catalog.DB_LOCATION_PROP)) {
throw new IllegalArgumentException(
"Cannot specify location for a database when using fileSystem catalog.");
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java
index 8a0b1643fb2b..6ab3664e4ca9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java
@@ -20,9 +20,6 @@
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
-import org.apache.paimon.privilege.FileBasedPrivilegeManager;
-import org.apache.paimon.privilege.PrivilegeManager;
-import org.apache.paimon.privilege.PrivilegedCatalog;
import org.apache.paimon.table.TableType;
import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE;
@@ -44,18 +41,6 @@ public Catalog create(FileIO fileIO, Path warehouse, CatalogContext context) {
"Only managed table is supported in File system catalog.");
}
- Catalog catalog = new FileSystemCatalog(fileIO, warehouse, context.options());
-
- PrivilegeManager privilegeManager =
- new FileBasedPrivilegeManager(
- warehouse.toString(),
- fileIO,
- context.options().get(PrivilegedCatalog.USER),
- context.options().get(PrivilegedCatalog.PASSWORD));
- if (privilegeManager.privilegeEnabled()) {
- catalog = new PrivilegedCatalog(catalog, privilegeManager);
- }
-
- return catalog;
+ return new FileSystemCatalog(fileIO, warehouse, context.options());
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 4f70ac725e48..905ac2dff0b6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -65,7 +65,7 @@
import java.util.function.Function;
import java.util.stream.Collectors;
-import static org.apache.paimon.catalog.AbstractCatalog.DB_SUFFIX;
+import static org.apache.paimon.catalog.Catalog.DB_SUFFIX;
import static org.apache.paimon.catalog.Identifier.UNKNOWN_DATABASE;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.FileUtils.listVersionedFiles;
diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
new file mode 100644
index 000000000000..56fc238a8f70
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.catalog;
+
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.system.SystemTableLoader;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.FakeTicker;
+
+import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+class CachingCatalogTest extends CatalogTestBase {
+
+ private static final Duration EXPIRATION_TTL = Duration.ofMinutes(5);
+ private static final Duration HALF_OF_EXPIRATION = EXPIRATION_TTL.dividedBy(2);
+
+ private FakeTicker ticker;
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ super.setUp();
+ catalog = new FileSystemCatalog(fileIO, new Path(warehouse));
+ ticker = new FakeTicker();
+ catalog.createDatabase("db", false);
+ }
+
+ @Override
+ @Test
+ public void testListDatabasesWhenNoDatabases() {
+ List databases = catalog.listDatabases();
+ assertThat(databases).contains("db");
+ }
+
+ @Test
+ public void testInvalidateSystemTablesIfBaseTableIsModified() throws Exception {
+ Catalog catalog = new CachingCatalog(this.catalog);
+ Identifier tableIdent = new Identifier("db", "tbl");
+ catalog.createTable(new Identifier("db", "tbl"), DEFAULT_TABLE_SCHEMA, false);
+ Identifier sysIdent = new Identifier("db", "tbl$files");
+ Table sysTable = catalog.getTable(sysIdent);
+ catalog.alterTable(tableIdent, SchemaChange.addColumn("col3", DataTypes.INT()), false);
+ assertThat(catalog.getTable(sysIdent)).isNotSameAs(sysTable);
+ }
+
+ @Test
+ public void testInvalidateSysTablesIfBaseTableIsDropped() throws Exception {
+ Catalog catalog = new CachingCatalog(this.catalog);
+ Identifier tableIdent = new Identifier("db", "tbl");
+ catalog.createTable(new Identifier("db", "tbl"), DEFAULT_TABLE_SCHEMA, false);
+ Identifier sysIdent = new Identifier("db", "tbl$files");
+ catalog.getTable(sysIdent);
+ catalog.dropTable(tableIdent, false);
+ assertThatThrownBy(() -> catalog.getTable(sysIdent))
+ .hasMessage("Table db.tbl does not exist.");
+ }
+
+ @Test
+ public void testTableExpiresAfterInterval() throws Exception {
+ TestableCachingCatalog catalog =
+ new TestableCachingCatalog(this.catalog, EXPIRATION_TTL, ticker);
+
+ Identifier tableIdent = new Identifier("db", "tbl");
+ catalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false);
+ Table table = catalog.getTable(tableIdent);
+
+ // Ensure table is cached with full ttl remaining upon creation
+ assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+ assertThat(catalog.remainingAgeFor(tableIdent)).isPresent().get().isEqualTo(EXPIRATION_TTL);
+
+ ticker.advance(HALF_OF_EXPIRATION);
+ assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+ assertThat(catalog.ageOf(tableIdent)).isPresent().get().isEqualTo(HALF_OF_EXPIRATION);
+
+ ticker.advance(HALF_OF_EXPIRATION.plus(Duration.ofSeconds(10)));
+ assertThat(catalog.cache().asMap()).doesNotContainKey(tableIdent);
+ assertThat(catalog.getTable(tableIdent))
+ .as("CachingCatalog should return a new instance after expiration")
+ .isNotSameAs(table);
+ }
+
+ @Test
+ public void testCatalogExpirationTtlRefreshesAfterAccessViaCatalog() throws Exception {
+ TestableCachingCatalog catalog =
+ new TestableCachingCatalog(this.catalog, EXPIRATION_TTL, ticker);
+
+ Identifier tableIdent = new Identifier("db", "tbl");
+ catalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false);
+ catalog.getTable(tableIdent);
+ assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+ assertThat(catalog.ageOf(tableIdent)).isPresent().get().isEqualTo(Duration.ZERO);
+
+ ticker.advance(HALF_OF_EXPIRATION);
+ assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+ assertThat(catalog.ageOf(tableIdent)).isPresent().get().isEqualTo(HALF_OF_EXPIRATION);
+ assertThat(catalog.remainingAgeFor(tableIdent))
+ .isPresent()
+ .get()
+ .isEqualTo(HALF_OF_EXPIRATION);
+
+ Duration oneMinute = Duration.ofMinutes(1L);
+ ticker.advance(oneMinute);
+ assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+ assertThat(catalog.ageOf(tableIdent))
+ .isPresent()
+ .get()
+ .isEqualTo(HALF_OF_EXPIRATION.plus(oneMinute));
+ assertThat(catalog.remainingAgeFor(tableIdent))
+ .get()
+ .isEqualTo(HALF_OF_EXPIRATION.minus(oneMinute));
+
+ // Access the table via the catalog, which should refresh the TTL
+ Table table = catalog.getTable(tableIdent);
+ assertThat(catalog.ageOf(tableIdent)).get().isEqualTo(Duration.ZERO);
+ assertThat(catalog.remainingAgeFor(tableIdent)).get().isEqualTo(EXPIRATION_TTL);
+
+ ticker.advance(HALF_OF_EXPIRATION);
+ assertThat(catalog.ageOf(tableIdent)).get().isEqualTo(HALF_OF_EXPIRATION);
+ assertThat(catalog.remainingAgeFor(tableIdent)).get().isEqualTo(HALF_OF_EXPIRATION);
+ }
+
+ @Test
+ public void testCacheExpirationEagerlyRemovesSysTables() throws Exception {
+ TestableCachingCatalog catalog =
+ new TestableCachingCatalog(this.catalog, EXPIRATION_TTL, ticker);
+
+ Identifier tableIdent = new Identifier("db", "tbl");
+ catalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false);
+ Table table = catalog.getTable(tableIdent);
+ assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+ assertThat(catalog.ageOf(tableIdent)).get().isEqualTo(Duration.ZERO);
+
+ ticker.advance(HALF_OF_EXPIRATION);
+ assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+ assertThat(catalog.ageOf(tableIdent)).get().isEqualTo(HALF_OF_EXPIRATION);
+
+ for (Identifier sysTable : sysTables(tableIdent)) {
+ catalog.getTable(sysTable);
+ }
+ assertThat(catalog.cache().asMap()).containsKeys(sysTables(tableIdent));
+ assertThat(Arrays.stream(sysTables(tableIdent)).map(catalog::ageOf))
+ .isNotEmpty()
+ .allMatch(age -> age.isPresent() && age.get().equals(Duration.ZERO));
+
+ assertThat(catalog.remainingAgeFor(tableIdent))
+ .as("Loading a non-cached sys table should refresh the main table's age")
+ .isEqualTo(Optional.of(EXPIRATION_TTL));
+
+ // Move time forward and access already cached sys tables.
+ ticker.advance(HALF_OF_EXPIRATION);
+ for (Identifier sysTable : sysTables(tableIdent)) {
+ catalog.getTable(sysTable);
+ }
+ assertThat(Arrays.stream(sysTables(tableIdent)).map(catalog::ageOf))
+ .isNotEmpty()
+ .allMatch(age -> age.isPresent() && age.get().equals(Duration.ZERO));
+
+ assertThat(catalog.remainingAgeFor(tableIdent))
+ .as("Accessing a cached sys table should not affect the main table's age")
+ .isEqualTo(Optional.of(HALF_OF_EXPIRATION));
+
+ // Move time forward so the data table drops.
+ ticker.advance(HALF_OF_EXPIRATION);
+ assertThat(catalog.cache().asMap()).doesNotContainKey(tableIdent);
+
+ Arrays.stream(sysTables(tableIdent))
+ .forEach(
+ sysTable ->
+ assertThat(catalog.cache().asMap())
+ .as(
+ "When a data table expires, its sys tables should expire regardless of age")
+ .doesNotContainKeys(sysTable));
+ }
+
+ @Test
+ public void testDeadlock() throws Exception {
+ Catalog underlyCatalog = this.catalog;
+ TestableCachingCatalog catalog =
+ new TestableCachingCatalog(this.catalog, Duration.ofSeconds(1), ticker);
+ int numThreads = 20;
+ List createdTables = new ArrayList<>();
+ for (int i = 0; i < numThreads; i++) {
+ Identifier tableIdent = new Identifier("db", "tbl" + i);
+ catalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false);
+ createdTables.add(tableIdent);
+ }
+
+ Cache cache = catalog.cache();
+ AtomicInteger cacheGetCount = new AtomicInteger(0);
+ AtomicInteger cacheCleanupCount = new AtomicInteger(0);
+ ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+ for (int i = 0; i < numThreads; i++) {
+ if (i % 2 == 0) {
+ String table = "tbl" + i;
+ executor.submit(
+ () -> {
+ ticker.advance(Duration.ofSeconds(2));
+ cache.get(
+ new Identifier("db", table),
+ identifier -> {
+ try {
+ return underlyCatalog.getTable(identifier);
+ } catch (Catalog.TableNotExistException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ cacheGetCount.incrementAndGet();
+ });
+ } else {
+ executor.submit(
+ () -> {
+ ticker.advance(Duration.ofSeconds(2));
+ cache.cleanUp();
+ cacheCleanupCount.incrementAndGet();
+ });
+ }
+ }
+ executor.awaitTermination(2, TimeUnit.SECONDS);
+ assertThat(cacheGetCount).hasValue(numThreads / 2);
+ assertThat(cacheCleanupCount).hasValue(numThreads / 2);
+
+ executor.shutdown();
+ }
+
+ @Test
+ public void testCachingCatalogRejectsExpirationIntervalOfZero() {
+ Assertions.assertThatThrownBy(
+ () -> new TestableCachingCatalog(this.catalog, Duration.ZERO, ticker))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage(
+ "When cache.expiration-interval is set to negative or 0, the catalog cache should be disabled.");
+ }
+
+ @Test
+ public void testInvalidateTableForChainedCachingCatalogs() throws Exception {
+ TestableCachingCatalog wrappedCatalog =
+ new TestableCachingCatalog(this.catalog, EXPIRATION_TTL, ticker);
+ TestableCachingCatalog catalog =
+ new TestableCachingCatalog(wrappedCatalog, EXPIRATION_TTL, ticker);
+ Identifier tableIdent = new Identifier("db", "tbl");
+ catalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false);
+ catalog.getTable(tableIdent);
+ assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+ catalog.dropTable(tableIdent, false);
+ assertThat(catalog.cache().asMap()).doesNotContainKey(tableIdent);
+ assertThat(wrappedCatalog.cache().asMap()).doesNotContainKey(tableIdent);
+ }
+
+ public static Identifier[] sysTables(Identifier tableIdent) {
+ return SystemTableLoader.SYSTEM_TABLES.stream()
+ .map(type -> Identifier.fromString(tableIdent.getFullName() + "$" + type))
+ .toArray(Identifier[]::new);
+ }
+}
diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
index 378ebf159079..19c9e83c45bc 100644
--- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
@@ -38,7 +38,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
-import java.util.Arrays;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -55,6 +55,7 @@ public abstract class CatalogTestBase {
protected String warehouse;
protected FileIO fileIO;
protected Catalog catalog;
+
protected static final Schema DEFAULT_TABLE_SCHEMA =
new Schema(
Lists.newArrayList(
@@ -66,18 +67,6 @@ public abstract class CatalogTestBase {
Maps.newHashMap(),
"");
- protected static final Schema PARTITION_SCHEMA =
- new Schema(
- Lists.newArrayList(
- new DataField(0, "pk1", DataTypes.INT()),
- new DataField(1, "pk2", DataTypes.STRING()),
- new DataField(3, "pk3", DataTypes.STRING()),
- new DataField(4, "col", DataTypes.STRING())),
- Arrays.asList("pk1", "pk2"),
- Arrays.asList("pk1", "pk2", "pk3"),
- Maps.newHashMap(),
- "");
-
@BeforeEach
public void setUp() throws Exception {
warehouse = tempFile.toUri().toString();
@@ -95,7 +84,10 @@ void tearDown() throws Exception {
}
@Test
- public abstract void testListDatabasesWhenNoDatabases();
+ public void testListDatabasesWhenNoDatabases() {
+ List databases = catalog.listDatabases();
+ assertThat(databases).isEqualTo(new ArrayList<>());
+ }
@Test
public void testListDatabases() throws Exception {
diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java
new file mode 100644
index 000000000000..67fbd2718a36
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.catalog;
+
+import org.apache.paimon.fs.Path;
+
+import org.junit.jupiter.api.BeforeEach;
+
+class FileSystemCatalogTest extends CatalogTestBase {
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ super.setUp();
+ catalog = new FileSystemCatalog(fileIO, new Path(warehouse));
+ }
+}
diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java b/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java
new file mode 100644
index 000000000000..c393d3afff6c
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.catalog;
+
+import org.apache.paimon.table.Table;
+
+import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
+import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Ticker;
+
+import java.time.Duration;
+import java.util.Optional;
+
+/**
+ * A wrapper around CachingCatalog that provides accessor methods to test the underlying cache,
+ * without making those fields public in the CachingCatalog itself.
+ */
+public class TestableCachingCatalog extends CachingCatalog {
+
+ private final Duration cacheExpirationInterval;
+
+ public TestableCachingCatalog(Catalog catalog, Duration expirationInterval, Ticker ticker) {
+ super(catalog, expirationInterval, ticker);
+ this.cacheExpirationInterval = expirationInterval;
+ }
+
+ public Cache cache() {
+ // cleanUp must be called as tests apply assertions directly on the underlying map, but
+ // metadata
+ // table map entries are cleaned up asynchronously.
+ tableCache.cleanUp();
+ return tableCache;
+ }
+
+ public Optional ageOf(Identifier identifier) {
+ return tableCache.policy().expireAfterAccess().get().ageOf(identifier);
+ }
+
+ public Optional remainingAgeFor(Identifier identifier) {
+ return ageOf(identifier).map(cacheExpirationInterval::minus);
+ }
+}
diff --git a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java
index f0c84eb2c4d8..f5befc724f8b 100644
--- a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java
@@ -32,8 +32,6 @@
import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -88,13 +86,6 @@ public void testCleanTimeoutLockAndAcquireLock() throws SQLException, Interrupte
.isTrue();
}
- @Test
- @Override
- public void testListDatabasesWhenNoDatabases() {
- List databases = catalog.listDatabases();
- assertThat(databases).isEqualTo(new ArrayList<>());
- }
-
@Test
public void testCheckIdentifierUpperCase() throws Exception {
catalog.createDatabase("test_db", false);
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/system/AllTableOptionsTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/system/AllTableOptionsTableTest.java
index f2a1efce5151..764c0f4e168e 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/system/AllTableOptionsTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/system/AllTableOptionsTableTest.java
@@ -18,10 +18,7 @@
package org.apache.paimon.table.system;
-import org.apache.paimon.catalog.AbstractCatalog;
import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.fs.Path;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.types.DataTypes;
@@ -29,18 +26,12 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import java.util.Iterator;
import java.util.List;
-import java.util.Map;
-import java.util.Spliterator;
-import java.util.Spliterators;
+import java.util.Objects;
import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME;
import static org.apache.paimon.table.system.AllTableOptionsTable.ALL_TABLE_OPTIONS;
-import static org.apache.paimon.table.system.AllTableOptionsTable.options;
-import static org.apache.paimon.table.system.AllTableOptionsTable.toRow;
import static org.assertj.core.api.Assertions.assertThat;
/** Unit tests for {@link AllTableOptionsTable}. */
@@ -69,18 +60,14 @@ public void before() throws Exception {
@Test
public void testSchemasTable() throws Exception {
- List expectRow = getExceptedResult();
- List result = read(allTableOptionsTable);
- assertThat(result).containsExactlyElementsOf(expectRow);
- }
-
- private List getExceptedResult() {
- AbstractCatalog abstractCatalog = (AbstractCatalog) catalog;
- Map> stringMapMap = abstractCatalog.allTablePaths();
- Iterator rows =
- toRow(options(((AbstractCatalog) catalog).fileIO(), stringMapMap));
- return StreamSupport.stream(
- Spliterators.spliteratorUnknownSize(rows, Spliterator.ORDERED), false)
- .collect(Collectors.toList());
+ List result =
+ read(allTableOptionsTable).stream()
+ .map(Objects::toString)
+ .collect(Collectors.toList());
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ "+I(default,T,fields.sales.aggregate-function,sum)",
+ "+I(default,T,merge-engine,aggregation)",
+ "+I(default,T,fields.price.aggregate-function,max)");
}
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/FakeTicker.java b/paimon-core/src/test/java/org/apache/paimon/utils/FakeTicker.java
new file mode 100644
index 000000000000..cbd536ad5f00
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/FakeTicker.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Ticker;
+
+import java.time.Duration;
+import java.util.concurrent.atomic.AtomicLong;
+
+/** A {@code Ticker} whose value can be advanced programmatically in tests. */
+public class FakeTicker implements Ticker {
+
+ private final AtomicLong nanos = new AtomicLong();
+
+ public FakeTicker() {}
+
+ public FakeTicker advance(Duration duration) {
+ nanos.addAndGet(duration.toNanos());
+ return this;
+ }
+
+ @Override
+ public long read() {
+ return nanos.get();
+ }
+}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
index d6c4c12ecdd3..6c62bcc0c21f 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
@@ -18,7 +18,6 @@
package org.apache.paimon.flink.action.cdc;
-import org.apache.paimon.catalog.AbstractCatalog;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.MultiTablesSinkMode;
@@ -120,9 +119,9 @@ public SyncDatabaseActionBase withPrimaryKeys(String... primaryKeys) {
@Override
protected void validateCaseSensitivity() {
- AbstractCatalog.validateCaseInsensitive(allowUpperCase, "Database", database);
- AbstractCatalog.validateCaseInsensitive(allowUpperCase, "Table prefix", tablePrefix);
- AbstractCatalog.validateCaseInsensitive(allowUpperCase, "Table suffix", tableSuffix);
+ Catalog.validateCaseInsensitive(allowUpperCase, "Database", database);
+ Catalog.validateCaseInsensitive(allowUpperCase, "Table prefix", tablePrefix);
+ Catalog.validateCaseInsensitive(allowUpperCase, "Table suffix", tableSuffix);
}
@Override
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
index be8cedf0baf6..4c7db6d28b62 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
@@ -19,7 +19,7 @@
package org.apache.paimon.flink.action.cdc;
import org.apache.paimon.annotation.VisibleForTesting;
-import org.apache.paimon.catalog.AbstractCatalog;
+import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.Action;
@@ -114,8 +114,8 @@ protected Schema buildPaimonSchema(Schema retrievedSchema) {
@Override
protected void validateCaseSensitivity() {
- AbstractCatalog.validateCaseInsensitive(allowUpperCase, "Database", database);
- AbstractCatalog.validateCaseInsensitive(allowUpperCase, "Table", table);
+ Catalog.validateCaseInsensitive(allowUpperCase, "Database", database);
+ Catalog.validateCaseInsensitive(allowUpperCase, "Table", table);
}
@Override
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java
index 816b7b903afa..c15cbf89884e 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java
@@ -45,7 +45,7 @@ public static void startContainers() {
}
@Test
- @Timeout(120)
+ @Timeout(180)
public void testActionRunResult() throws Exception {
Map mySqlConfig = getBasicMySqlConfig();
mySqlConfig.put(
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index 98efd479f20d..2cb96e00750a 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -347,10 +347,7 @@ protected Schema buildPaimonSchema(
// Although catalog.createTable will copy the default options, but we need this info
// here before create table, such as table-default.kafka.bootstrap.servers defined in
// catalog options. Temporarily, we copy the default options here.
- if (catalog instanceof org.apache.paimon.catalog.AbstractCatalog) {
- ((org.apache.paimon.catalog.AbstractCatalog) catalog)
- .copyTableDefaultOptions(options);
- }
+ Catalog.tableDefaultOptions(catalog.options()).forEach(options::putIfAbsent);
options.put(REGISTER_TIMEOUT.key(), logStoreAutoRegisterTimeout.toString());
registerLogSystem(catalog, identifier, options, classLoader);
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
index dd32c52c6ca0..30e32d62efec 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
@@ -40,6 +40,8 @@
import java.util.UUID;
import java.util.stream.Collectors;
+import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED;
+
/** Abstract base of {@link Action} for table. */
public abstract class ActionBase implements Action {
@@ -55,6 +57,11 @@ public ActionBase(String warehouse, Map catalogConfig) {
catalogOptions = Options.fromMap(catalogConfig);
catalogOptions.set(CatalogOptions.WAREHOUSE, warehouse);
+ // disable cache to avoid concurrent modification exception
+ if (!catalogOptions.contains(CACHE_ENABLED)) {
+ catalogOptions.set(CACHE_ENABLED, false);
+ }
+
catalog = initPaimonCatalog();
flinkCatalog = initFlinkCatalog();
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java
index 5b320f4e983e..e5c83360627e 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java
@@ -18,7 +18,7 @@
package org.apache.paimon.flink.clone;
-import org.apache.paimon.catalog.AbstractCatalog;
+import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.fs.FileIO;
@@ -34,6 +34,8 @@
import java.util.Map;
+import static org.apache.paimon.CoreOptions.PATH;
+
/** A Operator to copy files. */
public class CopyFileOperator extends AbstractStreamOperator
implements OneInputStreamOperator {
@@ -43,8 +45,8 @@ public class CopyFileOperator extends AbstractStreamOperator
private final Map sourceCatalogConfig;
private final Map targetCatalogConfig;
- private AbstractCatalog sourceCatalog;
- private AbstractCatalog targetCatalog;
+ private Catalog sourceCatalog;
+ private Catalog targetCatalog;
public CopyFileOperator(
Map sourceCatalogConfig, Map targetCatalogConfig) {
@@ -55,13 +57,9 @@ public CopyFileOperator(
@Override
public void open() throws Exception {
sourceCatalog =
- (AbstractCatalog)
- FlinkCatalogFactory.createPaimonCatalog(
- Options.fromMap(sourceCatalogConfig));
+ FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(sourceCatalogConfig));
targetCatalog =
- (AbstractCatalog)
- FlinkCatalogFactory.createPaimonCatalog(
- Options.fromMap(targetCatalogConfig));
+ FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(targetCatalogConfig));
}
@Override
@@ -71,11 +69,19 @@ public void processElement(StreamRecord streamRecord) throws Exce
FileIO sourceTableFileIO = sourceCatalog.fileIO();
FileIO targetTableFileIO = targetCatalog.fileIO();
Path sourceTableRootPath =
- sourceCatalog.getDataTableLocation(
- Identifier.fromString(cloneFileInfo.getSourceIdentifier()));
+ new Path(
+ sourceCatalog
+ .getTable(
+ Identifier.fromString(cloneFileInfo.getSourceIdentifier()))
+ .options()
+ .get(PATH.key()));
Path targetTableRootPath =
- targetCatalog.getDataTableLocation(
- Identifier.fromString(cloneFileInfo.getTargetIdentifier()));
+ new Path(
+ targetCatalog
+ .getTable(
+ Identifier.fromString(cloneFileInfo.getTargetIdentifier()))
+ .options()
+ .get(PATH.key()));
String filePathExcludeTableRoot = cloneFileInfo.getFilePathExcludeTableRoot();
Path sourcePath = new Path(sourceTableRootPath + filePathExcludeTableRoot);
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateDatabaseProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateDatabaseProcedure.java
index 166544eb7357..128875a8b862 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateDatabaseProcedure.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateDatabaseProcedure.java
@@ -19,7 +19,6 @@
package org.apache.paimon.flink.procedure;
import org.apache.paimon.flink.utils.TableMigrationUtils;
-import org.apache.paimon.hive.HiveCatalog;
import org.apache.paimon.migrate.Migrator;
import org.apache.paimon.utils.ParameterUtils;
@@ -47,14 +46,10 @@ public String[] call(
String sourceDatabasePath,
String properties)
throws Exception {
- if (!(catalog instanceof HiveCatalog)) {
- throw new IllegalArgumentException("Only support Hive Catalog");
- }
- HiveCatalog hiveCatalog = (HiveCatalog) this.catalog;
List migrators =
TableMigrationUtils.getImporters(
connector,
- hiveCatalog,
+ catalog,
sourceDatabasePath,
ParameterUtils.parseCommaSeparatedKeyValues(properties));
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
index c9a273336c3f..110b4e25fc00 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
@@ -20,7 +20,6 @@
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.utils.TableMigrationUtils;
-import org.apache.paimon.hive.HiveCatalog;
import org.apache.paimon.migrate.Migrator;
import org.apache.flink.table.procedure.ProcedureContext;
@@ -62,9 +61,6 @@ public void migrateHandle(
String targetPaimonTablePath,
boolean deleteOrigin)
throws Exception {
- if (!(catalog instanceof HiveCatalog)) {
- throw new IllegalArgumentException("Only support Hive Catalog");
- }
Identifier sourceTableId = Identifier.fromString(sourceTablePath);
Identifier targetTableId = Identifier.fromString(targetPaimonTablePath);
@@ -76,7 +72,7 @@ public void migrateHandle(
Migrator importer =
TableMigrationUtils.getImporter(
connector,
- (HiveCatalog) catalog,
+ catalog,
sourceTableId.getDatabaseName(),
sourceTableId.getObjectName(),
targetTableId.getDatabaseName(),
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
index f721f89257e5..39e6092d8496 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
@@ -20,7 +20,6 @@
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.utils.TableMigrationUtils;
-import org.apache.paimon.hive.HiveCatalog;
import org.apache.paimon.utils.ParameterUtils;
import org.apache.flink.table.procedure.ProcedureContext;
@@ -51,10 +50,6 @@ public String[] call(
String sourceTablePath,
String properties)
throws Exception {
- if (!(catalog instanceof HiveCatalog)) {
- throw new IllegalArgumentException("Only support Hive Catalog");
- }
-
String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX;
Identifier sourceTableId = Identifier.fromString(sourceTablePath);
@@ -62,7 +57,7 @@ public String[] call(
TableMigrationUtils.getImporter(
connector,
- (HiveCatalog) catalog,
+ catalog,
sourceTableId.getDatabaseName(),
sourceTableId.getObjectName(),
targetTableId.getDatabaseName(),
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RepairProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RepairProcedure.java
index 0c208116be22..196adf4753cc 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RepairProcedure.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RepairProcedure.java
@@ -20,7 +20,6 @@
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.hive.HiveCatalog;
import org.apache.paimon.utils.StringUtils;
import org.apache.flink.table.procedure.ProcedureContext;
@@ -55,10 +54,6 @@ public String[] call(ProcedureContext procedureContext)
public String[] call(ProcedureContext procedureContext, String identifier)
throws Catalog.DatabaseNotExistException, Catalog.TableNotExistException {
- if (!(catalog instanceof HiveCatalog)) {
- throw new IllegalArgumentException("Only support Hive Catalog");
- }
-
if (StringUtils.isBlank(identifier)) {
catalog.repairCatalog();
return new String[] {"Success"};
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java
index 32d017be6665..47655d39a0f7 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java
@@ -18,6 +18,8 @@
package org.apache.paimon.flink.utils;
+import org.apache.paimon.catalog.CachingCatalog;
+import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.hive.HiveCatalog;
import org.apache.paimon.hive.migrate.HiveMigrator;
import org.apache.paimon.migrate.Migrator;
@@ -30,7 +32,7 @@ public class TableMigrationUtils {
public static Migrator getImporter(
String connector,
- HiveCatalog paimonCatalog,
+ Catalog catalog,
String sourceDatabase,
String souceTableName,
String targetDatabase,
@@ -38,8 +40,14 @@ public static Migrator getImporter(
Map options) {
switch (connector) {
case "hive":
+ if (catalog instanceof CachingCatalog) {
+ catalog = ((CachingCatalog) catalog).wrapped();
+ }
+ if (!(catalog instanceof HiveCatalog)) {
+ throw new IllegalArgumentException("Only support Hive Catalog");
+ }
return new HiveMigrator(
- paimonCatalog,
+ (HiveCatalog) catalog,
sourceDatabase,
souceTableName,
targetDatabase,
@@ -51,13 +59,17 @@ public static Migrator getImporter(
}
public static List getImporters(
- String connector,
- HiveCatalog paimonCatalog,
- String sourceDatabase,
- Map options) {
+ String connector, Catalog catalog, String sourceDatabase, Map options) {
switch (connector) {
case "hive":
- return HiveMigrator.databaseMigrators(paimonCatalog, sourceDatabase, options);
+ if (catalog instanceof CachingCatalog) {
+ catalog = ((CachingCatalog) catalog).wrapped();
+ }
+ if (!(catalog instanceof HiveCatalog)) {
+ throw new IllegalArgumentException("Only support Hive Catalog");
+ }
+ return HiveMigrator.databaseMigrators(
+ (HiveCatalog) catalog, sourceDatabase, options);
default:
throw new UnsupportedOperationException("Don't support connector " + connector);
}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 0fe4173643c2..bbc827b24815 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -19,14 +19,7 @@
package org.apache.paimon.flink;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.fs.local.LocalFileIO;
-import org.apache.paimon.schema.Schema;
-import org.apache.paimon.schema.SchemaManager;
-import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.BlockingIterator;
import org.apache.paimon.utils.DateTimeUtils;
@@ -38,7 +31,6 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -440,33 +432,6 @@ public void testIgnoreDelete() {
assertThat(sql("SELECT * FROM ignore_delete")).containsExactly(Row.of(1, "B"));
}
- @Test
- public void testIgnoreDeleteCompatible() throws Exception {
- sql(
- "CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, v STRING) "
- + "WITH ('merge-engine' = 'deduplicate', 'write-only' = 'true')");
-
- sql("INSERT INTO ignore_delete VALUES (1, 'A')");
- // write delete records
- sql("DELETE FROM ignore_delete WHERE pk = 1");
- assertThat(sql("SELECT * FROM ignore_delete")).isEmpty();
-
- // set ignore-delete and read
- Map newOptions = new HashMap<>();
- newOptions.put(CoreOptions.IGNORE_DELETE.key(), "true");
- SchemaUtils.forceCommit(
- new SchemaManager(LocalFileIO.create(), new Path(path, "default.db/ignore_delete")),
- new Schema(
- Arrays.asList(
- new DataField(0, "pk", DataTypes.INT().notNull()),
- new DataField(1, "v", DataTypes.STRING())),
- Collections.emptyList(),
- Collections.singletonList("pk"),
- newOptions,
- null));
- assertThat(sql("SELECT * FROM ignore_delete")).containsExactlyInAnyOrder(Row.of(1, "A"));
- }
-
@Test
public void testIgnoreDeleteWithRowKindField() {
sql(
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
index e358fb31158b..4e1ea424f834 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
@@ -18,7 +18,6 @@
package org.apache.paimon.flink;
-import org.apache.paimon.catalog.AbstractCatalog;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogLock;
import org.apache.paimon.catalog.CatalogLockContext;
@@ -98,7 +97,7 @@ public void testRenameTable() throws Exception {
Identifier identifier = new Identifier(DB_NAME, "t3");
Catalog catalog =
((FlinkCatalog) tEnv.getCatalog(tEnv.getCurrentCatalog()).get()).catalog();
- Path tablePath = ((AbstractCatalog) catalog).getDataTableLocation(identifier);
+ Path tablePath = new Path(catalog.getTable(identifier).options().get("path"));
assertThat(tablePath.toString())
.isEqualTo(new File(path, DB_NAME + ".db" + File.separator + "t3").toString());
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
index 587069ada982..f95ec3ae4a16 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
@@ -19,7 +19,6 @@
package org.apache.paimon.flink;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.catalog.AbstractCatalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.log.LogSinkProvider;
@@ -647,9 +646,18 @@ private void checkEquals(
CatalogTable t2,
Map optionsToAdd,
Set optionsToRemove) {
- Path tablePath =
- ((AbstractCatalog) ((FlinkCatalog) catalog).catalog())
- .getDataTableLocation(FlinkCatalog.toIdentifier(path));
+ Path tablePath;
+ try {
+ tablePath =
+ new Path(
+ ((FlinkCatalog) catalog)
+ .catalog()
+ .getTable(FlinkCatalog.toIdentifier(path))
+ .options()
+ .get(CoreOptions.PATH.key()));
+ } catch (org.apache.paimon.catalog.Catalog.TableNotExistException e) {
+ throw new RuntimeException(e);
+ }
Map options = new HashMap<>(t1.getOptions());
options.put("path", tablePath.toString());
options.putAll(optionsToAdd);
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
index f65a5008110b..399109137b79 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
@@ -18,14 +18,6 @@
package org.apache.paimon.flink;
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.fs.local.LocalFileIO;
-import org.apache.paimon.schema.Schema;
-import org.apache.paimon.schema.SchemaManager;
-import org.apache.paimon.schema.SchemaUtils;
-import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.BlockingIterator;
import org.apache.paimon.utils.CommonTestUtils;
@@ -44,9 +36,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
@@ -627,37 +617,4 @@ public void testIgnoreDelete(boolean localMerge) throws Exception {
Row.ofKind(RowKind.UPDATE_AFTER, 1, "A", "apple"));
iterator.close();
}
-
- @Test
- public void testIgnoreDeleteCompatible() throws Exception {
- sql(
- "CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, a STRING, b STRING) WITH ("
- + " 'merge-engine' = 'deduplicate',"
- + " 'write-only' = 'true')");
- sql("INSERT INTO ignore_delete VALUES (1, CAST (NULL AS STRING), 'apple')");
- // write delete records
- sql("DELETE FROM ignore_delete WHERE pk = 1");
- sql("INSERT INTO ignore_delete VALUES (1, 'A', CAST (NULL AS STRING))");
- assertThat(sql("SELECT * FROM ignore_delete"))
- .containsExactlyInAnyOrder(Row.of(1, "A", null));
-
- // force altering merge engine and read
- Map newOptions = new HashMap<>();
- newOptions.put(
- CoreOptions.MERGE_ENGINE.key(), CoreOptions.MergeEngine.PARTIAL_UPDATE.toString());
- newOptions.put(CoreOptions.IGNORE_DELETE.key(), "true");
- SchemaUtils.forceCommit(
- new SchemaManager(LocalFileIO.create(), new Path(path, "default.db/ignore_delete")),
- new Schema(
- Arrays.asList(
- new DataField(0, "pk", DataTypes.INT().notNull()),
- new DataField(1, "a", DataTypes.STRING()),
- new DataField(2, "b", DataTypes.STRING())),
- Collections.emptyList(),
- Collections.singletonList("pk"),
- newOptions,
- null));
- assertThat(sql("SELECT * FROM ignore_delete"))
- .containsExactlyInAnyOrder(Row.of(1, "A", "apple"));
- }
}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
index 5e7f0108c1ed..1b17ee17279c 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
@@ -51,6 +51,8 @@
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
+import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED;
+
/** {@link Action} test base. */
public abstract class ActionITCaseBase extends AbstractTestBase {
@@ -70,7 +72,9 @@ public void before() throws IOException {
tableName = "test_table_" + UUID.randomUUID();
commitUser = UUID.randomUUID().toString();
incrementalIdentifier = 0;
- catalog = CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
+ CatalogContext context = CatalogContext.create(new Path(warehouse));
+ context.options().set(CACHE_ENABLED, false);
+ catalog = CatalogFactory.createCatalog(context);
}
@AfterEach
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/privilege/PrivilegeProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/privilege/PrivilegeProcedureITCase.java
index e0c5f5d66722..db973b39c8ca 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/privilege/PrivilegeProcedureITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/privilege/PrivilegeProcedureITCase.java
@@ -19,7 +19,6 @@
package org.apache.paimon.flink.procedure.privilege;
import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.FileSystemCatalog;
import org.apache.paimon.flink.FlinkCatalog;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.privilege.FileBasedPrivilegeManager;
@@ -85,7 +84,6 @@ public void testUserPrivileges() throws Exception {
Catalog paimonCatalog = ((FlinkCatalog) catalog).catalog();
assertThat(paimonCatalog).isInstanceOf(PrivilegedCatalog.class);
PrivilegedCatalog privilegedCatalog = (PrivilegedCatalog) paimonCatalog;
- assertThat(privilegedCatalog.wrapped()).isInstanceOf(FileSystemCatalog.class);
assertThat(privilegedCatalog.privilegeManager())
.isInstanceOf(FileBasedPrivilegeManager.class);
assertThat(privilegedCatalog.privilegeManager().privilegeEnabled()).isTrue();
diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 1bbf4b3871e8..e9fc9f32cc2c 100644
--- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -36,9 +36,6 @@
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.options.OptionsUtils;
-import org.apache.paimon.privilege.FileBasedPrivilegeManager;
-import org.apache.paimon.privilege.PrivilegeManager;
-import org.apache.paimon.privilege.PrivilegedCatalog;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
@@ -875,25 +872,12 @@ public static Catalog createHiveCatalog(CatalogContext context) {
throw new UncheckedIOException(e);
}
- Catalog catalog =
- new HiveCatalog(
- fileIO,
- hiveConf,
- options.get(HiveCatalogFactory.METASTORE_CLIENT_CLASS),
- options,
- warehouse.toUri().toString());
-
- PrivilegeManager privilegeManager =
- new FileBasedPrivilegeManager(
- warehouse.toString(),
- fileIO,
- context.options().get(PrivilegedCatalog.USER),
- context.options().get(PrivilegedCatalog.PASSWORD));
- if (privilegeManager.privilegeEnabled()) {
- catalog = new PrivilegedCatalog(catalog, privilegeManager);
- }
-
- return catalog;
+ return new HiveCatalog(
+ fileIO,
+ hiveConf,
+ options.get(HiveCatalogFactory.METASTORE_CLIENT_CLASS),
+ options,
+ warehouse.toUri().toString());
}
public static HiveConf createHiveConf(CatalogContext context) {
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/TableMigrationUtils.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/TableMigrationUtils.java
index 4a45b365a159..992f17253c9f 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/TableMigrationUtils.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/TableMigrationUtils.java
@@ -18,6 +18,7 @@
package org.apache.paimon.spark.utils;
+import org.apache.paimon.catalog.CachingCatalog;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.hive.HiveCatalog;
import org.apache.paimon.hive.migrate.HiveMigrator;
@@ -30,7 +31,7 @@ public class TableMigrationUtils {
public static Migrator getImporter(
String connector,
- Catalog paimonCatalog,
+ Catalog catalog,
String sourceDatabase,
String sourceTableName,
String targetDatabase,
@@ -38,9 +39,14 @@ public static Migrator getImporter(
Map options) {
switch (connector) {
case "hive":
- assert paimonCatalog instanceof HiveCatalog;
+ if (catalog instanceof CachingCatalog) {
+ catalog = ((CachingCatalog) catalog).wrapped();
+ }
+ if (!(catalog instanceof HiveCatalog)) {
+ throw new IllegalArgumentException("Only support Hive Catalog");
+ }
return new HiveMigrator(
- (HiveCatalog) paimonCatalog,
+ (HiveCatalog) catalog,
sourceDatabase,
sourceTableName,
targetDatabase,
diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
index 55ce7c9aad88..f12a3d8fabe9 100644
--- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
+++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
@@ -59,6 +59,7 @@
import java.util.function.Consumer;
import java.util.stream.Collectors;
+import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED;
import static org.apache.paimon.options.CatalogOptions.WAREHOUSE;
import static org.assertj.core.api.Assertions.assertThat;
@@ -88,6 +89,7 @@ public void startMetastoreAndSpark(@TempDir java.nio.file.Path tempDir) {
Options options = new Options();
options.set(WAREHOUSE, spark.conf().get("spark.sql.catalog.paimon.warehouse"));
+ options.set(CACHE_ENABLED, false);
fileSystemCatalog =
(FileSystemCatalog) CatalogFactory.createCatalog(CatalogContext.create(options));
}
diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
index b8115132d71c..983dd037f131 100644
--- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
+++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
@@ -19,7 +19,7 @@
package org.apache.paimon.spark
import org.apache.paimon.catalog.{Catalog, CatalogContext, CatalogFactory, Identifier}
-import org.apache.paimon.options.Options
+import org.apache.paimon.options.{CatalogOptions, Options}
import org.apache.paimon.spark.catalog.Catalogs
import org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
import org.apache.paimon.spark.sql.{SparkVersionSupport, WithTableOptions}
@@ -36,6 +36,8 @@ import org.scalactic.source.Position
import org.scalatest.Tag
import java.io.File
+import java.util
+import java.util.{HashMap => JHashMap}
import java.util.TimeZone
import scala.util.Random
@@ -64,6 +66,7 @@ class PaimonSparkTestBase
super.sparkConf
.set("spark.sql.catalog.paimon", classOf[SparkCatalog].getName)
.set("spark.sql.catalog.paimon.warehouse", tempDBDir.getCanonicalPath)
+ .set("spark.sql.catalog.paimon.cache-enabled", "false")
.set("spark.sql.extensions", classOf[PaimonSparkSessionExtensions].getName)
.set("spark.serializer", serializer)
}
@@ -122,7 +125,9 @@ class PaimonSparkTestBase
private def initCatalog(): Catalog = {
val currentCatalog = spark.sessionState.catalogManager.currentCatalog.name()
- val options = Catalogs.catalogOptions(currentCatalog, spark.sessionState.conf)
+ val options =
+ new JHashMap[String, String](Catalogs.catalogOptions(currentCatalog, spark.sessionState.conf))
+ options.put(CatalogOptions.CACHE_ENABLED.key(), "false")
val catalogContext =
CatalogContext.create(Options.fromMap(options), spark.sessionState.newHadoopConf())
CatalogFactory.createCatalog(catalogContext)