diff --git a/docs/content/maintenance/configurations.md b/docs/content/maintenance/configurations.md
index 3849d70a5862..99f797e68fa6 100644
--- a/docs/content/maintenance/configurations.md
+++ b/docs/content/maintenance/configurations.md
@@ -38,12 +38,6 @@ Options for paimon catalog.
{{< generated/catalog_configuration >}}
-### FilesystemCatalogOptions
-
-Options for Filesystem catalog.
-
-{{< generated/file_system_catalog_configuration >}}
-
### HiveCatalogOptions
Options for Hive catalog.
diff --git a/docs/content/program-api/flink-api.md b/docs/content/program-api/flink-api.md
index 6ecac3909ced..3451b40d5880 100644
--- a/docs/content/program-api/flink-api.md
+++ b/docs/content/program-api/flink-api.md
@@ -221,7 +221,7 @@ public class WriteCdcToTable {
Identifier identifier = Identifier.create("my_db", "T");
Options catalogOptions = new Options();
catalogOptions.set("warehouse", "/path/to/warehouse");
- Catalog.Loader catalogLoader =
+ CatalogLoader catalogLoader =
() -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
Table table = catalogLoader.load().getTable(identifier);
diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html b/docs/layouts/shortcodes/generated/catalog_configuration.html
index 63f7adda1e0d..6355c9558653 100644
--- a/docs/layouts/shortcodes/generated/catalog_configuration.html
+++ b/docs/layouts/shortcodes/generated/catalog_configuration.html
@@ -26,12 +26,6 @@
-
- allow-upper-case |
- (none) |
- Boolean |
- Indicates whether this catalog allow upper case, its default value depends on the implementation of the specific catalog. |
-
cache-enabled |
true |
@@ -74,6 +68,12 @@
Integer |
Controls the max number for snapshots per table in the catalog are cached. |
+
+ case-sensitive |
+ (none) |
+ Boolean |
+ Indicates whether this catalog is case-sensitive. |
+
client-pool-size |
2 |
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index 15b1aac93543..1133de289fa3 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -230,6 +230,12 @@
String |
Specify the file name prefix of data files. |
+
+ data-file.thin-mode |
+ false |
+ Boolean |
+ Enable data file thin mode to avoid duplicate columns storage. |
+
delete-file.thread-num |
(none) |
@@ -864,12 +870,6 @@
Integer |
Default spill compression zstd level. For higher compression rates, it can be configured to 9, but the read and write speed will significantly decrease. |
-
- data-file.thin-mode |
- false |
- Boolean |
- Enable data file thin mode to avoid duplicate columns storage. |
-
streaming-read-mode |
(none) |
diff --git a/docs/layouts/shortcodes/generated/file_system_catalog_configuration.html b/docs/layouts/shortcodes/generated/file_system_catalog_configuration.html
deleted file mode 100644
index c416ed6da557..000000000000
--- a/docs/layouts/shortcodes/generated/file_system_catalog_configuration.html
+++ /dev/null
@@ -1,36 +0,0 @@
-{{/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/}}
-
-
-
- Key |
- Default |
- Type |
- Description |
-
-
-
-
- case-sensitive |
- true |
- Boolean |
- Is case sensitive. If case insensitive, you need to set this option to false, and the table name and fields be converted to lowercase. |
-
-
-
diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowBundleRecords.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowBundleRecords.java
index 9627bbd85d8c..25f6603ec22e 100644
--- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowBundleRecords.java
+++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowBundleRecords.java
@@ -32,13 +32,13 @@ public class ArrowBundleRecords implements BundleRecords {
private final VectorSchemaRoot vectorSchemaRoot;
private final RowType rowType;
- private final boolean allowUpperCase;
+ private final boolean caseSensitive;
public ArrowBundleRecords(
- VectorSchemaRoot vectorSchemaRoot, RowType rowType, boolean allowUpperCase) {
+ VectorSchemaRoot vectorSchemaRoot, RowType rowType, boolean caseSensitive) {
this.vectorSchemaRoot = vectorSchemaRoot;
this.rowType = rowType;
- this.allowUpperCase = allowUpperCase;
+ this.caseSensitive = caseSensitive;
}
public VectorSchemaRoot getVectorSchemaRoot() {
@@ -52,7 +52,7 @@ public long rowCount() {
@Override
public Iterator iterator() {
- ArrowBatchReader arrowBatchReader = new ArrowBatchReader(rowType, allowUpperCase);
+ ArrowBatchReader arrowBatchReader = new ArrowBatchReader(rowType, caseSensitive);
return arrowBatchReader.readBatch(vectorSchemaRoot).iterator();
}
}
diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java
index b3925a0a769e..0f6a98b7a2e0 100644
--- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java
+++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java
@@ -55,6 +55,8 @@
import java.util.List;
import java.util.stream.Collectors;
+import static org.apache.paimon.utils.StringUtils.toLowerCaseIfNeed;
+
/** Utilities for creating Arrow objects. */
public class ArrowUtils {
@@ -66,13 +68,13 @@ public static VectorSchemaRoot createVectorSchemaRoot(
}
public static VectorSchemaRoot createVectorSchemaRoot(
- RowType rowType, BufferAllocator allocator, boolean allowUpperCase) {
+ RowType rowType, BufferAllocator allocator, boolean caseSensitive) {
List fields =
rowType.getFields().stream()
.map(
f ->
toArrowField(
- allowUpperCase ? f.name() : f.name().toLowerCase(),
+ toLowerCaseIfNeed(f.name(), caseSensitive),
f.id(),
f.type(),
0))
@@ -81,9 +83,9 @@ public static VectorSchemaRoot createVectorSchemaRoot(
}
public static FieldVector createVector(
- DataField dataField, BufferAllocator allocator, boolean allowUpperCase) {
+ DataField dataField, BufferAllocator allocator, boolean caseSensitive) {
return toArrowField(
- allowUpperCase ? dataField.name() : dataField.name().toLowerCase(),
+ toLowerCaseIfNeed(dataField.name(), caseSensitive),
dataField.id(),
dataField.type(),
0)
diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/reader/ArrowBatchReader.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/reader/ArrowBatchReader.java
index 9d20062b437b..b626758dedfc 100644
--- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/reader/ArrowBatchReader.java
+++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/reader/ArrowBatchReader.java
@@ -34,6 +34,8 @@
import java.util.Iterator;
import java.util.List;
+import static org.apache.paimon.utils.StringUtils.toLowerCaseIfNeed;
+
/** Reader from a {@link VectorSchemaRoot} to paimon rows. */
public class ArrowBatchReader {
@@ -41,9 +43,9 @@ public class ArrowBatchReader {
private final VectorizedColumnBatch batch;
private final Arrow2PaimonVectorConverter[] convertors;
private final RowType projectedRowType;
- private final boolean allowUpperCase;
+ private final boolean caseSensitive;
- public ArrowBatchReader(RowType rowType, boolean allowUpperCase) {
+ public ArrowBatchReader(RowType rowType, boolean caseSensitive) {
this.internalRowSerializer = new InternalRowSerializer(rowType);
ColumnVector[] columnVectors = new ColumnVector[rowType.getFieldCount()];
this.convertors = new Arrow2PaimonVectorConverter[rowType.getFieldCount()];
@@ -53,7 +55,7 @@ public ArrowBatchReader(RowType rowType, boolean allowUpperCase) {
for (int i = 0; i < columnVectors.length; i++) {
this.convertors[i] = Arrow2PaimonVectorConverter.construct(rowType.getTypeAt(i));
}
- this.allowUpperCase = allowUpperCase;
+ this.caseSensitive = caseSensitive;
}
public Iterable readBatch(VectorSchemaRoot vsr) {
@@ -63,8 +65,7 @@ public Iterable readBatch(VectorSchemaRoot vsr) {
for (int i = 0; i < dataFields.size(); ++i) {
try {
String fieldName = dataFields.get(i).name();
- Field field =
- arrowSchema.findField(allowUpperCase ? fieldName : fieldName.toLowerCase());
+ Field field = arrowSchema.findField(toLowerCaseIfNeed(fieldName, caseSensitive));
int idx = arrowSchema.getFields().indexOf(field);
mapping[i] = idx;
} catch (IllegalArgumentException e) {
diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatCWriter.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatCWriter.java
index 10afcaf6917a..442457813ace 100644
--- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatCWriter.java
+++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatCWriter.java
@@ -37,8 +37,8 @@ public class ArrowFormatCWriter implements AutoCloseable {
private final ArrowSchema schema;
private final ArrowFormatWriter realWriter;
- public ArrowFormatCWriter(RowType rowType, int writeBatchSize, boolean allowUpperCase) {
- this.realWriter = new ArrowFormatWriter(rowType, writeBatchSize, allowUpperCase);
+ public ArrowFormatCWriter(RowType rowType, int writeBatchSize, boolean caseSensitive) {
+ this.realWriter = new ArrowFormatWriter(rowType, writeBatchSize, caseSensitive);
RootAllocator allocator = realWriter.getAllocator();
array = ArrowArray.allocateNew(allocator);
schema = ArrowSchema.allocateNew(allocator);
diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java
index acdb5d0dcb1d..9f557921979b 100644
--- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java
+++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java
@@ -43,10 +43,10 @@ public class ArrowFormatWriter implements AutoCloseable {
private final RootAllocator allocator;
private int rowId;
- public ArrowFormatWriter(RowType rowType, int writeBatchSize, boolean allowUpperCase) {
+ public ArrowFormatWriter(RowType rowType, int writeBatchSize, boolean caseSensitive) {
allocator = new RootAllocator();
- vectorSchemaRoot = ArrowUtils.createVectorSchemaRoot(rowType, allocator, allowUpperCase);
+ vectorSchemaRoot = ArrowUtils.createVectorSchemaRoot(rowType, allocator, caseSensitive);
fieldWriters = new ArrowFieldWriter[rowType.getFieldCount()];
diff --git a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
index bb8cfae68284..b22274e011fb 100644
--- a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
@@ -128,13 +128,12 @@ public class CatalogOptions {
.withDescription(
"Controls the max number for snapshots per table in the catalog are cached.");
- public static final ConfigOption ALLOW_UPPER_CASE =
- ConfigOptions.key("allow-upper-case")
+ public static final ConfigOption CASE_SENSITIVE =
+ ConfigOptions.key("case-sensitive")
.booleanType()
.noDefaultValue()
- .withDescription(
- "Indicates whether this catalog allow upper case, "
- + "its default value depends on the implementation of the specific catalog.");
+ .withFallbackKeys("allow-upper-case")
+ .withDescription("Indicates whether this catalog is case-sensitive.");
public static final ConfigOption SYNC_ALL_PROPERTIES =
ConfigOptions.key("sync-all-properties")
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
index e184624c0bbf..c4e07e0a6972 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
@@ -542,8 +542,8 @@ public static String quote(String str) {
return "`" + str + "`";
}
- public static String caseSensitiveConversion(String str, boolean allowUpperCase) {
- return allowUpperCase ? str : str.toLowerCase();
+ public static String toLowerCaseIfNeed(String str, boolean caseSensitive) {
+ return caseSensitive ? str : str.toLowerCase();
}
public static boolean isNumeric(final CharSequence cs) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index a1b41e3b8a41..db6909295556 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -60,7 +60,6 @@
import static org.apache.paimon.CoreOptions.TYPE;
import static org.apache.paimon.CoreOptions.createCommitUser;
-import static org.apache.paimon.options.CatalogOptions.ALLOW_UPPER_CASE;
import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
@@ -82,7 +81,7 @@ protected AbstractCatalog(FileIO fileIO) {
protected AbstractCatalog(FileIO fileIO, Options options) {
this.fileIO = fileIO;
- this.tableDefaultOptions = Catalog.tableDefaultOptions(options.toMap());
+ this.tableDefaultOptions = CatalogUtils.tableDefaultOptions(options.toMap());
this.catalogOptions = options;
}
@@ -123,11 +122,6 @@ protected boolean lockEnabled() {
return catalogOptions.getOptional(LOCK_ENABLED).orElse(fileIO.isObjectStore());
}
- @Override
- public boolean allowUpperCase() {
- return catalogOptions.getOptional(ALLOW_UPPER_CASE).orElse(true);
- }
-
protected boolean allowCustomTablePath() {
return false;
}
@@ -559,8 +553,9 @@ protected void checkNotSystemDatabase(String database) {
}
protected void validateIdentifierNameCaseInsensitive(Identifier identifier) {
- Catalog.validateCaseInsensitive(allowUpperCase(), "Database", identifier.getDatabaseName());
- Catalog.validateCaseInsensitive(allowUpperCase(), "Table", identifier.getObjectName());
+ CatalogUtils.validateCaseInsensitive(
+ caseSensitive(), "Database", identifier.getDatabaseName());
+ CatalogUtils.validateCaseInsensitive(caseSensitive(), "Table", identifier.getObjectName());
}
private void validateFieldNameCaseInsensitiveInSchemaChange(List changes) {
@@ -578,7 +573,7 @@ private void validateFieldNameCaseInsensitiveInSchemaChange(List c
}
protected void validateFieldNameCaseInsensitive(List fieldNames) {
- Catalog.validateCaseInsensitive(allowUpperCase(), "Field", fieldNames);
+ CatalogUtils.validateCaseInsensitive(caseSensitive(), "Field", fieldNames);
}
private void validateAutoCreateClose(Map options) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index c3808caa135a..7b1fe0ea072e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -26,15 +26,9 @@
import org.apache.paimon.table.Table;
import org.apache.paimon.view.View;
-import java.io.Serializable;
-import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
-
-import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
/**
* This interface is responsible for reading and writing metadata such as database/table from a
@@ -46,30 +40,38 @@
@Public
public interface Catalog extends AutoCloseable {
- String DEFAULT_DATABASE = "default";
-
+ // constants for system table and database
String SYSTEM_TABLE_SPLITTER = "$";
String SYSTEM_DATABASE_NAME = "sys";
String SYSTEM_BRANCH_PREFIX = "branch_";
- String TABLE_DEFAULT_OPTION_PREFIX = "table-default.";
- String DB_SUFFIX = ".db";
+ // constants for table and database
String COMMENT_PROP = "comment";
String OWNER_PROP = "owner";
+
+ // constants for database
+ String DEFAULT_DATABASE = "default";
+ String DB_SUFFIX = ".db";
String DB_LOCATION_PROP = "location";
+
+ // constants for table
+ String TABLE_DEFAULT_OPTION_PREFIX = "table-default.";
String NUM_ROWS_PROP = "numRows";
String NUM_FILES_PROP = "numFiles";
String TOTAL_SIZE_PROP = "totalSize";
String LAST_UPDATE_TIME_PROP = "lastUpdateTime";
- String HIVE_LAST_UPDATE_TIME_PROP = "transient_lastDdlTime";
- /** Warehouse root path containing all database directories in this catalog. */
+ /** Warehouse root path for creating new databases. */
String warehouse();
- /** Catalog options. */
+ /** {@link FileIO} of this catalog. It can access {@link #warehouse()} path. */
+ FileIO fileIO();
+
+ /** Catalog options for re-creating this catalog. */
Map options();
- FileIO fileIO();
+ /** Return a boolean that indicates whether this catalog is case-sensitive. */
+ boolean caseSensitive();
/**
* Get the names of all databases in this catalog.
@@ -325,44 +327,30 @@ default void renameView(Identifier fromView, Identifier toView, boolean ignoreIf
throw new UnsupportedOperationException();
}
- /** Return a boolean that indicates whether this catalog allow upper case. */
- boolean allowUpperCase();
-
+ /**
+ * Repair the entire Catalog, repair the metadata in the metastore consistent with the metadata
+ * in the filesystem, register missing tables in the metastore.
+ */
default void repairCatalog() {
throw new UnsupportedOperationException();
}
+ /**
+ * Repair the entire database, repair the metadata in the metastore consistent with the metadata
+ * in the filesystem, register missing tables in the metastore.
+ */
default void repairDatabase(String databaseName) {
throw new UnsupportedOperationException();
}
+ /**
+ * Repair the table, repair the metadata in the metastore consistent with the metadata in the
+ * filesystem.
+ */
default void repairTable(Identifier identifier) throws TableNotExistException {
throw new UnsupportedOperationException();
}
- static Map tableDefaultOptions(Map options) {
- return convertToPropertiesPrefixKey(options, TABLE_DEFAULT_OPTION_PREFIX);
- }
-
- /** Validate database, table and field names must be lowercase when not case-sensitive. */
- static void validateCaseInsensitive(boolean caseSensitive, String type, String... names) {
- validateCaseInsensitive(caseSensitive, type, Arrays.asList(names));
- }
-
- /** Validate database, table and field names must be lowercase when not case-sensitive. */
- static void validateCaseInsensitive(boolean caseSensitive, String type, List names) {
- if (caseSensitive) {
- return;
- }
- List illegalNames =
- names.stream().filter(f -> !f.equals(f.toLowerCase())).collect(Collectors.toList());
- checkArgument(
- illegalNames.isEmpty(),
- String.format(
- "%s name %s cannot contain upper case in the catalog.",
- type, illegalNames));
- }
-
/** Exception for trying to drop on a database that is not empty. */
class DatabaseNotEmptyException extends Exception {
private static final String MSG = "Database %s is not empty.";
@@ -599,10 +587,4 @@ public Identifier identifier() {
return identifier;
}
}
-
- /** Loader of {@link Catalog}. */
- @FunctionalInterface
- interface Loader extends Serializable {
- Catalog load();
- }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogOptions.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLoader.java
similarity index 55%
rename from paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogOptions.java
rename to paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLoader.java
index e656742b42e9..c8de08139cb7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLoader.java
@@ -18,19 +18,11 @@
package org.apache.paimon.catalog;
-import org.apache.paimon.options.ConfigOption;
-import org.apache.paimon.options.ConfigOptions;
+import java.io.Serializable;
-/** Options for filesystem catalog. */
-public final class FileSystemCatalogOptions {
+/** Loader for creating a {@link Catalog}. */
+@FunctionalInterface
+public interface CatalogLoader extends Serializable {
- public static final ConfigOption CASE_SENSITIVE =
- ConfigOptions.key("case-sensitive")
- .booleanType()
- .defaultValue(true)
- .withFallbackKeys("allow-upper-case")
- .withDescription(
- "Is case sensitive. If case insensitive, you need to set this option to false, and the table name and fields be converted to lowercase.");
-
- private FileSystemCatalogOptions() {}
+ Catalog load();
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
index 39f81833a9eb..bae23c627607 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
@@ -21,6 +21,15 @@
import org.apache.paimon.fs.Path;
import org.apache.paimon.schema.SchemaManager;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.catalog.Catalog.TABLE_DEFAULT_OPTION_PREFIX;
+import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
/** Utils for {@link Catalog}. */
public class CatalogUtils {
@@ -51,4 +60,29 @@ public static String table(Path path) {
public static String table(String path) {
return SchemaManager.identifierFromPath(path, false).getObjectName();
}
+
+ public static Map tableDefaultOptions(Map options) {
+ return convertToPropertiesPrefixKey(options, TABLE_DEFAULT_OPTION_PREFIX);
+ }
+
+ /** Validate database, table and field names must be lowercase when not case-sensitive. */
+ public static void validateCaseInsensitive(
+ boolean caseSensitive, String type, String... names) {
+ validateCaseInsensitive(caseSensitive, type, Arrays.asList(names));
+ }
+
+ /** Validate database, table and field names must be lowercase when not case-sensitive. */
+ public static void validateCaseInsensitive(
+ boolean caseSensitive, String type, List names) {
+ if (caseSensitive) {
+ return;
+ }
+ List illegalNames =
+ names.stream().filter(f -> !f.equals(f.toLowerCase())).collect(Collectors.toList());
+ checkArgument(
+ illegalNames.isEmpty(),
+ String.format(
+ "%s name %s cannot contain upper case in the catalog.",
+ type, illegalNames));
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
index 2298626b0e48..93e8ce2581ad 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
@@ -42,8 +42,8 @@ public Catalog wrapped() {
}
@Override
- public boolean allowUpperCase() {
- return wrapped.allowUpperCase();
+ public boolean caseSensitive() {
+ return wrapped.caseSensitive();
}
@Override
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
index 9264a54647b1..279ddb26ee53 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
@@ -34,7 +34,7 @@
import java.util.Map;
import java.util.concurrent.Callable;
-import static org.apache.paimon.catalog.FileSystemCatalogOptions.CASE_SENSITIVE;
+import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE;
/** A catalog implementation for {@link FileIO}. */
public class FileSystemCatalog extends AbstractCatalog {
@@ -158,7 +158,7 @@ public String warehouse() {
}
@Override
- public boolean allowUpperCase() {
- return catalogOptions.get(CASE_SENSITIVE);
+ public boolean caseSensitive() {
+ return catalogOptions.getOptional(CASE_SENSITIVE).orElse(true);
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
index 778bc591fe89..551b2d8fc910 100644
--- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
@@ -320,7 +320,7 @@ protected TableSchema getDataTableSchema(Identifier identifier) throws TableNotE
}
@Override
- public boolean allowUpperCase() {
+ public boolean caseSensitive() {
return false;
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index 86b87e25e832..c30e1109e2ec 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -52,6 +52,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
+import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE;
import static org.apache.paimon.utils.ThreadPoolUtils.createScheduledThreadPool;
/** A catalog implementation for REST. */
@@ -61,7 +62,7 @@ public class RESTCatalog implements Catalog {
private final RESTClient client;
private final ResourcePaths resourcePaths;
- private final Map options;
+ private final Options options;
private final Map baseHeader;
private final AuthSession catalogAuth;
@@ -99,7 +100,7 @@ public RESTCatalog(Options options) {
}
Map initHeaders =
RESTUtil.merge(configHeaders(options.toMap()), this.catalogAuth.getHeaders());
- this.options = fetchOptionsFromServer(initHeaders, options.toMap());
+ this.options = new Options(fetchOptionsFromServer(initHeaders, options.toMap()));
this.resourcePaths =
ResourcePaths.forCatalogProperties(
this.options.get(RESTCatalogInternalOptions.PREFIX));
@@ -112,7 +113,7 @@ public String warehouse() {
@Override
public Map options() {
- return this.options;
+ return this.options.toMap();
}
@Override
@@ -223,8 +224,8 @@ public List listPartitions(Identifier identifier)
}
@Override
- public boolean allowUpperCase() {
- return false;
+ public boolean caseSensitive() {
+ return options.getOptional(CASE_SENSITIVE).orElse(true);
}
@Override
diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java
index 303a9d8733d4..65ea6721c220 100644
--- a/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java
@@ -36,12 +36,12 @@ public class FileSystemCatalogTest extends CatalogTestBase {
public void setUp() throws Exception {
super.setUp();
Options catalogOptions = new Options();
- catalogOptions.set(CatalogOptions.ALLOW_UPPER_CASE, false);
+ catalogOptions.set(CatalogOptions.CASE_SENSITIVE, false);
catalog = new FileSystemCatalog(fileIO, new Path(warehouse), catalogOptions);
}
@Test
- public void testCreateTableAllowUpperCase() throws Exception {
+ public void testCreateTableCaseSensitive() throws Exception {
catalog.createDatabase("test_db", false);
Identifier identifier = Identifier.create("test_db", "new_table");
Schema schema =
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
index c8af6f91c420..6482a625f4c7 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
@@ -42,7 +42,7 @@
import static org.apache.paimon.flink.action.MultiTablesSinkMode.DIVIDED;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkState;
-import static org.apache.paimon.utils.StringUtils.caseSensitiveConversion;
+import static org.apache.paimon.utils.StringUtils.toLowerCaseIfNeed;
/** Common utils for CDC Action. */
public class CdcActionCommonUtils {
@@ -129,21 +129,21 @@ public static Schema buildPaimonSchema(
List allFieldNames = new ArrayList<>();
for (DataField field : sourceSchema.fields()) {
- String fieldName = caseSensitiveConversion(field.name(), caseSensitive);
+ String fieldName = toLowerCaseIfNeed(field.name(), caseSensitive);
allFieldNames.add(fieldName);
builder.column(fieldName, field.type(), field.description());
}
for (ComputedColumn computedColumn : computedColumns) {
String computedColumnName =
- caseSensitiveConversion(computedColumn.columnName(), caseSensitive);
+ toLowerCaseIfNeed(computedColumn.columnName(), caseSensitive);
allFieldNames.add(computedColumnName);
builder.column(computedColumnName, computedColumn.columnType());
}
for (CdcMetadataConverter metadataConverter : metadataConverters) {
String metadataColumnName =
- caseSensitiveConversion(metadataConverter.columnName(), caseSensitive);
+ toLowerCaseIfNeed(metadataConverter.columnName(), caseSensitive);
allFieldNames.add(metadataColumnName);
builder.column(metadataColumnName, metadataConverter.dataType());
}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
index 2e0a1319293f..3290ec18291f 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
@@ -189,7 +189,7 @@ public static ReferencedField checkArgument(
String[] literals =
Arrays.stream(args).skip(1).map(String::trim).toArray(String[]::new);
String referencedFieldCheckForm =
- StringUtils.caseSensitiveConversion(referencedField, caseSensitive);
+ StringUtils.toLowerCaseIfNeed(referencedField, caseSensitive);
DataType fieldType =
checkNotNull(
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java
index 9c629b5a516f..3af0957ce6da 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java
@@ -86,7 +86,7 @@ protected Schema buildPaimonSchema(Schema retrievedSchema) {
tableConfig,
retrievedSchema,
metadataConverters,
- allowUpperCase,
+ caseSensitive,
true,
false);
}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
index 4fb1339c5193..56334c1e7bff 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink.action.cdc;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogUtils;
import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.MultiTablesSinkMode;
import org.apache.paimon.flink.sink.cdc.EventParser;
@@ -155,9 +156,9 @@ public SyncDatabaseActionBase withPrimaryKeys(String... primaryKeys) {
@Override
protected void validateCaseSensitivity() {
- Catalog.validateCaseInsensitive(allowUpperCase, "Database", database);
- Catalog.validateCaseInsensitive(allowUpperCase, "Table prefix", tablePrefix);
- Catalog.validateCaseInsensitive(allowUpperCase, "Table suffix", tableSuffix);
+ CatalogUtils.validateCaseInsensitive(caseSensitive, "Database", database);
+ CatalogUtils.validateCaseInsensitive(caseSensitive, "Table prefix", tablePrefix);
+ CatalogUtils.validateCaseInsensitive(caseSensitive, "Table suffix", tableSuffix);
}
@Override
@@ -179,7 +180,7 @@ protected EventParser.Factory buildEventParserFactory()
NewTableSchemaBuilder schemaBuilder =
new NewTableSchemaBuilder(
tableConfig,
- allowUpperCase,
+ caseSensitive,
partitionKeys,
primaryKeys,
requirePrimaryKeys(),
@@ -190,7 +191,7 @@ protected EventParser.Factory buildEventParserFactory()
excludingTables == null ? null : Pattern.compile(excludingTables);
TableNameConverter tableNameConverter =
new TableNameConverter(
- allowUpperCase,
+ caseSensitive,
mergeShards,
dbPrefix,
dbSuffix,
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
index 87efeb2a19cf..6fcdbd44bca2 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
@@ -20,6 +20,7 @@
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogUtils;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.Action;
@@ -107,15 +108,15 @@ protected Schema buildPaimonSchema(Schema retrievedSchema) {
tableConfig,
retrievedSchema,
metadataConverters,
- allowUpperCase,
+ caseSensitive,
true,
true);
}
@Override
protected void validateCaseSensitivity() {
- Catalog.validateCaseInsensitive(allowUpperCase, "Database", database);
- Catalog.validateCaseInsensitive(allowUpperCase, "Table", table);
+ CatalogUtils.validateCaseInsensitive(caseSensitive, "Database", database);
+ CatalogUtils.validateCaseInsensitive(caseSensitive, "Table", table);
}
@Override
@@ -142,7 +143,7 @@ protected void beforeBuildingSourceSink() throws Exception {
buildComputedColumns(
computedColumnArgs,
fileStoreTable.schema().fields(),
- allowUpperCase);
+ caseSensitive);
// check partition keys and primary keys in case that user specified them
checkConstraints();
}
@@ -162,7 +163,7 @@ protected FlatMapFunction recordParse()
@Override
protected EventParser.Factory buildEventParserFactory() {
- boolean caseSensitive = this.allowUpperCase;
+ boolean caseSensitive = this.caseSensitive;
return () -> new RichCdcMultiplexRecordEventParser(caseSensitive);
}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
index a7c770347410..d755b200a957 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
@@ -64,7 +64,7 @@ public abstract class SynchronizationActionBase extends ActionBase {
protected final String database;
protected final Configuration cdcSourceConfig;
protected final SyncJobHandler syncJobHandler;
- protected final boolean allowUpperCase;
+ protected final boolean caseSensitive;
protected Map tableConfig = new HashMap<>();
protected TypeMapping typeMapping = TypeMapping.defaultMapping();
@@ -80,7 +80,7 @@ public SynchronizationActionBase(
this.database = database;
this.cdcSourceConfig = Configuration.fromMap(cdcSourceConfig);
this.syncJobHandler = syncJobHandler;
- this.allowUpperCase = catalog.allowUpperCase();
+ this.caseSensitive = catalog.caseSensitive();
this.syncJobHandler.registerJdbcDriver();
}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TableNameConverter.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TableNameConverter.java
index 15fc3507ce2d..7dd63ed2273e 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TableNameConverter.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TableNameConverter.java
@@ -24,6 +24,8 @@
import java.util.HashMap;
import java.util.Map;
+import static org.apache.paimon.utils.StringUtils.toLowerCaseIfNeed;
+
/** Used to convert a MySQL source table name to corresponding Paimon table name. */
public class TableNameConverter implements Serializable {
@@ -78,7 +80,7 @@ public String convert(String originDbName, String originTblName) {
// top priority: table mapping
if (tableMapping.containsKey(originTblName.toLowerCase())) {
String mappedName = tableMapping.get(originTblName.toLowerCase());
- return caseSensitive ? mappedName : mappedName.toLowerCase();
+ return toLowerCaseIfNeed(mappedName, caseSensitive);
}
String tblPrefix = prefix;
@@ -93,7 +95,7 @@ public String convert(String originDbName, String originTblName) {
}
// third priority: normal prefix and suffix
- String tableName = caseSensitive ? originTblName : originTblName.toLowerCase();
+ String tableName = toLowerCaseIfNeed(originTblName, caseSensitive);
return tblPrefix + tableName + tblSuffix;
}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index ce2e9124a664..0f452e2834be 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -139,7 +139,7 @@ protected void beforeBuildingSourceSink() throws Exception {
TableNameConverter tableNameConverter =
new TableNameConverter(
- allowUpperCase, mergeShards, tablePrefix, tableSuffix, tableMapping);
+ caseSensitive, mergeShards, tablePrefix, tableSuffix, tableMapping);
for (JdbcTableInfo tableInfo : jdbcTableInfos) {
Identifier identifier =
Identifier.create(
@@ -155,7 +155,7 @@ protected void beforeBuildingSourceSink() throws Exception {
tableConfig,
tableInfo.schema(),
metadataConverters,
- allowUpperCase,
+ caseSensitive,
false,
true);
try {
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CaseSensitiveUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CaseSensitiveUtils.java
index 4892aee03024..e80692ed22f5 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CaseSensitiveUtils.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CaseSensitiveUtils.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.ProcessFunction;
@@ -28,8 +29,8 @@
public class CaseSensitiveUtils {
public static DataStream cdcRecordConvert(
- Catalog.Loader catalogLoader, DataStream input) {
- if (allowUpperCase(catalogLoader)) {
+ CatalogLoader catalogLoader, DataStream input) {
+ if (caseSensitive(catalogLoader)) {
return input;
}
@@ -46,8 +47,8 @@ public void processElement(
}
public static DataStream cdcMultiplexRecordConvert(
- Catalog.Loader catalogLoader, DataStream input) {
- if (allowUpperCase(catalogLoader)) {
+ CatalogLoader catalogLoader, DataStream input) {
+ if (caseSensitive(catalogLoader)) {
return input;
}
@@ -65,9 +66,9 @@ public void processElement(
.name("Case-insensitive Convert");
}
- private static boolean allowUpperCase(Catalog.Loader catalogLoader) {
+ private static boolean caseSensitive(CatalogLoader catalogLoader) {
try (Catalog catalog = catalogLoader.load()) {
- return catalog.allowUpperCase();
+ return catalog.caseSensitive();
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
index 886e33e2046a..4efcf1207e9f 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.types.DataField;
@@ -62,13 +63,13 @@ public class CdcDynamicTableParsingProcessFunction extends ProcessFunction parserFactory;
private final String database;
- private final Catalog.Loader catalogLoader;
+ private final CatalogLoader catalogLoader;
private transient EventParser parser;
private transient Catalog catalog;
public CdcDynamicTableParsingProcessFunction(
- String database, Catalog.Loader catalogLoader, EventParser.Factory parserFactory) {
+ String database, CatalogLoader catalogLoader, EventParser.Factory parserFactory) {
// for now, only support single database
this.database = database;
this.catalogLoader = catalogLoader;
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputer.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputer.java
index fdad3a921d63..2858b2d4eb6b 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputer.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputer.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
@@ -38,13 +39,13 @@ public class CdcMultiplexRecordChannelComputer implements ChannelComputer channelComputers;
- public CdcMultiplexRecordChannelComputer(Catalog.Loader catalogLoader) {
+ public CdcMultiplexRecordChannelComputer(CatalogLoader catalogLoader) {
this.catalogLoader = catalogLoader;
}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
index 5db111a30047..9387a8293874 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
@@ -20,6 +20,7 @@
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.flink.sink.MultiTableCommittable;
@@ -67,7 +68,7 @@ public class CdcRecordStoreMultiWriteOperator
private final StoreSinkWrite.WithWriteBufferProvider storeSinkWriteProvider;
private final String initialCommitUser;
- private final Catalog.Loader catalogLoader;
+ private final CatalogLoader catalogLoader;
private MemoryPoolFactory memoryPoolFactory;
private Catalog catalog;
@@ -79,7 +80,7 @@ public class CdcRecordStoreMultiWriteOperator
private CdcRecordStoreMultiWriteOperator(
StreamOperatorParameters parameters,
- Catalog.Loader catalogLoader,
+ CatalogLoader catalogLoader,
StoreSinkWrite.WithWriteBufferProvider storeSinkWriteProvider,
String initialCommitUser,
Options options) {
@@ -264,10 +265,10 @@ public static class Factory
extends PrepareCommitOperator.Factory {
private final StoreSinkWrite.WithWriteBufferProvider storeSinkWriteProvider;
private final String initialCommitUser;
- private final Catalog.Loader catalogLoader;
+ private final CatalogLoader catalogLoader;
public Factory(
- Catalog.Loader catalogLoader,
+ CatalogLoader catalogLoader,
StoreSinkWrite.WithWriteBufferProvider storeSinkWriteProvider,
String initialCommitUser,
Options options) {
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
index 28b68fedc3e6..5c27db6ddf1b 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
@@ -19,7 +19,7 @@
package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.annotation.Experimental;
-import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.utils.SingleOutputStreamOperatorUtils;
import org.apache.paimon.schema.SchemaManager;
@@ -48,7 +48,7 @@ public class CdcSinkBuilder {
private EventParser.Factory parserFactory = null;
private Table table = null;
private Identifier identifier = null;
- private Catalog.Loader catalogLoader = null;
+ private CatalogLoader catalogLoader = null;
@Nullable private Integer parallelism;
@@ -77,7 +77,7 @@ public CdcSinkBuilder withIdentifier(Identifier identifier) {
return this;
}
- public CdcSinkBuilder withCatalogLoader(Catalog.Loader catalogLoader) {
+ public CdcSinkBuilder withCatalogLoader(CatalogLoader catalogLoader) {
this.catalogLoader = catalogLoader;
return this;
}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
index 1688d4deb088..4cd9235cb58a 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
@@ -18,7 +18,7 @@
package org.apache.paimon.flink.sink.cdc;
-import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.flink.sink.CommittableStateManager;
import org.apache.paimon.flink.sink.Committer;
import org.apache.paimon.flink.sink.CommitterOperatorFactory;
@@ -60,13 +60,13 @@ public class FlinkCdcMultiTableSink implements Serializable {
private static final String GLOBAL_COMMITTER_NAME = "Multiplex Global Committer";
private final boolean isOverwrite = false;
- private final Catalog.Loader catalogLoader;
+ private final CatalogLoader catalogLoader;
private final double commitCpuCores;
@Nullable private final MemorySize commitHeapMemory;
private final String commitUser;
public FlinkCdcMultiTableSink(
- Catalog.Loader catalogLoader,
+ CatalogLoader catalogLoader,
double commitCpuCores,
@Nullable MemorySize commitHeapMemory,
String commitUser) {
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
index a9ad66847b4b..bd18c7e7ad82 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
@@ -18,7 +18,7 @@
package org.apache.paimon.flink.sink.cdc;
-import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.MultiTablesSinkMode;
@@ -72,7 +72,7 @@ public class FlinkCdcSyncDatabaseSinkBuilder {
// it will check newly added tables and create the corresponding
// Paimon tables. 2) in multiplex sink where it is used to
// initialize different writers to multiple tables.
- private Catalog.Loader catalogLoader;
+ private CatalogLoader catalogLoader;
// database to sync, currently only support single database
private String database;
private MultiTablesSinkMode mode;
@@ -111,7 +111,7 @@ public FlinkCdcSyncDatabaseSinkBuilder withDatabase(String database) {
return this;
}
- public FlinkCdcSyncDatabaseSinkBuilder withCatalogLoader(Catalog.Loader catalogLoader) {
+ public FlinkCdcSyncDatabaseSinkBuilder withCatalogLoader(CatalogLoader catalogLoader) {
this.catalogLoader = catalogLoader;
return this;
}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
index 0ad412e47d34..dd612a52c2eb 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
@@ -51,7 +52,7 @@ public class MultiTableUpdatedDataFieldsProcessFunction
private final Map schemaManagers = new HashMap<>();
- public MultiTableUpdatedDataFieldsProcessFunction(Catalog.Loader catalogLoader) {
+ public MultiTableUpdatedDataFieldsProcessFunction(CatalogLoader catalogLoader) {
super(catalogLoader);
}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java
index 610856d3af54..43f63854bb22 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java
@@ -19,7 +19,7 @@
package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.annotation.Public;
-import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.Table;
@@ -39,7 +39,7 @@ public class RichCdcSinkBuilder {
private DataStream input = null;
private Table table = null;
private Identifier identifier = null;
- private Catalog.Loader catalogLoader = null;
+ private CatalogLoader catalogLoader = null;
@Nullable private Integer parallelism;
@@ -62,7 +62,7 @@ public RichCdcSinkBuilder parallelism(@Nullable Integer parallelism) {
return this;
}
- public RichCdcSinkBuilder catalogLoader(Catalog.Loader catalogLoader) {
+ public RichCdcSinkBuilder catalogLoader(CatalogLoader catalogLoader) {
this.catalogLoader = catalogLoader;
return this;
}
@@ -114,7 +114,7 @@ public RichCdcSinkBuilder withIdentifier(Identifier identifier) {
/** @deprecated Use {@link #catalogLoader}. */
@Deprecated
- public RichCdcSinkBuilder withCatalogLoader(Catalog.Loader catalogLoader) {
+ public RichCdcSinkBuilder withCatalogLoader(CatalogLoader catalogLoader) {
this.catalogLoader = catalogLoader;
return this;
}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
index 64f00d96b0f5..504f63105801 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
@@ -18,7 +18,7 @@
package org.apache.paimon.flink.sink.cdc;
-import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
@@ -53,7 +53,7 @@ public class UpdatedDataFieldsProcessFunction
private Set latestFields;
public UpdatedDataFieldsProcessFunction(
- SchemaManager schemaManager, Identifier identifier, Catalog.Loader catalogLoader) {
+ SchemaManager schemaManager, Identifier identifier, CatalogLoader catalogLoader) {
super(catalogLoader);
this.schemaManager = schemaManager;
this.identifier = identifier;
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
index 4f02b784c2ba..d50df23742aa 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
@@ -48,9 +49,9 @@ public abstract class UpdatedDataFieldsProcessFunctionBase extends Process
private static final Logger LOG =
LoggerFactory.getLogger(UpdatedDataFieldsProcessFunctionBase.class);
- protected final Catalog.Loader catalogLoader;
+ protected final CatalogLoader catalogLoader;
protected Catalog catalog;
- private boolean allowUpperCase;
+ private boolean caseSensitive;
private static final List STRING_TYPES =
Arrays.asList(DataTypeRoot.CHAR, DataTypeRoot.VARCHAR);
@@ -70,7 +71,7 @@ public abstract class UpdatedDataFieldsProcessFunctionBase extends Process
private static final List TIMESTAMP_TYPES =
Arrays.asList(DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE);
- protected UpdatedDataFieldsProcessFunctionBase(Catalog.Loader catalogLoader) {
+ protected UpdatedDataFieldsProcessFunctionBase(CatalogLoader catalogLoader) {
this.catalogLoader = catalogLoader;
}
@@ -86,7 +87,7 @@ public void open(OpenContext openContext) throws Exception {
*/
public void open(Configuration parameters) {
this.catalog = catalogLoader.load();
- this.allowUpperCase = this.catalog.allowUpperCase();
+ this.caseSensitive = this.catalog.caseSensitive();
}
protected void applySchemaChange(
@@ -215,8 +216,7 @@ protected List extractSchemaChanges(
List result = new ArrayList<>();
for (DataField newField : updatedDataFields) {
- String newFieldName =
- StringUtils.caseSensitiveConversion(newField.name(), allowUpperCase);
+ String newFieldName = StringUtils.toLowerCaseIfNeed(newField.name(), caseSensitive);
if (oldFields.containsKey(newFieldName)) {
DataField oldField = oldFields.get(newFieldName);
// we compare by ignoring nullable, because partition keys and primary keys might be
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SchemaEvolutionTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SchemaEvolutionTest.java
index 9ba18376867f..46c8e98fb639 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SchemaEvolutionTest.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SchemaEvolutionTest.java
@@ -19,7 +19,7 @@
package org.apache.paimon.flink.action.cdc;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction;
@@ -202,7 +202,7 @@ public void testSchemaEvolution() throws Exception {
DataStream> upDataFieldStream = env.fromCollection(prepareData());
Options options = new Options();
options.set("warehouse", tempPath.toString());
- final Catalog.Loader catalogLoader = () -> FlinkCatalogFactory.createPaimonCatalog(options);
+ final CatalogLoader catalogLoader = () -> FlinkCatalogFactory.createPaimonCatalog(options);
Identifier identifier = Identifier.create(database, tableName);
DataStream schemaChangeProcessFunction =
upDataFieldStream
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
index 0e1f4e72ea8c..6e37c589ac92 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
@@ -18,7 +18,7 @@
package org.apache.paimon.flink.action.cdc.kafka;
-import org.apache.paimon.catalog.FileSystemCatalogOptions;
+import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
@@ -535,7 +535,7 @@ public void testCaseInsensitive() throws Exception {
.withTableConfig(getBasicTableConfig())
.withCatalogConfig(
Collections.singletonMap(
- FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false"))
+ CatalogOptions.CASE_SENSITIVE.key(), "false"))
.build();
runActionWithDefaultEnv(action);
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
index 8a4dc2f3035b..ed1885f5d774 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
@@ -19,7 +19,7 @@
package org.apache.paimon.flink.action.cdc.kafka;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.catalog.FileSystemCatalogOptions;
+import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
@@ -1121,7 +1121,7 @@ public void testComputedColumnWithCaseInsensitive(boolean triggerSchemaRetrieval
.withTableConfig(getBasicTableConfig())
.withCatalogConfig(
Collections.singletonMap(
- FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false"))
+ CatalogOptions.CASE_SENSITIVE.key(), "false"))
.withComputedColumnArgs("_YEAR=year(_DATE)")
.build();
runActionWithDefaultEnv(action);
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionITCase.java
index de189bc20536..606c46e90e4a 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionITCase.java
@@ -18,7 +18,7 @@
package org.apache.paimon.flink.action.cdc.kafka;
-import org.apache.paimon.catalog.FileSystemCatalogOptions;
+import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
@@ -475,7 +475,7 @@ protected void testCaseInsensitive(String format) throws Exception {
.withTableConfig(getBasicTableConfig())
.withCatalogConfig(
Collections.singletonMap(
- FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false"))
+ CatalogOptions.CASE_SENSITIVE.key(), "false"))
.build();
runActionWithDefaultEnv(action);
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
index ae0b0b412ab4..92c2a7243a7c 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
@@ -18,7 +18,7 @@
package org.apache.paimon.flink.action.cdc.mongodb;
-import org.apache.paimon.catalog.FileSystemCatalogOptions;
+import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
@@ -196,7 +196,7 @@ public void testDynamicTableCreationInMongoDB() throws Exception {
.withTableConfig(getBasicTableConfig())
.withCatalogConfig(
Collections.singletonMap(
- FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false"))
+ CatalogOptions.CASE_SENSITIVE.key(), "false"))
.build();
runActionWithDefaultEnv(action);
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java
index b4f31f2d6d3d..2d8489dc23df 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java
@@ -18,7 +18,7 @@
package org.apache.paimon.flink.action.cdc.mongodb;
-import org.apache.paimon.catalog.FileSystemCatalogOptions;
+import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
@@ -382,7 +382,7 @@ public void testComputedColumnWithCaseInsensitive() throws Exception {
.withTableConfig(getBasicTableConfig())
.withCatalogConfig(
Collections.singletonMap(
- FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false"))
+ CatalogOptions.CASE_SENSITIVE.key(), "false"))
.withComputedColumnArgs("_YEAR=year(_DATE)")
.build();
runActionWithDefaultEnv(action);
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
index b48b898d66e3..10ee548125bc 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
@@ -19,7 +19,6 @@
package org.apache.paimon.flink.action.cdc.mysql;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.catalog.FileSystemCatalogOptions;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.MultiTablesSinkMode;
import org.apache.paimon.options.CatalogOptions;
@@ -475,7 +474,7 @@ public void testIgnoreCaseDivided() throws Exception {
syncDatabaseActionBuilder(mySqlConfig)
.withCatalogConfig(
Collections.singletonMap(
- FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false"))
+ CatalogOptions.CASE_SENSITIVE.key(), "false"))
.withTableConfig(getBasicTableConfig())
.build();
runActionWithDefaultEnv(action);
@@ -496,7 +495,7 @@ public void testIgnoreCaseCombined() throws Exception {
syncDatabaseActionBuilder(mySqlConfig)
.withCatalogConfig(
Collections.singletonMap(
- FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false"))
+ CatalogOptions.CASE_SENSITIVE.key(), "false"))
.withMode(COMBINED.configString())
.withTableConfig(getBasicTableConfig())
.build();
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index febbe4e1deaa..749d87eb0636 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -19,7 +19,6 @@
package org.apache.paimon.flink.action.cdc.mysql;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.catalog.FileSystemCatalogOptions;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
@@ -1327,7 +1326,7 @@ public void testComputedColumnWithCaseInsensitive() throws Exception {
syncTableActionBuilder(mySqlConfig)
.withCatalogConfig(
Collections.singletonMap(
- FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false"))
+ CatalogOptions.CASE_SENSITIVE.key(), "false"))
.withComputedColumnArgs("SUBSTRING=substring(UPPERCASE_STRING,2)")
.build();
runActionWithDefaultEnv(action);
@@ -1363,7 +1362,7 @@ public void testSpecifyKeysWithCaseInsensitive() throws Exception {
syncTableActionBuilder(mySqlConfig)
.withCatalogConfig(
Collections.singletonMap(
- FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false"))
+ CatalogOptions.CASE_SENSITIVE.key(), "false"))
.withPrimaryKeys("ID1", "PART")
.withPartitionKeys("PART")
.build();
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java
index 867cbdbae002..43b7d2ba6399 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java
@@ -22,6 +22,7 @@
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.CatalogOptions;
@@ -54,7 +55,7 @@
public class CdcMultiplexRecordChannelComputerTest {
@TempDir java.nio.file.Path tempDir;
- private Catalog.Loader catalogLoader;
+ private CatalogLoader catalogLoader;
private Path warehouse;
private String databaseName;
private Identifier tableWithPartition;
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
index 9f35b25026bb..4436aa392d42 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
@@ -22,6 +22,7 @@
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.sink.MultiTableCommittable;
import org.apache.paimon.flink.sink.MultiTableCommittableTypeInfo;
@@ -82,7 +83,7 @@ public class CdcRecordStoreMultiWriteOperatorTest {
private Identifier firstTable;
private Catalog catalog;
private Identifier secondTable;
- private Catalog.Loader catalogLoader;
+ private CatalogLoader catalogLoader;
private Schema firstTableSchema;
@BeforeEach
@@ -340,7 +341,7 @@ public void testSingleTableAddColumn() throws Exception {
harness.close();
}
- private Catalog.Loader createCatalogLoader() {
+ private CatalogLoader createCatalogLoader() {
Options catalogOptions = createCatalogOptions(warehouse);
return () -> CatalogFactory.createCatalog(CatalogContext.create(catalogOptions));
}
@@ -688,7 +689,7 @@ public void testUsingTheSameCompactExecutor() throws Exception {
}
private OneInputStreamOperatorTestHarness
- createTestHarness(Catalog.Loader catalogLoader) throws Exception {
+ createTestHarness(CatalogLoader catalogLoader) throws Exception {
CdcRecordStoreMultiWriteOperator.Factory operatorFactory =
new CdcRecordStoreMultiWriteOperator.Factory(
catalogLoader,
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
index 28b137a93ed9..35286e3a88d4 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
@@ -19,7 +19,7 @@
package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.CatalogUtils;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkCatalogFactory;
@@ -162,8 +162,7 @@ private void innerTestRandomCdcEvents(Supplier bucket, boolean unawareB
Options catalogOptions = new Options();
catalogOptions.set("warehouse", tempDir.toString());
- Catalog.Loader catalogLoader =
- () -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
+ CatalogLoader catalogLoader = () -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
new FlinkCdcSyncDatabaseSinkBuilder()
.withInput(source)
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
index 8b19391f3eda..9fccaac99228 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
@@ -19,7 +19,7 @@
package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.CatalogUtils;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.InternalRow;
@@ -159,8 +159,7 @@ private void innerTestRandomCdcEvents(
Options catalogOptions = new Options();
catalogOptions.set("warehouse", tempDir.toString());
- Catalog.Loader catalogLoader =
- () -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
+ CatalogLoader catalogLoader = () -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
new CdcSinkBuilder()
.withInput(source)
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index dd95c48af8d1..3407735b4b79 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -21,6 +21,7 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.TableType;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogUtils;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.procedure.ProcedureUtil;
import org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil;
@@ -519,7 +520,7 @@ protected Schema buildPaimonSchema(
// Although catalog.createTable will copy the default options, but we need this info
// here before create table, such as table-default.kafka.bootstrap.servers defined in
// catalog options. Temporarily, we copy the default options here.
- Catalog.tableDefaultOptions(catalog.options()).forEach(options::putIfAbsent);
+ CatalogUtils.tableDefaultOptions(catalog.options()).forEach(options::putIfAbsent);
options.put(REGISTER_TIMEOUT.key(), logStoreAutoRegisterTimeout.toString());
registerLogSystem(catalog, identifier, options, classLoader);
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
index 30e32d62efec..4490023e7b03 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
@@ -20,6 +20,7 @@
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.flink.FlinkCatalog;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.LogicalTypeConversion;
@@ -99,7 +100,7 @@ protected void execute(String defaultName) throws Exception {
env.execute(name);
}
- protected Catalog.Loader catalogLoader() {
+ protected CatalogLoader catalogLoader() {
// to make the action workflow serializable
Options catalogOptions = this.catalogOptions;
return () -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiAwareBucketTableScan.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiAwareBucketTableScan.java
index 88730132ef68..e2fd5a9d318e 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiAwareBucketTableScan.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiAwareBucketTableScan.java
@@ -18,7 +18,7 @@
package org.apache.paimon.flink.compact;
-import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
@@ -47,7 +47,7 @@ public class MultiAwareBucketTableScan extends MultiTableScanBase scansMap;
public MultiAwareBucketTableScan(
- Catalog.Loader catalogLoader,
+ CatalogLoader catalogLoader,
Pattern includingPattern,
Pattern excludingPattern,
Pattern databasePattern,
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiTableScanBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiTableScanBase.java
index f5940740b691..805e8da0a417 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiTableScanBase.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiTableScanBase.java
@@ -20,6 +20,7 @@
import org.apache.paimon.append.MultiTableUnawareAppendCompactionTask;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
@@ -59,7 +60,7 @@ public abstract class MultiTableScanBase implements AutoCloseable {
protected boolean isStreaming;
public MultiTableScanBase(
- Catalog.Loader catalogLoader,
+ CatalogLoader catalogLoader,
Pattern includingPattern,
Pattern excludingPattern,
Pattern databasePattern,
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiUnawareBucketTableScan.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiUnawareBucketTableScan.java
index da86b93af512..2ad2642b6248 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiUnawareBucketTableScan.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiUnawareBucketTableScan.java
@@ -20,7 +20,7 @@
import org.apache.paimon.append.MultiTableUnawareAppendCompactionTask;
import org.apache.paimon.append.UnawareAppendTableCompactionCoordinator;
-import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
@@ -41,7 +41,7 @@ public class MultiUnawareBucketTableScan
protected transient Map tablesMap;
public MultiUnawareBucketTableScan(
- Catalog.Loader catalogLoader,
+ CatalogLoader catalogLoader,
Pattern includingPattern,
Pattern excludingPattern,
Pattern databasePattern,
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java
index 83d51f302e51..07ec7d165e3a 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java
@@ -21,6 +21,7 @@
import org.apache.paimon.append.MultiTableUnawareAppendCompactionTask;
import org.apache.paimon.append.UnawareAppendCompactionTask;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.compact.UnawareBucketCompactor;
import org.apache.paimon.options.Options;
@@ -56,7 +57,7 @@ public class AppendOnlyMultiTableCompactionWorkerOperator
LoggerFactory.getLogger(AppendOnlyMultiTableCompactionWorkerOperator.class);
private final String commitUser;
- private final Catalog.Loader catalogLoader;
+ private final CatalogLoader catalogLoader;
// support multi table compaction
private transient Map compactorContainer;
@@ -67,7 +68,7 @@ public class AppendOnlyMultiTableCompactionWorkerOperator
private AppendOnlyMultiTableCompactionWorkerOperator(
StreamOperatorParameters parameters,
- Catalog.Loader catalogLoader,
+ CatalogLoader catalogLoader,
String commitUser,
Options options) {
super(parameters, options);
@@ -188,9 +189,9 @@ public static class Factory
MultiTableUnawareAppendCompactionTask, MultiTableCommittable> {
private final String commitUser;
- private final Catalog.Loader catalogLoader;
+ private final CatalogLoader catalogLoader;
- public Factory(Catalog.Loader catalogLoader, String commitUser, Options options) {
+ public Factory(CatalogLoader catalogLoader, String commitUser, Options options) {
super(options);
this.commitUser = commitUser;
this.catalogLoader = catalogLoader;
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
index 25f76ce97683..53f1bf165d98 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
@@ -20,7 +20,7 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.append.MultiTableUnawareAppendCompactionTask;
-import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.manifest.WrappedManifestCommittable;
import org.apache.paimon.options.Options;
@@ -55,14 +55,14 @@ public class CombinedTableCompactorSink implements Serializable {
private static final String WRITER_NAME = "Writer";
private static final String GLOBAL_COMMITTER_NAME = "Global Committer";
- private final Catalog.Loader catalogLoader;
+ private final CatalogLoader catalogLoader;
private final boolean ignorePreviousFiles;
private final boolean fullCompaction;
private final Options options;
public CombinedTableCompactorSink(
- Catalog.Loader catalogLoader, Options options, boolean fullCompaction) {
+ CatalogLoader catalogLoader, Options options, boolean fullCompaction) {
this.catalogLoader = catalogLoader;
this.ignorePreviousFiles = false;
this.fullCompaction = fullCompaction;
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
index 58f6a3834096..02a7e6c1b3c8 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
@@ -20,6 +20,7 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.utils.RuntimeContextUtils;
@@ -72,7 +73,7 @@ public class MultiTablesStoreCompactOperator
private transient StoreSinkWriteState state;
private transient DataFileMetaSerializer dataFileMetaSerializer;
- private final Catalog.Loader catalogLoader;
+ private final CatalogLoader catalogLoader;
protected Catalog catalog;
protected Map tables;
@@ -81,7 +82,7 @@ public class MultiTablesStoreCompactOperator
private MultiTablesStoreCompactOperator(
StreamOperatorParameters parameters,
- Catalog.Loader catalogLoader,
+ CatalogLoader catalogLoader,
String initialCommitUser,
CheckpointConfig checkpointConfig,
boolean isStreaming,
@@ -324,7 +325,7 @@ private StoreSinkWrite.Provider createWriteProvider(
/** {@link StreamOperatorFactory} of {@link MultiTablesStoreCompactOperator}. */
public static class Factory
extends PrepareCommitOperator.Factory {
- private final Catalog.Loader catalogLoader;
+ private final CatalogLoader catalogLoader;
private final CheckpointConfig checkpointConfig;
private final boolean isStreaming;
private final boolean ignorePreviousFiles;
@@ -332,7 +333,7 @@ public static class Factory
private final String initialCommitUser;
public Factory(
- Catalog.Loader catalogLoader,
+ CatalogLoader catalogLoader,
String initialCommitUser,
CheckpointConfig checkpointConfig,
boolean isStreaming,
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
index 537a98f97fb0..01acddb9ad99 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink.sink;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.WrappedManifestCommittable;
@@ -56,12 +57,12 @@ public class StoreMultiCommitter
private final boolean ignoreEmptyCommit;
private final Map dynamicOptions;
- public StoreMultiCommitter(Catalog.Loader catalogLoader, Context context) {
+ public StoreMultiCommitter(CatalogLoader catalogLoader, Context context) {
this(catalogLoader, context, false, Collections.emptyMap());
}
public StoreMultiCommitter(
- Catalog.Loader catalogLoader,
+ CatalogLoader catalogLoader,
Context context,
boolean ignoreEmptyCommit,
Map dynamicOptions) {
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java
index dbdf77601480..d190b9ccf39e 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java
@@ -58,8 +58,11 @@ public static PartitionListeners create(Committer.Context context, FileStoreTabl
throws Exception {
List listeners = new ArrayList<>();
- ReportHmsListener.create(context.isRestored(), context.stateStore(), table)
+ // partition statistics reporter
+ ReportPartStatsListener.create(context.isRestored(), context.stateStore(), table)
.ifPresent(listeners::add);
+
+ // partition mark done
PartitionMarkDone.create(
context.streamingCheckpointEnabled(),
context.isRestored(),
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/HmsReporter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporter.java
similarity index 85%
rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/HmsReporter.java
rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporter.java
index 853dc52c20bf..b75889d567ee 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/HmsReporter.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporter.java
@@ -40,22 +40,24 @@
import java.util.List;
import java.util.Map;
-import static org.apache.paimon.catalog.Catalog.HIVE_LAST_UPDATE_TIME_PROP;
+import static org.apache.paimon.catalog.Catalog.LAST_UPDATE_TIME_PROP;
import static org.apache.paimon.catalog.Catalog.NUM_FILES_PROP;
import static org.apache.paimon.catalog.Catalog.NUM_ROWS_PROP;
import static org.apache.paimon.catalog.Catalog.TOTAL_SIZE_PROP;
import static org.apache.paimon.utils.PartitionPathUtils.extractPartitionSpecFromPath;
/** Action to report the table statistic from the latest snapshot to HMS. */
-public class HmsReporter implements Closeable {
+public class PartitionStatisticsReporter implements Closeable {
- private static final Logger LOG = LoggerFactory.getLogger(HmsReporter.class);
+ private static final Logger LOG = LoggerFactory.getLogger(PartitionStatisticsReporter.class);
+
+ private static final String HIVE_LAST_UPDATE_TIME_PROP = "transient_lastDdlTime";
private final MetastoreClient metastoreClient;
private final SnapshotReader snapshotReader;
private final SnapshotManager snapshotManager;
- public HmsReporter(FileStoreTable table, MetastoreClient client) {
+ public PartitionStatisticsReporter(FileStoreTable table, MetastoreClient client) {
this.metastoreClient =
Preconditions.checkNotNull(client, "the metastore client factory is null");
this.snapshotReader = table.newSnapshotReader();
@@ -90,7 +92,12 @@ public void report(String partition, long modifyTime) throws Exception {
statistic.put(NUM_FILES_PROP, String.valueOf(fileCount));
statistic.put(TOTAL_SIZE_PROP, String.valueOf(totalSize));
statistic.put(NUM_ROWS_PROP, String.valueOf(rowCount));
- statistic.put(HIVE_LAST_UPDATE_TIME_PROP, String.valueOf(modifyTime / 1000));
+
+ String modifyTimeSeconds = String.valueOf(modifyTime / 1000);
+ statistic.put(LAST_UPDATE_TIME_PROP, modifyTimeSeconds);
+
+ // just for being compatible with hive metastore
+ statistic.put(HIVE_LAST_UPDATE_TIME_PROP, modifyTimeSeconds);
LOG.info("alter partition {} with statistic {}.", partitionSpec, statistic);
metastoreClient.alterPartition(partitionSpec, statistic, modifyTime, true);
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportHmsListener.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java
similarity index 91%
rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportHmsListener.java
rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java
index 842dd012e88e..ca51c3df5b1a 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportHmsListener.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java
@@ -49,7 +49,7 @@
* This listener will collect data from the newly touched partition and then decide when to trigger
* a report based on the partition's idle time.
*/
-public class ReportHmsListener implements PartitionListener {
+public class ReportPartStatsListener implements PartitionListener {
@SuppressWarnings("unchecked")
private static final ListStateDescriptor