From c3102b4d5c4b0db4b52b7577702b133664c5f167 Mon Sep 17 00:00:00 2001 From: yuzelin <33053040+yuzelin@users.noreply.github.com> Date: Wed, 7 Jun 2023 19:13:46 +0800 Subject: [PATCH] [flink] All flink actions should parse catalog config (#1328) --- docs/content/how-to/writing-tables.md | 14 ++++--- docs/content/maintenance/manage-snapshots.md | 3 +- .../paimon/flink/FlinkActionITCase.java | 3 +- .../paimon/flink/FlinkActionITCase.java | 3 +- .../apache/paimon/flink/action/Action.java | 12 +++--- .../paimon/flink/action/ActionBase.java | 7 +--- .../paimon/flink/action/CompactAction.java | 18 ++++----- .../paimon/flink/action/DeleteAction.java | 16 ++++++-- .../flink/action/DropPartitionAction.java | 11 ++++-- .../paimon/flink/action/MergeIntoAction.java | 16 +++++++- .../paimon/flink/action/RollbackToAction.java | 18 +++++++-- .../paimon/flink/action/TableActionBase.java | 12 +++--- .../cdc/kafka/KafkaSyncTableAction.java | 28 +++----------- .../cdc/mysql/MySqlSyncDatabaseAction.java | 37 ++++++++++--------- .../cdc/mysql/MySqlSyncTableAction.java | 17 +++++---- .../flink/action/DeleteActionITCase.java | 6 ++- .../action/DropPartitionActionITCase.java | 9 ++++- .../flink/action/RollbackToActionITCase.java | 3 +- 18 files changed, 135 insertions(+), 98 deletions(-) diff --git a/docs/content/how-to/writing-tables.md b/docs/content/how-to/writing-tables.md index c461027e0296..8ef5c08e4c1c 100644 --- a/docs/content/how-to/writing-tables.md +++ b/docs/content/how-to/writing-tables.md @@ -230,9 +230,9 @@ Run the following command to submit a drop-partition job for the table. drop-partition \ --warehouse \ --database \ - --table - --partition - [--partition ...] + --table \ + [--partition [--partition ...]] \ + [--catalog-conf [--catalog-conf ...]] partition_spec: key1=value1,key2=value2... @@ -307,8 +307,9 @@ Run the following command to submit a 'delete' job for the table. delete \ --warehouse \ --database \ - --table - --where + --table \ + --where \ + [--catalog-conf [--catalog-conf ...]] filter_spec is equal to the 'WHERE' clause in SQL DELETE statement. Examples: age >= 18 AND age <= 60 @@ -418,7 +419,8 @@ Run the following command to submit a 'merge-into' job for the table. --not-matched-insert-values \ --not-matched-by-source-upsert-condition \ --not-matched-by-source-upsert-set \ - --not-matched-by-source-delete-condition + --not-matched-by-source-delete-condition \ + [--catalog-conf [--catalog-conf ...]] You can pass sqls by '--source-sql [, --source-sql ...]' to config environment and create source table at runtime. diff --git a/docs/content/maintenance/manage-snapshots.md b/docs/content/maintenance/manage-snapshots.md index 3def401b2e05..6813b505af1e 100644 --- a/docs/content/maintenance/manage-snapshots.md +++ b/docs/content/maintenance/manage-snapshots.md @@ -98,7 +98,8 @@ Run the following command: --warehouse \ --database \ --table \ - --snapshot + --snapshot \ + [--catalog-conf [--catalog-conf ...]] ``` {{< /tab >}} diff --git a/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/FlinkActionITCase.java b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/FlinkActionITCase.java index 8e721cba92d4..e317af5bfb76 100644 --- a/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/FlinkActionITCase.java +++ b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/FlinkActionITCase.java @@ -46,7 +46,8 @@ protected List ddl() { public void testDeleteAction() throws Exception { batchSql("INSERT INTO T VALUES (1, 'Hi'), (2, 'Hello'), (3, 'World')"); - DeleteAction action = new DeleteAction(path, "default", "T", "k = 1"); + DeleteAction action = + new DeleteAction(path, "default", "T", "k = 1", Collections.emptyMap()); BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql("SELECT * FROM T").collect()); diff --git a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/FlinkActionITCase.java b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/FlinkActionITCase.java index 8e721cba92d4..e317af5bfb76 100644 --- a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/FlinkActionITCase.java +++ b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/FlinkActionITCase.java @@ -46,7 +46,8 @@ protected List ddl() { public void testDeleteAction() throws Exception { batchSql("INSERT INTO T VALUES (1, 'Hi'), (2, 'Hello'), (3, 'World')"); - DeleteAction action = new DeleteAction(path, "default", "T", "k = 1"); + DeleteAction action = + new DeleteAction(path, "default", "T", "k = 1", Collections.emptyMap()); BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql("SELECT * FROM T").collect()); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/Action.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/Action.java index e9effff7c58d..8526f683411b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/Action.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/Action.java @@ -30,6 +30,7 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -170,9 +171,9 @@ public static void printHelp() { } } - static Optional> getConfigMap(MultipleParameterTool params, String key) { + static Map optionalConfigMap(MultipleParameterTool params, String key) { if (!params.has(key)) { - return Optional.empty(); + return Collections.emptyMap(); } Map map = new HashMap<>(); @@ -183,9 +184,10 @@ static Optional> getConfigMap(MultipleParameterTool params, continue; } - System.err.println("Invalid key " + key + ". Please use format 'key=value'"); - return Optional.empty(); + throw new IllegalArgumentException( + String.format( + "Invalid argument '%s %s'. Please use format 'key=value'", key, param)); } - return Optional.of(map); + return map; } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java index c07480e1bfe5..49e805393ff3 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java @@ -41,11 +41,8 @@ public abstract class ActionBase implements Action { protected final String catalogName = "paimon-" + UUID.randomUUID(); - public ActionBase(String warehouse, Map catalogOptions) { - this(warehouse, Options.fromMap(catalogOptions)); - } - - public ActionBase(String warehouse, Options catalogOptions) { + public ActionBase(String warehouse, Map catalogConfig) { + Options catalogOptions = Options.fromMap(catalogConfig); catalogOptions.set(CatalogOptions.WAREHOUSE, warehouse); catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions); flinkCatalog = FlinkCatalogFactory.createCatalog(catalogName, catalog); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java index 4c4303ffc12d..5665bcf20e98 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java @@ -22,7 +22,6 @@ import org.apache.paimon.flink.sink.CompactorSinkBuilder; import org.apache.paimon.flink.source.CompactorSourceBuilder; import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils; -import org.apache.paimon.options.Options; import org.apache.paimon.table.FileStoreTable; import org.apache.flink.api.common.RuntimeExecutionMode; @@ -41,9 +40,9 @@ import java.util.Map; import java.util.Optional; -import static org.apache.paimon.flink.action.Action.getConfigMap; import static org.apache.paimon.flink.action.Action.getPartitions; import static org.apache.paimon.flink.action.Action.getTablePath; +import static org.apache.paimon.flink.action.Action.optionalConfigMap; /** Table compact action for Flink. */ public class CompactAction extends TableActionBase { @@ -54,12 +53,15 @@ public class CompactAction extends TableActionBase { private final CompactorSinkBuilder sinkBuilder; public CompactAction(String warehouse, String database, String tableName) { - this(warehouse, database, tableName, new Options()); + this(warehouse, database, tableName, Collections.emptyMap()); } public CompactAction( - String warehouse, String database, String tableName, Options catalogOptions) { - super(warehouse, database, tableName, catalogOptions); + String warehouse, + String database, + String tableName, + Map catalogConfig) { + super(warehouse, database, tableName, catalogConfig); if (!(table instanceof FileStoreTable)) { throw new UnsupportedOperationException( String.format( @@ -111,12 +113,10 @@ public static Optional create(String[] args) { return Optional.empty(); } - Optional> catalogConfigOption = getConfigMap(params, "catalog-conf"); - Options catalogOptions = - Options.fromMap(catalogConfigOption.orElse(Collections.emptyMap())); + Map catalogConfig = optionalConfigMap(params, "catalog-conf"); CompactAction action = - new CompactAction(tablePath.f0, tablePath.f1, tablePath.f2, catalogOptions); + new CompactAction(tablePath.f0, tablePath.f1, tablePath.f2, catalogConfig); if (params.has("partition")) { List> partitions = getPartitions(params); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteAction.java index 96f840c509a1..18f2c290d7f7 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteAction.java @@ -31,10 +31,12 @@ import org.slf4j.LoggerFactory; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; import static org.apache.paimon.flink.action.Action.getTablePath; +import static org.apache.paimon.flink.action.Action.optionalConfigMap; /** Delete from table action for Flink. */ public class DeleteAction extends TableActionBase { @@ -43,8 +45,13 @@ public class DeleteAction extends TableActionBase { private final String filter; - public DeleteAction(String warehouse, String databaseName, String tableName, String filter) { - super(warehouse, databaseName, tableName); + public DeleteAction( + String warehouse, + String databaseName, + String tableName, + String filter, + Map catalogConfig) { + super(warehouse, databaseName, tableName, catalogConfig); changeIgnoreMergeEngine(); this.filter = filter; } @@ -70,7 +77,10 @@ public static Optional create(String[] args) { return Optional.empty(); } - DeleteAction action = new DeleteAction(tablePath.f0, tablePath.f1, tablePath.f2, filter); + Map catalogConfig = optionalConfigMap(params, "catalog-conf"); + + DeleteAction action = + new DeleteAction(tablePath.f0, tablePath.f1, tablePath.f2, filter, catalogConfig); return Optional.of(action); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionAction.java index 8b132cbde7b7..37c94dcde888 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionAction.java @@ -35,6 +35,7 @@ import static org.apache.paimon.flink.action.Action.getPartitions; import static org.apache.paimon.flink.action.Action.getTablePath; +import static org.apache.paimon.flink.action.Action.optionalConfigMap; /** Table drop partition action for Flink. */ public class DropPartitionAction extends TableActionBase { @@ -48,8 +49,9 @@ public class DropPartitionAction extends TableActionBase { String warehouse, String databaseName, String tableName, - List> partitions) { - super(warehouse, databaseName, tableName); + List> partitions, + Map catalogConfig) { + super(warehouse, databaseName, tableName, catalogConfig); if (!(table instanceof FileStoreTable)) { throw new UnsupportedOperationException( String.format( @@ -95,8 +97,11 @@ public static Optional create(String[] args) { return Optional.empty(); } + Map catalogConfig = optionalConfigMap(params, "catalog-conf"); + return Optional.of( - new DropPartitionAction(tablePath.f0, tablePath.f1, tablePath.f2, partitions)); + new DropPartitionAction( + tablePath.f0, tablePath.f1, tablePath.f2, partitions, catalogConfig)); } private static void printHelp() { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java index 0c9167afce64..a254ff71bd2f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java @@ -48,6 +48,7 @@ import java.util.stream.Stream; import static org.apache.paimon.flink.action.Action.getTablePath; +import static org.apache.paimon.flink.action.Action.optionalConfigMap; import static org.apache.paimon.flink.action.Action.parseKeyValues; /** @@ -122,7 +123,15 @@ public class MergeIntoAction extends TableActionBase { @Nullable private String notMatchedInsertValues; MergeIntoAction(String warehouse, String database, String tableName) { - super(warehouse, database, tableName); + this(warehouse, database, tableName, Collections.emptyMap()); + } + + MergeIntoAction( + String warehouse, + String database, + String tableName, + Map catalogConfig) { + super(warehouse, database, tableName, catalogConfig); if (!(table instanceof FileStoreTable)) { throw new UnsupportedOperationException( @@ -228,7 +237,10 @@ public static Optional create(String[] args) { return Optional.empty(); } - MergeIntoAction action = new MergeIntoAction(tablePath.f0, tablePath.f1, tablePath.f2); + Map catalogConfig = optionalConfigMap(params, "catalog-conf"); + + MergeIntoAction action = + new MergeIntoAction(tablePath.f0, tablePath.f1, tablePath.f2, catalogConfig); if (params.has("target-as")) { action.withTargetAlias(params.get("target-as")); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToAction.java index 9da130fbc82f..cd3b66ed5352 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToAction.java @@ -25,9 +25,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Map; import java.util.Optional; import static org.apache.paimon.flink.action.Action.getTablePath; +import static org.apache.paimon.flink.action.Action.optionalConfigMap; /** Rollback to specific snapshot action for Flink. */ public class RollbackToAction extends TableActionBase { @@ -37,8 +39,12 @@ public class RollbackToAction extends TableActionBase { private final long snapshotId; public RollbackToAction( - String warehouse, String databaseName, String tableName, long snapshotId) { - super(warehouse, databaseName, tableName); + String warehouse, + String databaseName, + String tableName, + long snapshotId, + Map catalogConfig) { + super(warehouse, databaseName, tableName, catalogConfig); this.snapshotId = snapshotId; } @@ -63,9 +69,15 @@ public static Optional create(String[] args) { throw new IllegalArgumentException("Please specific snapshot."); } + Map catalogConfig = optionalConfigMap(params, "catalog-conf"); + RollbackToAction action = new RollbackToAction( - tablePath.f0, tablePath.f1, tablePath.f2, Long.parseLong(snapshot)); + tablePath.f0, + tablePath.f1, + tablePath.f2, + Long.parseLong(snapshot), + catalogConfig); return Optional.of(action); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java index 1d7e30144cdd..98c0e819a6d8 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java @@ -25,7 +25,6 @@ import org.apache.paimon.flink.sink.FlinkSinkBuilder; import org.apache.paimon.flink.utils.TableEnvironmentUtils; import org.apache.paimon.operation.Lock; -import org.apache.paimon.options.Options; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.types.DataType; @@ -59,13 +58,12 @@ public abstract class TableActionBase extends ActionBase { protected Table table; - TableActionBase(String warehouse, String databaseName, String tableName) { - this(warehouse, databaseName, tableName, new Options()); - } - TableActionBase( - String warehouse, String databaseName, String tableName, Options catalogOptions) { - super(warehouse, catalogOptions); + String warehouse, + String databaseName, + String tableName, + Map catalogConfig) { + super(warehouse, catalogConfig); env = StreamExecutionEnvironment.getExecutionEnvironment(); batchTEnv = StreamTableEnvironment.create(env, EnvironmentSettings.inBatchMode()); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java index 7c63725053f6..d16270774cfe 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java @@ -41,12 +41,12 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; +import static org.apache.paimon.flink.action.Action.optionalConfigMap; import static org.apache.paimon.utils.Preconditions.checkArgument; /** @@ -229,13 +229,14 @@ public static Optional create(String[] args) { computedColumnArgs = new ArrayList<>(params.getMultiParameter("computed-column")); } - Map kafkaConfig = getConfigMap(params, "kafka-conf"); - Map catalogConfig = getConfigMap(params, "catalog-conf"); - Map paimonConfig = getConfigMap(params, "paimon-conf"); - if (kafkaConfig == null || paimonConfig == null) { + if (!params.has("kafka-conf")) { return Optional.empty(); } + Map kafkaConfig = optionalConfigMap(params, "kafka-conf"); + Map catalogConfig = optionalConfigMap(params, "catalog-conf"); + Map paimonConfig = optionalConfigMap(params, "paimon-conf"); + return Optional.of( new KafkaSyncTableAction( kafkaConfig, @@ -249,23 +250,6 @@ public static Optional create(String[] args) { paimonConfig)); } - private static Map getConfigMap(MultipleParameterTool params, String key) { - Map map = new HashMap<>(); - - for (String param : params.getMultiParameter(key)) { - String[] kv = param.split("="); - if (kv.length == 2) { - map.put(kv[0], kv[1]); - continue; - } - - System.err.println( - "Invalid " + key + " " + param + ".\nRun kafka-sync-table --help for help."); - return null; - } - return map; - } - private static void printHelp() { System.out.println( "Action \"kafka-sync-table\" creates a streaming job " diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java index 6d7150516eba..b057070cc9db 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java @@ -54,7 +54,7 @@ import java.util.function.Supplier; import java.util.regex.Pattern; -import static org.apache.paimon.flink.action.Action.getConfigMap; +import static org.apache.paimon.flink.action.Action.optionalConfigMap; import static org.apache.paimon.utils.Preconditions.checkArgument; /** @@ -331,22 +331,25 @@ public static Optional create(String[] args) { String includingTables = params.get("including-tables"); String excludingTables = params.get("excluding-tables"); - Optional> mySqlConfigOption = getConfigMap(params, "mysql-conf"); - Optional> catalogConfigOption = getConfigMap(params, "catalog-conf"); - Optional> tableConfigOption = getConfigMap(params, "table-conf"); - return mySqlConfigOption.map( - mySqlConfig -> - new MySqlSyncDatabaseAction( - mySqlConfig, - warehouse, - database, - ignoreIncompatible, - tablePrefix, - tableSuffix, - includingTables, - excludingTables, - catalogConfigOption.orElse(Collections.emptyMap()), - tableConfigOption.orElse(Collections.emptyMap()))); + if (!params.has("mysql-conf")) { + return Optional.empty(); + } + + Map mySqlConfig = optionalConfigMap(params, "mysql-conf"); + Map catalogConfig = optionalConfigMap(params, "catalog-conf"); + Map tableConfig = optionalConfigMap(params, "table-conf"); + return Optional.of( + new MySqlSyncDatabaseAction( + mySqlConfig, + warehouse, + database, + ignoreIncompatible, + tablePrefix, + tableSuffix, + includingTables, + excludingTables, + catalogConfig, + tableConfig)); } private static void printHelp() { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java index e4fe5fc3f36c..17466a947816 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java @@ -51,7 +51,7 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; -import static org.apache.paimon.flink.action.Action.getConfigMap; +import static org.apache.paimon.flink.action.Action.optionalConfigMap; import static org.apache.paimon.utils.Preconditions.checkArgument; /** @@ -290,24 +290,25 @@ public static Optional create(String[] args) { computedColumnArgs = new ArrayList<>(params.getMultiParameter("computed-column")); } - Optional> mySqlConfig = getConfigMap(params, "mysql-conf"); - Optional> catalogConfig = getConfigMap(params, "catalog-conf"); - Optional> tableConfig = getConfigMap(params, "table-conf"); - if (!mySqlConfig.isPresent()) { + if (!params.has("mysql-conf")) { return Optional.empty(); } + Map mySqlConfig = optionalConfigMap(params, "mysql-conf"); + Map catalogConfig = optionalConfigMap(params, "catalog-conf"); + Map tableConfig = optionalConfigMap(params, "table-conf"); + return Optional.of( new MySqlSyncTableAction( - mySqlConfig.get(), + mySqlConfig, tablePath.f0, tablePath.f1, tablePath.f2, partitionKeys, primaryKeys, computedColumnArgs, - catalogConfig.orElse(Collections.emptyMap()), - tableConfig.orElse(Collections.emptyMap()))); + catalogConfig, + tableConfig)); } private static void printHelp() { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java index b614bf5031bd..ab20863e85d5 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java @@ -71,7 +71,8 @@ public void testDeleteAction(boolean hasPk, List initialRecords, List throws Exception { prepareTable(hasPk); - DeleteAction action = new DeleteAction(warehouse, database, tableName, "k = 1"); + DeleteAction action = + new DeleteAction(warehouse, database, tableName, "k = 1", Collections.emptyMap()); BlockingIterator iterator = testStreamingRead(buildSimpleQuery(tableName), initialRecords); @@ -108,7 +109,8 @@ public void testWorkWithPartialUpdateTable() throws Exception { } }); - DeleteAction action = new DeleteAction(warehouse, database, tableName, "k < 3"); + DeleteAction action = + new DeleteAction(warehouse, database, tableName, "k < 3", Collections.emptyMap()); insertInto( tableName, "(1, 'Say', 'A'), (2, 'Hi', 'B'), (3, 'To', 'C'), (4, 'Paimon', 'D')"); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java index 5ed895264800..c5c4c941b933 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java @@ -57,7 +57,8 @@ public void testDropPartitionWithSinglePartitionKey(boolean hasPk) throws Except warehouse, database, tableName, - Collections.singletonList(Collections.singletonMap("partKey0", "0"))) + Collections.singletonList(Collections.singletonMap("partKey0", "0")), + Collections.emptyMap()) .run(); Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId()); @@ -104,7 +105,11 @@ public void testDropPartitionWithMultiplePartitionKey(boolean hasPk) throws Exce partitions1.put("partKey1", "0"); new DropPartitionAction( - warehouse, database, tableName, Arrays.asList(partitions0, partitions1)) + warehouse, + database, + tableName, + Arrays.asList(partitions0, partitions1), + Collections.emptyMap()) .run(); Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId()); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java index 3358cbe55263..6413dce3e2f2 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java @@ -66,7 +66,8 @@ public void test() throws Exception { writeData(rowData(2L, BinaryString.fromString("World"))); writeData(rowData(2L, BinaryString.fromString("Flink"))); - RollbackToAction action = new RollbackToAction(warehouse, database, tableName, 2); + RollbackToAction action = + new RollbackToAction(warehouse, database, tableName, 2, Collections.emptyMap()); action.run(); testBatchRead(