From c14421c521a136205390b4d90092503dc77f6c41 Mon Sep 17 00:00:00 2001 From: yuzelin Date: Tue, 17 Dec 2024 14:25:21 +0800 Subject: [PATCH] [flink] make warehouse in Flink action optional --- .../flink/procedure/CompactProcedure.java | 4 -- .../action/cdc/SynchronizationActionBase.java | 3 +- .../paimon/flink/action/ActionBase.java | 4 +- .../paimon/flink/action/ActionFactory.java | 66 ++++++------------- .../paimon/flink/action/CloneAction.java | 11 +--- .../flink/action/CloneActionFactory.java | 15 +++-- .../paimon/flink/action/CompactAction.java | 8 +-- .../flink/action/CompactActionFactory.java | 29 ++------ .../flink/action/CompactDatabaseAction.java | 4 +- .../action/CompactDatabaseActionFactory.java | 5 +- .../flink/action/CreateBranchAction.java | 3 +- .../action/CreateBranchActionFactory.java | 26 ++------ .../CreateOrReplaceTagActionFactory.java | 19 +++--- .../paimon/flink/action/CreateTagAction.java | 3 +- .../flink/action/CreateTagActionFactory.java | 14 +--- .../action/CreateTagFromTimestampAction.java | 13 +++- .../CreateTagFromTimestampActionFactory.java | 17 ++--- .../action/CreateTagFromWatermarkAction.java | 13 +++- .../CreateTagFromWatermarkActionFactory.java | 16 +++-- .../paimon/flink/action/DeleteAction.java | 3 +- .../flink/action/DeleteActionFactory.java | 13 ++-- .../flink/action/DeleteBranchAction.java | 3 +- .../action/DeleteBranchActionFactory.java | 14 ++-- .../paimon/flink/action/DeleteTagAction.java | 3 +- .../flink/action/DeleteTagActionFactory.java | 14 ++-- .../flink/action/DropPartitionAction.java | 3 +- .../action/DropPartitionActionFactory.java | 18 ++--- .../flink/action/ExpirePartitionsAction.java | 3 +- .../action/ExpirePartitionsActionFactory.java | 22 ++----- .../flink/action/ExpireSnapshotsAction.java | 14 ++-- .../action/ExpireSnapshotsActionFactory.java | 13 ++-- .../paimon/flink/action/ExpireTagsAction.java | 9 ++- .../flink/action/ExpireTagsActionFactory.java | 15 ++--- .../flink/action/FastForwardAction.java | 3 +- .../action/FastForwardActionFactory.java | 14 ++-- .../flink/action/MarkPartitionDoneAction.java | 3 +- .../MarkPartitionDoneActionFactory.java | 18 ++--- .../paimon/flink/action/MergeIntoAction.java | 12 +--- .../flink/action/MergeIntoActionFactory.java | 26 +++----- .../flink/action/MigrateDatabaseAction.java | 3 +- .../action/MigrateDatabaseActionFactory.java | 10 +-- .../flink/action/MigrateFileAction.java | 3 +- .../action/MigrateFileActionFactory.java | 4 +- .../flink/action/MigrateTableAction.java | 3 +- .../action/MigrateTableActionFactory.java | 10 +-- .../action/MultipleParameterToolAdapter.java | 11 ++++ .../action/QueryServiceActionFactory.java | 8 +-- .../flink/action/RemoveOrphanFilesAction.java | 3 +- .../RemoveOrphanFilesActionFactory.java | 17 ++--- .../paimon/flink/action/RenameTagAction.java | 3 +- .../flink/action/RenameTagActionFactory.java | 22 ++----- .../paimon/flink/action/RepairAction.java | 4 +- .../flink/action/RepairActionFactory.java | 5 +- .../paimon/flink/action/ReplaceTagAction.java | 3 +- .../flink/action/ReplaceTagActionFactory.java | 13 +--- .../flink/action/ResetConsumerAction.java | 3 +- .../action/ResetConsumerActionFactory.java | 14 ++-- .../flink/action/RewriteFileIndexAction.java | 5 +- .../action/RewriteFileIndexActionFactory.java | 6 +- .../paimon/flink/action/RollbackToAction.java | 3 +- .../flink/action/RollbackToActionFactory.java | 15 ++--- .../action/RollbackToTimestampAction.java | 3 +- .../RollbackToTimestampActionFactory.java | 19 ++---- .../flink/action/SortCompactAction.java | 3 +- .../paimon/flink/action/TableActionBase.java | 8 +-- .../flink/procedure/CloneProcedure.java | 26 ++++++-- .../procedure/CompactDatabaseProcedure.java | 3 +- .../flink/procedure/CompactProcedure.java | 3 - .../flink/procedure/MergeIntoProcedure.java | 6 +- .../flink/PrimaryKeyFileStoreTableITCase.java | 24 ++++++- ...rtCompactActionForUnawareBucketITCase.java | 6 +- .../MigrateDatabaseProcedureITCase.java | 10 +-- .../procedure/MigrateFileProcedureITCase.java | 3 +- .../MigrateTableProcedureITCase.java | 10 +-- .../hive/procedure/RepairActionITCase.java | 8 +-- 75 files changed, 307 insertions(+), 489 deletions(-) diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java index 560e532a6dbb..18e03e053cdd 100644 --- a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java +++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java @@ -139,8 +139,6 @@ public String[] call( String partitionIdleTime, String compactStrategy) throws Exception { - - String warehouse = catalog.warehouse(); Map catalogOptions = catalog.options(); Map tableConf = StringUtils.isNullOrWhitespaceOnly(tableOptions) @@ -152,7 +150,6 @@ public String[] call( if (orderStrategy.isEmpty() && orderByColumns.isEmpty()) { action = new CompactAction( - warehouse, identifier.getDatabaseName(), identifier.getObjectName(), catalogOptions, @@ -171,7 +168,6 @@ public String[] call( "sort compact do not support 'partition_idle_time'."); action = new SortCompactAction( - warehouse, identifier.getDatabaseName(), identifier.getObjectName(), catalogOptions, diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java index a7c770347410..3d094bc5a5f5 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java @@ -71,12 +71,11 @@ public abstract class SynchronizationActionBase extends ActionBase { protected CdcMetadataConverter[] metadataConverters = new CdcMetadataConverter[] {}; public SynchronizationActionBase( - String warehouse, String database, Map catalogConfig, Map cdcSourceConfig, SyncJobHandler syncJobHandler) { - super(warehouse, catalogConfig); + super(catalogConfig); this.database = database; this.cdcSourceConfig = Configuration.fromMap(cdcSourceConfig); this.syncJobHandler = syncJobHandler; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java index 30e32d62efec..b327addb00db 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java @@ -23,7 +23,6 @@ import org.apache.paimon.flink.FlinkCatalog; import org.apache.paimon.flink.FlinkCatalogFactory; import org.apache.paimon.flink.LogicalTypeConversion; -import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypeCasts; @@ -53,9 +52,8 @@ public abstract class ActionBase implements Action { protected StreamExecutionEnvironment env; protected StreamTableEnvironment batchTEnv; - public ActionBase(String warehouse, Map catalogConfig) { + public ActionBase(Map catalogConfig) { catalogOptions = Options.fromMap(catalogConfig); - catalogOptions.set(CatalogOptions.WAREHOUSE, warehouse); // disable cache to avoid concurrent modification exception if (!catalogOptions.contains(CACHE_ENABLED)) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java index fbf8f12f49eb..34964d4c89df 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java @@ -18,13 +18,11 @@ package org.apache.paimon.flink.action; -import org.apache.paimon.catalog.CatalogUtils; import org.apache.paimon.factories.Factory; import org.apache.paimon.factories.FactoryException; import org.apache.paimon.factories.FactoryUtil; -import org.apache.paimon.utils.Preconditions; +import org.apache.paimon.options.CatalogOptions; -import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.utils.MultipleParameterTool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,7 +48,7 @@ public interface ActionFactory extends Factory { String WAREHOUSE = "warehouse"; String DATABASE = "database"; String TABLE = "table"; - String PATH = "path"; + @Deprecated String PATH = "path"; String CATALOG_CONF = "catalog_conf"; String TABLE_CONF = "table_conf"; String PARTITION = "partition"; @@ -88,6 +86,14 @@ static Optional createAction(String[] args) { return Optional.empty(); } + if (params.has(PATH)) { + throw new UnsupportedOperationException( + String.format( + "Parameter '%s' is deprecated. Please use '--%s %s=' to specify warehouse if needed, " + + "and use '%s' to specify database and '%s' to specify table.", + PATH, CATALOG_CONF, CatalogOptions.WAREHOUSE.key(), DATABASE, TABLE)); + } + return actionFactory.create(params); } @@ -105,39 +111,6 @@ static void printDefaultHelp() { System.out.println("For detailed options of each action, run --help"); } - default Tuple3 getTablePath(MultipleParameterToolAdapter params) { - String warehouse = params.get(WAREHOUSE); - String database = params.get(DATABASE); - String table = params.get(TABLE); - String path = params.get(PATH); - - Tuple3 tablePath = null; - int count = 0; - if (warehouse != null || database != null || table != null) { - if (warehouse == null || database == null || table == null) { - throw new IllegalArgumentException( - "Warehouse, database and table must be specified all at once to specify a table."); - } - tablePath = Tuple3.of(warehouse, database, table); - count++; - } - if (path != null) { - tablePath = - Tuple3.of( - CatalogUtils.warehouse(path), - CatalogUtils.database(path), - CatalogUtils.table(path)); - count++; - } - - if (count != 1) { - throw new IllegalArgumentException( - "Please specify either \"warehouse, database and table\" or \"path\"."); - } - - return tablePath; - } - default List> getPartitions(MultipleParameterToolAdapter params) { List> partitions = new ArrayList<>(); for (String partition : params.getMultiParameter(PARTITION)) { @@ -160,16 +133,6 @@ default Map optionalConfigMap(MultipleParameterToolAdapter param return config; } - default void checkRequiredArgument(MultipleParameterToolAdapter params, String key) { - Preconditions.checkArgument( - params.has(key), "Argument '%s' is required. Run ' --help' for help.", key); - } - - default String getRequiredValue(MultipleParameterToolAdapter params, String key) { - checkRequiredArgument(params, key); - return params.get(key); - } - default Map> optionalConfigMapList( MultipleParameterToolAdapter params, String key) { if (!params.has(key)) { @@ -182,4 +145,13 @@ default Map> optionalConfigMapList( } return config; } + + default Map catalogConfigMap(MultipleParameterToolAdapter params) { + Map catalogConfig = optionalConfigMap(params, CATALOG_CONF); + String warehouse = params.get(WAREHOUSE); + if (warehouse != null && !catalogConfig.containsKey(WAREHOUSE)) { + catalogConfig.put(WAREHOUSE, warehouse); + } + return catalogConfig; + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java index bac030dd0496..15b90ec83411 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java @@ -25,7 +25,6 @@ import org.apache.paimon.flink.clone.SnapshotHintChannelComputer; import org.apache.paimon.flink.clone.SnapshotHintOperator; import org.apache.paimon.flink.sink.FlinkStreamPartitioner; -import org.apache.paimon.options.CatalogOptions; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; @@ -37,7 +36,6 @@ import java.util.HashMap; import java.util.Map; -import static org.apache.paimon.utils.Preconditions.checkNotNull; import static org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly; /** The Latest Snapshot clone action for Flink. */ @@ -54,19 +52,14 @@ public class CloneAction extends ActionBase { private final String targetTableName; public CloneAction( - String warehouse, String database, String tableName, Map sourceCatalogConfig, - String targetWarehouse, String targetDatabase, String targetTableName, Map targetCatalogConfig, String parallelismStr) { - super(warehouse, sourceCatalogConfig); - - checkNotNull(warehouse, "warehouse must not be null."); - checkNotNull(targetWarehouse, "targetWarehouse must not be null."); + super(sourceCatalogConfig); this.parallelism = isNullOrWhitespaceOnly(parallelismStr) @@ -77,7 +70,6 @@ public CloneAction( if (!sourceCatalogConfig.isEmpty()) { this.sourceCatalogConfig = sourceCatalogConfig; } - this.sourceCatalogConfig.put(CatalogOptions.WAREHOUSE.key(), warehouse); this.database = database; this.tableName = tableName; @@ -85,7 +77,6 @@ public CloneAction( if (!targetCatalogConfig.isEmpty()) { this.targetCatalogConfig = targetCatalogConfig; } - this.targetCatalogConfig.put(CatalogOptions.WAREHOUSE.key(), targetWarehouse); this.targetDatabase = targetDatabase; this.targetTableName = targetTableName; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java index db45c8508447..fff7fd9d95da 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink.action; +import java.util.Map; import java.util.Optional; /** Factory to create {@link CloneAction}. */ @@ -37,16 +38,22 @@ public String identifier() { @Override public Optional create(MultipleParameterToolAdapter params) { + Map catalogConfig = catalogConfigMap(params); + + Map targetCatalogConfig = optionalConfigMap(params, TARGET_CATALOG_CONF); + String targetWarehouse = params.get(TARGET_WAREHOUSE); + if (targetWarehouse != null && !targetCatalogConfig.containsKey(TARGET_WAREHOUSE)) { + catalogConfig.put(TARGET_WAREHOUSE, targetWarehouse); + } + CloneAction cloneAction = new CloneAction( - params.get(WAREHOUSE), params.get(DATABASE), params.get(TABLE), - optionalConfigMap(params, CATALOG_CONF), - params.get(TARGET_WAREHOUSE), + catalogConfig, params.get(TARGET_DATABASE), params.get(TARGET_TABLE), - optionalConfigMap(params, TARGET_CATALOG_CONF), + targetCatalogConfig, params.get(PARALLELISM)); return Optional.of(cloneAction); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java index 84e37a5b10f9..73c96b2c4bb1 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java @@ -41,7 +41,6 @@ import javax.annotation.Nullable; import java.time.Duration; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -61,17 +60,12 @@ public class CompactAction extends TableActionBase { private Boolean fullCompaction; - public CompactAction(String warehouse, String database, String tableName) { - this(warehouse, database, tableName, Collections.emptyMap(), Collections.emptyMap()); - } - public CompactAction( - String warehouse, String database, String tableName, Map catalogConfig, Map tableConf) { - super(warehouse, database, tableName, catalogConfig); + super(database, tableName, catalogConfig); if (!(table instanceof FileStoreTable)) { throw new UnsupportedOperationException( String.format( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java index fc60a870eabe..ea0e48db2213 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java @@ -21,8 +21,6 @@ import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.TimeUtils; -import org.apache.flink.api.java.tuple.Tuple3; - import java.util.List; import java.util.Map; import java.util.Optional; @@ -46,9 +44,10 @@ public String identifier() { @Override public Optional create(MultipleParameterToolAdapter params) { - Tuple3 tablePath = getTablePath(params); - - Map catalogConfig = optionalConfigMap(params, CATALOG_CONF); + String database = params.getRequired(DATABASE); + String table = params.getRequired(TABLE); + Map catalogConfig = catalogConfigMap(params); + Map tableConfig = optionalConfigMap(params, TABLE_CONF); CompactAction action; if (params.has(ORDER_STRATEGY)) { @@ -56,22 +55,11 @@ public Optional create(MultipleParameterToolAdapter params) { !params.has(PARTITION_IDLE_TIME), "sort compact do not support 'partition_idle_time'."); action = - new SortCompactAction( - tablePath.f0, - tablePath.f1, - tablePath.f2, - catalogConfig, - optionalConfigMap(params, TABLE_CONF)) + new SortCompactAction(database, table, catalogConfig, tableConfig) .withOrderStrategy(params.get(ORDER_STRATEGY)) - .withOrderColumns(getRequiredValue(params, ORDER_BY).split(",")); + .withOrderColumns(params.getRequired(ORDER_BY).split(",")); } else { - action = - new CompactAction( - tablePath.f0, - tablePath.f1, - tablePath.f2, - catalogConfig, - optionalConfigMap(params, TABLE_CONF)); + action = new CompactAction(database, table, catalogConfig, tableConfig); if (params.has(PARTITION_IDLE_TIME)) { action.withPartitionIdleTime( TimeUtils.parseDuration(params.get(PARTITION_IDLE_TIME))); @@ -123,7 +111,6 @@ public void printHelp() { System.out.println( " compact --warehouse s3://path/to/warehouse --database " + "--table [--catalog_conf [--catalog_conf ...]]"); - System.out.println(" compact --path [--partition ]"); System.out.println(); System.out.println("Partition name syntax:"); @@ -142,8 +129,6 @@ public void printHelp() { System.out.println("Examples:"); System.out.println( " compact --warehouse hdfs:///path/to/warehouse --database test_db --table test_table"); - System.out.println( - " compact --path hdfs:///path/to/warehouse/test_db.db/test_table --partition dt=20221126,hh=08"); System.out.println( " compact --warehouse hdfs:///path/to/warehouse --database test_db --table test_table " + "--partition dt=20221126,hh=08 --partition dt=20221127,hh=09"); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java index 124d3ca68776..457e04bfd8ff 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java @@ -76,8 +76,8 @@ public class CompactDatabaseAction extends ActionBase { private boolean isStreaming; - public CompactDatabaseAction(String warehouse, Map catalogConfig) { - super(warehouse, catalogConfig); + public CompactDatabaseAction(Map catalogConfig) { + super(catalogConfig); } public CompactDatabaseAction includingDatabases(@Nullable String includingDatabases) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseActionFactory.java index 5672f99dc30f..8fa5a1cf46ec 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseActionFactory.java @@ -42,10 +42,7 @@ public String identifier() { @Override public Optional create(MultipleParameterToolAdapter params) { - CompactDatabaseAction action = - new CompactDatabaseAction( - getRequiredValue(params, WAREHOUSE), - optionalConfigMap(params, CATALOG_CONF)); + CompactDatabaseAction action = new CompactDatabaseAction(catalogConfigMap(params)); action.includingDatabases(params.get(INCLUDING_DATABASES)) .includingTables(params.get(INCLUDING_TABLES)) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateBranchAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateBranchAction.java index aa8cc697ae36..c8eb1d83a41b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateBranchAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateBranchAction.java @@ -28,13 +28,12 @@ public class CreateBranchAction extends TableActionBase { private final String tagName; public CreateBranchAction( - String warehouse, String databaseName, String tableName, Map catalogConfig, String branchName, String tagName) { - super(warehouse, databaseName, tableName, catalogConfig); + super(databaseName, tableName, catalogConfig); this.branchName = branchName; this.tagName = tagName; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateBranchActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateBranchActionFactory.java index d1071d0870ad..a95a1d5a36ae 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateBranchActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateBranchActionFactory.java @@ -18,9 +18,6 @@ package org.apache.paimon.flink.action; -import org.apache.flink.api.java.tuple.Tuple3; - -import java.util.Map; import java.util.Optional; /** Factory to create {@link CreateBranchAction}. */ @@ -38,26 +35,13 @@ public String identifier() { @Override public Optional create(MultipleParameterToolAdapter params) { - checkRequiredArgument(params, BRANCH_NAME); - - Tuple3 tablePath = getTablePath(params); - Map catalogConfig = optionalConfigMap(params, CATALOG_CONF); - - String tagName = null; - if (params.has(TAG_NAME)) { - tagName = params.get(TAG_NAME); - } - - String branchName = params.get(BRANCH_NAME); - CreateBranchAction action = new CreateBranchAction( - tablePath.f0, - tablePath.f1, - tablePath.f2, - catalogConfig, - branchName, - tagName); + params.getRequired(DATABASE), + params.getRequired(TABLE), + catalogConfigMap(params), + params.getRequired(BRANCH_NAME), + params.get(TAG_NAME)); return Optional.of(action); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateOrReplaceTagActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateOrReplaceTagActionFactory.java index fecb6895b682..d6ff60c3fabf 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateOrReplaceTagActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateOrReplaceTagActionFactory.java @@ -20,8 +20,6 @@ import org.apache.paimon.utils.TimeUtils; -import org.apache.flink.api.java.tuple.Tuple3; - import java.time.Duration; import java.util.Map; import java.util.Optional; @@ -35,11 +33,8 @@ public abstract class CreateOrReplaceTagActionFactory implements ActionFactory { @Override public Optional create(MultipleParameterToolAdapter params) { - checkRequiredArgument(params, TAG_NAME); - - Tuple3 tablePath = getTablePath(params); - Map catalogConfig = optionalConfigMap(params, CATALOG_CONF); - String tagName = params.get(TAG_NAME); + Map catalogConfig = catalogConfigMap(params); + String tagName = params.getRequired(TAG_NAME); Long snapshot = null; if (params.has(SNAPSHOT)) { @@ -53,11 +48,17 @@ public Optional create(MultipleParameterToolAdapter params) { return Optional.of( createOrReplaceTagAction( - tablePath, catalogConfig, tagName, snapshot, timeRetained)); + params.getRequired(DATABASE), + params.getRequired(TABLE), + catalogConfig, + tagName, + snapshot, + timeRetained)); } abstract Action createOrReplaceTagAction( - Tuple3 tablePath, + String database, + String table, Map catalogConfig, String tagName, Long snapshot, diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagAction.java index cfc9b558b40c..7628fda10618 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagAction.java @@ -31,14 +31,13 @@ public class CreateTagAction extends TableActionBase { private final @Nullable Duration timeRetained; public CreateTagAction( - String warehouse, String databaseName, String tableName, Map catalogConfig, String tagName, @Nullable Long snapshotId, @Nullable Duration timeRetained) { - super(warehouse, databaseName, tableName, catalogConfig); + super(databaseName, tableName, catalogConfig); this.tagName = tagName; this.timeRetained = timeRetained; this.snapshotId = snapshotId; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java index c525943122bc..76669d31401a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java @@ -18,8 +18,6 @@ package org.apache.paimon.flink.action; -import org.apache.flink.api.java.tuple.Tuple3; - import java.time.Duration; import java.util.Map; @@ -35,19 +33,13 @@ public String identifier() { @Override Action createOrReplaceTagAction( - Tuple3 tablePath, + String database, + String table, Map catalogConfig, String tagName, Long snapshot, Duration timeRetained) { - return new CreateTagAction( - tablePath.f0, - tablePath.f1, - tablePath.f2, - catalogConfig, - tagName, - snapshot, - timeRetained); + return new CreateTagAction(database, table, catalogConfig, tagName, snapshot, timeRetained); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagFromTimestampAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagFromTimestampAction.java index 632c41fc0f66..14a4e01fd1ea 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagFromTimestampAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagFromTimestampAction.java @@ -26,19 +26,22 @@ /** Create tag from timestamp action for Flink. */ public class CreateTagFromTimestampAction extends ActionBase { + + private final String database; private final String table; private final String tag; private final Long timestamp; private final String timeRetained; public CreateTagFromTimestampAction( - String warehouse, + String database, String table, String tag, Long timestamp, String timeRetained, Map catalogConfig) { - super(warehouse, catalogConfig); + super(catalogConfig); + this.database = database; this.table = table; this.tag = tag; this.timestamp = timestamp; @@ -51,6 +54,10 @@ public void run() throws Exception { new CreateTagFromTimestampProcedure(); createTagFromTimestampProcedure.withCatalog(catalog); createTagFromTimestampProcedure.call( - new DefaultProcedureContext(env), table, tag, timestamp, timeRetained); + new DefaultProcedureContext(env), + database + "." + table, + tag, + timestamp, + timeRetained); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagFromTimestampActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagFromTimestampActionFactory.java index 2d2fae73925d..0c5f477f30c5 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagFromTimestampActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagFromTimestampActionFactory.java @@ -25,8 +25,6 @@ public class CreateTagFromTimestampActionFactory implements ActionFactory { public static final String IDENTIFIER = "create_tag_from_timestamp"; - private static final String TABLE = "table"; - private static final String TAG = "tag"; private static final String TIMESTAMP = "timestamp"; @@ -40,16 +38,18 @@ public String identifier() { @Override public Optional create(MultipleParameterToolAdapter params) { - String warehouse = params.get(WAREHOUSE); - String table = params.get(TABLE); - String tag = params.get(TAG); Long timestamp = Long.parseLong(params.get(TIMESTAMP)); String timeRetained = params.get(TIME_RETAINED); - Map catalogConfig = optionalConfigMap(params, CATALOG_CONF); + Map catalogConfig = catalogConfigMap(params); CreateTagFromTimestampAction createTagFromTimestampAction = new CreateTagFromTimestampAction( - warehouse, table, tag, timestamp, timeRetained, catalogConfig); + params.getRequired(DATABASE), + params.getRequired(TABLE), + params.getRequired(TAG), + timestamp, + timeRetained, + catalogConfig); return Optional.of(createTagFromTimestampAction); } @@ -61,7 +61,8 @@ public void printHelp() { System.out.println("Syntax:"); System.out.println( " create_tag_from_timestamp --warehouse " - + "--table " + + "--database " + + "--table " + "--tag " + "--timestamp " + "[--timeRetained ] " diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagFromWatermarkAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagFromWatermarkAction.java index 8afa54082f29..af75865eef02 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagFromWatermarkAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagFromWatermarkAction.java @@ -26,19 +26,22 @@ /** Create tag from watermark action for Flink. */ public class CreateTagFromWatermarkAction extends ActionBase { + + private final String database; private final String table; private final String tag; private final Long watermark; private final String timeRetained; public CreateTagFromWatermarkAction( - String warehouse, + String database, String table, String tag, Long watermark, String timeRetained, Map catalogConfig) { - super(warehouse, catalogConfig); + super(catalogConfig); + this.database = database; this.table = table; this.tag = tag; this.watermark = watermark; @@ -51,6 +54,10 @@ public void run() throws Exception { new CreateTagFromWatermarkProcedure(); createTagFromWatermarkProcedure.withCatalog(catalog); createTagFromWatermarkProcedure.call( - new DefaultProcedureContext(env), table, tag, watermark, timeRetained); + new DefaultProcedureContext(env), + database + "." + table, + tag, + watermark, + timeRetained); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagFromWatermarkActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagFromWatermarkActionFactory.java index 1fb86bde7ebb..282ce67246b3 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagFromWatermarkActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagFromWatermarkActionFactory.java @@ -26,8 +26,6 @@ public class CreateTagFromWatermarkActionFactory implements ActionFactory { public static final String IDENTIFIER = "create_tag_from_watermark"; - private static final String TABLE = "table"; - private static final String TAG = "tag"; private static final String WATERMARK = "watermark"; @@ -41,16 +39,19 @@ public String identifier() { @Override public Optional create(MultipleParameterToolAdapter params) { - String warehouse = params.get(WAREHOUSE); - String table = params.get(TABLE); String tag = params.get(TAG); Long watermark = Long.parseLong(params.get(WATERMARK)); String timeRetained = params.get(TIME_RETAINED); - Map catalogConfig = optionalConfigMap(params, CATALOG_CONF); + Map catalogConfig = catalogConfigMap(params); CreateTagFromWatermarkAction createTagFromWatermarkAction = new CreateTagFromWatermarkAction( - warehouse, table, tag, watermark, timeRetained, catalogConfig); + params.getRequired(DATABASE), + params.getRequired(TABLE), + tag, + watermark, + timeRetained, + catalogConfig); return Optional.of(createTagFromWatermarkAction); } @@ -62,7 +63,8 @@ public void printHelp() { System.out.println("Syntax:"); System.out.println( " create_tag_from_watermark --warehouse " - + "--table " + + "--database " + + "--table " + "--tag " + "--watermark " + "[--timeRetained ] " diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteAction.java index c275ce6f1fe0..ed9423cf528c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteAction.java @@ -44,12 +44,11 @@ public class DeleteAction extends TableActionBase { private final String filter; public DeleteAction( - String warehouse, String databaseName, String tableName, String filter, Map catalogConfig) { - super(warehouse, databaseName, tableName, catalogConfig); + super(databaseName, tableName, catalogConfig); this.filter = filter; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteActionFactory.java index 7f8f63f7f601..1f116e6097be 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteActionFactory.java @@ -18,9 +18,6 @@ package org.apache.paimon.flink.action; -import org.apache.flink.api.java.tuple.Tuple3; - -import java.util.Map; import java.util.Optional; /** Factory to create {@link DeleteAction}. */ @@ -37,18 +34,18 @@ public String identifier() { @Override public Optional create(MultipleParameterToolAdapter params) { - Tuple3 tablePath = getTablePath(params); - String filter = params.get(WHERE); if (filter == null) { throw new IllegalArgumentException( "Please specify deletion filter. If you want to delete all records, please use overwrite (see doc)."); } - Map catalogConfig = optionalConfigMap(params, CATALOG_CONF); - DeleteAction action = - new DeleteAction(tablePath.f0, tablePath.f1, tablePath.f2, filter, catalogConfig); + new DeleteAction( + params.getRequired(DATABASE), + params.getRequired(TABLE), + filter, + catalogConfigMap(params)); return Optional.of(action); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteBranchAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteBranchAction.java index 7373f8fff773..9cec62b99a53 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteBranchAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteBranchAction.java @@ -26,12 +26,11 @@ public class DeleteBranchAction extends TableActionBase { private final String branchNames; public DeleteBranchAction( - String warehouse, String databaseName, String tableName, Map catalogConfig, String branchNames) { - super(warehouse, databaseName, tableName, catalogConfig); + super(databaseName, tableName, catalogConfig); this.branchNames = branchNames; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteBranchActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteBranchActionFactory.java index 33f1c7990683..eedc16023437 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteBranchActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteBranchActionFactory.java @@ -18,9 +18,6 @@ package org.apache.paimon.flink.action; -import org.apache.flink.api.java.tuple.Tuple3; - -import java.util.Map; import java.util.Optional; /** Factory to create {@link DeleteBranchAction}. */ @@ -37,15 +34,12 @@ public String identifier() { @Override public Optional create(MultipleParameterToolAdapter params) { - checkRequiredArgument(params, BRANCH_NAME); - - Tuple3 tablePath = getTablePath(params); - Map catalogConfig = optionalConfigMap(params, CATALOG_CONF); - String branchName = params.get(BRANCH_NAME); - DeleteBranchAction action = new DeleteBranchAction( - tablePath.f0, tablePath.f1, tablePath.f2, catalogConfig, branchName); + params.getRequired(DATABASE), + params.getRequired(TABLE), + catalogConfigMap(params), + params.getRequired(BRANCH_NAME)); return Optional.of(action); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteTagAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteTagAction.java index 73cf21033d64..3e73a7f18265 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteTagAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteTagAction.java @@ -26,12 +26,11 @@ public class DeleteTagAction extends TableActionBase { private final String tagNameStr; public DeleteTagAction( - String warehouse, String databaseName, String tableName, Map catalogConfig, String tagNameStr) { - super(warehouse, databaseName, tableName, catalogConfig); + super(databaseName, tableName, catalogConfig); this.tagNameStr = tagNameStr; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteTagActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteTagActionFactory.java index 56b724c0ee29..fcb3c77a359a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteTagActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteTagActionFactory.java @@ -18,9 +18,6 @@ package org.apache.paimon.flink.action; -import org.apache.flink.api.java.tuple.Tuple3; - -import java.util.Map; import java.util.Optional; /** Factory to create {@link DeleteTagAction}. */ @@ -37,15 +34,12 @@ public String identifier() { @Override public Optional create(MultipleParameterToolAdapter params) { - checkRequiredArgument(params, TAG_NAME); - - Tuple3 tablePath = getTablePath(params); - Map catalogConfig = optionalConfigMap(params, CATALOG_CONF); - String tagName = params.get(TAG_NAME); - DeleteTagAction action = new DeleteTagAction( - tablePath.f0, tablePath.f1, tablePath.f2, catalogConfig, tagName); + params.getRequired(DATABASE), + params.getRequired(TABLE), + catalogConfigMap(params), + params.getRequired(TAG_NAME)); return Optional.of(action); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionAction.java index 2ead85001d6d..6f568dfe61bc 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionAction.java @@ -34,12 +34,11 @@ public class DropPartitionAction extends TableActionBase { private final FileStoreCommit commit; public DropPartitionAction( - String warehouse, String databaseName, String tableName, List> partitions, Map catalogConfig) { - super(warehouse, databaseName, tableName, catalogConfig); + super(databaseName, tableName, catalogConfig); if (!(table instanceof FileStoreTable)) { throw new UnsupportedOperationException( String.format( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionActionFactory.java index e4b29dde0758..e70dcc4a77d1 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionActionFactory.java @@ -18,12 +18,12 @@ package org.apache.paimon.flink.action; -import org.apache.flink.api.java.tuple.Tuple3; - import java.util.List; import java.util.Map; import java.util.Optional; +import static org.apache.paimon.utils.Preconditions.checkArgument; + /** Factory to create {@link DropPartitionAction}. */ public class DropPartitionActionFactory implements ActionFactory { @@ -36,16 +36,18 @@ public String identifier() { @Override public Optional create(MultipleParameterToolAdapter params) { - Tuple3 tablePath = getTablePath(params); - - checkRequiredArgument(params, PARTITION); + checkArgument( + params.has(PARTITION), + "Argument '%s' is required. Run ' --help' for help.", + PARTITION); List> partitions = getPartitions(params); - Map catalogConfig = optionalConfigMap(params, CATALOG_CONF); - return Optional.of( new DropPartitionAction( - tablePath.f0, tablePath.f1, tablePath.f2, partitions, catalogConfig)); + params.getRequired(DATABASE), + params.getRequired(TABLE), + partitions, + catalogConfigMap(params))); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java index 0fa17e1a8ddb..0dc96bd93a7e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java @@ -37,7 +37,6 @@ public class ExpirePartitionsAction extends TableActionBase { private final PartitionExpire partitionExpire; public ExpirePartitionsAction( - String warehouse, String databaseName, String tableName, Map catalogConfig, @@ -45,7 +44,7 @@ public ExpirePartitionsAction( String timestampFormatter, String timestampPattern, String expireStrategy) { - super(warehouse, databaseName, tableName, catalogConfig); + super(databaseName, tableName, catalogConfig); if (!(table instanceof FileStoreTable)) { throw new UnsupportedOperationException( String.format( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsActionFactory.java index 3d0dfc265983..2cf63d3bc775 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsActionFactory.java @@ -18,9 +18,6 @@ package org.apache.paimon.flink.action; -import org.apache.flink.api.java.tuple.Tuple3; - -import java.util.Map; import java.util.Optional; /** Factory to create {@link ExpirePartitionsAction}. */ @@ -35,25 +32,16 @@ public String identifier() { @Override public Optional create(MultipleParameterToolAdapter params) { - Tuple3 tablePath = getTablePath(params); - - checkRequiredArgument(params, EXPIRATIONTIME); - checkRequiredArgument(params, TIMESTAMPFORMATTER); - String expirationTime = params.get(EXPIRATIONTIME); - String timestampFormatter = params.get(TIMESTAMPFORMATTER); String expireStrategy = params.get(EXPIRE_STRATEGY); String timestampPattern = params.get(TIMESTAMP_PATTERN); - Map catalogConfig = optionalConfigMap(params, CATALOG_CONF); - return Optional.of( new ExpirePartitionsAction( - tablePath.f0, - tablePath.f1, - tablePath.f2, - catalogConfig, - expirationTime, - timestampFormatter, + params.getRequired(DATABASE), + params.getRequired(TABLE), + catalogConfigMap(params), + params.getRequired(EXPIRATIONTIME), + params.getRequired(TIMESTAMPFORMATTER), timestampPattern, expireStrategy)); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireSnapshotsAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireSnapshotsAction.java index 92848c804852..5ae5c486d7ce 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireSnapshotsAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireSnapshotsAction.java @@ -27,22 +27,24 @@ /** Expire snapshots action for Flink. */ public class ExpireSnapshotsAction extends ActionBase { - private final String identifier; + private final String database; + private final String table; private final Integer retainMax; private final Integer retainMin; private final String olderThan; private final Integer maxDeletes; public ExpireSnapshotsAction( - String warehouse, - String identifier, + String database, + String table, Map catalogConfig, Integer retainMax, Integer retainMin, String olderThan, Integer maxDeletes) { - super(warehouse, catalogConfig); - this.identifier = identifier; + super(catalogConfig); + this.database = database; + this.table = table; this.retainMax = retainMax; this.retainMin = retainMin; this.olderThan = olderThan; @@ -54,7 +56,7 @@ public void run() throws Exception { expireSnapshotsProcedure.withCatalog(catalog); expireSnapshotsProcedure.call( new DefaultProcedureContext(env), - identifier, + database + "." + table, retainMax, retainMin, olderThan, diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireSnapshotsActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireSnapshotsActionFactory.java index 1f29d3a71326..24e25844718a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireSnapshotsActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireSnapshotsActionFactory.java @@ -18,7 +18,6 @@ package org.apache.paimon.flink.action; -import java.util.Map; import java.util.Optional; /** Factory to create {@link ExpireSnapshotsAction}. */ @@ -40,10 +39,6 @@ public String identifier() { @Override public Optional create(MultipleParameterToolAdapter params) { - String warehouse = params.get(WAREHOUSE); - Map catalogConfig = optionalConfigMap(params, CATALOG_CONF); - String identifier = params.get(IDENTIFIER_KEY); - Integer retainMax = params.has(RETAIN_MAX) ? Integer.parseInt(params.get(RETAIN_MAX)) : null; Integer retainMin = @@ -54,9 +49,9 @@ public Optional create(MultipleParameterToolAdapter params) { ExpireSnapshotsAction action = new ExpireSnapshotsAction( - warehouse, - identifier, - catalogConfig, + params.getRequired(DATABASE), + params.getRequired(TABLE), + catalogConfigMap(params), retainMax, retainMin, olderThan, @@ -72,6 +67,6 @@ public void printHelp() { System.out.println("Syntax:"); System.out.println( - " expire_snapshots --warehouse --identifier --retain_max --retain_min --older_than --max_delete "); + " expire_snapshots --warehouse --database --table --retain_max --retain_min --older_than --max_delete "); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireTagsAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireTagsAction.java index c1231ed3ad54..9e26e2121939 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireTagsAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireTagsAction.java @@ -27,12 +27,14 @@ /** Expire tags action for Flink. */ public class ExpireTagsAction extends ActionBase { + private final String database; private final String table; private final String olderThan; public ExpireTagsAction( - String warehouse, String table, String olderThan, Map catalogConfig) { - super(warehouse, catalogConfig); + String database, String table, String olderThan, Map catalogConfig) { + super(catalogConfig); + this.database = database; this.table = table; this.olderThan = olderThan; } @@ -41,6 +43,7 @@ public ExpireTagsAction( public void run() throws Exception { ExpireTagsProcedure expireTagsProcedure = new ExpireTagsProcedure(); expireTagsProcedure.withCatalog(catalog); - expireTagsProcedure.call(new DefaultProcedureContext(env), table, olderThan); + expireTagsProcedure.call( + new DefaultProcedureContext(env), database + "." + table, olderThan); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireTagsActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireTagsActionFactory.java index e9bbb0a3bdc7..01f7a2ecd633 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireTagsActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireTagsActionFactory.java @@ -18,7 +18,6 @@ package org.apache.paimon.flink.action; -import java.util.Map; import java.util.Optional; /** Factory to create {@link ExpireTagsAction}. */ @@ -35,13 +34,12 @@ public String identifier() { @Override public Optional create(MultipleParameterToolAdapter params) { - String warehouse = params.get(WAREHOUSE); - String table = params.get(TABLE); - String olderThan = params.get(OLDER_THAN); - Map catalogConfig = optionalConfigMap(params, CATALOG_CONF); - ExpireTagsAction expireTagsAction = - new ExpireTagsAction(warehouse, table, olderThan, catalogConfig); + new ExpireTagsAction( + params.getRequired(DATABASE), + params.getRequired(TABLE), + params.get(OLDER_THAN), + catalogConfigMap(params)); return Optional.of(expireTagsAction); } @@ -53,7 +51,8 @@ public void printHelp() { System.out.println("Syntax:"); System.out.println( " expire_tags --warehouse " - + "--table " + + "--database " + + "--table
" + "[--older_than ]"); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/FastForwardAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/FastForwardAction.java index b13c004c0d96..0aaa44e6e356 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/FastForwardAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/FastForwardAction.java @@ -25,12 +25,11 @@ public class FastForwardAction extends TableActionBase { private final String branchName; public FastForwardAction( - String warehouse, String databaseName, String tableName, Map catalogConfig, String branchName) { - super(warehouse, databaseName, tableName, catalogConfig); + super(databaseName, tableName, catalogConfig); this.branchName = branchName; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/FastForwardActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/FastForwardActionFactory.java index b1e2228c6592..9d984f3c9505 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/FastForwardActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/FastForwardActionFactory.java @@ -18,9 +18,6 @@ package org.apache.paimon.flink.action; -import org.apache.flink.api.java.tuple.Tuple3; - -import java.util.Map; import java.util.Optional; /** Factory to create {@link FastForwardAction}. */ @@ -37,15 +34,12 @@ public String identifier() { @Override public Optional create(MultipleParameterToolAdapter params) { - checkRequiredArgument(params, BRANCH_NAME); - - Tuple3 tablePath = getTablePath(params); - Map catalogConfig = optionalConfigMap(params, CATALOG_CONF); - String branchName = params.get(BRANCH_NAME); - FastForwardAction action = new FastForwardAction( - tablePath.f0, tablePath.f1, tablePath.f2, catalogConfig, branchName); + params.getRequired(DATABASE), + params.getRequired(TABLE), + catalogConfigMap(params), + params.getRequired(BRANCH_NAME)); return Optional.of(action); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java index 9fd906ee44dc..25cd14af2195 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java @@ -35,12 +35,11 @@ public class MarkPartitionDoneAction extends TableActionBase { private final List> partitions; public MarkPartitionDoneAction( - String warehouse, String databaseName, String tableName, List> partitions, Map catalogConfig) { - super(warehouse, databaseName, tableName, catalogConfig); + super(databaseName, tableName, catalogConfig); if (!(table instanceof FileStoreTable)) { throw new UnsupportedOperationException( String.format( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneActionFactory.java index 8de5a7130b14..78929e1b76c4 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneActionFactory.java @@ -18,12 +18,12 @@ package org.apache.paimon.flink.action; -import org.apache.flink.api.java.tuple.Tuple3; - import java.util.List; import java.util.Map; import java.util.Optional; +import static org.apache.paimon.utils.Preconditions.checkArgument; + /** Factory to create {@link MarkPartitionDoneAction}. */ public class MarkPartitionDoneActionFactory implements ActionFactory { @@ -36,16 +36,18 @@ public String identifier() { @Override public Optional create(MultipleParameterToolAdapter params) { - Tuple3 tablePath = getTablePath(params); - - checkRequiredArgument(params, PARTITION); + checkArgument( + params.has(PARTITION), + "Argument '%s' is required. Run ' --help' for help.", + PARTITION); List> partitions = getPartitions(params); - Map catalogConfig = optionalConfigMap(params, CATALOG_CONF); - return Optional.of( new MarkPartitionDoneAction( - tablePath.f0, tablePath.f1, tablePath.f2, partitions, catalogConfig)); + params.getRequired(DATABASE), + params.getRequired(TABLE), + partitions, + catalogConfigMap(params))); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java index f99b2297392c..1ecd23ea6246 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java @@ -120,16 +120,8 @@ public class MergeIntoAction extends TableActionBase { @Nullable private String notMatchedInsertCondition; @Nullable private String notMatchedInsertValues; - public MergeIntoAction(String warehouse, String database, String tableName) { - this(warehouse, database, tableName, Collections.emptyMap()); - } - - public MergeIntoAction( - String warehouse, - String database, - String tableName, - Map catalogConfig) { - super(warehouse, database, tableName, catalogConfig); + public MergeIntoAction(String database, String tableName, Map catalogConfig) { + super(database, tableName, catalogConfig); if (!(table instanceof FileStoreTable)) { throw new UnsupportedOperationException( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoActionFactory.java index 0ff0db3271ee..830c0708d911 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoActionFactory.java @@ -18,12 +18,9 @@ package org.apache.paimon.flink.action; -import org.apache.flink.api.java.tuple.Tuple3; - import java.util.Arrays; import java.util.Collection; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; @@ -62,12 +59,12 @@ public String identifier() { @Override public Optional create(MultipleParameterToolAdapter params) { - Tuple3 tablePath = getTablePath(params); - - Map catalogConfig = optionalConfigMap(params, CATALOG_CONF); MergeIntoAction action = - new MergeIntoAction(tablePath.f0, tablePath.f1, tablePath.f2, catalogConfig); + new MergeIntoAction( + params.getRequired(DATABASE), + params.getRequired(TABLE), + catalogConfigMap(params)); if (params.has(TARGET_AS)) { action.withTargetAlias(params.get(TARGET_AS)); @@ -78,26 +75,22 @@ public Optional create(MultipleParameterToolAdapter params) { action.withSourceSqls(sourceSqls.toArray(new String[0])); } - checkRequiredArgument(params, SOURCE_TABLE); - action.withSourceTable(params.get(SOURCE_TABLE)); + action.withSourceTable(params.getRequired(SOURCE_TABLE)); - checkRequiredArgument(params, ON); - action.withMergeCondition(params.get(ON)); + action.withMergeCondition(params.getRequired(ON)); List actions = Arrays.stream(params.get(MERGE_ACTIONS).split(",")) .map(String::trim) .collect(Collectors.toList()); if (actions.contains(MATCHED_UPSERT)) { - checkRequiredArgument(params, MATCHED_UPSERT_SET); action.withMatchedUpsert( - params.get(MATCHED_UPSERT_CONDITION), params.get(MATCHED_UPSERT_SET)); + params.get(MATCHED_UPSERT_CONDITION), params.getRequired(MATCHED_UPSERT_SET)); } if (actions.contains(NOT_MATCHED_BY_SOURCE_UPSERT)) { - checkRequiredArgument(params, NOT_MATCHED_BY_SOURCE_UPSERT_SET); action.withNotMatchedBySourceUpsert( params.get(NOT_MATCHED_BY_SOURCE_UPSERT_CONDITION), - params.get(NOT_MATCHED_BY_SOURCE_UPSERT_SET)); + params.getRequired(NOT_MATCHED_BY_SOURCE_UPSERT_SET)); } if (actions.contains(MATCHED_DELETE)) { action.withMatchedDelete(params.get(MATCHED_DELETE_CONDITION)); @@ -106,10 +99,9 @@ public Optional create(MultipleParameterToolAdapter params) { action.withNotMatchedBySourceDelete(params.get(NOT_MATCHED_BY_SOURCE_DELETE_CONDITION)); } if (actions.contains(NOT_MATCHED_INSERT)) { - checkRequiredArgument(params, NOT_MATCHED_INSERT_VALUES); action.withNotMatchedInsert( params.get(NOT_MATCHED_INSERT_CONDITION), - params.get(NOT_MATCHED_INSERT_VALUES)); + params.getRequired(NOT_MATCHED_INSERT_VALUES)); } action.validate(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateDatabaseAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateDatabaseAction.java index 6a8afd206091..c7f5445c19be 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateDatabaseAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateDatabaseAction.java @@ -33,12 +33,11 @@ public class MigrateDatabaseAction extends ActionBase { public MigrateDatabaseAction( String connector, - String warehouse, String hiveDatabaseName, Map catalogConfig, String tableProperties, Integer parallelism) { - super(warehouse, catalogConfig); + super(catalogConfig); this.connector = connector; this.hiveDatabaseName = hiveDatabaseName; this.tableProperties = tableProperties; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateDatabaseActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateDatabaseActionFactory.java index 8ce33f58e644..33a4524ed701 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateDatabaseActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateDatabaseActionFactory.java @@ -37,21 +37,15 @@ public String identifier() { @Override public Optional create(MultipleParameterToolAdapter params) { - String warehouse = params.get(WAREHOUSE); String connector = params.get(SOURCE_TYPE); String sourceHiveDatabase = params.get(DATABASE); - Map catalogConfig = optionalConfigMap(params, CATALOG_CONF); + Map catalogConfig = catalogConfigMap(params); String tableConf = params.get(OPTIONS); Integer parallelism = Integer.parseInt(params.get(PARALLELISM)); MigrateDatabaseAction migrateDatabaseAction = new MigrateDatabaseAction( - connector, - warehouse, - sourceHiveDatabase, - catalogConfig, - tableConf, - parallelism); + connector, sourceHiveDatabase, catalogConfig, tableConf, parallelism); return Optional.of(migrateDatabaseAction); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileAction.java index 798d1d347732..e874536e788f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileAction.java @@ -36,14 +36,13 @@ public class MigrateFileAction extends ActionBase { public MigrateFileAction( String connector, - String warehouse, String sourceTable, String targetTable, boolean deleteOrigin, Map catalogConfig, String tableProperties, Integer parallelism) { - super(warehouse, catalogConfig); + super(catalogConfig); this.connector = connector; this.sourceTable = sourceTable; this.targetTable = targetTable; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileActionFactory.java index 34df99cfdf5b..c2d6aa2993ef 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileActionFactory.java @@ -44,19 +44,17 @@ public String identifier() { @Override public Optional create(MultipleParameterToolAdapter params) { - String warehouse = params.get(WAREHOUSE); String connector = params.get(SOURCE_TYPE); String sourceHiveTable = params.get(SOURCE_TABLE); String targetTable = params.get(TARGET_TABLE); boolean deleteOrigin = Boolean.parseBoolean(params.get(DELETE_ORIGIN)); - Map catalogConfig = optionalConfigMap(params, CATALOG_CONF); + Map catalogConfig = catalogConfigMap(params); String tableConf = params.get(OPTIONS); Integer parallelism = Integer.parseInt(params.get(PARALLELISM)); MigrateFileAction migrateFileAction = new MigrateFileAction( connector, - warehouse, sourceHiveTable, targetTable, deleteOrigin, diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java index 8a4efdfc710d..b12cb5f862fe 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java @@ -34,12 +34,11 @@ public class MigrateTableAction extends ActionBase { public MigrateTableAction( String connector, - String warehouse, String hiveTableFullName, Map catalogConfig, String tableProperties, Integer parallelism) { - super(warehouse, catalogConfig); + super(catalogConfig); this.connector = connector; this.hiveTableFullName = hiveTableFullName; this.tableProperties = tableProperties; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java index a1a93bc91163..4ff060fb4665 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java @@ -37,21 +37,15 @@ public String identifier() { @Override public Optional create(MultipleParameterToolAdapter params) { - String warehouse = params.get(WAREHOUSE); String connector = params.get(SOURCE_TYPE); String sourceHiveTable = params.get(TABLE); - Map catalogConfig = optionalConfigMap(params, CATALOG_CONF); + Map catalogConfig = catalogConfigMap(params); String tableConf = params.get(OPTIONS); Integer parallelism = Integer.parseInt(params.get(PARALLELISM)); MigrateTableAction migrateTableAction = new MigrateTableAction( - connector, - warehouse, - sourceHiveTable, - catalogConfig, - tableConf, - parallelism); + connector, sourceHiveTable, catalogConfig, tableConf, parallelism); return Optional.of(migrateTableAction); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MultipleParameterToolAdapter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MultipleParameterToolAdapter.java index e03b8cd698f2..7755cf7b40af 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MultipleParameterToolAdapter.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MultipleParameterToolAdapter.java @@ -48,4 +48,15 @@ public Collection getMultiParameter(String key) { public String fallback(String key) { return key.replaceAll("_", "-"); } + + public String getRequired(String key) { + String value = get(key); + if (value == null) { + throw new IllegalArgumentException( + "Argument '" + + key + + "' is required. Run ' --help' for more information."); + } + return value; + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/QueryServiceActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/QueryServiceActionFactory.java index fecbc91e6270..060933676dbb 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/QueryServiceActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/QueryServiceActionFactory.java @@ -20,8 +20,6 @@ import org.apache.paimon.flink.service.QueryService; -import org.apache.flink.api.java.tuple.Tuple3; - import java.util.Map; import java.util.Optional; @@ -39,13 +37,13 @@ public String identifier() { @Override public Optional create(MultipleParameterToolAdapter params) { - Tuple3 tablePath = getTablePath(params); - Map catalogConfig = optionalConfigMap(params, CATALOG_CONF); + Map catalogConfig = catalogConfigMap(params); Map tableConfig = optionalConfigMap(params, TABLE_CONF); String parallStr = params.get(PARALLELISM); int parallelism = parallStr == null ? 1 : Integer.parseInt(parallStr); Action action = - new TableActionBase(tablePath.f0, tablePath.f1, tablePath.f2, catalogConfig) { + new TableActionBase( + params.getRequired(DATABASE), params.getRequired(TABLE), catalogConfig) { @Override public void run() throws Exception { QueryService.build(env, table.copy(tableConfig), parallelism); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesAction.java index 5c7e6967c319..318089b30be5 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesAction.java @@ -37,12 +37,11 @@ public class RemoveOrphanFilesAction extends ActionBase { private boolean dryRun = false; public RemoveOrphanFilesAction( - String warehouse, String databaseName, @Nullable String tableName, @Nullable String parallelism, Map catalogConfig) { - super(warehouse, catalogConfig); + super(catalogConfig); this.databaseName = databaseName; this.tableName = tableName; this.parallelism = parallelism; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionFactory.java index ed567510d143..279bc87baf65 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionFactory.java @@ -18,11 +18,8 @@ package org.apache.paimon.flink.action; -import java.util.Map; import java.util.Optional; -import static org.apache.paimon.utils.Preconditions.checkNotNull; - /** Factory to create {@link RemoveOrphanFilesAction}. */ public class RemoveOrphanFilesActionFactory implements ActionFactory { @@ -38,16 +35,12 @@ public String identifier() { @Override public Optional create(MultipleParameterToolAdapter params) { - String warehouse = params.get(WAREHOUSE); - checkNotNull(warehouse); - String database = params.get(DATABASE); - checkNotNull(database); - String table = params.get(TABLE); - String parallelism = params.get(PARALLELISM); - - Map catalogConfig = optionalConfigMap(params, CATALOG_CONF); RemoveOrphanFilesAction action = - new RemoveOrphanFilesAction(warehouse, database, table, parallelism, catalogConfig); + new RemoveOrphanFilesAction( + params.getRequired(DATABASE), + params.getRequired(TABLE), + params.get(PARALLELISM), + catalogConfigMap(params)); if (params.has(OLDER_THAN)) { action.olderThan(params.get(OLDER_THAN)); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RenameTagAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RenameTagAction.java index 3b12b4c119a1..69ef7aa12a22 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RenameTagAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RenameTagAction.java @@ -28,13 +28,12 @@ public class RenameTagAction extends TableActionBase { private final String targetTagName; public RenameTagAction( - String warehouse, String databaseName, String tableName, Map catalogConfig, String tagName, String targetTagName) { - super(warehouse, databaseName, tableName, catalogConfig); + super(databaseName, tableName, catalogConfig); this.tagName = tagName; this.targetTagName = targetTagName; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RenameTagActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RenameTagActionFactory.java index 84f174d39bbb..9f43146cfa05 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RenameTagActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RenameTagActionFactory.java @@ -18,9 +18,6 @@ package org.apache.paimon.flink.action; -import org.apache.flink.api.java.tuple.Tuple3; - -import java.util.Map; import java.util.Optional; /** Factory to create {@link RenameTagActionFactory}. */ @@ -38,22 +35,13 @@ public String identifier() { @Override public Optional create(MultipleParameterToolAdapter params) { - checkRequiredArgument(params, TAG_NAME); - checkRequiredArgument(params, TARGET_TAG_NAME); - - Tuple3 tablePath = getTablePath(params); - Map catalogConfig = optionalConfigMap(params, CATALOG_CONF); - String tagName = params.get(TAG_NAME); - String targetTagName = params.get(TARGET_TAG_NAME); - RenameTagAction action = new RenameTagAction( - tablePath.f0, - tablePath.f1, - tablePath.f2, - catalogConfig, - tagName, - targetTagName); + params.getRequired(DATABASE), + params.getRequired(TABLE), + catalogConfigMap(params), + params.getRequired(TAG_NAME), + params.getRequired(TARGET_TAG_NAME)); return Optional.of(action); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RepairAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RepairAction.java index 69fe5b409796..52a068777b36 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RepairAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RepairAction.java @@ -29,8 +29,8 @@ public class RepairAction extends ActionBase { private final String identifier; - public RepairAction(String warehouse, String identifier, Map catalogConfig) { - super(warehouse, catalogConfig); + public RepairAction(String identifier, Map catalogConfig) { + super(catalogConfig); this.identifier = identifier; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RepairActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RepairActionFactory.java index 2f201f675cc8..6431e00d2357 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RepairActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RepairActionFactory.java @@ -18,7 +18,6 @@ package org.apache.paimon.flink.action; -import java.util.Map; import java.util.Optional; /** Factory to create {@link RepairAction}. */ @@ -35,10 +34,8 @@ public String identifier() { @Override public Optional create(MultipleParameterToolAdapter params) { - String warehouse = params.get(WAREHOUSE); - Map catalogConfig = optionalConfigMap(params, CATALOG_CONF); String identifier = params.get(IDENTIFIER_KEY); - RepairAction action = new RepairAction(warehouse, identifier, catalogConfig); + RepairAction action = new RepairAction(identifier, catalogConfigMap(params)); return Optional.of(action); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ReplaceTagAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ReplaceTagAction.java index 09a85fe8a25a..a2b81bcc1e25 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ReplaceTagAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ReplaceTagAction.java @@ -31,14 +31,13 @@ public class ReplaceTagAction extends TableActionBase { private final @Nullable Duration timeRetained; public ReplaceTagAction( - String warehouse, String databaseName, String tableName, Map catalogConfig, String tagName, @Nullable Long snapshotId, @Nullable Duration timeRetained) { - super(warehouse, databaseName, tableName, catalogConfig); + super(databaseName, tableName, catalogConfig); this.tagName = tagName; this.timeRetained = timeRetained; this.snapshotId = snapshotId; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ReplaceTagActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ReplaceTagActionFactory.java index a734e9cfbdc5..147e47af6dda 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ReplaceTagActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ReplaceTagActionFactory.java @@ -18,8 +18,6 @@ package org.apache.paimon.flink.action; -import org.apache.flink.api.java.tuple.Tuple3; - import java.time.Duration; import java.util.Map; @@ -35,19 +33,14 @@ public String identifier() { @Override Action createOrReplaceTagAction( - Tuple3 tablePath, + String database, + String table, Map catalogConfig, String tagName, Long snapshot, Duration timeRetained) { return new ReplaceTagAction( - tablePath.f0, - tablePath.f1, - tablePath.f2, - catalogConfig, - tagName, - snapshot, - timeRetained); + database, table, catalogConfig, tagName, snapshot, timeRetained); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerAction.java index 6db8ab4fef75..9c8f800650e6 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerAction.java @@ -32,12 +32,11 @@ public class ResetConsumerAction extends TableActionBase { private Long nextSnapshotId; protected ResetConsumerAction( - String warehouse, String databaseName, String tableName, Map catalogConfig, String consumerId) { - super(warehouse, databaseName, tableName, catalogConfig); + super(databaseName, tableName, catalogConfig); this.consumerId = consumerId; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerActionFactory.java index acd85ae60167..dd93d2c7770c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerActionFactory.java @@ -18,9 +18,6 @@ package org.apache.paimon.flink.action; -import org.apache.flink.api.java.tuple.Tuple3; - -import java.util.Map; import java.util.Optional; /** Factory to create {@link ResetConsumerAction}. */ @@ -38,15 +35,12 @@ public String identifier() { @Override public Optional create(MultipleParameterToolAdapter params) { - checkRequiredArgument(params, CONSUMER_ID); - - Tuple3 tablePath = getTablePath(params); - Map catalogConfig = optionalConfigMap(params, CATALOG_CONF); - String consumerId = params.get(CONSUMER_ID); - ResetConsumerAction action = new ResetConsumerAction( - tablePath.f0, tablePath.f1, tablePath.f2, catalogConfig, consumerId); + params.getRequired(DATABASE), + params.getRequired(TABLE), + catalogConfigMap(params), + params.getRequired(CONSUMER_ID)); if (params.has(NEXT_SNAPSHOT)) { action.withNextSnapshotIds(Long.parseLong(params.get(NEXT_SNAPSHOT))); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RewriteFileIndexAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RewriteFileIndexAction.java index f8ed73fd2025..dd2544093ead 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RewriteFileIndexAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RewriteFileIndexAction.java @@ -29,9 +29,8 @@ public class RewriteFileIndexAction extends ActionBase { private String identifier; - public RewriteFileIndexAction( - String warehouse, String identifier, Map catalogConfig) { - super(warehouse, catalogConfig); + public RewriteFileIndexAction(String identifier, Map catalogConfig) { + super(catalogConfig); this.identifier = identifier; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RewriteFileIndexActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RewriteFileIndexActionFactory.java index 910e7b8d9280..9cb178025471 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RewriteFileIndexActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RewriteFileIndexActionFactory.java @@ -35,12 +35,10 @@ public String identifier() { @Override public Optional create(MultipleParameterToolAdapter params) { - String warehouse = params.get(WAREHOUSE); - Map catalogConfig = optionalConfigMap(params, CATALOG_CONF); + Map catalogConfig = catalogConfigMap(params); String identifier = params.get(IDENTIFIER_KEY); - RewriteFileIndexAction action = - new RewriteFileIndexAction(warehouse, identifier, catalogConfig); + RewriteFileIndexAction action = new RewriteFileIndexAction(identifier, catalogConfig); return Optional.of(action); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToAction.java index 431383d38215..8afc75aa3fde 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToAction.java @@ -33,12 +33,11 @@ public class RollbackToAction extends TableActionBase { private final String version; public RollbackToAction( - String warehouse, String databaseName, String tableName, String version, Map catalogConfig) { - super(warehouse, databaseName, tableName, catalogConfig); + super(databaseName, tableName, catalogConfig); this.version = version; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToActionFactory.java index 077c608acab5..1a6a33bb1165 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToActionFactory.java @@ -18,9 +18,6 @@ package org.apache.paimon.flink.action; -import org.apache.flink.api.java.tuple.Tuple3; - -import java.util.Map; import java.util.Optional; /** Factory to create {@link RollbackToAction}. */ @@ -37,16 +34,12 @@ public String identifier() { @Override public Optional create(MultipleParameterToolAdapter params) { - Tuple3 tablePath = getTablePath(params); - - checkRequiredArgument(params, VERSION); - String version = params.get(VERSION); - - Map catalogConfig = optionalConfigMap(params, CATALOG_CONF); - RollbackToAction action = new RollbackToAction( - tablePath.f0, tablePath.f1, tablePath.f2, version, catalogConfig); + params.getRequired(DATABASE), + params.getRequired(TABLE), + params.getRequired(VERSION), + catalogConfigMap(params)); return Optional.of(action); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToTimestampAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToTimestampAction.java index ca706101cc1d..3a91b8a65a77 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToTimestampAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToTimestampAction.java @@ -36,12 +36,11 @@ public class RollbackToTimestampAction extends TableActionBase { private final Long timestamp; public RollbackToTimestampAction( - String warehouse, String databaseName, String tableName, Long timestamp, Map catalogConfig) { - super(warehouse, databaseName, tableName, catalogConfig); + super(databaseName, tableName, catalogConfig); this.timestamp = timestamp; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToTimestampActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToTimestampActionFactory.java index c694ac0041b5..f4462a160451 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToTimestampActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToTimestampActionFactory.java @@ -18,9 +18,6 @@ package org.apache.paimon.flink.action; -import org.apache.flink.api.java.tuple.Tuple3; - -import java.util.Map; import java.util.Optional; /** Factory to create {@link RollbackToTimestampAction}. */ @@ -37,20 +34,12 @@ public String identifier() { @Override public Optional create(MultipleParameterToolAdapter params) { - Tuple3 tablePath = getTablePath(params); - - checkRequiredArgument(params, TIMESTAMP); - String timestamp = params.get(TIMESTAMP); - - Map catalogConfig = optionalConfigMap(params, CATALOG_CONF); - RollbackToTimestampAction action = new RollbackToTimestampAction( - tablePath.f0, - tablePath.f1, - tablePath.f2, - Long.parseLong(timestamp), - catalogConfig); + params.getRequired(DATABASE), + params.getRequired(TABLE), + Long.parseLong(params.getRequired(TIMESTAMP)), + catalogConfigMap(params)); return Optional.of(action); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java index 2b12aa7a0430..eb545ea4377e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java @@ -51,12 +51,11 @@ public class SortCompactAction extends CompactAction { private List orderColumns; public SortCompactAction( - String warehouse, String database, String tableName, Map catalogConfig, Map tableConf) { - super(warehouse, database, tableName, catalogConfig, tableConf); + super(database, tableName, catalogConfig, tableConf); table = table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "true")); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java index c525b133b949..870d22012bf2 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java @@ -41,12 +41,8 @@ public abstract class TableActionBase extends ActionBase { protected Table table; protected final Identifier identifier; - TableActionBase( - String warehouse, - String databaseName, - String tableName, - Map catalogConfig) { - super(warehouse, catalogConfig); + TableActionBase(String databaseName, String tableName, Map catalogConfig) { + super(catalogConfig); identifier = new Identifier(databaseName, tableName); try { table = catalog.getTable(identifier); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CloneProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CloneProcedure.java index 8b3bc99567c8..11265f925e66 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CloneProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CloneProcedure.java @@ -19,19 +19,25 @@ package org.apache.paimon.flink.procedure; import org.apache.paimon.flink.action.CloneAction; +import org.apache.paimon.utils.StringUtils; import org.apache.flink.table.annotation.ArgumentHint; import org.apache.flink.table.annotation.DataTypeHint; import org.apache.flink.table.annotation.ProcedureHint; import org.apache.flink.table.procedure.ProcedureContext; +import java.util.Map; + /** Clone Procedure. */ public class CloneProcedure extends ProcedureBase { public static final String IDENTIFIER = "clone"; @ProcedureHint( argument = { - @ArgumentHint(name = "warehouse", type = @DataTypeHint("STRING")), + @ArgumentHint( + name = "warehouse", + type = @DataTypeHint("STRING"), + isOptional = true), @ArgumentHint(name = "database", type = @DataTypeHint("STRING"), isOptional = true), @ArgumentHint(name = "table", type = @DataTypeHint("STRING"), isOptional = true), @ArgumentHint( @@ -65,16 +71,26 @@ public String[] call( String targetCatalogConfigStr, Integer parallelismStr) throws Exception { + Map sourceCatalogConfig = optionalConfigMap(sourceCatalogConfigStr); + if (!StringUtils.isNullOrWhitespaceOnly(warehouse) + && !sourceCatalogConfig.containsKey("warehouse")) { + sourceCatalogConfig.put("warehouse", warehouse); + } + + Map targetCatalogConfig = optionalConfigMap(targetCatalogConfigStr); + if (!StringUtils.isNullOrWhitespaceOnly(warehouse) + && !targetCatalogConfig.containsKey("warehouse")) { + targetCatalogConfig.put("warehouse", targetWarehouse); + } + CloneAction cloneAction = new CloneAction( - warehouse, database, tableName, - optionalConfigMap(sourceCatalogConfigStr), - targetWarehouse, + sourceCatalogConfig, targetDatabase, targetTableName, - optionalConfigMap(targetCatalogConfigStr), + targetCatalogConfig, parallelismStr == null ? null : Integer.toString(parallelismStr)); return execute(procedureContext, cloneAction, "Clone Job"); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java index 80602b755aa5..eaa8b4baf604 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java @@ -101,10 +101,9 @@ public String[] call( String compactStrategy) throws Exception { partitionIdleTime = notnull(partitionIdleTime); - String warehouse = catalog.warehouse(); Map catalogOptions = catalog.options(); CompactDatabaseAction action = - new CompactDatabaseAction(warehouse, catalogOptions) + new CompactDatabaseAction(catalogOptions) .includingDatabases(nullable(includingDatabases)) .includingTables(nullable(includingTables)) .excludingTables(nullable(excludingTables)) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java index 282f5af34043..b80693b39ec9 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java @@ -77,7 +77,6 @@ public String[] call( String partitionIdleTime, String compactStrategy) throws Exception { - String warehouse = catalog.warehouse(); Map catalogOptions = catalog.options(); Map tableConf = isNullOrWhitespaceOnly(tableOptions) @@ -89,7 +88,6 @@ public String[] call( if (isNullOrWhitespaceOnly(orderStrategy) && isNullOrWhitespaceOnly(orderByColumns)) { action = new CompactAction( - warehouse, identifier.getDatabaseName(), identifier.getObjectName(), catalogOptions, @@ -109,7 +107,6 @@ public String[] call( "sort compact do not support 'partition_idle_time'."); action = new SortCompactAction( - warehouse, identifier.getDatabaseName(), identifier.getObjectName(), catalogOptions, diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java index e297c0bdbb4c..b5cf67a484d0 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java @@ -144,15 +144,11 @@ public String[] call( notMatchedBySourceUpsertSetting = notnull(notMatchedBySourceUpsertSetting); notMatchedBySourceDeleteCondition = notnull(notMatchedBySourceDeleteCondition); - String warehouse = catalog.warehouse(); Map catalogOptions = catalog.options(); Identifier identifier = Identifier.fromString(targetTableId); MergeIntoAction action = new MergeIntoAction( - warehouse, - identifier.getDatabaseName(), - identifier.getObjectName(), - catalogOptions); + identifier.getDatabaseName(), identifier.getObjectName(), catalogOptions); action.withTargetAlias(nullable(targetAlias)); if (!sourceSqls.isEmpty()) { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java index 4ee539c4fd27..4a90415c191a 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java @@ -172,7 +172,13 @@ public void testFullCompactionWithLongCheckpointInterval() throws Exception { .checkpointIntervalMs(2000) .build(); env.setParallelism(1); - new CompactAction(path, "default", "T").withStreamExecutionEnvironment(env).build(); + new CompactAction( + "default", + "T", + Collections.singletonMap("warehouse", path), + Collections.emptyMap()) + .withStreamExecutionEnvironment(env) + .build(); JobClient client = env.executeAsync(); // write records for a while @@ -834,7 +840,13 @@ private void testStandAloneFullCompactJobRandom( .parallelism(2) .allowRestart() .build(); - new CompactAction(path, "default", "T").withStreamExecutionEnvironment(env).build(); + new CompactAction( + "default", + "T", + Collections.singletonMap("warehouse", path), + Collections.emptyMap()) + .withStreamExecutionEnvironment(env) + .build(); env.executeAsync(); } @@ -873,7 +885,13 @@ private void testStandAloneLookupJobRandom( .allowRestart() .build(); env.setParallelism(2); - new CompactAction(path, "default", "T").withStreamExecutionEnvironment(env).build(); + new CompactAction( + "default", + "T", + Collections.singletonMap("warehouse", path), + Collections.emptyMap()) + .withStreamExecutionEnvironment(env) + .build(); env.executeAsync(); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java index dabb8bb2c990..f53686df07fb 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java @@ -285,10 +285,9 @@ public void testTableConf() throws Exception { createTable(); SortCompactAction sortCompactAction = new SortCompactAction( - warehouse, database, tableName, - Collections.emptyMap(), + Collections.singletonMap("warehouse", warehouse), Collections.singletonMap( FlinkConnectorOptions.SINK_PARALLELISM.key(), "20")) .withOrderStrategy("zorder") @@ -323,10 +322,9 @@ public void testSortCompactionOnEmptyData() throws Exception { createTable(); SortCompactAction sortCompactAction = new SortCompactAction( - warehouse, database, tableName, - Collections.emptyMap(), + Collections.singletonMap("warehouse", warehouse), Collections.emptyMap()) .withOrderStrategy("zorder") .withOrderColumns(Collections.singletonList("f0")); diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateDatabaseProcedureITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateDatabaseProcedureITCase.java index 0acb7b8398ed..6bd57e0129bc 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateDatabaseProcedureITCase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateDatabaseProcedureITCase.java @@ -251,14 +251,10 @@ public void testMigrateDatabaseAction(String format) throws Exception { Map catalogConf = new HashMap<>(); catalogConf.put("metastore", "hive"); catalogConf.put("uri", "thrift://localhost:" + PORT); + catalogConf.put( + "warehouse", System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname)); MigrateDatabaseAction migrateDatabaseAction = - new MigrateDatabaseAction( - "hive", - System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname), - "my_database", - catalogConf, - "", - 6); + new MigrateDatabaseAction("hive", "my_database", catalogConf, "", 6); migrateDatabaseAction.run(); tEnv.executeSql( diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java index a50bdca3d58e..7ffc2ae54f04 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java @@ -161,10 +161,11 @@ public void testMigrateFileAction(String format, boolean isNamedArgument) throws Map catalogConf = new HashMap<>(); catalogConf.put("metastore", "hive"); catalogConf.put("uri", "thrift://localhost:" + PORT); + catalogConf.put( + "warehouse", System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname)); MigrateFileAction migrateFileAction = new MigrateFileAction( "hive", - System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname), "default.hivetable02", "default.paimontable02", false, diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java index ca3b6c82e7d3..8d6ded69dc99 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java @@ -185,14 +185,10 @@ public void testMigrateAction(String format) throws Exception { Map catalogConf = new HashMap<>(); catalogConf.put("metastore", "hive"); catalogConf.put("uri", "thrift://localhost:" + PORT); + catalogConf.put( + "warehouse", System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname)); MigrateTableAction migrateTableAction = - new MigrateTableAction( - "hive", - System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname), - "default.hivetable", - catalogConf, - "", - 6); + new MigrateTableAction("hive", "default.hivetable", catalogConf, "", 6); migrateTableAction.run(); tEnv.executeSql( diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/RepairActionITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/RepairActionITCase.java index fdaa243dfa5d..5101425ded54 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/RepairActionITCase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/RepairActionITCase.java @@ -83,11 +83,9 @@ public void testRepairTableAction() throws Exception { Map catalogConf = new HashMap<>(); catalogConf.put("metastore", "hive"); catalogConf.put("uri", "thrift://localhost:" + PORT); - RepairAction repairAction = - new RepairAction( - System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname), - "test_db.t_repair_hive", - catalogConf); + catalogConf.put( + "warehouse", System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname)); + RepairAction repairAction = new RepairAction("test_db.t_repair_hive", catalogConf); repairAction.run(); List ret =