From cdd5bb72f706901f6978a71832e4ee1c78934e08 Mon Sep 17 00:00:00 2001 From: Jingsong Lee Date: Mon, 16 Dec 2024 22:33:22 +0800 Subject: [PATCH] [core] Clean constants, caseSensitive, loader in Catalog (#4721) --- docs/content/maintenance/configurations.md | 6 -- docs/content/program-api/flink-api.md | 2 +- .../generated/catalog_configuration.html | 12 +-- .../generated/core_configuration.html | 12 +-- .../file_system_catalog_configuration.html | 36 --------- .../paimon/arrow/ArrowBundleRecords.java | 8 +- .../org/apache/paimon/arrow/ArrowUtils.java | 10 ++- .../paimon/arrow/reader/ArrowBatchReader.java | 11 +-- .../arrow/vector/ArrowFormatCWriter.java | 4 +- .../arrow/vector/ArrowFormatWriter.java | 4 +- .../apache/paimon/options/CatalogOptions.java | 9 +-- .../org/apache/paimon/utils/StringUtils.java | 4 +- .../paimon/catalog/AbstractCatalog.java | 15 ++-- .../org/apache/paimon/catalog/Catalog.java | 74 +++++++------------ ...CatalogOptions.java => CatalogLoader.java} | 18 ++--- .../apache/paimon/catalog/CatalogUtils.java | 34 +++++++++ .../paimon/catalog/DelegateCatalog.java | 4 +- .../paimon/catalog/FileSystemCatalog.java | 6 +- .../org/apache/paimon/jdbc/JdbcCatalog.java | 2 +- .../org/apache/paimon/rest/RESTCatalog.java | 11 +-- .../paimon/catalog/FileSystemCatalogTest.java | 4 +- .../action/cdc/CdcActionCommonUtils.java | 8 +- .../paimon/flink/action/cdc/Expression.java | 2 +- .../cdc/MessageQueueSyncTableActionBase.java | 2 +- .../action/cdc/SyncDatabaseActionBase.java | 11 +-- .../flink/action/cdc/SyncTableActionBase.java | 11 +-- .../action/cdc/SynchronizationActionBase.java | 4 +- .../flink/action/cdc/TableNameConverter.java | 6 +- .../cdc/mysql/MySqlSyncDatabaseAction.java | 4 +- .../flink/sink/cdc/CaseSensitiveUtils.java | 13 ++-- ...CdcDynamicTableParsingProcessFunction.java | 5 +- .../CdcMultiplexRecordChannelComputer.java | 5 +- .../cdc/CdcRecordStoreMultiWriteOperator.java | 9 ++- .../paimon/flink/sink/cdc/CdcSinkBuilder.java | 6 +- .../sink/cdc/FlinkCdcMultiTableSink.java | 6 +- .../cdc/FlinkCdcSyncDatabaseSinkBuilder.java | 6 +- ...TableUpdatedDataFieldsProcessFunction.java | 3 +- .../flink/sink/cdc/RichCdcSinkBuilder.java | 8 +- .../cdc/UpdatedDataFieldsProcessFunction.java | 4 +- .../UpdatedDataFieldsProcessFunctionBase.java | 12 +-- .../flink/action/cdc/SchemaEvolutionTest.java | 4 +- .../KafkaCanalSyncDatabaseActionITCase.java | 4 +- .../KafkaCanalSyncTableActionITCase.java | 4 +- .../kafka/KafkaSyncDatabaseActionITCase.java | 4 +- .../MongoDBSyncDatabaseActionITCase.java | 4 +- .../mongodb/MongoDBSyncTableActionITCase.java | 4 +- .../mysql/MySqlSyncDatabaseActionITCase.java | 5 +- .../cdc/mysql/MySqlSyncTableActionITCase.java | 5 +- ...CdcMultiplexRecordChannelComputerTest.java | 3 +- .../CdcRecordStoreMultiWriteOperatorTest.java | 7 +- .../cdc/FlinkCdcSyncDatabaseSinkITCase.java | 5 +- .../sink/cdc/FlinkCdcSyncTableSinkITCase.java | 5 +- .../org/apache/paimon/flink/FlinkCatalog.java | 3 +- .../paimon/flink/action/ActionBase.java | 3 +- .../compact/MultiAwareBucketTableScan.java | 4 +- .../flink/compact/MultiTableScanBase.java | 3 +- .../compact/MultiUnawareBucketTableScan.java | 4 +- ...nlyMultiTableCompactionWorkerOperator.java | 9 ++- .../sink/CombinedTableCompactorSink.java | 6 +- .../sink/MultiTablesStoreCompactOperator.java | 9 ++- .../flink/sink/StoreMultiCommitter.java | 5 +- .../sink/partition/PartitionListeners.java | 5 +- ....java => PartitionStatisticsReporter.java} | 17 +++-- ...ener.java => ReportPartStatsListener.java} | 22 +++--- .../CombinedTableCompactorSourceBuilder.java | 7 +- .../operator/CombinedAwareBatchSource.java | 6 +- .../CombinedAwareStreamingSource.java | 6 +- .../operator/CombinedCompactorSource.java | 9 ++- .../operator/CombinedUnawareBatchSource.java | 5 +- .../CombinedUnawareStreamingSource.java | 6 +- .../operator/MultiTablesReadOperator.java | 7 +- .../MultiUnawareTablesReadOperator.java | 6 +- .../flink/sink/CompactorSinkITCase.java | 3 +- .../flink/sink/StoreMultiCommitterTest.java | 5 +- ...a => PartitionStatisticsReporterTest.java} | 8 +- ...ltiTablesCompactorSourceBuilderITCase.java | 3 +- .../org/apache/paimon/hive/HiveCatalog.java | 7 +- .../org/apache/paimon/spark/SparkCatalog.java | 6 +- .../paimon/spark/SparkGenericCatalog.java | 6 +- 79 files changed, 320 insertions(+), 325 deletions(-) delete mode 100644 docs/layouts/shortcodes/generated/file_system_catalog_configuration.html rename paimon-core/src/main/java/org/apache/paimon/catalog/{FileSystemCatalogOptions.java => CatalogLoader.java} (55%) rename paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/{HmsReporter.java => PartitionStatisticsReporter.java} (85%) rename paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/{ReportHmsListener.java => ReportPartStatsListener.java} (91%) rename paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/{HmsReporterTest.java => PartitionStatisticsReporterTest.java} (95%) diff --git a/docs/content/maintenance/configurations.md b/docs/content/maintenance/configurations.md index 3849d70a5862..99f797e68fa6 100644 --- a/docs/content/maintenance/configurations.md +++ b/docs/content/maintenance/configurations.md @@ -38,12 +38,6 @@ Options for paimon catalog. {{< generated/catalog_configuration >}} -### FilesystemCatalogOptions - -Options for Filesystem catalog. - -{{< generated/file_system_catalog_configuration >}} - ### HiveCatalogOptions Options for Hive catalog. diff --git a/docs/content/program-api/flink-api.md b/docs/content/program-api/flink-api.md index 6ecac3909ced..3451b40d5880 100644 --- a/docs/content/program-api/flink-api.md +++ b/docs/content/program-api/flink-api.md @@ -221,7 +221,7 @@ public class WriteCdcToTable { Identifier identifier = Identifier.create("my_db", "T"); Options catalogOptions = new Options(); catalogOptions.set("warehouse", "/path/to/warehouse"); - Catalog.Loader catalogLoader = + CatalogLoader catalogLoader = () -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions); Table table = catalogLoader.load().getTable(identifier); diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html b/docs/layouts/shortcodes/generated/catalog_configuration.html index 63f7adda1e0d..6355c9558653 100644 --- a/docs/layouts/shortcodes/generated/catalog_configuration.html +++ b/docs/layouts/shortcodes/generated/catalog_configuration.html @@ -26,12 +26,6 @@ - -
allow-upper-case
- (none) - Boolean - Indicates whether this catalog allow upper case, its default value depends on the implementation of the specific catalog. -
cache-enabled
true @@ -74,6 +68,12 @@ Integer Controls the max number for snapshots per table in the catalog are cached. + +
case-sensitive
+ (none) + Boolean + Indicates whether this catalog is case-sensitive. +
client-pool-size
2 diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 15b1aac93543..1133de289fa3 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -230,6 +230,12 @@ String Specify the file name prefix of data files. + +
data-file.thin-mode
+ false + Boolean + Enable data file thin mode to avoid duplicate columns storage. +
delete-file.thread-num
(none) @@ -864,12 +870,6 @@ Integer Default spill compression zstd level. For higher compression rates, it can be configured to 9, but the read and write speed will significantly decrease. - -
data-file.thin-mode
- false - Boolean - Enable data file thin mode to avoid duplicate columns storage. -
streaming-read-mode
(none) diff --git a/docs/layouts/shortcodes/generated/file_system_catalog_configuration.html b/docs/layouts/shortcodes/generated/file_system_catalog_configuration.html deleted file mode 100644 index c416ed6da557..000000000000 --- a/docs/layouts/shortcodes/generated/file_system_catalog_configuration.html +++ /dev/null @@ -1,36 +0,0 @@ -{{/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/}} - - - - - - - - - - - - - - - - - -
KeyDefaultTypeDescription
case-sensitive
trueBooleanIs case sensitive. If case insensitive, you need to set this option to false, and the table name and fields be converted to lowercase.
diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowBundleRecords.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowBundleRecords.java index 9627bbd85d8c..25f6603ec22e 100644 --- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowBundleRecords.java +++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowBundleRecords.java @@ -32,13 +32,13 @@ public class ArrowBundleRecords implements BundleRecords { private final VectorSchemaRoot vectorSchemaRoot; private final RowType rowType; - private final boolean allowUpperCase; + private final boolean caseSensitive; public ArrowBundleRecords( - VectorSchemaRoot vectorSchemaRoot, RowType rowType, boolean allowUpperCase) { + VectorSchemaRoot vectorSchemaRoot, RowType rowType, boolean caseSensitive) { this.vectorSchemaRoot = vectorSchemaRoot; this.rowType = rowType; - this.allowUpperCase = allowUpperCase; + this.caseSensitive = caseSensitive; } public VectorSchemaRoot getVectorSchemaRoot() { @@ -52,7 +52,7 @@ public long rowCount() { @Override public Iterator iterator() { - ArrowBatchReader arrowBatchReader = new ArrowBatchReader(rowType, allowUpperCase); + ArrowBatchReader arrowBatchReader = new ArrowBatchReader(rowType, caseSensitive); return arrowBatchReader.readBatch(vectorSchemaRoot).iterator(); } } diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java index b3925a0a769e..0f6a98b7a2e0 100644 --- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java +++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java @@ -55,6 +55,8 @@ import java.util.List; import java.util.stream.Collectors; +import static org.apache.paimon.utils.StringUtils.toLowerCaseIfNeed; + /** Utilities for creating Arrow objects. */ public class ArrowUtils { @@ -66,13 +68,13 @@ public static VectorSchemaRoot createVectorSchemaRoot( } public static VectorSchemaRoot createVectorSchemaRoot( - RowType rowType, BufferAllocator allocator, boolean allowUpperCase) { + RowType rowType, BufferAllocator allocator, boolean caseSensitive) { List fields = rowType.getFields().stream() .map( f -> toArrowField( - allowUpperCase ? f.name() : f.name().toLowerCase(), + toLowerCaseIfNeed(f.name(), caseSensitive), f.id(), f.type(), 0)) @@ -81,9 +83,9 @@ public static VectorSchemaRoot createVectorSchemaRoot( } public static FieldVector createVector( - DataField dataField, BufferAllocator allocator, boolean allowUpperCase) { + DataField dataField, BufferAllocator allocator, boolean caseSensitive) { return toArrowField( - allowUpperCase ? dataField.name() : dataField.name().toLowerCase(), + toLowerCaseIfNeed(dataField.name(), caseSensitive), dataField.id(), dataField.type(), 0) diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/reader/ArrowBatchReader.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/reader/ArrowBatchReader.java index 9d20062b437b..b626758dedfc 100644 --- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/reader/ArrowBatchReader.java +++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/reader/ArrowBatchReader.java @@ -34,6 +34,8 @@ import java.util.Iterator; import java.util.List; +import static org.apache.paimon.utils.StringUtils.toLowerCaseIfNeed; + /** Reader from a {@link VectorSchemaRoot} to paimon rows. */ public class ArrowBatchReader { @@ -41,9 +43,9 @@ public class ArrowBatchReader { private final VectorizedColumnBatch batch; private final Arrow2PaimonVectorConverter[] convertors; private final RowType projectedRowType; - private final boolean allowUpperCase; + private final boolean caseSensitive; - public ArrowBatchReader(RowType rowType, boolean allowUpperCase) { + public ArrowBatchReader(RowType rowType, boolean caseSensitive) { this.internalRowSerializer = new InternalRowSerializer(rowType); ColumnVector[] columnVectors = new ColumnVector[rowType.getFieldCount()]; this.convertors = new Arrow2PaimonVectorConverter[rowType.getFieldCount()]; @@ -53,7 +55,7 @@ public ArrowBatchReader(RowType rowType, boolean allowUpperCase) { for (int i = 0; i < columnVectors.length; i++) { this.convertors[i] = Arrow2PaimonVectorConverter.construct(rowType.getTypeAt(i)); } - this.allowUpperCase = allowUpperCase; + this.caseSensitive = caseSensitive; } public Iterable readBatch(VectorSchemaRoot vsr) { @@ -63,8 +65,7 @@ public Iterable readBatch(VectorSchemaRoot vsr) { for (int i = 0; i < dataFields.size(); ++i) { try { String fieldName = dataFields.get(i).name(); - Field field = - arrowSchema.findField(allowUpperCase ? fieldName : fieldName.toLowerCase()); + Field field = arrowSchema.findField(toLowerCaseIfNeed(fieldName, caseSensitive)); int idx = arrowSchema.getFields().indexOf(field); mapping[i] = idx; } catch (IllegalArgumentException e) { diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatCWriter.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatCWriter.java index 10afcaf6917a..442457813ace 100644 --- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatCWriter.java +++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatCWriter.java @@ -37,8 +37,8 @@ public class ArrowFormatCWriter implements AutoCloseable { private final ArrowSchema schema; private final ArrowFormatWriter realWriter; - public ArrowFormatCWriter(RowType rowType, int writeBatchSize, boolean allowUpperCase) { - this.realWriter = new ArrowFormatWriter(rowType, writeBatchSize, allowUpperCase); + public ArrowFormatCWriter(RowType rowType, int writeBatchSize, boolean caseSensitive) { + this.realWriter = new ArrowFormatWriter(rowType, writeBatchSize, caseSensitive); RootAllocator allocator = realWriter.getAllocator(); array = ArrowArray.allocateNew(allocator); schema = ArrowSchema.allocateNew(allocator); diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java index acdb5d0dcb1d..9f557921979b 100644 --- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java +++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java @@ -43,10 +43,10 @@ public class ArrowFormatWriter implements AutoCloseable { private final RootAllocator allocator; private int rowId; - public ArrowFormatWriter(RowType rowType, int writeBatchSize, boolean allowUpperCase) { + public ArrowFormatWriter(RowType rowType, int writeBatchSize, boolean caseSensitive) { allocator = new RootAllocator(); - vectorSchemaRoot = ArrowUtils.createVectorSchemaRoot(rowType, allocator, allowUpperCase); + vectorSchemaRoot = ArrowUtils.createVectorSchemaRoot(rowType, allocator, caseSensitive); fieldWriters = new ArrowFieldWriter[rowType.getFieldCount()]; diff --git a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java index bb8cfae68284..b22274e011fb 100644 --- a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java @@ -128,13 +128,12 @@ public class CatalogOptions { .withDescription( "Controls the max number for snapshots per table in the catalog are cached."); - public static final ConfigOption ALLOW_UPPER_CASE = - ConfigOptions.key("allow-upper-case") + public static final ConfigOption CASE_SENSITIVE = + ConfigOptions.key("case-sensitive") .booleanType() .noDefaultValue() - .withDescription( - "Indicates whether this catalog allow upper case, " - + "its default value depends on the implementation of the specific catalog."); + .withFallbackKeys("allow-upper-case") + .withDescription("Indicates whether this catalog is case-sensitive."); public static final ConfigOption SYNC_ALL_PROPERTIES = ConfigOptions.key("sync-all-properties") diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java index e184624c0bbf..c4e07e0a6972 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java @@ -542,8 +542,8 @@ public static String quote(String str) { return "`" + str + "`"; } - public static String caseSensitiveConversion(String str, boolean allowUpperCase) { - return allowUpperCase ? str : str.toLowerCase(); + public static String toLowerCaseIfNeed(String str, boolean caseSensitive) { + return caseSensitive ? str : str.toLowerCase(); } public static boolean isNumeric(final CharSequence cs) { diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index a1b41e3b8a41..db6909295556 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -60,7 +60,6 @@ import static org.apache.paimon.CoreOptions.TYPE; import static org.apache.paimon.CoreOptions.createCommitUser; -import static org.apache.paimon.options.CatalogOptions.ALLOW_UPPER_CASE; import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED; import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE; import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; @@ -82,7 +81,7 @@ protected AbstractCatalog(FileIO fileIO) { protected AbstractCatalog(FileIO fileIO, Options options) { this.fileIO = fileIO; - this.tableDefaultOptions = Catalog.tableDefaultOptions(options.toMap()); + this.tableDefaultOptions = CatalogUtils.tableDefaultOptions(options.toMap()); this.catalogOptions = options; } @@ -123,11 +122,6 @@ protected boolean lockEnabled() { return catalogOptions.getOptional(LOCK_ENABLED).orElse(fileIO.isObjectStore()); } - @Override - public boolean allowUpperCase() { - return catalogOptions.getOptional(ALLOW_UPPER_CASE).orElse(true); - } - protected boolean allowCustomTablePath() { return false; } @@ -559,8 +553,9 @@ protected void checkNotSystemDatabase(String database) { } protected void validateIdentifierNameCaseInsensitive(Identifier identifier) { - Catalog.validateCaseInsensitive(allowUpperCase(), "Database", identifier.getDatabaseName()); - Catalog.validateCaseInsensitive(allowUpperCase(), "Table", identifier.getObjectName()); + CatalogUtils.validateCaseInsensitive( + caseSensitive(), "Database", identifier.getDatabaseName()); + CatalogUtils.validateCaseInsensitive(caseSensitive(), "Table", identifier.getObjectName()); } private void validateFieldNameCaseInsensitiveInSchemaChange(List changes) { @@ -578,7 +573,7 @@ private void validateFieldNameCaseInsensitiveInSchemaChange(List c } protected void validateFieldNameCaseInsensitive(List fieldNames) { - Catalog.validateCaseInsensitive(allowUpperCase(), "Field", fieldNames); + CatalogUtils.validateCaseInsensitive(caseSensitive(), "Field", fieldNames); } private void validateAutoCreateClose(Map options) { diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java index c3808caa135a..7b1fe0ea072e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java @@ -26,15 +26,9 @@ import org.apache.paimon.table.Table; import org.apache.paimon.view.View; -import java.io.Serializable; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; - -import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey; -import static org.apache.paimon.utils.Preconditions.checkArgument; /** * This interface is responsible for reading and writing metadata such as database/table from a @@ -46,30 +40,38 @@ @Public public interface Catalog extends AutoCloseable { - String DEFAULT_DATABASE = "default"; - + // constants for system table and database String SYSTEM_TABLE_SPLITTER = "$"; String SYSTEM_DATABASE_NAME = "sys"; String SYSTEM_BRANCH_PREFIX = "branch_"; - String TABLE_DEFAULT_OPTION_PREFIX = "table-default."; - String DB_SUFFIX = ".db"; + // constants for table and database String COMMENT_PROP = "comment"; String OWNER_PROP = "owner"; + + // constants for database + String DEFAULT_DATABASE = "default"; + String DB_SUFFIX = ".db"; String DB_LOCATION_PROP = "location"; + + // constants for table + String TABLE_DEFAULT_OPTION_PREFIX = "table-default."; String NUM_ROWS_PROP = "numRows"; String NUM_FILES_PROP = "numFiles"; String TOTAL_SIZE_PROP = "totalSize"; String LAST_UPDATE_TIME_PROP = "lastUpdateTime"; - String HIVE_LAST_UPDATE_TIME_PROP = "transient_lastDdlTime"; - /** Warehouse root path containing all database directories in this catalog. */ + /** Warehouse root path for creating new databases. */ String warehouse(); - /** Catalog options. */ + /** {@link FileIO} of this catalog. It can access {@link #warehouse()} path. */ + FileIO fileIO(); + + /** Catalog options for re-creating this catalog. */ Map options(); - FileIO fileIO(); + /** Return a boolean that indicates whether this catalog is case-sensitive. */ + boolean caseSensitive(); /** * Get the names of all databases in this catalog. @@ -325,44 +327,30 @@ default void renameView(Identifier fromView, Identifier toView, boolean ignoreIf throw new UnsupportedOperationException(); } - /** Return a boolean that indicates whether this catalog allow upper case. */ - boolean allowUpperCase(); - + /** + * Repair the entire Catalog, repair the metadata in the metastore consistent with the metadata + * in the filesystem, register missing tables in the metastore. + */ default void repairCatalog() { throw new UnsupportedOperationException(); } + /** + * Repair the entire database, repair the metadata in the metastore consistent with the metadata + * in the filesystem, register missing tables in the metastore. + */ default void repairDatabase(String databaseName) { throw new UnsupportedOperationException(); } + /** + * Repair the table, repair the metadata in the metastore consistent with the metadata in the + * filesystem. + */ default void repairTable(Identifier identifier) throws TableNotExistException { throw new UnsupportedOperationException(); } - static Map tableDefaultOptions(Map options) { - return convertToPropertiesPrefixKey(options, TABLE_DEFAULT_OPTION_PREFIX); - } - - /** Validate database, table and field names must be lowercase when not case-sensitive. */ - static void validateCaseInsensitive(boolean caseSensitive, String type, String... names) { - validateCaseInsensitive(caseSensitive, type, Arrays.asList(names)); - } - - /** Validate database, table and field names must be lowercase when not case-sensitive. */ - static void validateCaseInsensitive(boolean caseSensitive, String type, List names) { - if (caseSensitive) { - return; - } - List illegalNames = - names.stream().filter(f -> !f.equals(f.toLowerCase())).collect(Collectors.toList()); - checkArgument( - illegalNames.isEmpty(), - String.format( - "%s name %s cannot contain upper case in the catalog.", - type, illegalNames)); - } - /** Exception for trying to drop on a database that is not empty. */ class DatabaseNotEmptyException extends Exception { private static final String MSG = "Database %s is not empty."; @@ -599,10 +587,4 @@ public Identifier identifier() { return identifier; } } - - /** Loader of {@link Catalog}. */ - @FunctionalInterface - interface Loader extends Serializable { - Catalog load(); - } } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogOptions.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLoader.java similarity index 55% rename from paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogOptions.java rename to paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLoader.java index e656742b42e9..c8de08139cb7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogOptions.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLoader.java @@ -18,19 +18,11 @@ package org.apache.paimon.catalog; -import org.apache.paimon.options.ConfigOption; -import org.apache.paimon.options.ConfigOptions; +import java.io.Serializable; -/** Options for filesystem catalog. */ -public final class FileSystemCatalogOptions { +/** Loader for creating a {@link Catalog}. */ +@FunctionalInterface +public interface CatalogLoader extends Serializable { - public static final ConfigOption CASE_SENSITIVE = - ConfigOptions.key("case-sensitive") - .booleanType() - .defaultValue(true) - .withFallbackKeys("allow-upper-case") - .withDescription( - "Is case sensitive. If case insensitive, you need to set this option to false, and the table name and fields be converted to lowercase."); - - private FileSystemCatalogOptions() {} + Catalog load(); } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java index 39f81833a9eb..bae23c627607 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java @@ -21,6 +21,15 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.schema.SchemaManager; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.paimon.catalog.Catalog.TABLE_DEFAULT_OPTION_PREFIX; +import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey; +import static org.apache.paimon.utils.Preconditions.checkArgument; + /** Utils for {@link Catalog}. */ public class CatalogUtils { @@ -51,4 +60,29 @@ public static String table(Path path) { public static String table(String path) { return SchemaManager.identifierFromPath(path, false).getObjectName(); } + + public static Map tableDefaultOptions(Map options) { + return convertToPropertiesPrefixKey(options, TABLE_DEFAULT_OPTION_PREFIX); + } + + /** Validate database, table and field names must be lowercase when not case-sensitive. */ + public static void validateCaseInsensitive( + boolean caseSensitive, String type, String... names) { + validateCaseInsensitive(caseSensitive, type, Arrays.asList(names)); + } + + /** Validate database, table and field names must be lowercase when not case-sensitive. */ + public static void validateCaseInsensitive( + boolean caseSensitive, String type, List names) { + if (caseSensitive) { + return; + } + List illegalNames = + names.stream().filter(f -> !f.equals(f.toLowerCase())).collect(Collectors.toList()); + checkArgument( + illegalNames.isEmpty(), + String.format( + "%s name %s cannot contain upper case in the catalog.", + type, illegalNames)); + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java index 2298626b0e48..93e8ce2581ad 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java @@ -42,8 +42,8 @@ public Catalog wrapped() { } @Override - public boolean allowUpperCase() { - return wrapped.allowUpperCase(); + public boolean caseSensitive() { + return wrapped.caseSensitive(); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java index 9264a54647b1..279ddb26ee53 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java @@ -34,7 +34,7 @@ import java.util.Map; import java.util.concurrent.Callable; -import static org.apache.paimon.catalog.FileSystemCatalogOptions.CASE_SENSITIVE; +import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE; /** A catalog implementation for {@link FileIO}. */ public class FileSystemCatalog extends AbstractCatalog { @@ -158,7 +158,7 @@ public String warehouse() { } @Override - public boolean allowUpperCase() { - return catalogOptions.get(CASE_SENSITIVE); + public boolean caseSensitive() { + return catalogOptions.getOptional(CASE_SENSITIVE).orElse(true); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java index 778bc591fe89..551b2d8fc910 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java @@ -320,7 +320,7 @@ protected TableSchema getDataTableSchema(Identifier identifier) throws TableNotE } @Override - public boolean allowUpperCase() { + public boolean caseSensitive() { return false; } diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java index 86b87e25e832..c30e1109e2ec 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java @@ -52,6 +52,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.stream.Collectors; +import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE; import static org.apache.paimon.utils.ThreadPoolUtils.createScheduledThreadPool; /** A catalog implementation for REST. */ @@ -61,7 +62,7 @@ public class RESTCatalog implements Catalog { private final RESTClient client; private final ResourcePaths resourcePaths; - private final Map options; + private final Options options; private final Map baseHeader; private final AuthSession catalogAuth; @@ -99,7 +100,7 @@ public RESTCatalog(Options options) { } Map initHeaders = RESTUtil.merge(configHeaders(options.toMap()), this.catalogAuth.getHeaders()); - this.options = fetchOptionsFromServer(initHeaders, options.toMap()); + this.options = new Options(fetchOptionsFromServer(initHeaders, options.toMap())); this.resourcePaths = ResourcePaths.forCatalogProperties( this.options.get(RESTCatalogInternalOptions.PREFIX)); @@ -112,7 +113,7 @@ public String warehouse() { @Override public Map options() { - return this.options; + return this.options.toMap(); } @Override @@ -223,8 +224,8 @@ public List listPartitions(Identifier identifier) } @Override - public boolean allowUpperCase() { - return false; + public boolean caseSensitive() { + return options.getOptional(CASE_SENSITIVE).orElse(true); } @Override diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java index 303a9d8733d4..65ea6721c220 100644 --- a/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java @@ -36,12 +36,12 @@ public class FileSystemCatalogTest extends CatalogTestBase { public void setUp() throws Exception { super.setUp(); Options catalogOptions = new Options(); - catalogOptions.set(CatalogOptions.ALLOW_UPPER_CASE, false); + catalogOptions.set(CatalogOptions.CASE_SENSITIVE, false); catalog = new FileSystemCatalog(fileIO, new Path(warehouse), catalogOptions); } @Test - public void testCreateTableAllowUpperCase() throws Exception { + public void testCreateTableCaseSensitive() throws Exception { catalog.createDatabase("test_db", false); Identifier identifier = Identifier.create("test_db", "new_table"); Schema schema = diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java index c8af6f91c420..6482a625f4c7 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java @@ -42,7 +42,7 @@ import static org.apache.paimon.flink.action.MultiTablesSinkMode.DIVIDED; import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.paimon.utils.Preconditions.checkState; -import static org.apache.paimon.utils.StringUtils.caseSensitiveConversion; +import static org.apache.paimon.utils.StringUtils.toLowerCaseIfNeed; /** Common utils for CDC Action. */ public class CdcActionCommonUtils { @@ -129,21 +129,21 @@ public static Schema buildPaimonSchema( List allFieldNames = new ArrayList<>(); for (DataField field : sourceSchema.fields()) { - String fieldName = caseSensitiveConversion(field.name(), caseSensitive); + String fieldName = toLowerCaseIfNeed(field.name(), caseSensitive); allFieldNames.add(fieldName); builder.column(fieldName, field.type(), field.description()); } for (ComputedColumn computedColumn : computedColumns) { String computedColumnName = - caseSensitiveConversion(computedColumn.columnName(), caseSensitive); + toLowerCaseIfNeed(computedColumn.columnName(), caseSensitive); allFieldNames.add(computedColumnName); builder.column(computedColumnName, computedColumn.columnType()); } for (CdcMetadataConverter metadataConverter : metadataConverters) { String metadataColumnName = - caseSensitiveConversion(metadataConverter.columnName(), caseSensitive); + toLowerCaseIfNeed(metadataConverter.columnName(), caseSensitive); allFieldNames.add(metadataColumnName); builder.column(metadataColumnName, metadataConverter.dataType()); } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java index 2e0a1319293f..3290ec18291f 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java @@ -189,7 +189,7 @@ public static ReferencedField checkArgument( String[] literals = Arrays.stream(args).skip(1).map(String::trim).toArray(String[]::new); String referencedFieldCheckForm = - StringUtils.caseSensitiveConversion(referencedField, caseSensitive); + StringUtils.toLowerCaseIfNeed(referencedField, caseSensitive); DataType fieldType = checkNotNull( diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java index 9c629b5a516f..3af0957ce6da 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java @@ -86,7 +86,7 @@ protected Schema buildPaimonSchema(Schema retrievedSchema) { tableConfig, retrievedSchema, metadataConverters, - allowUpperCase, + caseSensitive, true, false); } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java index 4fb1339c5193..56334c1e7bff 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink.action.cdc; import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogUtils; import org.apache.paimon.flink.action.Action; import org.apache.paimon.flink.action.MultiTablesSinkMode; import org.apache.paimon.flink.sink.cdc.EventParser; @@ -155,9 +156,9 @@ public SyncDatabaseActionBase withPrimaryKeys(String... primaryKeys) { @Override protected void validateCaseSensitivity() { - Catalog.validateCaseInsensitive(allowUpperCase, "Database", database); - Catalog.validateCaseInsensitive(allowUpperCase, "Table prefix", tablePrefix); - Catalog.validateCaseInsensitive(allowUpperCase, "Table suffix", tableSuffix); + CatalogUtils.validateCaseInsensitive(caseSensitive, "Database", database); + CatalogUtils.validateCaseInsensitive(caseSensitive, "Table prefix", tablePrefix); + CatalogUtils.validateCaseInsensitive(caseSensitive, "Table suffix", tableSuffix); } @Override @@ -179,7 +180,7 @@ protected EventParser.Factory buildEventParserFactory() NewTableSchemaBuilder schemaBuilder = new NewTableSchemaBuilder( tableConfig, - allowUpperCase, + caseSensitive, partitionKeys, primaryKeys, requirePrimaryKeys(), @@ -190,7 +191,7 @@ protected EventParser.Factory buildEventParserFactory() excludingTables == null ? null : Pattern.compile(excludingTables); TableNameConverter tableNameConverter = new TableNameConverter( - allowUpperCase, + caseSensitive, mergeShards, dbPrefix, dbSuffix, diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java index 87efeb2a19cf..6fcdbd44bca2 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java @@ -20,6 +20,7 @@ import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogUtils; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.FlinkConnectorOptions; import org.apache.paimon.flink.action.Action; @@ -107,15 +108,15 @@ protected Schema buildPaimonSchema(Schema retrievedSchema) { tableConfig, retrievedSchema, metadataConverters, - allowUpperCase, + caseSensitive, true, true); } @Override protected void validateCaseSensitivity() { - Catalog.validateCaseInsensitive(allowUpperCase, "Database", database); - Catalog.validateCaseInsensitive(allowUpperCase, "Table", table); + CatalogUtils.validateCaseInsensitive(caseSensitive, "Database", database); + CatalogUtils.validateCaseInsensitive(caseSensitive, "Table", table); } @Override @@ -142,7 +143,7 @@ protected void beforeBuildingSourceSink() throws Exception { buildComputedColumns( computedColumnArgs, fileStoreTable.schema().fields(), - allowUpperCase); + caseSensitive); // check partition keys and primary keys in case that user specified them checkConstraints(); } @@ -162,7 +163,7 @@ protected FlatMapFunction recordParse() @Override protected EventParser.Factory buildEventParserFactory() { - boolean caseSensitive = this.allowUpperCase; + boolean caseSensitive = this.caseSensitive; return () -> new RichCdcMultiplexRecordEventParser(caseSensitive); } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java index a7c770347410..d755b200a957 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java @@ -64,7 +64,7 @@ public abstract class SynchronizationActionBase extends ActionBase { protected final String database; protected final Configuration cdcSourceConfig; protected final SyncJobHandler syncJobHandler; - protected final boolean allowUpperCase; + protected final boolean caseSensitive; protected Map tableConfig = new HashMap<>(); protected TypeMapping typeMapping = TypeMapping.defaultMapping(); @@ -80,7 +80,7 @@ public SynchronizationActionBase( this.database = database; this.cdcSourceConfig = Configuration.fromMap(cdcSourceConfig); this.syncJobHandler = syncJobHandler; - this.allowUpperCase = catalog.allowUpperCase(); + this.caseSensitive = catalog.caseSensitive(); this.syncJobHandler.registerJdbcDriver(); } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TableNameConverter.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TableNameConverter.java index 15fc3507ce2d..7dd63ed2273e 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TableNameConverter.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TableNameConverter.java @@ -24,6 +24,8 @@ import java.util.HashMap; import java.util.Map; +import static org.apache.paimon.utils.StringUtils.toLowerCaseIfNeed; + /** Used to convert a MySQL source table name to corresponding Paimon table name. */ public class TableNameConverter implements Serializable { @@ -78,7 +80,7 @@ public String convert(String originDbName, String originTblName) { // top priority: table mapping if (tableMapping.containsKey(originTblName.toLowerCase())) { String mappedName = tableMapping.get(originTblName.toLowerCase()); - return caseSensitive ? mappedName : mappedName.toLowerCase(); + return toLowerCaseIfNeed(mappedName, caseSensitive); } String tblPrefix = prefix; @@ -93,7 +95,7 @@ public String convert(String originDbName, String originTblName) { } // third priority: normal prefix and suffix - String tableName = caseSensitive ? originTblName : originTblName.toLowerCase(); + String tableName = toLowerCaseIfNeed(originTblName, caseSensitive); return tblPrefix + tableName + tblSuffix; } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java index ce2e9124a664..0f452e2834be 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java @@ -139,7 +139,7 @@ protected void beforeBuildingSourceSink() throws Exception { TableNameConverter tableNameConverter = new TableNameConverter( - allowUpperCase, mergeShards, tablePrefix, tableSuffix, tableMapping); + caseSensitive, mergeShards, tablePrefix, tableSuffix, tableMapping); for (JdbcTableInfo tableInfo : jdbcTableInfos) { Identifier identifier = Identifier.create( @@ -155,7 +155,7 @@ protected void beforeBuildingSourceSink() throws Exception { tableConfig, tableInfo.schema(), metadataConverters, - allowUpperCase, + caseSensitive, false, true); try { diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CaseSensitiveUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CaseSensitiveUtils.java index 4892aee03024..e80692ed22f5 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CaseSensitiveUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CaseSensitiveUtils.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink.sink.cdc; import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogLoader; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.functions.ProcessFunction; @@ -28,8 +29,8 @@ public class CaseSensitiveUtils { public static DataStream cdcRecordConvert( - Catalog.Loader catalogLoader, DataStream input) { - if (allowUpperCase(catalogLoader)) { + CatalogLoader catalogLoader, DataStream input) { + if (caseSensitive(catalogLoader)) { return input; } @@ -46,8 +47,8 @@ public void processElement( } public static DataStream cdcMultiplexRecordConvert( - Catalog.Loader catalogLoader, DataStream input) { - if (allowUpperCase(catalogLoader)) { + CatalogLoader catalogLoader, DataStream input) { + if (caseSensitive(catalogLoader)) { return input; } @@ -65,9 +66,9 @@ public void processElement( .name("Case-insensitive Convert"); } - private static boolean allowUpperCase(Catalog.Loader catalogLoader) { + private static boolean caseSensitive(CatalogLoader catalogLoader) { try (Catalog catalog = catalogLoader.load()) { - return catalog.allowUpperCase(); + return catalog.caseSensitive(); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java index 886e33e2046a..4efcf1207e9f 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink.sink.cdc; import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.types.DataField; @@ -62,13 +63,13 @@ public class CdcDynamicTableParsingProcessFunction extends ProcessFunction parserFactory; private final String database; - private final Catalog.Loader catalogLoader; + private final CatalogLoader catalogLoader; private transient EventParser parser; private transient Catalog catalog; public CdcDynamicTableParsingProcessFunction( - String database, Catalog.Loader catalogLoader, EventParser.Factory parserFactory) { + String database, CatalogLoader catalogLoader, EventParser.Factory parserFactory) { // for now, only support single database this.database = database; this.catalogLoader = catalogLoader; diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputer.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputer.java index fdad3a921d63..2858b2d4eb6b 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputer.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputer.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink.sink.cdc; import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.FileStoreTable; @@ -38,13 +39,13 @@ public class CdcMultiplexRecordChannelComputer implements ChannelComputer channelComputers; - public CdcMultiplexRecordChannelComputer(Catalog.Loader catalogLoader) { + public CdcMultiplexRecordChannelComputer(CatalogLoader catalogLoader) { this.catalogLoader = catalogLoader; } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java index 5db111a30047..9387a8293874 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java @@ -20,6 +20,7 @@ import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.data.GenericRow; import org.apache.paimon.flink.sink.MultiTableCommittable; @@ -67,7 +68,7 @@ public class CdcRecordStoreMultiWriteOperator private final StoreSinkWrite.WithWriteBufferProvider storeSinkWriteProvider; private final String initialCommitUser; - private final Catalog.Loader catalogLoader; + private final CatalogLoader catalogLoader; private MemoryPoolFactory memoryPoolFactory; private Catalog catalog; @@ -79,7 +80,7 @@ public class CdcRecordStoreMultiWriteOperator private CdcRecordStoreMultiWriteOperator( StreamOperatorParameters parameters, - Catalog.Loader catalogLoader, + CatalogLoader catalogLoader, StoreSinkWrite.WithWriteBufferProvider storeSinkWriteProvider, String initialCommitUser, Options options) { @@ -264,10 +265,10 @@ public static class Factory extends PrepareCommitOperator.Factory { private final StoreSinkWrite.WithWriteBufferProvider storeSinkWriteProvider; private final String initialCommitUser; - private final Catalog.Loader catalogLoader; + private final CatalogLoader catalogLoader; public Factory( - Catalog.Loader catalogLoader, + CatalogLoader catalogLoader, StoreSinkWrite.WithWriteBufferProvider storeSinkWriteProvider, String initialCommitUser, Options options) { diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java index 28b68fedc3e6..5c27db6ddf1b 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java @@ -19,7 +19,7 @@ package org.apache.paimon.flink.sink.cdc; import org.apache.paimon.annotation.Experimental; -import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.utils.SingleOutputStreamOperatorUtils; import org.apache.paimon.schema.SchemaManager; @@ -48,7 +48,7 @@ public class CdcSinkBuilder { private EventParser.Factory parserFactory = null; private Table table = null; private Identifier identifier = null; - private Catalog.Loader catalogLoader = null; + private CatalogLoader catalogLoader = null; @Nullable private Integer parallelism; @@ -77,7 +77,7 @@ public CdcSinkBuilder withIdentifier(Identifier identifier) { return this; } - public CdcSinkBuilder withCatalogLoader(Catalog.Loader catalogLoader) { + public CdcSinkBuilder withCatalogLoader(CatalogLoader catalogLoader) { this.catalogLoader = catalogLoader; return this; } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java index 1688d4deb088..4cd9235cb58a 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java @@ -18,7 +18,7 @@ package org.apache.paimon.flink.sink.cdc; -import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.flink.sink.CommittableStateManager; import org.apache.paimon.flink.sink.Committer; import org.apache.paimon.flink.sink.CommitterOperatorFactory; @@ -60,13 +60,13 @@ public class FlinkCdcMultiTableSink implements Serializable { private static final String GLOBAL_COMMITTER_NAME = "Multiplex Global Committer"; private final boolean isOverwrite = false; - private final Catalog.Loader catalogLoader; + private final CatalogLoader catalogLoader; private final double commitCpuCores; @Nullable private final MemorySize commitHeapMemory; private final String commitUser; public FlinkCdcMultiTableSink( - Catalog.Loader catalogLoader, + CatalogLoader catalogLoader, double commitCpuCores, @Nullable MemorySize commitHeapMemory, String commitUser) { diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java index a9ad66847b4b..bd18c7e7ad82 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java @@ -18,7 +18,7 @@ package org.apache.paimon.flink.sink.cdc; -import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.FlinkConnectorOptions; import org.apache.paimon.flink.action.MultiTablesSinkMode; @@ -72,7 +72,7 @@ public class FlinkCdcSyncDatabaseSinkBuilder { // it will check newly added tables and create the corresponding // Paimon tables. 2) in multiplex sink where it is used to // initialize different writers to multiple tables. - private Catalog.Loader catalogLoader; + private CatalogLoader catalogLoader; // database to sync, currently only support single database private String database; private MultiTablesSinkMode mode; @@ -111,7 +111,7 @@ public FlinkCdcSyncDatabaseSinkBuilder withDatabase(String database) { return this; } - public FlinkCdcSyncDatabaseSinkBuilder withCatalogLoader(Catalog.Loader catalogLoader) { + public FlinkCdcSyncDatabaseSinkBuilder withCatalogLoader(CatalogLoader catalogLoader) { this.catalogLoader = catalogLoader; return this; } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java index 0ad412e47d34..dd612a52c2eb 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink.sink.cdc; import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.schema.SchemaManager; @@ -51,7 +52,7 @@ public class MultiTableUpdatedDataFieldsProcessFunction private final Map schemaManagers = new HashMap<>(); - public MultiTableUpdatedDataFieldsProcessFunction(Catalog.Loader catalogLoader) { + public MultiTableUpdatedDataFieldsProcessFunction(CatalogLoader catalogLoader) { super(catalogLoader); } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java index 610856d3af54..43f63854bb22 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java @@ -19,7 +19,7 @@ package org.apache.paimon.flink.sink.cdc; import org.apache.paimon.annotation.Public; -import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.table.Table; @@ -39,7 +39,7 @@ public class RichCdcSinkBuilder { private DataStream input = null; private Table table = null; private Identifier identifier = null; - private Catalog.Loader catalogLoader = null; + private CatalogLoader catalogLoader = null; @Nullable private Integer parallelism; @@ -62,7 +62,7 @@ public RichCdcSinkBuilder parallelism(@Nullable Integer parallelism) { return this; } - public RichCdcSinkBuilder catalogLoader(Catalog.Loader catalogLoader) { + public RichCdcSinkBuilder catalogLoader(CatalogLoader catalogLoader) { this.catalogLoader = catalogLoader; return this; } @@ -114,7 +114,7 @@ public RichCdcSinkBuilder withIdentifier(Identifier identifier) { /** @deprecated Use {@link #catalogLoader}. */ @Deprecated - public RichCdcSinkBuilder withCatalogLoader(Catalog.Loader catalogLoader) { + public RichCdcSinkBuilder withCatalogLoader(CatalogLoader catalogLoader) { this.catalogLoader = catalogLoader; return this; } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java index 64f00d96b0f5..504f63105801 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java @@ -18,7 +18,7 @@ package org.apache.paimon.flink.sink.cdc; -import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.schema.SchemaManager; @@ -53,7 +53,7 @@ public class UpdatedDataFieldsProcessFunction private Set latestFields; public UpdatedDataFieldsProcessFunction( - SchemaManager schemaManager, Identifier identifier, Catalog.Loader catalogLoader) { + SchemaManager schemaManager, Identifier identifier, CatalogLoader catalogLoader) { super(catalogLoader); this.schemaManager = schemaManager; this.identifier = identifier; diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java index 4f02b784c2ba..d50df23742aa 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink.sink.cdc; import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.schema.SchemaManager; @@ -48,9 +49,9 @@ public abstract class UpdatedDataFieldsProcessFunctionBase extends Process private static final Logger LOG = LoggerFactory.getLogger(UpdatedDataFieldsProcessFunctionBase.class); - protected final Catalog.Loader catalogLoader; + protected final CatalogLoader catalogLoader; protected Catalog catalog; - private boolean allowUpperCase; + private boolean caseSensitive; private static final List STRING_TYPES = Arrays.asList(DataTypeRoot.CHAR, DataTypeRoot.VARCHAR); @@ -70,7 +71,7 @@ public abstract class UpdatedDataFieldsProcessFunctionBase extends Process private static final List TIMESTAMP_TYPES = Arrays.asList(DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE); - protected UpdatedDataFieldsProcessFunctionBase(Catalog.Loader catalogLoader) { + protected UpdatedDataFieldsProcessFunctionBase(CatalogLoader catalogLoader) { this.catalogLoader = catalogLoader; } @@ -86,7 +87,7 @@ public void open(OpenContext openContext) throws Exception { */ public void open(Configuration parameters) { this.catalog = catalogLoader.load(); - this.allowUpperCase = this.catalog.allowUpperCase(); + this.caseSensitive = this.catalog.caseSensitive(); } protected void applySchemaChange( @@ -215,8 +216,7 @@ protected List extractSchemaChanges( List result = new ArrayList<>(); for (DataField newField : updatedDataFields) { - String newFieldName = - StringUtils.caseSensitiveConversion(newField.name(), allowUpperCase); + String newFieldName = StringUtils.toLowerCaseIfNeed(newField.name(), caseSensitive); if (oldFields.containsKey(newFieldName)) { DataField oldField = oldFields.get(newFieldName); // we compare by ignoring nullable, because partition keys and primary keys might be diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SchemaEvolutionTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SchemaEvolutionTest.java index 9ba18376867f..46c8e98fb639 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SchemaEvolutionTest.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SchemaEvolutionTest.java @@ -19,7 +19,7 @@ package org.apache.paimon.flink.action.cdc; import org.apache.paimon.CoreOptions; -import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.FlinkCatalogFactory; import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction; @@ -202,7 +202,7 @@ public void testSchemaEvolution() throws Exception { DataStream> upDataFieldStream = env.fromCollection(prepareData()); Options options = new Options(); options.set("warehouse", tempPath.toString()); - final Catalog.Loader catalogLoader = () -> FlinkCatalogFactory.createPaimonCatalog(options); + final CatalogLoader catalogLoader = () -> FlinkCatalogFactory.createPaimonCatalog(options); Identifier identifier = Identifier.create(database, tableName); DataStream schemaChangeProcessFunction = upDataFieldStream diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java index 0e1f4e72ea8c..6e37c589ac92 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java @@ -18,7 +18,7 @@ package org.apache.paimon.flink.action.cdc.kafka; -import org.apache.paimon.catalog.FileSystemCatalogOptions; +import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; @@ -535,7 +535,7 @@ public void testCaseInsensitive() throws Exception { .withTableConfig(getBasicTableConfig()) .withCatalogConfig( Collections.singletonMap( - FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false")) + CatalogOptions.CASE_SENSITIVE.key(), "false")) .build(); runActionWithDefaultEnv(action); diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java index 8a4dc2f3035b..ed1885f5d774 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java @@ -19,7 +19,7 @@ package org.apache.paimon.flink.action.cdc.kafka; import org.apache.paimon.CoreOptions; -import org.apache.paimon.catalog.FileSystemCatalogOptions; +import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; @@ -1121,7 +1121,7 @@ public void testComputedColumnWithCaseInsensitive(boolean triggerSchemaRetrieval .withTableConfig(getBasicTableConfig()) .withCatalogConfig( Collections.singletonMap( - FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false")) + CatalogOptions.CASE_SENSITIVE.key(), "false")) .withComputedColumnArgs("_YEAR=year(_DATE)") .build(); runActionWithDefaultEnv(action); diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionITCase.java index de189bc20536..606c46e90e4a 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionITCase.java @@ -18,7 +18,7 @@ package org.apache.paimon.flink.action.cdc.kafka; -import org.apache.paimon.catalog.FileSystemCatalogOptions; +import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; @@ -475,7 +475,7 @@ protected void testCaseInsensitive(String format) throws Exception { .withTableConfig(getBasicTableConfig()) .withCatalogConfig( Collections.singletonMap( - FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false")) + CatalogOptions.CASE_SENSITIVE.key(), "false")) .build(); runActionWithDefaultEnv(action); diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java index ae0b0b412ab4..92c2a7243a7c 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java @@ -18,7 +18,7 @@ package org.apache.paimon.flink.action.cdc.mongodb; -import org.apache.paimon.catalog.FileSystemCatalogOptions; +import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; @@ -196,7 +196,7 @@ public void testDynamicTableCreationInMongoDB() throws Exception { .withTableConfig(getBasicTableConfig()) .withCatalogConfig( Collections.singletonMap( - FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false")) + CatalogOptions.CASE_SENSITIVE.key(), "false")) .build(); runActionWithDefaultEnv(action); diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java index b4f31f2d6d3d..2d8489dc23df 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java @@ -18,7 +18,7 @@ package org.apache.paimon.flink.action.cdc.mongodb; -import org.apache.paimon.catalog.FileSystemCatalogOptions; +import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; @@ -382,7 +382,7 @@ public void testComputedColumnWithCaseInsensitive() throws Exception { .withTableConfig(getBasicTableConfig()) .withCatalogConfig( Collections.singletonMap( - FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false")) + CatalogOptions.CASE_SENSITIVE.key(), "false")) .withComputedColumnArgs("_YEAR=year(_DATE)") .build(); runActionWithDefaultEnv(action); diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java index b48b898d66e3..10ee548125bc 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java @@ -19,7 +19,6 @@ package org.apache.paimon.flink.action.cdc.mysql; import org.apache.paimon.CoreOptions; -import org.apache.paimon.catalog.FileSystemCatalogOptions; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.action.MultiTablesSinkMode; import org.apache.paimon.options.CatalogOptions; @@ -475,7 +474,7 @@ public void testIgnoreCaseDivided() throws Exception { syncDatabaseActionBuilder(mySqlConfig) .withCatalogConfig( Collections.singletonMap( - FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false")) + CatalogOptions.CASE_SENSITIVE.key(), "false")) .withTableConfig(getBasicTableConfig()) .build(); runActionWithDefaultEnv(action); @@ -496,7 +495,7 @@ public void testIgnoreCaseCombined() throws Exception { syncDatabaseActionBuilder(mySqlConfig) .withCatalogConfig( Collections.singletonMap( - FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false")) + CatalogOptions.CASE_SENSITIVE.key(), "false")) .withMode(COMBINED.configString()) .withTableConfig(getBasicTableConfig()) .build(); diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java index febbe4e1deaa..749d87eb0636 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java @@ -19,7 +19,6 @@ package org.apache.paimon.flink.action.cdc.mysql; import org.apache.paimon.CoreOptions; -import org.apache.paimon.catalog.FileSystemCatalogOptions; import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.schema.SchemaManager; @@ -1327,7 +1326,7 @@ public void testComputedColumnWithCaseInsensitive() throws Exception { syncTableActionBuilder(mySqlConfig) .withCatalogConfig( Collections.singletonMap( - FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false")) + CatalogOptions.CASE_SENSITIVE.key(), "false")) .withComputedColumnArgs("SUBSTRING=substring(UPPERCASE_STRING,2)") .build(); runActionWithDefaultEnv(action); @@ -1363,7 +1362,7 @@ public void testSpecifyKeysWithCaseInsensitive() throws Exception { syncTableActionBuilder(mySqlConfig) .withCatalogConfig( Collections.singletonMap( - FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false")) + CatalogOptions.CASE_SENSITIVE.key(), "false")) .withPrimaryKeys("ID1", "PART") .withPartitionKeys("PART") .build(); diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java index 867cbdbae002..43b7d2ba6399 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java @@ -22,6 +22,7 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.fs.Path; import org.apache.paimon.options.CatalogOptions; @@ -54,7 +55,7 @@ public class CdcMultiplexRecordChannelComputerTest { @TempDir java.nio.file.Path tempDir; - private Catalog.Loader catalogLoader; + private CatalogLoader catalogLoader; private Path warehouse; private String databaseName; private Identifier tableWithPartition; diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java index 9f35b25026bb..4436aa392d42 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java @@ -22,6 +22,7 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.sink.MultiTableCommittable; import org.apache.paimon.flink.sink.MultiTableCommittableTypeInfo; @@ -82,7 +83,7 @@ public class CdcRecordStoreMultiWriteOperatorTest { private Identifier firstTable; private Catalog catalog; private Identifier secondTable; - private Catalog.Loader catalogLoader; + private CatalogLoader catalogLoader; private Schema firstTableSchema; @BeforeEach @@ -340,7 +341,7 @@ public void testSingleTableAddColumn() throws Exception { harness.close(); } - private Catalog.Loader createCatalogLoader() { + private CatalogLoader createCatalogLoader() { Options catalogOptions = createCatalogOptions(warehouse); return () -> CatalogFactory.createCatalog(CatalogContext.create(catalogOptions)); } @@ -688,7 +689,7 @@ public void testUsingTheSameCompactExecutor() throws Exception { } private OneInputStreamOperatorTestHarness - createTestHarness(Catalog.Loader catalogLoader) throws Exception { + createTestHarness(CatalogLoader catalogLoader) throws Exception { CdcRecordStoreMultiWriteOperator.Factory operatorFactory = new CdcRecordStoreMultiWriteOperator.Factory( catalogLoader, diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java index 28b137a93ed9..35286e3a88d4 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java @@ -19,7 +19,7 @@ package org.apache.paimon.flink.sink.cdc; import org.apache.paimon.CoreOptions; -import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.catalog.CatalogUtils; import org.apache.paimon.data.InternalRow; import org.apache.paimon.flink.FlinkCatalogFactory; @@ -162,8 +162,7 @@ private void innerTestRandomCdcEvents(Supplier bucket, boolean unawareB Options catalogOptions = new Options(); catalogOptions.set("warehouse", tempDir.toString()); - Catalog.Loader catalogLoader = - () -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions); + CatalogLoader catalogLoader = () -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions); new FlinkCdcSyncDatabaseSinkBuilder() .withInput(source) diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java index 8b19391f3eda..9fccaac99228 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java @@ -19,7 +19,7 @@ package org.apache.paimon.flink.sink.cdc; import org.apache.paimon.CoreOptions; -import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.catalog.CatalogUtils; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.data.InternalRow; @@ -159,8 +159,7 @@ private void innerTestRandomCdcEvents( Options catalogOptions = new Options(); catalogOptions.set("warehouse", tempDir.toString()); - Catalog.Loader catalogLoader = - () -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions); + CatalogLoader catalogLoader = () -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions); new CdcSinkBuilder() .withInput(source) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java index dd95c48af8d1..3407735b4b79 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java @@ -21,6 +21,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.TableType; import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogUtils; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.procedure.ProcedureUtil; import org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil; @@ -519,7 +520,7 @@ protected Schema buildPaimonSchema( // Although catalog.createTable will copy the default options, but we need this info // here before create table, such as table-default.kafka.bootstrap.servers defined in // catalog options. Temporarily, we copy the default options here. - Catalog.tableDefaultOptions(catalog.options()).forEach(options::putIfAbsent); + CatalogUtils.tableDefaultOptions(catalog.options()).forEach(options::putIfAbsent); options.put(REGISTER_TIMEOUT.key(), logStoreAutoRegisterTimeout.toString()); registerLogSystem(catalog, identifier, options, classLoader); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java index 30e32d62efec..4490023e7b03 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java @@ -20,6 +20,7 @@ import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.flink.FlinkCatalog; import org.apache.paimon.flink.FlinkCatalogFactory; import org.apache.paimon.flink.LogicalTypeConversion; @@ -99,7 +100,7 @@ protected void execute(String defaultName) throws Exception { env.execute(name); } - protected Catalog.Loader catalogLoader() { + protected CatalogLoader catalogLoader() { // to make the action workflow serializable Options catalogOptions = this.catalogOptions; return () -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiAwareBucketTableScan.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiAwareBucketTableScan.java index 88730132ef68..e2fd5a9d318e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiAwareBucketTableScan.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiAwareBucketTableScan.java @@ -18,7 +18,7 @@ package org.apache.paimon.flink.compact; -import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.FileStoreTable; @@ -47,7 +47,7 @@ public class MultiAwareBucketTableScan extends MultiTableScanBase scansMap; public MultiAwareBucketTableScan( - Catalog.Loader catalogLoader, + CatalogLoader catalogLoader, Pattern includingPattern, Pattern excludingPattern, Pattern databasePattern, diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiTableScanBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiTableScanBase.java index f5940740b691..805e8da0a417 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiTableScanBase.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiTableScanBase.java @@ -20,6 +20,7 @@ import org.apache.paimon.append.MultiTableUnawareAppendCompactionTask; import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; @@ -59,7 +60,7 @@ public abstract class MultiTableScanBase implements AutoCloseable { protected boolean isStreaming; public MultiTableScanBase( - Catalog.Loader catalogLoader, + CatalogLoader catalogLoader, Pattern includingPattern, Pattern excludingPattern, Pattern databasePattern, diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiUnawareBucketTableScan.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiUnawareBucketTableScan.java index da86b93af512..2ad2642b6248 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiUnawareBucketTableScan.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiUnawareBucketTableScan.java @@ -20,7 +20,7 @@ import org.apache.paimon.append.MultiTableUnawareAppendCompactionTask; import org.apache.paimon.append.UnawareAppendTableCompactionCoordinator; -import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.FileStoreTable; @@ -41,7 +41,7 @@ public class MultiUnawareBucketTableScan protected transient Map tablesMap; public MultiUnawareBucketTableScan( - Catalog.Loader catalogLoader, + CatalogLoader catalogLoader, Pattern includingPattern, Pattern excludingPattern, Pattern databasePattern, diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java index 83d51f302e51..07ec7d165e3a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java @@ -21,6 +21,7 @@ import org.apache.paimon.append.MultiTableUnawareAppendCompactionTask; import org.apache.paimon.append.UnawareAppendCompactionTask; import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.compact.UnawareBucketCompactor; import org.apache.paimon.options.Options; @@ -56,7 +57,7 @@ public class AppendOnlyMultiTableCompactionWorkerOperator LoggerFactory.getLogger(AppendOnlyMultiTableCompactionWorkerOperator.class); private final String commitUser; - private final Catalog.Loader catalogLoader; + private final CatalogLoader catalogLoader; // support multi table compaction private transient Map compactorContainer; @@ -67,7 +68,7 @@ public class AppendOnlyMultiTableCompactionWorkerOperator private AppendOnlyMultiTableCompactionWorkerOperator( StreamOperatorParameters parameters, - Catalog.Loader catalogLoader, + CatalogLoader catalogLoader, String commitUser, Options options) { super(parameters, options); @@ -188,9 +189,9 @@ public static class Factory MultiTableUnawareAppendCompactionTask, MultiTableCommittable> { private final String commitUser; - private final Catalog.Loader catalogLoader; + private final CatalogLoader catalogLoader; - public Factory(Catalog.Loader catalogLoader, String commitUser, Options options) { + public Factory(CatalogLoader catalogLoader, String commitUser, Options options) { super(options); this.commitUser = commitUser; this.catalogLoader = catalogLoader; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java index 25f76ce97683..53f1bf165d98 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java @@ -20,7 +20,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.append.MultiTableUnawareAppendCompactionTask; -import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.manifest.WrappedManifestCommittable; import org.apache.paimon.options.Options; @@ -55,14 +55,14 @@ public class CombinedTableCompactorSink implements Serializable { private static final String WRITER_NAME = "Writer"; private static final String GLOBAL_COMMITTER_NAME = "Global Committer"; - private final Catalog.Loader catalogLoader; + private final CatalogLoader catalogLoader; private final boolean ignorePreviousFiles; private final boolean fullCompaction; private final Options options; public CombinedTableCompactorSink( - Catalog.Loader catalogLoader, Options options, boolean fullCompaction) { + CatalogLoader catalogLoader, Options options, boolean fullCompaction) { this.catalogLoader = catalogLoader; this.ignorePreviousFiles = false; this.fullCompaction = fullCompaction; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java index 58f6a3834096..02a7e6c1b3c8 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java @@ -20,6 +20,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.flink.utils.RuntimeContextUtils; @@ -72,7 +73,7 @@ public class MultiTablesStoreCompactOperator private transient StoreSinkWriteState state; private transient DataFileMetaSerializer dataFileMetaSerializer; - private final Catalog.Loader catalogLoader; + private final CatalogLoader catalogLoader; protected Catalog catalog; protected Map tables; @@ -81,7 +82,7 @@ public class MultiTablesStoreCompactOperator private MultiTablesStoreCompactOperator( StreamOperatorParameters parameters, - Catalog.Loader catalogLoader, + CatalogLoader catalogLoader, String initialCommitUser, CheckpointConfig checkpointConfig, boolean isStreaming, @@ -324,7 +325,7 @@ private StoreSinkWrite.Provider createWriteProvider( /** {@link StreamOperatorFactory} of {@link MultiTablesStoreCompactOperator}. */ public static class Factory extends PrepareCommitOperator.Factory { - private final Catalog.Loader catalogLoader; + private final CatalogLoader catalogLoader; private final CheckpointConfig checkpointConfig; private final boolean isStreaming; private final boolean ignorePreviousFiles; @@ -332,7 +333,7 @@ public static class Factory private final String initialCommitUser; public Factory( - Catalog.Loader catalogLoader, + CatalogLoader catalogLoader, String initialCommitUser, CheckpointConfig checkpointConfig, boolean isStreaming, diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java index 537a98f97fb0..01acddb9ad99 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink.sink; import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.manifest.WrappedManifestCommittable; @@ -56,12 +57,12 @@ public class StoreMultiCommitter private final boolean ignoreEmptyCommit; private final Map dynamicOptions; - public StoreMultiCommitter(Catalog.Loader catalogLoader, Context context) { + public StoreMultiCommitter(CatalogLoader catalogLoader, Context context) { this(catalogLoader, context, false, Collections.emptyMap()); } public StoreMultiCommitter( - Catalog.Loader catalogLoader, + CatalogLoader catalogLoader, Context context, boolean ignoreEmptyCommit, Map dynamicOptions) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java index dbdf77601480..d190b9ccf39e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java @@ -58,8 +58,11 @@ public static PartitionListeners create(Committer.Context context, FileStoreTabl throws Exception { List listeners = new ArrayList<>(); - ReportHmsListener.create(context.isRestored(), context.stateStore(), table) + // partition statistics reporter + ReportPartStatsListener.create(context.isRestored(), context.stateStore(), table) .ifPresent(listeners::add); + + // partition mark done PartitionMarkDone.create( context.streamingCheckpointEnabled(), context.isRestored(), diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/HmsReporter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporter.java similarity index 85% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/HmsReporter.java rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporter.java index 853dc52c20bf..b75889d567ee 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/HmsReporter.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporter.java @@ -40,22 +40,24 @@ import java.util.List; import java.util.Map; -import static org.apache.paimon.catalog.Catalog.HIVE_LAST_UPDATE_TIME_PROP; +import static org.apache.paimon.catalog.Catalog.LAST_UPDATE_TIME_PROP; import static org.apache.paimon.catalog.Catalog.NUM_FILES_PROP; import static org.apache.paimon.catalog.Catalog.NUM_ROWS_PROP; import static org.apache.paimon.catalog.Catalog.TOTAL_SIZE_PROP; import static org.apache.paimon.utils.PartitionPathUtils.extractPartitionSpecFromPath; /** Action to report the table statistic from the latest snapshot to HMS. */ -public class HmsReporter implements Closeable { +public class PartitionStatisticsReporter implements Closeable { - private static final Logger LOG = LoggerFactory.getLogger(HmsReporter.class); + private static final Logger LOG = LoggerFactory.getLogger(PartitionStatisticsReporter.class); + + private static final String HIVE_LAST_UPDATE_TIME_PROP = "transient_lastDdlTime"; private final MetastoreClient metastoreClient; private final SnapshotReader snapshotReader; private final SnapshotManager snapshotManager; - public HmsReporter(FileStoreTable table, MetastoreClient client) { + public PartitionStatisticsReporter(FileStoreTable table, MetastoreClient client) { this.metastoreClient = Preconditions.checkNotNull(client, "the metastore client factory is null"); this.snapshotReader = table.newSnapshotReader(); @@ -90,7 +92,12 @@ public void report(String partition, long modifyTime) throws Exception { statistic.put(NUM_FILES_PROP, String.valueOf(fileCount)); statistic.put(TOTAL_SIZE_PROP, String.valueOf(totalSize)); statistic.put(NUM_ROWS_PROP, String.valueOf(rowCount)); - statistic.put(HIVE_LAST_UPDATE_TIME_PROP, String.valueOf(modifyTime / 1000)); + + String modifyTimeSeconds = String.valueOf(modifyTime / 1000); + statistic.put(LAST_UPDATE_TIME_PROP, modifyTimeSeconds); + + // just for being compatible with hive metastore + statistic.put(HIVE_LAST_UPDATE_TIME_PROP, modifyTimeSeconds); LOG.info("alter partition {} with statistic {}.", partitionSpec, statistic); metastoreClient.alterPartition(partitionSpec, statistic, modifyTime, true); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportHmsListener.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java similarity index 91% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportHmsListener.java rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java index 842dd012e88e..ca51c3df5b1a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportHmsListener.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java @@ -49,7 +49,7 @@ * This listener will collect data from the newly touched partition and then decide when to trigger * a report based on the partition's idle time. */ -public class ReportHmsListener implements PartitionListener { +public class ReportPartStatsListener implements PartitionListener { @SuppressWarnings("unchecked") private static final ListStateDescriptor> PENDING_REPORT_STATE_DESC = @@ -58,20 +58,20 @@ public class ReportHmsListener implements PartitionListener { new MapSerializer<>(StringSerializer.INSTANCE, LongSerializer.INSTANCE)); private final InternalRowPartitionComputer partitionComputer; - private final HmsReporter hmsReporter; + private final PartitionStatisticsReporter partitionStatisticsReporter; private final ListState> pendingPartitionsState; private final Map pendingPartitions; private final long idleTime; - private ReportHmsListener( + private ReportPartStatsListener( InternalRowPartitionComputer partitionComputer, - HmsReporter hmsReporter, + PartitionStatisticsReporter partitionStatisticsReporter, OperatorStateStore store, boolean isRestored, long idleTime) throws Exception { this.partitionComputer = partitionComputer; - this.hmsReporter = hmsReporter; + this.partitionStatisticsReporter = partitionStatisticsReporter; this.pendingPartitionsState = store.getListState(PENDING_REPORT_STATE_DESC); this.pendingPartitions = new HashMap<>(); if (isRestored) { @@ -108,7 +108,7 @@ public void notifyCommittable(List committables) { try { Map partitions = reportPartition(endInput); for (Map.Entry entry : partitions.entrySet()) { - hmsReporter.report(entry.getKey(), entry.getValue()); + partitionStatisticsReporter.report(entry.getKey(), entry.getValue()); } } catch (Exception e) { throw new RuntimeException(e); @@ -138,7 +138,7 @@ public void snapshotState() throws Exception { pendingPartitionsState.update(Collections.singletonList(pendingPartitions)); } - public static Optional create( + public static Optional create( boolean isRestored, OperatorStateStore stateStore, FileStoreTable table) throws Exception { @@ -169,9 +169,9 @@ public static Optional create( coreOptions.legacyPartitionName()); return Optional.of( - new ReportHmsListener( + new ReportPartStatsListener( partitionComputer, - new HmsReporter( + new PartitionStatisticsReporter( table, table.catalogEnvironment().metastoreClientFactory().create()), stateStore, @@ -182,8 +182,8 @@ public static Optional create( @Override public void close() throws IOException { - if (hmsReporter != null) { - hmsReporter.close(); + if (partitionStatisticsReporter != null) { + partitionStatisticsReporter.close(); } } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CombinedTableCompactorSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CombinedTableCompactorSourceBuilder.java index 415eddb037df..ac6af6a14f6a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CombinedTableCompactorSourceBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CombinedTableCompactorSourceBuilder.java @@ -19,7 +19,7 @@ package org.apache.paimon.flink.source; import org.apache.paimon.append.MultiTableUnawareAppendCompactionTask; -import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.flink.LogicalTypeConversion; import org.apache.paimon.flink.source.operator.CombinedAwareBatchSource; import org.apache.paimon.flink.source.operator.CombinedAwareStreamingSource; @@ -44,7 +44,8 @@ * compactor jobs in combined mode. */ public class CombinedTableCompactorSourceBuilder { - private final Catalog.Loader catalogLoader; + + private final CatalogLoader catalogLoader; private final Pattern includingPattern; private final Pattern excludingPattern; private final Pattern databasePattern; @@ -55,7 +56,7 @@ public class CombinedTableCompactorSourceBuilder { @Nullable private Duration partitionIdleTime = null; public CombinedTableCompactorSourceBuilder( - Catalog.Loader catalogLoader, + CatalogLoader catalogLoader, Pattern databasePattern, Pattern includingPattern, Pattern excludingPattern, diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSource.java index c3a1258bb176..2f7a82c95184 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSource.java @@ -18,7 +18,7 @@ package org.apache.paimon.flink.source.operator; -import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.flink.compact.MultiAwareBucketTableScan; import org.apache.paimon.flink.compact.MultiTableScanBase; import org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader; @@ -54,7 +54,7 @@ public class CombinedAwareBatchSource extends CombinedCompactorSource buildSource( StreamExecutionEnvironment env, String name, TypeInformation typeInfo, - Catalog.Loader catalogLoader, + CatalogLoader catalogLoader, Pattern includingPattern, Pattern excludingPattern, Pattern databasePattern, diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSource.java index 9bd4a84f571c..a23a3b41a441 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSource.java @@ -18,7 +18,7 @@ package org.apache.paimon.flink.source.operator; -import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.flink.compact.MultiAwareBucketTableScan; import org.apache.paimon.flink.compact.MultiTableScanBase; import org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader; @@ -51,7 +51,7 @@ public class CombinedAwareStreamingSource extends CombinedCompactorSource buildSource( StreamExecutionEnvironment env, String name, TypeInformation typeInfo, - Catalog.Loader catalogLoader, + CatalogLoader catalogLoader, Pattern includingPattern, Pattern excludingPattern, Pattern databasePattern, diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedCompactorSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedCompactorSource.java index f58d86cdd65e..e292d2441ccd 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedCompactorSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedCompactorSource.java @@ -19,7 +19,7 @@ package org.apache.paimon.flink.source.operator; import org.apache.paimon.append.UnawareAppendCompactionTask; -import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.flink.source.AbstractNonCoordinatedSource; import org.apache.paimon.table.source.Split; @@ -44,16 +44,17 @@ * single (non-parallel) monitoring task, it is responsible for the new Paimon table. */ public abstract class CombinedCompactorSource extends AbstractNonCoordinatedSource { - private static final long serialVersionUID = 2L; - protected final Catalog.Loader catalogLoader; + private static final long serialVersionUID = 3L; + + protected final CatalogLoader catalogLoader; protected final Pattern includingPattern; protected final Pattern excludingPattern; protected final Pattern databasePattern; protected final boolean isStreaming; public CombinedCompactorSource( - Catalog.Loader catalogLoader, + CatalogLoader catalogLoader, Pattern includingPattern, Pattern excludingPattern, Pattern databasePattern, diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSource.java index 64f0c38f5a11..5c0d9c42dd29 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSource.java @@ -20,6 +20,7 @@ import org.apache.paimon.append.MultiTableUnawareAppendCompactionTask; import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.flink.compact.MultiTableScanBase; @@ -63,7 +64,7 @@ public class CombinedUnawareBatchSource private static final Logger LOGGER = LoggerFactory.getLogger(CombinedUnawareBatchSource.class); public CombinedUnawareBatchSource( - Catalog.Loader catalogLoader, + CatalogLoader catalogLoader, Pattern includingPattern, Pattern excludingPattern, Pattern databasePattern) { @@ -121,7 +122,7 @@ public void close() throws Exception { public static DataStream buildSource( StreamExecutionEnvironment env, String name, - Catalog.Loader catalogLoader, + CatalogLoader catalogLoader, Pattern includingPattern, Pattern excludingPattern, Pattern databasePattern, diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareStreamingSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareStreamingSource.java index 6ea1ead4db30..2e38d538a999 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareStreamingSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareStreamingSource.java @@ -19,7 +19,7 @@ package org.apache.paimon.flink.source.operator; import org.apache.paimon.append.MultiTableUnawareAppendCompactionTask; -import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.flink.compact.MultiTableScanBase; import org.apache.paimon.flink.compact.MultiUnawareBucketTableScan; import org.apache.paimon.flink.sink.MultiTableCompactionTaskTypeInfo; @@ -48,7 +48,7 @@ public class CombinedUnawareStreamingSource private final long monitorInterval; public CombinedUnawareStreamingSource( - Catalog.Loader catalogLoader, + CatalogLoader catalogLoader, Pattern includingPattern, Pattern excludingPattern, Pattern databasePattern, @@ -104,7 +104,7 @@ public void close() throws Exception { public static DataStream buildSource( StreamExecutionEnvironment env, String name, - Catalog.Loader catalogLoader, + CatalogLoader catalogLoader, Pattern includingPattern, Pattern excludingPattern, Pattern databasePattern, diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java index fbc8bb9d756a..ae3099ec0628 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink.source.operator; import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; @@ -60,18 +61,18 @@ public class MultiTablesReadOperator extends AbstractStreamOperator private static final long serialVersionUID = 1L; - private final Catalog.Loader catalogLoader; + private final CatalogLoader catalogLoader; private final boolean isStreaming; private Duration partitionIdleTime = null; - public MultiTablesReadOperator(Catalog.Loader catalogLoader, boolean isStreaming) { + public MultiTablesReadOperator(CatalogLoader catalogLoader, boolean isStreaming) { this.catalogLoader = catalogLoader; this.isStreaming = isStreaming; } public MultiTablesReadOperator( - Catalog.Loader catalogLoader, boolean isStreaming, Duration partitionIdleTime) { + CatalogLoader catalogLoader, boolean isStreaming, Duration partitionIdleTime) { this.catalogLoader = catalogLoader; this.isStreaming = isStreaming; this.partitionIdleTime = partitionIdleTime; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiUnawareTablesReadOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiUnawareTablesReadOperator.java index 0864741a178f..15fde93755fd 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiUnawareTablesReadOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiUnawareTablesReadOperator.java @@ -20,6 +20,7 @@ import org.apache.paimon.append.MultiTableUnawareAppendCompactionTask; import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.manifest.PartitionEntry; @@ -54,12 +55,11 @@ public class MultiUnawareTablesReadOperator private static final Logger LOG = LoggerFactory.getLogger(MultiUnawareTablesReadOperator.class); - private final Catalog.Loader catalogLoader; + private final CatalogLoader catalogLoader; private final Duration partitionIdleTime; - public MultiUnawareTablesReadOperator( - Catalog.Loader catalogLoader, Duration partitionIdleTime) { + public MultiUnawareTablesReadOperator(CatalogLoader catalogLoader, Duration partitionIdleTime) { this.catalogLoader = catalogLoader; this.partitionIdleTime = partitionIdleTime; } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java index d487d75925eb..0e85f559d9de 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java @@ -20,6 +20,7 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.BinaryRowWriter; @@ -273,7 +274,7 @@ protected StoreCompactOperator.Factory createCompactOperator(FileStoreTable tabl } protected MultiTablesStoreCompactOperator.Factory createMultiTablesCompactOperator( - Catalog.Loader catalogLoader) throws Exception { + CatalogLoader catalogLoader) throws Exception { return new MultiTablesStoreCompactOperator.Factory( catalogLoader, commitUser, diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java index 752679fb5903..53e3a6dcb79c 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java @@ -22,6 +22,7 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.BinaryString; @@ -78,7 +79,7 @@ class StoreMultiCommitterTest { private String initialCommitUser; private Path warehouse; - private Catalog.Loader catalogLoader; + private CatalogLoader catalogLoader; private Catalog catalog; private Identifier firstTable; private Identifier secondTable; @@ -691,7 +692,7 @@ public void snapshotState( return harness; } - private Catalog.Loader createCatalogLoader() { + private CatalogLoader createCatalogLoader() { Options catalogOptions = createCatalogOptions(warehouse); return () -> CatalogFactory.createCatalog(CatalogContext.create(catalogOptions)); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/HmsReporterTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporterTest.java similarity index 95% rename from paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/HmsReporterTest.java rename to paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporterTest.java index f245940da57d..142a0c32f781 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/HmsReporterTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporterTest.java @@ -49,8 +49,8 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; -/** Test for {@link HmsReporter}. */ -public class HmsReporterTest { +/** Test for {@link PartitionStatisticsReporter}. */ +public class PartitionStatisticsReporterTest { @TempDir java.nio.file.Path tempDir; @@ -131,7 +131,7 @@ public void close() throws Exception { } }; - HmsReporter action = new HmsReporter(table, client); + PartitionStatisticsReporter action = new PartitionStatisticsReporter(table, client); long time = 1729598544974L; action.report("c1=a/", time); Assertions.assertThat(partitionParams).containsKey("c1=a/"); @@ -144,6 +144,8 @@ public void close() throws Exception { "591", "numRows", "1", + "lastUpdateTime", + String.valueOf(time / 1000), "transient_lastDdlTime", String.valueOf(time / 1000))); action.close(); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase.java index fba5f33807de..3b41f39431bd 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase.java @@ -21,6 +21,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.BinaryRowWriter; @@ -662,7 +663,7 @@ private BinaryRow binaryRow(String dt, int hh) { return b; } - private Catalog.Loader catalogLoader() { + private CatalogLoader catalogLoader() { // to make the action workflow serializable catalogOptions.set(CatalogOptions.WAREHOUSE, warehouse); return () -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions); diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index c74ede981546..5744ac894d12 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -103,7 +103,7 @@ import static org.apache.paimon.hive.HiveCatalogOptions.IDENTIFIER; import static org.apache.paimon.hive.HiveCatalogOptions.LOCATION_IN_PROPERTIES; import static org.apache.paimon.hive.HiveTableUtils.convertToFormatTable; -import static org.apache.paimon.options.CatalogOptions.ALLOW_UPPER_CASE; +import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE; import static org.apache.paimon.options.CatalogOptions.FORMAT_TABLE_ENABLED; import static org.apache.paimon.options.CatalogOptions.SYNC_ALL_PROPERTIES; import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE; @@ -872,8 +872,8 @@ private void alterTableToHms(Table table, Identifier identifier, TableSchema new } @Override - public boolean allowUpperCase() { - return catalogOptions.getOptional(ALLOW_UPPER_CASE).orElse(false); + public boolean caseSensitive() { + return catalogOptions.getOptional(CASE_SENSITIVE).orElse(false); } @Override @@ -931,7 +931,6 @@ public void repairDatabase(String databaseName) { public void repairTable(Identifier identifier) throws TableNotExistException { checkNotBranch(identifier, "repairTable"); checkNotSystemTable(identifier, "repairTable"); - validateIdentifierNameCaseInsensitive(identifier); Path location = getTableLocation(identifier); TableSchema tableSchema = diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java index d6318c723fe0..de6e2414fc8f 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java @@ -69,7 +69,7 @@ import static org.apache.paimon.CoreOptions.FILE_FORMAT; import static org.apache.paimon.CoreOptions.TYPE; import static org.apache.paimon.TableType.FORMAT_TABLE; -import static org.apache.paimon.options.CatalogOptions.ALLOW_UPPER_CASE; +import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE; import static org.apache.paimon.spark.SparkCatalogOptions.DEFAULT_DATABASE; import static org.apache.paimon.spark.SparkTypeUtils.toPaimonType; import static org.apache.paimon.spark.util.OptionUtils.copyWithSQLConf; @@ -96,9 +96,9 @@ public void initialize(String name, CaseInsensitiveStringMap options) { CatalogContext catalogContext = CatalogContext.create(Options.fromMap(options), sessionState.newHadoopConf()); - // if spark is case-insensitive, set allow upper case to catalog + // if spark is case-insensitive, set case-sensitive to catalog if (!sessionState.conf().caseSensitiveAnalysis()) { - newOptions.put(ALLOW_UPPER_CASE.key(), "true"); + newOptions.put(CASE_SENSITIVE.key(), "true"); } options = new CaseInsensitiveStringMap(newOptions); diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java index 6b7b17b1b1a5..b57228fa44f0 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java @@ -62,7 +62,7 @@ import java.util.Map; import java.util.concurrent.Callable; -import static org.apache.paimon.options.CatalogOptions.ALLOW_UPPER_CASE; +import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE; import static org.apache.paimon.options.CatalogOptions.METASTORE; import static org.apache.paimon.options.CatalogOptions.WAREHOUSE; import static org.apache.paimon.spark.SparkCatalogOptions.CREATE_UNDERLYING_SESSION_CATALOG; @@ -331,9 +331,9 @@ private void fillCommonConfigurations(Map options, SQLConf sqlCo options.put(DEFAULT_DATABASE.key(), sessionCatalogDefaultDatabase); } - // if spark is case-insensitive, set allow upper case to catalog + // if spark is case-insensitive, set case-sensitive to catalog if (!sqlConf.caseSensitiveAnalysis()) { - options.put(ALLOW_UPPER_CASE.key(), "true"); + options.put(CASE_SENSITIVE.key(), "true"); } }