diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java index 684408f2438e..5c04d5707e1c 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java @@ -44,6 +44,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.stream.Collectors; import static org.apache.paimon.CoreOptions.TagCreationMode.WATERMARK; @@ -178,11 +179,23 @@ protected abstract void buildSink( protected FileStoreTable alterTableOptions(Identifier identifier, FileStoreTable table) { // doesn't support altering bucket here - Map withoutBucket = new HashMap<>(tableConfig); - withoutBucket.remove(CoreOptions.BUCKET.key()); - + Map dynamicOptions = new HashMap<>(tableConfig); + dynamicOptions.remove(CoreOptions.BUCKET.key()); + + // remove immutable options and options with equal values + Map oldOptions = table.options(); + Set immutableOptionKeys = CoreOptions.getImmutableOptionKeys(); + dynamicOptions + .entrySet() + .removeIf( + entry -> + immutableOptionKeys.contains(entry.getKey()) + || Objects.equals( + oldOptions.get(entry.getKey()), entry.getValue())); + + // alter the table dynamic options List optionChanges = - withoutBucket.entrySet().stream() + dynamicOptions.entrySet().stream() .map(entry -> SchemaChange.setOption(entry.getKey(), entry.getValue())) .collect(Collectors.toList()); @@ -194,7 +207,7 @@ protected FileStoreTable alterTableOptions(Identifier identifier, FileStoreTable throw new RuntimeException("This is unexpected.", e); } - return table.copy(withoutBucket); + return table.copy(dynamicOptions); } @Override diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java index 2ed3b6f466c5..fb54763cb7c6 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java @@ -1097,6 +1097,49 @@ public void testOptionsChange() throws Exception { assertThat(table.options()).containsAllEntriesOf(tableConfig); } + @Test + public void testOptionsChangeInExistingTable() throws Exception { + Map options = new HashMap<>(); + options.put("bucket", "1"); + options.put("sink.parallelism", "1"); + options.put("sequence.field", "_timestamp"); + + createFileStoreTable( + RowType.of( + new DataType[] { + DataTypes.INT().notNull(), DataTypes.DATE(), DataTypes.TIMESTAMP(0) + }, + new String[] {"pk", "_date", "_timestamp"}), + Collections.emptyList(), + Collections.singletonList("pk"), + options); + + Map mySqlConfig = getBasicMySqlConfig(); + mySqlConfig.put("database-name", DATABASE_NAME); + mySqlConfig.put("table-name", "test_exist_options_change"); + Map tableConfig = new HashMap<>(); + // update immutable options + tableConfig.put("sequence.field", "_date"); + // update existing options + tableConfig.put("sink.parallelism", "2"); + // add new options + tableConfig.put("snapshot.expire.limit", "1000"); + + MySqlSyncTableAction action = + syncTableActionBuilder(mySqlConfig) + .withPrimaryKeys("pk") + .withTableConfig(tableConfig) + .build(); + runActionWithDefaultEnv(action); + + FileStoreTable table = getFileStoreTable(); + + assertThat(table.options().get("bucket")).isEqualTo("1"); + assertThat(table.options().get("sequence.field")).isEqualTo("_timestamp"); + assertThat(table.options().get("sink.parallelism")).isEqualTo("2"); + assertThat(table.options().get("snapshot.expire.limit")).isEqualTo("1000"); + } + @Test @Timeout(60) public void testMetadataColumns() throws Exception { diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql index 949f1c99dac5..b69661b60fa7 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql @@ -309,6 +309,13 @@ CREATE TABLE test_options_change ( PRIMARY KEY (pk) ); +CREATE TABLE test_exist_options_change ( + pk INT, + _date DATE, + _timestamp TIMESTAMP, + PRIMARY KEY (pk) +); + -- ################################################################################ -- testSyncShard -- ################################################################################ @@ -405,4 +412,4 @@ USE invalid_alter_bucket; CREATE TABLE t ( k INT PRIMARY KEY -); \ No newline at end of file +);