From a3e19b405355b2a431a2e0f6260a9c57f0232b2b Mon Sep 17 00:00:00 2001 From: Jingsong Date: Mon, 29 Jul 2024 19:22:46 +0800 Subject: [PATCH] [core] Introduce CachingCatalog --- .../generated/catalog_configuration.html | 12 + .../apache/paimon/options/CatalogOptions.java | 14 + .../paimon/catalog/AbstractCatalog.java | 46 +-- .../apache/paimon/catalog/CachingCatalog.java | 194 ++++++++++++ .../org/apache/paimon/catalog/Catalog.java | 31 ++ .../apache/paimon/catalog/CatalogFactory.java | 26 ++ .../paimon/catalog/FileSystemCatalog.java | 2 +- .../catalog/FileSystemCatalogFactory.java | 17 +- .../apache/paimon/schema/SchemaManager.java | 2 +- .../paimon/catalog/CachingCatalogTest.java | 288 ++++++++++++++++++ .../paimon/catalog/CatalogTestBase.java | 20 +- .../paimon/catalog/FileSystemCatalogTest.java | 32 ++ .../catalog/TestableCachingCatalog.java | 57 ++++ .../apache/paimon/jdbc/JdbcCatalogTest.java | 9 - .../system/AllTableOptionsTableTest.java | 33 +- .../org/apache/paimon/utils/FakeTicker.java | 42 +++ .../action/cdc/SyncDatabaseActionBase.java | 7 +- .../flink/action/cdc/SyncTableActionBase.java | 6 +- .../MySqlSyncDatabaseTableListITCase.java | 2 +- .../org/apache/paimon/flink/FlinkCatalog.java | 5 +- .../paimon/flink/action/ActionBase.java | 7 + .../paimon/flink/clone/CopyFileOperator.java | 32 +- .../procedure/MigrateDatabaseProcedure.java | 7 +- .../flink/procedure/MigrateFileProcedure.java | 6 +- .../procedure/MigrateTableProcedure.java | 7 +- .../flink/procedure/RepairProcedure.java | 5 - .../flink/utils/TableMigrationUtils.java | 26 +- .../paimon/flink/BatchFileStoreITCase.java | 35 --- .../paimon/flink/FileSystemCatalogITCase.java | 3 +- .../apache/paimon/flink/FlinkCatalogTest.java | 16 +- .../paimon/flink/PartialUpdateITCase.java | 43 --- .../paimon/flink/action/ActionITCaseBase.java | 6 +- .../privilege/PrivilegeProcedureITCase.java | 2 - .../org/apache/paimon/hive/HiveCatalog.java | 28 +- .../spark/utils/TableMigrationUtils.java | 12 +- .../paimon/spark/SparkFileIndexITCase.java | 2 + .../paimon/spark/PaimonSparkTestBase.scala | 9 +- 37 files changed, 822 insertions(+), 269 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java create mode 100644 paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java create mode 100644 paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java create mode 100644 paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java create mode 100644 paimon-core/src/test/java/org/apache/paimon/utils/FakeTicker.java diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html b/docs/layouts/shortcodes/generated/catalog_configuration.html index a583c74714ba..f0aa4c4b85af 100644 --- a/docs/layouts/shortcodes/generated/catalog_configuration.html +++ b/docs/layouts/shortcodes/generated/catalog_configuration.html @@ -32,6 +32,18 @@ Boolean Indicates whether this catalog allow upper case, its default value depends on the implementation of the specific catalog. + +
cache-enabled
+ true + Boolean + Controls whether the catalog will cache table entries upon load. + + +
cache.expiration-interval
+ 30 s + Duration + Controls the duration for which entries in the catalog are cached. +
client-pool-size
2 diff --git a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java index 9ef681809eab..28a751f5025e 100644 --- a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java @@ -91,6 +91,20 @@ public class CatalogOptions { .defaultValue(2) .withDescription("Configure the size of the connection pool."); + public static final ConfigOption CACHE_ENABLED = + key("cache-enabled") + .booleanType() + .defaultValue(true) + .withDescription( + "Controls whether the catalog will cache table entries upon load."); + + public static final ConfigOption CACHE_EXPIRATION_INTERVAL_MS = + key("cache.expiration-interval") + .durationType() + .defaultValue(Duration.ofSeconds(30)) + .withDescription( + "Controls the duration for which entries in the catalog are cached."); + public static final ConfigOption LINEAGE_META = key("lineage-meta") .stringType() diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index afe64666a875..062e935328fc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -45,7 +45,6 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -58,17 +57,12 @@ import static org.apache.paimon.options.CatalogOptions.LINEAGE_META; import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED; import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE; -import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey; import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; import static org.apache.paimon.utils.Preconditions.checkArgument; /** Common implementation of {@link Catalog}. */ public abstract class AbstractCatalog implements Catalog { - public static final String DB_SUFFIX = ".db"; - protected static final String TABLE_DEFAULT_OPTION_PREFIX = "table-default."; - protected static final String DB_LOCATION_PROP = "location"; - protected final FileIO fileIO; protected final Map tableDefaultOptions; protected final Options catalogOptions; @@ -86,8 +80,7 @@ protected AbstractCatalog(FileIO fileIO, Options options) { this.fileIO = fileIO; this.lineageMetaFactory = findAndCreateLineageMeta(options, AbstractCatalog.class.getClassLoader()); - this.tableDefaultOptions = - convertToPropertiesPrefixKey(options.toMap(), TABLE_DEFAULT_OPTION_PREFIX); + this.tableDefaultOptions = Catalog.tableDefaultOptions(options.toMap()); this.catalogOptions = options; } @@ -445,12 +438,12 @@ protected void assertMainBranch(String branchName) { } } - private static boolean isSpecifiedSystemTable(Identifier identifier) { + public static boolean isSpecifiedSystemTable(Identifier identifier) { return identifier.getObjectName().contains(SYSTEM_TABLE_SPLITTER) && !getOriginalIdentifierAndBranch(identifier).isPresent(); } - protected boolean isSystemTable(Identifier identifier) { + protected static boolean isSystemTable(Identifier identifier) { return isSystemDatabase(identifier.getDatabaseName()) || isSpecifiedSystemTable(identifier); } @@ -463,11 +456,11 @@ protected void checkNotSystemTable(Identifier identifier, String method) { } } - public void copyTableDefaultOptions(Map options) { + private void copyTableDefaultOptions(Map options) { tableDefaultOptions.forEach(options::putIfAbsent); } - private String[] tableAndSystemName(Identifier identifier) { + public static String[] tableAndSystemName(Identifier identifier) { String[] splits = StringUtils.split(identifier.getObjectName(), SYSTEM_TABLE_SPLITTER); if (splits.length != 2) { throw new IllegalArgumentException( @@ -493,7 +486,7 @@ public static Path newDatabasePath(String warehouse, String database) { return new Path(warehouse, database + DB_SUFFIX); } - private boolean isSystemDatabase(String database) { + public static boolean isSystemDatabase(String database) { return SYSTEM_DATABASE_NAME.equals(database); } @@ -504,30 +497,9 @@ protected void checkNotSystemDatabase(String database) { } } - /** Validate database, table and field names must be lowercase when not case-sensitive. */ - public static void validateCaseInsensitive( - boolean caseSensitive, String type, String... names) { - validateCaseInsensitive(caseSensitive, type, Arrays.asList(names)); - } - - /** Validate database, table and field names must be lowercase when not case-sensitive. */ - public static void validateCaseInsensitive( - boolean caseSensitive, String type, List names) { - if (caseSensitive) { - return; - } - List illegalNames = - names.stream().filter(f -> !f.equals(f.toLowerCase())).collect(Collectors.toList()); - checkArgument( - illegalNames.isEmpty(), - String.format( - "%s name %s cannot contain upper case in the catalog.", - type, illegalNames)); - } - protected void validateIdentifierNameCaseInsensitive(Identifier identifier) { - validateCaseInsensitive(allowUpperCase(), "Database", identifier.getDatabaseName()); - validateCaseInsensitive(allowUpperCase(), "Table", identifier.getObjectName()); + Catalog.validateCaseInsensitive(allowUpperCase(), "Database", identifier.getDatabaseName()); + Catalog.validateCaseInsensitive(allowUpperCase(), "Table", identifier.getObjectName()); } private void validateFieldNameCaseInsensitiveInSchemaChange(List changes) { @@ -545,7 +517,7 @@ private void validateFieldNameCaseInsensitiveInSchemaChange(List c } protected void validateFieldNameCaseInsensitive(List fieldNames) { - validateCaseInsensitive(allowUpperCase(), "Field", fieldNames); + Catalog.validateCaseInsensitive(allowUpperCase(), "Field", fieldNames); } private void validateAutoCreateClose(Map options) { diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java new file mode 100644 index 000000000000..893d38810e4a --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.catalog; + +import org.apache.paimon.schema.SchemaChange; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.system.SystemTableLoader; + +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine; +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.RemovalCause; +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.RemovalListener; +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Ticker; + +import org.checkerframework.checker.nullness.qual.NonNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.apache.paimon.catalog.AbstractCatalog.isSpecifiedSystemTable; +import static org.apache.paimon.catalog.AbstractCatalog.tableAndSystemName; +import static org.apache.paimon.options.CatalogOptions.CACHE_EXPIRATION_INTERVAL_MS; +import static org.apache.paimon.table.system.SystemTableLoader.SYSTEM_TABLES; + +/** A {@link Catalog} to cache databases and tables and manifests. */ +public class CachingCatalog extends DelegateCatalog { + + private static final Logger LOG = LoggerFactory.getLogger(CachingCatalog.class); + + protected final Cache> databaseCache; + protected final Cache tableCache; + + public CachingCatalog(Catalog wrapped) { + this(wrapped, CACHE_EXPIRATION_INTERVAL_MS.defaultValue()); + } + + public CachingCatalog(Catalog wrapped, Duration expirationInterval) { + this(wrapped, expirationInterval, Ticker.systemTicker()); + } + + public CachingCatalog(Catalog wrapped, Duration expirationInterval, Ticker ticker) { + super(wrapped); + if (expirationInterval.isZero() || expirationInterval.isNegative()) { + throw new IllegalArgumentException( + "When cache.expiration-interval is set to negative or 0, the catalog cache should be disabled."); + } + + this.databaseCache = + Caffeine.newBuilder() + .softValues() + .executor(Runnable::run) + .expireAfterAccess(expirationInterval) + .ticker(ticker) + .build(); + this.tableCache = + Caffeine.newBuilder() + .softValues() + .removalListener(new TableInvalidatingRemovalListener()) + .executor(Runnable::run) + .expireAfterAccess(expirationInterval) + .ticker(ticker) + .build(); + } + + @Override + public Map loadDatabaseProperties(String databaseName) + throws DatabaseNotExistException { + Map properties = databaseCache.getIfPresent(databaseName); + if (properties != null) { + return properties; + } + + properties = super.loadDatabaseProperties(databaseName); + databaseCache.put(databaseName, properties); + return properties; + } + + @Override + public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade) + throws DatabaseNotExistException, DatabaseNotEmptyException { + super.dropDatabase(name, ignoreIfNotExists, cascade); + databaseCache.invalidate(name); + if (cascade) { + List tables = new ArrayList<>(); + for (Identifier identifier : tableCache.asMap().keySet()) { + if (identifier.getDatabaseName().equals(name)) { + tables.add(identifier); + } + } + tables.forEach(tableCache::invalidate); + } + } + + @Override + public void dropTable(Identifier identifier, boolean ignoreIfNotExists) + throws TableNotExistException { + super.dropTable(identifier, ignoreIfNotExists); + invalidateTable(identifier); + } + + @Override + public void renameTable(Identifier fromTable, Identifier toTable, boolean ignoreIfNotExists) + throws TableNotExistException, TableAlreadyExistException { + super.renameTable(fromTable, toTable, ignoreIfNotExists); + invalidateTable(fromTable); + } + + @Override + public void alterTable( + Identifier identifier, List changes, boolean ignoreIfNotExists) + throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException { + super.alterTable(identifier, changes, ignoreIfNotExists); + invalidateTable(identifier); + } + + @Override + public Table getTable(Identifier identifier) throws TableNotExistException { + Table table = tableCache.getIfPresent(identifier); + if (table != null) { + return table; + } + + if (isSpecifiedSystemTable(identifier)) { + String[] splits = tableAndSystemName(identifier); + String tableName = splits[0]; + String type = splits[1]; + + Identifier originIdentifier = + Identifier.create(identifier.getDatabaseName(), tableName); + Table originTable = tableCache.getIfPresent(originIdentifier); + if (originTable == null) { + originTable = wrapped.getTable(originIdentifier); + tableCache.put(originIdentifier, originTable); + } + table = SystemTableLoader.load(type, (FileStoreTable) originTable); + if (table == null) { + throw new TableNotExistException(identifier); + } + tableCache.put(identifier, table); + return table; + } + + table = wrapped.getTable(identifier); + tableCache.put(identifier, table); + return table; + } + + private class TableInvalidatingRemovalListener implements RemovalListener { + @Override + public void onRemoval( + Identifier tableIdentifier, Table table, @NonNull RemovalCause cause) { + LOG.debug("Evicted {} from the table cache ({})", tableIdentifier, cause); + if (RemovalCause.EXPIRED.equals(cause)) { + if (!isSpecifiedSystemTable(tableIdentifier)) { + tableCache.invalidateAll(allSystemTables(tableIdentifier)); + } + } + } + } + + private void invalidateTable(Identifier identifier) { + tableCache.invalidate(identifier); + tableCache.invalidateAll(allSystemTables(identifier)); + } + + private static Iterable allSystemTables(Identifier ident) { + List tables = new ArrayList<>(); + for (String type : SYSTEM_TABLES) { + tables.add(Identifier.fromString(ident.getFullName() + SYSTEM_TABLE_SPLITTER + type)); + } + return tables; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java index fe8e0b68b813..0f5fe6a2233e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java @@ -26,10 +26,15 @@ import org.apache.paimon.table.Table; import java.io.Serializable; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; + +import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey; +import static org.apache.paimon.utils.Preconditions.checkArgument; /** * This interface is responsible for reading and writing metadata such as database/table from a @@ -47,6 +52,9 @@ public interface Catalog extends AutoCloseable { String SYSTEM_TABLE_SPLITTER = "$"; String SYSTEM_DATABASE_NAME = "sys"; String COMMENT_PROP = "comment"; + String TABLE_DEFAULT_OPTION_PREFIX = "table-default."; + String DB_LOCATION_PROP = "location"; + String DB_SUFFIX = ".db"; /** Warehouse root path containing all database directories in this catalog. */ String warehouse(); @@ -277,6 +285,29 @@ default void repairTable(Identifier identifier) throws TableNotExistException { throw new UnsupportedOperationException(); } + static Map tableDefaultOptions(Map options) { + return convertToPropertiesPrefixKey(options, TABLE_DEFAULT_OPTION_PREFIX); + } + + /** Validate database, table and field names must be lowercase when not case-sensitive. */ + static void validateCaseInsensitive(boolean caseSensitive, String type, String... names) { + validateCaseInsensitive(caseSensitive, type, Arrays.asList(names)); + } + + /** Validate database, table and field names must be lowercase when not case-sensitive. */ + static void validateCaseInsensitive(boolean caseSensitive, String type, List names) { + if (caseSensitive) { + return; + } + List illegalNames = + names.stream().filter(f -> !f.equals(f.toLowerCase())).collect(Collectors.toList()); + checkArgument( + illegalNames.isEmpty(), + String.format( + "%s name %s cannot contain upper case in the catalog.", + type, illegalNames)); + } + /** Exception for trying to drop on a database that is not empty. */ class DatabaseNotEmptyException extends Exception { private static final String MSG = "Database %s is not empty."; diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java index c153f0ee39c1..2eff8e902b6e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java @@ -24,11 +24,16 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.options.Options; +import org.apache.paimon.privilege.FileBasedPrivilegeManager; +import org.apache.paimon.privilege.PrivilegeManager; +import org.apache.paimon.privilege.PrivilegedCatalog; import org.apache.paimon.utils.Preconditions; import java.io.IOException; import java.io.UncheckedIOException; +import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED; +import static org.apache.paimon.options.CatalogOptions.CACHE_EXPIRATION_INTERVAL_MS; import static org.apache.paimon.options.CatalogOptions.METASTORE; import static org.apache.paimon.options.CatalogOptions.WAREHOUSE; @@ -67,6 +72,27 @@ static Catalog createCatalog(CatalogContext options) { } static Catalog createCatalog(CatalogContext context, ClassLoader classLoader) { + Catalog catalog = createUnwrappedCatalog(context, classLoader); + + Options options = context.options(); + if (options.get(CACHE_ENABLED)) { + catalog = new CachingCatalog(catalog, options.get(CACHE_EXPIRATION_INTERVAL_MS)); + } + + PrivilegeManager privilegeManager = + new FileBasedPrivilegeManager( + catalog.warehouse(), + catalog.fileIO(), + context.options().get(PrivilegedCatalog.USER), + context.options().get(PrivilegedCatalog.PASSWORD)); + if (privilegeManager.privilegeEnabled()) { + catalog = new PrivilegedCatalog(catalog, privilegeManager); + } + + return catalog; + } + + static Catalog createUnwrappedCatalog(CatalogContext context, ClassLoader classLoader) { Options options = context.options(); String metastore = options.get(METASTORE); CatalogFactory catalogFactory = diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java index c2ff376019bf..64f38a106c93 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java @@ -63,7 +63,7 @@ public List listDatabases() { @Override protected void createDatabaseImpl(String name, Map properties) { - if (properties.containsKey(AbstractCatalog.DB_LOCATION_PROP)) { + if (properties.containsKey(Catalog.DB_LOCATION_PROP)) { throw new IllegalArgumentException( "Cannot specify location for a database when using fileSystem catalog."); } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java index 8a0b1643fb2b..6ab3664e4ca9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java @@ -20,9 +20,6 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; -import org.apache.paimon.privilege.FileBasedPrivilegeManager; -import org.apache.paimon.privilege.PrivilegeManager; -import org.apache.paimon.privilege.PrivilegedCatalog; import org.apache.paimon.table.TableType; import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE; @@ -44,18 +41,6 @@ public Catalog create(FileIO fileIO, Path warehouse, CatalogContext context) { "Only managed table is supported in File system catalog."); } - Catalog catalog = new FileSystemCatalog(fileIO, warehouse, context.options()); - - PrivilegeManager privilegeManager = - new FileBasedPrivilegeManager( - warehouse.toString(), - fileIO, - context.options().get(PrivilegedCatalog.USER), - context.options().get(PrivilegedCatalog.PASSWORD)); - if (privilegeManager.privilegeEnabled()) { - catalog = new PrivilegedCatalog(catalog, privilegeManager); - } - - return catalog; + return new FileSystemCatalog(fileIO, warehouse, context.options()); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index 4f70ac725e48..905ac2dff0b6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -65,7 +65,7 @@ import java.util.function.Function; import java.util.stream.Collectors; -import static org.apache.paimon.catalog.AbstractCatalog.DB_SUFFIX; +import static org.apache.paimon.catalog.Catalog.DB_SUFFIX; import static org.apache.paimon.catalog.Identifier.UNKNOWN_DATABASE; import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; import static org.apache.paimon.utils.FileUtils.listVersionedFiles; diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java new file mode 100644 index 000000000000..56fc238a8f70 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.catalog; + +import org.apache.paimon.fs.Path; +import org.apache.paimon.schema.SchemaChange; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.system.SystemTableLoader; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.utils.FakeTicker; + +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +class CachingCatalogTest extends CatalogTestBase { + + private static final Duration EXPIRATION_TTL = Duration.ofMinutes(5); + private static final Duration HALF_OF_EXPIRATION = EXPIRATION_TTL.dividedBy(2); + + private FakeTicker ticker; + + @BeforeEach + public void setUp() throws Exception { + super.setUp(); + catalog = new FileSystemCatalog(fileIO, new Path(warehouse)); + ticker = new FakeTicker(); + catalog.createDatabase("db", false); + } + + @Override + @Test + public void testListDatabasesWhenNoDatabases() { + List databases = catalog.listDatabases(); + assertThat(databases).contains("db"); + } + + @Test + public void testInvalidateSystemTablesIfBaseTableIsModified() throws Exception { + Catalog catalog = new CachingCatalog(this.catalog); + Identifier tableIdent = new Identifier("db", "tbl"); + catalog.createTable(new Identifier("db", "tbl"), DEFAULT_TABLE_SCHEMA, false); + Identifier sysIdent = new Identifier("db", "tbl$files"); + Table sysTable = catalog.getTable(sysIdent); + catalog.alterTable(tableIdent, SchemaChange.addColumn("col3", DataTypes.INT()), false); + assertThat(catalog.getTable(sysIdent)).isNotSameAs(sysTable); + } + + @Test + public void testInvalidateSysTablesIfBaseTableIsDropped() throws Exception { + Catalog catalog = new CachingCatalog(this.catalog); + Identifier tableIdent = new Identifier("db", "tbl"); + catalog.createTable(new Identifier("db", "tbl"), DEFAULT_TABLE_SCHEMA, false); + Identifier sysIdent = new Identifier("db", "tbl$files"); + catalog.getTable(sysIdent); + catalog.dropTable(tableIdent, false); + assertThatThrownBy(() -> catalog.getTable(sysIdent)) + .hasMessage("Table db.tbl does not exist."); + } + + @Test + public void testTableExpiresAfterInterval() throws Exception { + TestableCachingCatalog catalog = + new TestableCachingCatalog(this.catalog, EXPIRATION_TTL, ticker); + + Identifier tableIdent = new Identifier("db", "tbl"); + catalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false); + Table table = catalog.getTable(tableIdent); + + // Ensure table is cached with full ttl remaining upon creation + assertThat(catalog.cache().asMap()).containsKey(tableIdent); + assertThat(catalog.remainingAgeFor(tableIdent)).isPresent().get().isEqualTo(EXPIRATION_TTL); + + ticker.advance(HALF_OF_EXPIRATION); + assertThat(catalog.cache().asMap()).containsKey(tableIdent); + assertThat(catalog.ageOf(tableIdent)).isPresent().get().isEqualTo(HALF_OF_EXPIRATION); + + ticker.advance(HALF_OF_EXPIRATION.plus(Duration.ofSeconds(10))); + assertThat(catalog.cache().asMap()).doesNotContainKey(tableIdent); + assertThat(catalog.getTable(tableIdent)) + .as("CachingCatalog should return a new instance after expiration") + .isNotSameAs(table); + } + + @Test + public void testCatalogExpirationTtlRefreshesAfterAccessViaCatalog() throws Exception { + TestableCachingCatalog catalog = + new TestableCachingCatalog(this.catalog, EXPIRATION_TTL, ticker); + + Identifier tableIdent = new Identifier("db", "tbl"); + catalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false); + catalog.getTable(tableIdent); + assertThat(catalog.cache().asMap()).containsKey(tableIdent); + assertThat(catalog.ageOf(tableIdent)).isPresent().get().isEqualTo(Duration.ZERO); + + ticker.advance(HALF_OF_EXPIRATION); + assertThat(catalog.cache().asMap()).containsKey(tableIdent); + assertThat(catalog.ageOf(tableIdent)).isPresent().get().isEqualTo(HALF_OF_EXPIRATION); + assertThat(catalog.remainingAgeFor(tableIdent)) + .isPresent() + .get() + .isEqualTo(HALF_OF_EXPIRATION); + + Duration oneMinute = Duration.ofMinutes(1L); + ticker.advance(oneMinute); + assertThat(catalog.cache().asMap()).containsKey(tableIdent); + assertThat(catalog.ageOf(tableIdent)) + .isPresent() + .get() + .isEqualTo(HALF_OF_EXPIRATION.plus(oneMinute)); + assertThat(catalog.remainingAgeFor(tableIdent)) + .get() + .isEqualTo(HALF_OF_EXPIRATION.minus(oneMinute)); + + // Access the table via the catalog, which should refresh the TTL + Table table = catalog.getTable(tableIdent); + assertThat(catalog.ageOf(tableIdent)).get().isEqualTo(Duration.ZERO); + assertThat(catalog.remainingAgeFor(tableIdent)).get().isEqualTo(EXPIRATION_TTL); + + ticker.advance(HALF_OF_EXPIRATION); + assertThat(catalog.ageOf(tableIdent)).get().isEqualTo(HALF_OF_EXPIRATION); + assertThat(catalog.remainingAgeFor(tableIdent)).get().isEqualTo(HALF_OF_EXPIRATION); + } + + @Test + public void testCacheExpirationEagerlyRemovesSysTables() throws Exception { + TestableCachingCatalog catalog = + new TestableCachingCatalog(this.catalog, EXPIRATION_TTL, ticker); + + Identifier tableIdent = new Identifier("db", "tbl"); + catalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false); + Table table = catalog.getTable(tableIdent); + assertThat(catalog.cache().asMap()).containsKey(tableIdent); + assertThat(catalog.ageOf(tableIdent)).get().isEqualTo(Duration.ZERO); + + ticker.advance(HALF_OF_EXPIRATION); + assertThat(catalog.cache().asMap()).containsKey(tableIdent); + assertThat(catalog.ageOf(tableIdent)).get().isEqualTo(HALF_OF_EXPIRATION); + + for (Identifier sysTable : sysTables(tableIdent)) { + catalog.getTable(sysTable); + } + assertThat(catalog.cache().asMap()).containsKeys(sysTables(tableIdent)); + assertThat(Arrays.stream(sysTables(tableIdent)).map(catalog::ageOf)) + .isNotEmpty() + .allMatch(age -> age.isPresent() && age.get().equals(Duration.ZERO)); + + assertThat(catalog.remainingAgeFor(tableIdent)) + .as("Loading a non-cached sys table should refresh the main table's age") + .isEqualTo(Optional.of(EXPIRATION_TTL)); + + // Move time forward and access already cached sys tables. + ticker.advance(HALF_OF_EXPIRATION); + for (Identifier sysTable : sysTables(tableIdent)) { + catalog.getTable(sysTable); + } + assertThat(Arrays.stream(sysTables(tableIdent)).map(catalog::ageOf)) + .isNotEmpty() + .allMatch(age -> age.isPresent() && age.get().equals(Duration.ZERO)); + + assertThat(catalog.remainingAgeFor(tableIdent)) + .as("Accessing a cached sys table should not affect the main table's age") + .isEqualTo(Optional.of(HALF_OF_EXPIRATION)); + + // Move time forward so the data table drops. + ticker.advance(HALF_OF_EXPIRATION); + assertThat(catalog.cache().asMap()).doesNotContainKey(tableIdent); + + Arrays.stream(sysTables(tableIdent)) + .forEach( + sysTable -> + assertThat(catalog.cache().asMap()) + .as( + "When a data table expires, its sys tables should expire regardless of age") + .doesNotContainKeys(sysTable)); + } + + @Test + public void testDeadlock() throws Exception { + Catalog underlyCatalog = this.catalog; + TestableCachingCatalog catalog = + new TestableCachingCatalog(this.catalog, Duration.ofSeconds(1), ticker); + int numThreads = 20; + List createdTables = new ArrayList<>(); + for (int i = 0; i < numThreads; i++) { + Identifier tableIdent = new Identifier("db", "tbl" + i); + catalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false); + createdTables.add(tableIdent); + } + + Cache cache = catalog.cache(); + AtomicInteger cacheGetCount = new AtomicInteger(0); + AtomicInteger cacheCleanupCount = new AtomicInteger(0); + ExecutorService executor = Executors.newFixedThreadPool(numThreads); + for (int i = 0; i < numThreads; i++) { + if (i % 2 == 0) { + String table = "tbl" + i; + executor.submit( + () -> { + ticker.advance(Duration.ofSeconds(2)); + cache.get( + new Identifier("db", table), + identifier -> { + try { + return underlyCatalog.getTable(identifier); + } catch (Catalog.TableNotExistException e) { + throw new RuntimeException(e); + } + }); + cacheGetCount.incrementAndGet(); + }); + } else { + executor.submit( + () -> { + ticker.advance(Duration.ofSeconds(2)); + cache.cleanUp(); + cacheCleanupCount.incrementAndGet(); + }); + } + } + executor.awaitTermination(2, TimeUnit.SECONDS); + assertThat(cacheGetCount).hasValue(numThreads / 2); + assertThat(cacheCleanupCount).hasValue(numThreads / 2); + + executor.shutdown(); + } + + @Test + public void testCachingCatalogRejectsExpirationIntervalOfZero() { + Assertions.assertThatThrownBy( + () -> new TestableCachingCatalog(this.catalog, Duration.ZERO, ticker)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "When cache.expiration-interval is set to negative or 0, the catalog cache should be disabled."); + } + + @Test + public void testInvalidateTableForChainedCachingCatalogs() throws Exception { + TestableCachingCatalog wrappedCatalog = + new TestableCachingCatalog(this.catalog, EXPIRATION_TTL, ticker); + TestableCachingCatalog catalog = + new TestableCachingCatalog(wrappedCatalog, EXPIRATION_TTL, ticker); + Identifier tableIdent = new Identifier("db", "tbl"); + catalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false); + catalog.getTable(tableIdent); + assertThat(catalog.cache().asMap()).containsKey(tableIdent); + catalog.dropTable(tableIdent, false); + assertThat(catalog.cache().asMap()).doesNotContainKey(tableIdent); + assertThat(wrappedCatalog.cache().asMap()).doesNotContainKey(tableIdent); + } + + public static Identifier[] sysTables(Identifier tableIdent) { + return SystemTableLoader.SYSTEM_TABLES.stream() + .map(type -> Identifier.fromString(tableIdent.getFullName() + "$" + type)) + .toArray(Identifier[]::new); + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java index 378ebf159079..19c9e83c45bc 100644 --- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java @@ -38,7 +38,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; -import java.util.Arrays; +import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -55,6 +55,7 @@ public abstract class CatalogTestBase { protected String warehouse; protected FileIO fileIO; protected Catalog catalog; + protected static final Schema DEFAULT_TABLE_SCHEMA = new Schema( Lists.newArrayList( @@ -66,18 +67,6 @@ public abstract class CatalogTestBase { Maps.newHashMap(), ""); - protected static final Schema PARTITION_SCHEMA = - new Schema( - Lists.newArrayList( - new DataField(0, "pk1", DataTypes.INT()), - new DataField(1, "pk2", DataTypes.STRING()), - new DataField(3, "pk3", DataTypes.STRING()), - new DataField(4, "col", DataTypes.STRING())), - Arrays.asList("pk1", "pk2"), - Arrays.asList("pk1", "pk2", "pk3"), - Maps.newHashMap(), - ""); - @BeforeEach public void setUp() throws Exception { warehouse = tempFile.toUri().toString(); @@ -95,7 +84,10 @@ void tearDown() throws Exception { } @Test - public abstract void testListDatabasesWhenNoDatabases(); + public void testListDatabasesWhenNoDatabases() { + List databases = catalog.listDatabases(); + assertThat(databases).isEqualTo(new ArrayList<>()); + } @Test public void testListDatabases() throws Exception { diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java new file mode 100644 index 000000000000..67fbd2718a36 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.catalog; + +import org.apache.paimon.fs.Path; + +import org.junit.jupiter.api.BeforeEach; + +class FileSystemCatalogTest extends CatalogTestBase { + + @BeforeEach + public void setUp() throws Exception { + super.setUp(); + catalog = new FileSystemCatalog(fileIO, new Path(warehouse)); + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java b/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java new file mode 100644 index 000000000000..c393d3afff6c --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.catalog; + +import org.apache.paimon.table.Table; + +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Ticker; + +import java.time.Duration; +import java.util.Optional; + +/** + * A wrapper around CachingCatalog that provides accessor methods to test the underlying cache, + * without making those fields public in the CachingCatalog itself. + */ +public class TestableCachingCatalog extends CachingCatalog { + + private final Duration cacheExpirationInterval; + + public TestableCachingCatalog(Catalog catalog, Duration expirationInterval, Ticker ticker) { + super(catalog, expirationInterval, ticker); + this.cacheExpirationInterval = expirationInterval; + } + + public Cache cache() { + // cleanUp must be called as tests apply assertions directly on the underlying map, but + // metadata + // table map entries are cleaned up asynchronously. + tableCache.cleanUp(); + return tableCache; + } + + public Optional ageOf(Identifier identifier) { + return tableCache.policy().expireAfterAccess().get().ageOf(identifier); + } + + public Optional remainingAgeFor(Identifier identifier) { + return ageOf(identifier).map(cacheExpirationInterval::minus); + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java index f0c84eb2c4d8..f5befc724f8b 100644 --- a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java @@ -32,8 +32,6 @@ import java.io.ByteArrayOutputStream; import java.io.ObjectOutputStream; import java.sql.SQLException; -import java.util.ArrayList; -import java.util.List; import java.util.Map; import java.util.UUID; @@ -88,13 +86,6 @@ public void testCleanTimeoutLockAndAcquireLock() throws SQLException, Interrupte .isTrue(); } - @Test - @Override - public void testListDatabasesWhenNoDatabases() { - List databases = catalog.listDatabases(); - assertThat(databases).isEqualTo(new ArrayList<>()); - } - @Test public void testCheckIdentifierUpperCase() throws Exception { catalog.createDatabase("test_db", false); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/system/AllTableOptionsTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/system/AllTableOptionsTableTest.java index f2a1efce5151..764c0f4e168e 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/system/AllTableOptionsTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/system/AllTableOptionsTableTest.java @@ -18,10 +18,7 @@ package org.apache.paimon.table.system; -import org.apache.paimon.catalog.AbstractCatalog; import org.apache.paimon.catalog.Identifier; -import org.apache.paimon.data.InternalRow; -import org.apache.paimon.fs.Path; import org.apache.paimon.schema.Schema; import org.apache.paimon.table.TableTestBase; import org.apache.paimon.types.DataTypes; @@ -29,18 +26,12 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import java.util.Iterator; import java.util.List; -import java.util.Map; -import java.util.Spliterator; -import java.util.Spliterators; +import java.util.Objects; import java.util.stream.Collectors; -import java.util.stream.StreamSupport; import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME; import static org.apache.paimon.table.system.AllTableOptionsTable.ALL_TABLE_OPTIONS; -import static org.apache.paimon.table.system.AllTableOptionsTable.options; -import static org.apache.paimon.table.system.AllTableOptionsTable.toRow; import static org.assertj.core.api.Assertions.assertThat; /** Unit tests for {@link AllTableOptionsTable}. */ @@ -69,18 +60,14 @@ public void before() throws Exception { @Test public void testSchemasTable() throws Exception { - List expectRow = getExceptedResult(); - List result = read(allTableOptionsTable); - assertThat(result).containsExactlyElementsOf(expectRow); - } - - private List getExceptedResult() { - AbstractCatalog abstractCatalog = (AbstractCatalog) catalog; - Map> stringMapMap = abstractCatalog.allTablePaths(); - Iterator rows = - toRow(options(((AbstractCatalog) catalog).fileIO(), stringMapMap)); - return StreamSupport.stream( - Spliterators.spliteratorUnknownSize(rows, Spliterator.ORDERED), false) - .collect(Collectors.toList()); + List result = + read(allTableOptionsTable).stream() + .map(Objects::toString) + .collect(Collectors.toList()); + assertThat(result) + .containsExactlyInAnyOrder( + "+I(default,T,fields.sales.aggregate-function,sum)", + "+I(default,T,merge-engine,aggregation)", + "+I(default,T,fields.price.aggregate-function,max)"); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/FakeTicker.java b/paimon-core/src/test/java/org/apache/paimon/utils/FakeTicker.java new file mode 100644 index 000000000000..cbd536ad5f00 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/utils/FakeTicker.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Ticker; + +import java.time.Duration; +import java.util.concurrent.atomic.AtomicLong; + +/** A {@code Ticker} whose value can be advanced programmatically in tests. */ +public class FakeTicker implements Ticker { + + private final AtomicLong nanos = new AtomicLong(); + + public FakeTicker() {} + + public FakeTicker advance(Duration duration) { + nanos.addAndGet(duration.toNanos()); + return this; + } + + @Override + public long read() { + return nanos.get(); + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java index d6c4c12ecdd3..6c62bcc0c21f 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java @@ -18,7 +18,6 @@ package org.apache.paimon.flink.action.cdc; -import org.apache.paimon.catalog.AbstractCatalog; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.flink.action.Action; import org.apache.paimon.flink.action.MultiTablesSinkMode; @@ -120,9 +119,9 @@ public SyncDatabaseActionBase withPrimaryKeys(String... primaryKeys) { @Override protected void validateCaseSensitivity() { - AbstractCatalog.validateCaseInsensitive(allowUpperCase, "Database", database); - AbstractCatalog.validateCaseInsensitive(allowUpperCase, "Table prefix", tablePrefix); - AbstractCatalog.validateCaseInsensitive(allowUpperCase, "Table suffix", tableSuffix); + Catalog.validateCaseInsensitive(allowUpperCase, "Database", database); + Catalog.validateCaseInsensitive(allowUpperCase, "Table prefix", tablePrefix); + Catalog.validateCaseInsensitive(allowUpperCase, "Table suffix", tableSuffix); } @Override diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java index be8cedf0baf6..4c7db6d28b62 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java @@ -19,7 +19,7 @@ package org.apache.paimon.flink.action.cdc; import org.apache.paimon.annotation.VisibleForTesting; -import org.apache.paimon.catalog.AbstractCatalog; +import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.FlinkConnectorOptions; import org.apache.paimon.flink.action.Action; @@ -114,8 +114,8 @@ protected Schema buildPaimonSchema(Schema retrievedSchema) { @Override protected void validateCaseSensitivity() { - AbstractCatalog.validateCaseInsensitive(allowUpperCase, "Database", database); - AbstractCatalog.validateCaseInsensitive(allowUpperCase, "Table", table); + Catalog.validateCaseInsensitive(allowUpperCase, "Database", database); + Catalog.validateCaseInsensitive(allowUpperCase, "Table", table); } @Override diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java index 816b7b903afa..c15cbf89884e 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java @@ -45,7 +45,7 @@ public static void startContainers() { } @Test - @Timeout(120) + @Timeout(180) public void testActionRunResult() throws Exception { Map mySqlConfig = getBasicMySqlConfig(); mySqlConfig.put( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java index 98efd479f20d..2cb96e00750a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java @@ -347,10 +347,7 @@ protected Schema buildPaimonSchema( // Although catalog.createTable will copy the default options, but we need this info // here before create table, such as table-default.kafka.bootstrap.servers defined in // catalog options. Temporarily, we copy the default options here. - if (catalog instanceof org.apache.paimon.catalog.AbstractCatalog) { - ((org.apache.paimon.catalog.AbstractCatalog) catalog) - .copyTableDefaultOptions(options); - } + Catalog.tableDefaultOptions(catalog.options()).forEach(options::putIfAbsent); options.put(REGISTER_TIMEOUT.key(), logStoreAutoRegisterTimeout.toString()); registerLogSystem(catalog, identifier, options, classLoader); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java index dd32c52c6ca0..30e32d62efec 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java @@ -40,6 +40,8 @@ import java.util.UUID; import java.util.stream.Collectors; +import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED; + /** Abstract base of {@link Action} for table. */ public abstract class ActionBase implements Action { @@ -55,6 +57,11 @@ public ActionBase(String warehouse, Map catalogConfig) { catalogOptions = Options.fromMap(catalogConfig); catalogOptions.set(CatalogOptions.WAREHOUSE, warehouse); + // disable cache to avoid concurrent modification exception + if (!catalogOptions.contains(CACHE_ENABLED)) { + catalogOptions.set(CACHE_ENABLED, false); + } + catalog = initPaimonCatalog(); flinkCatalog = initFlinkCatalog(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java index 5b320f4e983e..e5c83360627e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java @@ -18,7 +18,7 @@ package org.apache.paimon.flink.clone; -import org.apache.paimon.catalog.AbstractCatalog; +import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.FlinkCatalogFactory; import org.apache.paimon.fs.FileIO; @@ -34,6 +34,8 @@ import java.util.Map; +import static org.apache.paimon.CoreOptions.PATH; + /** A Operator to copy files. */ public class CopyFileOperator extends AbstractStreamOperator implements OneInputStreamOperator { @@ -43,8 +45,8 @@ public class CopyFileOperator extends AbstractStreamOperator private final Map sourceCatalogConfig; private final Map targetCatalogConfig; - private AbstractCatalog sourceCatalog; - private AbstractCatalog targetCatalog; + private Catalog sourceCatalog; + private Catalog targetCatalog; public CopyFileOperator( Map sourceCatalogConfig, Map targetCatalogConfig) { @@ -55,13 +57,9 @@ public CopyFileOperator( @Override public void open() throws Exception { sourceCatalog = - (AbstractCatalog) - FlinkCatalogFactory.createPaimonCatalog( - Options.fromMap(sourceCatalogConfig)); + FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(sourceCatalogConfig)); targetCatalog = - (AbstractCatalog) - FlinkCatalogFactory.createPaimonCatalog( - Options.fromMap(targetCatalogConfig)); + FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(targetCatalogConfig)); } @Override @@ -71,11 +69,19 @@ public void processElement(StreamRecord streamRecord) throws Exce FileIO sourceTableFileIO = sourceCatalog.fileIO(); FileIO targetTableFileIO = targetCatalog.fileIO(); Path sourceTableRootPath = - sourceCatalog.getDataTableLocation( - Identifier.fromString(cloneFileInfo.getSourceIdentifier())); + new Path( + sourceCatalog + .getTable( + Identifier.fromString(cloneFileInfo.getSourceIdentifier())) + .options() + .get(PATH.key())); Path targetTableRootPath = - targetCatalog.getDataTableLocation( - Identifier.fromString(cloneFileInfo.getTargetIdentifier())); + new Path( + targetCatalog + .getTable( + Identifier.fromString(cloneFileInfo.getTargetIdentifier())) + .options() + .get(PATH.key())); String filePathExcludeTableRoot = cloneFileInfo.getFilePathExcludeTableRoot(); Path sourcePath = new Path(sourceTableRootPath + filePathExcludeTableRoot); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateDatabaseProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateDatabaseProcedure.java index 166544eb7357..128875a8b862 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateDatabaseProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateDatabaseProcedure.java @@ -19,7 +19,6 @@ package org.apache.paimon.flink.procedure; import org.apache.paimon.flink.utils.TableMigrationUtils; -import org.apache.paimon.hive.HiveCatalog; import org.apache.paimon.migrate.Migrator; import org.apache.paimon.utils.ParameterUtils; @@ -47,14 +46,10 @@ public String[] call( String sourceDatabasePath, String properties) throws Exception { - if (!(catalog instanceof HiveCatalog)) { - throw new IllegalArgumentException("Only support Hive Catalog"); - } - HiveCatalog hiveCatalog = (HiveCatalog) this.catalog; List migrators = TableMigrationUtils.getImporters( connector, - hiveCatalog, + catalog, sourceDatabasePath, ParameterUtils.parseCommaSeparatedKeyValues(properties)); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java index c9a273336c3f..110b4e25fc00 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java @@ -20,7 +20,6 @@ import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.utils.TableMigrationUtils; -import org.apache.paimon.hive.HiveCatalog; import org.apache.paimon.migrate.Migrator; import org.apache.flink.table.procedure.ProcedureContext; @@ -62,9 +61,6 @@ public void migrateHandle( String targetPaimonTablePath, boolean deleteOrigin) throws Exception { - if (!(catalog instanceof HiveCatalog)) { - throw new IllegalArgumentException("Only support Hive Catalog"); - } Identifier sourceTableId = Identifier.fromString(sourceTablePath); Identifier targetTableId = Identifier.fromString(targetPaimonTablePath); @@ -76,7 +72,7 @@ public void migrateHandle( Migrator importer = TableMigrationUtils.getImporter( connector, - (HiveCatalog) catalog, + catalog, sourceTableId.getDatabaseName(), sourceTableId.getObjectName(), targetTableId.getDatabaseName(), diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java index f721f89257e5..39e6092d8496 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java @@ -20,7 +20,6 @@ import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.utils.TableMigrationUtils; -import org.apache.paimon.hive.HiveCatalog; import org.apache.paimon.utils.ParameterUtils; import org.apache.flink.table.procedure.ProcedureContext; @@ -51,10 +50,6 @@ public String[] call( String sourceTablePath, String properties) throws Exception { - if (!(catalog instanceof HiveCatalog)) { - throw new IllegalArgumentException("Only support Hive Catalog"); - } - String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX; Identifier sourceTableId = Identifier.fromString(sourceTablePath); @@ -62,7 +57,7 @@ public String[] call( TableMigrationUtils.getImporter( connector, - (HiveCatalog) catalog, + catalog, sourceTableId.getDatabaseName(), sourceTableId.getObjectName(), targetTableId.getDatabaseName(), diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RepairProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RepairProcedure.java index 0c208116be22..196adf4753cc 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RepairProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RepairProcedure.java @@ -20,7 +20,6 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; -import org.apache.paimon.hive.HiveCatalog; import org.apache.paimon.utils.StringUtils; import org.apache.flink.table.procedure.ProcedureContext; @@ -55,10 +54,6 @@ public String[] call(ProcedureContext procedureContext) public String[] call(ProcedureContext procedureContext, String identifier) throws Catalog.DatabaseNotExistException, Catalog.TableNotExistException { - if (!(catalog instanceof HiveCatalog)) { - throw new IllegalArgumentException("Only support Hive Catalog"); - } - if (StringUtils.isBlank(identifier)) { catalog.repairCatalog(); return new String[] {"Success"}; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java index 32d017be6665..47655d39a0f7 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java @@ -18,6 +18,8 @@ package org.apache.paimon.flink.utils; +import org.apache.paimon.catalog.CachingCatalog; +import org.apache.paimon.catalog.Catalog; import org.apache.paimon.hive.HiveCatalog; import org.apache.paimon.hive.migrate.HiveMigrator; import org.apache.paimon.migrate.Migrator; @@ -30,7 +32,7 @@ public class TableMigrationUtils { public static Migrator getImporter( String connector, - HiveCatalog paimonCatalog, + Catalog catalog, String sourceDatabase, String souceTableName, String targetDatabase, @@ -38,8 +40,14 @@ public static Migrator getImporter( Map options) { switch (connector) { case "hive": + if (catalog instanceof CachingCatalog) { + catalog = ((CachingCatalog) catalog).wrapped(); + } + if (!(catalog instanceof HiveCatalog)) { + throw new IllegalArgumentException("Only support Hive Catalog"); + } return new HiveMigrator( - paimonCatalog, + (HiveCatalog) catalog, sourceDatabase, souceTableName, targetDatabase, @@ -51,13 +59,17 @@ public static Migrator getImporter( } public static List getImporters( - String connector, - HiveCatalog paimonCatalog, - String sourceDatabase, - Map options) { + String connector, Catalog catalog, String sourceDatabase, Map options) { switch (connector) { case "hive": - return HiveMigrator.databaseMigrators(paimonCatalog, sourceDatabase, options); + if (catalog instanceof CachingCatalog) { + catalog = ((CachingCatalog) catalog).wrapped(); + } + if (!(catalog instanceof HiveCatalog)) { + throw new IllegalArgumentException("Only support Hive Catalog"); + } + return HiveMigrator.databaseMigrators( + (HiveCatalog) catalog, sourceDatabase, options); default: throw new UnsupportedOperationException("Don't support connector " + connector); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java index 0fe4173643c2..bbc827b24815 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java @@ -19,14 +19,7 @@ package org.apache.paimon.flink; import org.apache.paimon.CoreOptions; -import org.apache.paimon.fs.Path; -import org.apache.paimon.fs.local.LocalFileIO; -import org.apache.paimon.schema.Schema; -import org.apache.paimon.schema.SchemaManager; -import org.apache.paimon.schema.SchemaUtils; import org.apache.paimon.table.FileStoreTable; -import org.apache.paimon.types.DataField; -import org.apache.paimon.types.DataTypes; import org.apache.paimon.utils.BlockingIterator; import org.apache.paimon.utils.DateTimeUtils; @@ -38,7 +31,6 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -440,33 +432,6 @@ public void testIgnoreDelete() { assertThat(sql("SELECT * FROM ignore_delete")).containsExactly(Row.of(1, "B")); } - @Test - public void testIgnoreDeleteCompatible() throws Exception { - sql( - "CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, v STRING) " - + "WITH ('merge-engine' = 'deduplicate', 'write-only' = 'true')"); - - sql("INSERT INTO ignore_delete VALUES (1, 'A')"); - // write delete records - sql("DELETE FROM ignore_delete WHERE pk = 1"); - assertThat(sql("SELECT * FROM ignore_delete")).isEmpty(); - - // set ignore-delete and read - Map newOptions = new HashMap<>(); - newOptions.put(CoreOptions.IGNORE_DELETE.key(), "true"); - SchemaUtils.forceCommit( - new SchemaManager(LocalFileIO.create(), new Path(path, "default.db/ignore_delete")), - new Schema( - Arrays.asList( - new DataField(0, "pk", DataTypes.INT().notNull()), - new DataField(1, "v", DataTypes.STRING())), - Collections.emptyList(), - Collections.singletonList("pk"), - newOptions, - null)); - assertThat(sql("SELECT * FROM ignore_delete")).containsExactlyInAnyOrder(Row.of(1, "A")); - } - @Test public void testIgnoreDeleteWithRowKindField() { sql( diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java index e358fb31158b..4e1ea424f834 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java @@ -18,7 +18,6 @@ package org.apache.paimon.flink; -import org.apache.paimon.catalog.AbstractCatalog; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogLock; import org.apache.paimon.catalog.CatalogLockContext; @@ -98,7 +97,7 @@ public void testRenameTable() throws Exception { Identifier identifier = new Identifier(DB_NAME, "t3"); Catalog catalog = ((FlinkCatalog) tEnv.getCatalog(tEnv.getCurrentCatalog()).get()).catalog(); - Path tablePath = ((AbstractCatalog) catalog).getDataTableLocation(identifier); + Path tablePath = new Path(catalog.getTable(identifier).options().get("path")); assertThat(tablePath.toString()) .isEqualTo(new File(path, DB_NAME + ".db" + File.separator + "t3").toString()); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java index 587069ada982..f95ec3ae4a16 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java @@ -19,7 +19,6 @@ package org.apache.paimon.flink; import org.apache.paimon.CoreOptions; -import org.apache.paimon.catalog.AbstractCatalog; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.log.LogSinkProvider; @@ -647,9 +646,18 @@ private void checkEquals( CatalogTable t2, Map optionsToAdd, Set optionsToRemove) { - Path tablePath = - ((AbstractCatalog) ((FlinkCatalog) catalog).catalog()) - .getDataTableLocation(FlinkCatalog.toIdentifier(path)); + Path tablePath; + try { + tablePath = + new Path( + ((FlinkCatalog) catalog) + .catalog() + .getTable(FlinkCatalog.toIdentifier(path)) + .options() + .get(CoreOptions.PATH.key())); + } catch (org.apache.paimon.catalog.Catalog.TableNotExistException e) { + throw new RuntimeException(e); + } Map options = new HashMap<>(t1.getOptions()); options.put("path", tablePath.toString()); options.putAll(optionsToAdd); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java index f65a5008110b..399109137b79 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java @@ -18,14 +18,6 @@ package org.apache.paimon.flink; -import org.apache.paimon.CoreOptions; -import org.apache.paimon.fs.Path; -import org.apache.paimon.fs.local.LocalFileIO; -import org.apache.paimon.schema.Schema; -import org.apache.paimon.schema.SchemaManager; -import org.apache.paimon.schema.SchemaUtils; -import org.apache.paimon.types.DataField; -import org.apache.paimon.types.DataTypes; import org.apache.paimon.utils.BlockingIterator; import org.apache.paimon.utils.CommonTestUtils; @@ -44,9 +36,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; @@ -627,37 +617,4 @@ public void testIgnoreDelete(boolean localMerge) throws Exception { Row.ofKind(RowKind.UPDATE_AFTER, 1, "A", "apple")); iterator.close(); } - - @Test - public void testIgnoreDeleteCompatible() throws Exception { - sql( - "CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, a STRING, b STRING) WITH (" - + " 'merge-engine' = 'deduplicate'," - + " 'write-only' = 'true')"); - sql("INSERT INTO ignore_delete VALUES (1, CAST (NULL AS STRING), 'apple')"); - // write delete records - sql("DELETE FROM ignore_delete WHERE pk = 1"); - sql("INSERT INTO ignore_delete VALUES (1, 'A', CAST (NULL AS STRING))"); - assertThat(sql("SELECT * FROM ignore_delete")) - .containsExactlyInAnyOrder(Row.of(1, "A", null)); - - // force altering merge engine and read - Map newOptions = new HashMap<>(); - newOptions.put( - CoreOptions.MERGE_ENGINE.key(), CoreOptions.MergeEngine.PARTIAL_UPDATE.toString()); - newOptions.put(CoreOptions.IGNORE_DELETE.key(), "true"); - SchemaUtils.forceCommit( - new SchemaManager(LocalFileIO.create(), new Path(path, "default.db/ignore_delete")), - new Schema( - Arrays.asList( - new DataField(0, "pk", DataTypes.INT().notNull()), - new DataField(1, "a", DataTypes.STRING()), - new DataField(2, "b", DataTypes.STRING())), - Collections.emptyList(), - Collections.singletonList("pk"), - newOptions, - null)); - assertThat(sql("SELECT * FROM ignore_delete")) - .containsExactlyInAnyOrder(Row.of(1, "A", "apple")); - } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java index 5e7f0108c1ed..1b17ee17279c 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java @@ -51,6 +51,8 @@ import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; +import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED; + /** {@link Action} test base. */ public abstract class ActionITCaseBase extends AbstractTestBase { @@ -70,7 +72,9 @@ public void before() throws IOException { tableName = "test_table_" + UUID.randomUUID(); commitUser = UUID.randomUUID().toString(); incrementalIdentifier = 0; - catalog = CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse))); + CatalogContext context = CatalogContext.create(new Path(warehouse)); + context.options().set(CACHE_ENABLED, false); + catalog = CatalogFactory.createCatalog(context); } @AfterEach diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/privilege/PrivilegeProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/privilege/PrivilegeProcedureITCase.java index e0c5f5d66722..db973b39c8ca 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/privilege/PrivilegeProcedureITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/privilege/PrivilegeProcedureITCase.java @@ -19,7 +19,6 @@ package org.apache.paimon.flink.procedure.privilege; import org.apache.paimon.catalog.Catalog; -import org.apache.paimon.catalog.FileSystemCatalog; import org.apache.paimon.flink.FlinkCatalog; import org.apache.paimon.flink.util.AbstractTestBase; import org.apache.paimon.privilege.FileBasedPrivilegeManager; @@ -85,7 +84,6 @@ public void testUserPrivileges() throws Exception { Catalog paimonCatalog = ((FlinkCatalog) catalog).catalog(); assertThat(paimonCatalog).isInstanceOf(PrivilegedCatalog.class); PrivilegedCatalog privilegedCatalog = (PrivilegedCatalog) paimonCatalog; - assertThat(privilegedCatalog.wrapped()).isInstanceOf(FileSystemCatalog.class); assertThat(privilegedCatalog.privilegeManager()) .isInstanceOf(FileBasedPrivilegeManager.class); assertThat(privilegedCatalog.privilegeManager().privilegeEnabled()).isTrue(); diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index 1bbf4b3871e8..e9fc9f32cc2c 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -36,9 +36,6 @@ import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; import org.apache.paimon.options.OptionsUtils; -import org.apache.paimon.privilege.FileBasedPrivilegeManager; -import org.apache.paimon.privilege.PrivilegeManager; -import org.apache.paimon.privilege.PrivilegedCatalog; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.schema.SchemaManager; @@ -875,25 +872,12 @@ public static Catalog createHiveCatalog(CatalogContext context) { throw new UncheckedIOException(e); } - Catalog catalog = - new HiveCatalog( - fileIO, - hiveConf, - options.get(HiveCatalogFactory.METASTORE_CLIENT_CLASS), - options, - warehouse.toUri().toString()); - - PrivilegeManager privilegeManager = - new FileBasedPrivilegeManager( - warehouse.toString(), - fileIO, - context.options().get(PrivilegedCatalog.USER), - context.options().get(PrivilegedCatalog.PASSWORD)); - if (privilegeManager.privilegeEnabled()) { - catalog = new PrivilegedCatalog(catalog, privilegeManager); - } - - return catalog; + return new HiveCatalog( + fileIO, + hiveConf, + options.get(HiveCatalogFactory.METASTORE_CLIENT_CLASS), + options, + warehouse.toUri().toString()); } public static HiveConf createHiveConf(CatalogContext context) { diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/TableMigrationUtils.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/TableMigrationUtils.java index 4a45b365a159..992f17253c9f 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/TableMigrationUtils.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/TableMigrationUtils.java @@ -18,6 +18,7 @@ package org.apache.paimon.spark.utils; +import org.apache.paimon.catalog.CachingCatalog; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.hive.HiveCatalog; import org.apache.paimon.hive.migrate.HiveMigrator; @@ -30,7 +31,7 @@ public class TableMigrationUtils { public static Migrator getImporter( String connector, - Catalog paimonCatalog, + Catalog catalog, String sourceDatabase, String sourceTableName, String targetDatabase, @@ -38,9 +39,14 @@ public static Migrator getImporter( Map options) { switch (connector) { case "hive": - assert paimonCatalog instanceof HiveCatalog; + if (catalog instanceof CachingCatalog) { + catalog = ((CachingCatalog) catalog).wrapped(); + } + if (!(catalog instanceof HiveCatalog)) { + throw new IllegalArgumentException("Only support Hive Catalog"); + } return new HiveMigrator( - (HiveCatalog) paimonCatalog, + (HiveCatalog) catalog, sourceDatabase, sourceTableName, targetDatabase, diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java index 55ce7c9aad88..f12a3d8fabe9 100644 --- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java +++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java @@ -59,6 +59,7 @@ import java.util.function.Consumer; import java.util.stream.Collectors; +import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED; import static org.apache.paimon.options.CatalogOptions.WAREHOUSE; import static org.assertj.core.api.Assertions.assertThat; @@ -88,6 +89,7 @@ public void startMetastoreAndSpark(@TempDir java.nio.file.Path tempDir) { Options options = new Options(); options.set(WAREHOUSE, spark.conf().get("spark.sql.catalog.paimon.warehouse")); + options.set(CACHE_ENABLED, false); fileSystemCatalog = (FileSystemCatalog) CatalogFactory.createCatalog(CatalogContext.create(options)); } diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala index b8115132d71c..983dd037f131 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala @@ -19,7 +19,7 @@ package org.apache.paimon.spark import org.apache.paimon.catalog.{Catalog, CatalogContext, CatalogFactory, Identifier} -import org.apache.paimon.options.Options +import org.apache.paimon.options.{CatalogOptions, Options} import org.apache.paimon.spark.catalog.Catalogs import org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions import org.apache.paimon.spark.sql.{SparkVersionSupport, WithTableOptions} @@ -36,6 +36,8 @@ import org.scalactic.source.Position import org.scalatest.Tag import java.io.File +import java.util +import java.util.{HashMap => JHashMap} import java.util.TimeZone import scala.util.Random @@ -64,6 +66,7 @@ class PaimonSparkTestBase super.sparkConf .set("spark.sql.catalog.paimon", classOf[SparkCatalog].getName) .set("spark.sql.catalog.paimon.warehouse", tempDBDir.getCanonicalPath) + .set("spark.sql.catalog.paimon.cache-enabled", "false") .set("spark.sql.extensions", classOf[PaimonSparkSessionExtensions].getName) .set("spark.serializer", serializer) } @@ -122,7 +125,9 @@ class PaimonSparkTestBase private def initCatalog(): Catalog = { val currentCatalog = spark.sessionState.catalogManager.currentCatalog.name() - val options = Catalogs.catalogOptions(currentCatalog, spark.sessionState.conf) + val options = + new JHashMap[String, String](Catalogs.catalogOptions(currentCatalog, spark.sessionState.conf)) + options.put(CatalogOptions.CACHE_ENABLED.key(), "false") val catalogContext = CatalogContext.create(Options.fromMap(options), spark.sessionState.newHadoopConf()) CatalogFactory.createCatalog(catalogContext)