From 04a7cd7a1a92adba428437ca3d600f14806ce705 Mon Sep 17 00:00:00 2001 From: zhuangchong Date: Tue, 26 Mar 2024 12:13:52 +0800 Subject: [PATCH 1/5] Fix cdc existing table options unavailable to change. --- .../paimon/flink/action/cdc/SynchronizationActionBase.java | 5 +++++ .../flink/action/cdc/mysql/MySqlSyncTableActionITCase.java | 1 + 2 files changed, 6 insertions(+) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java index 684408f2438e..c4a16d90536f 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java @@ -181,8 +181,13 @@ protected FileStoreTable alterTableOptions(Identifier identifier, FileStoreTable Map withoutBucket = new HashMap<>(tableConfig); withoutBucket.remove(CoreOptions.BUCKET.key()); + Map oldOptions = table.options(); List optionChanges = withoutBucket.entrySet().stream() + .filter( + entry -> + !Objects.equals( + entry.getValue(), oldOptions.get(entry.getKey()))) .map(entry -> SchemaChange.setOption(entry.getKey(), entry.getValue())) .collect(Collectors.toList()); diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java index 2ed3b6f466c5..7208cfb8e18d 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java @@ -1055,6 +1055,7 @@ public void testOptionsChange() throws Exception { Map tableConfig = new HashMap<>(); tableConfig.put("bucket", "1"); tableConfig.put("sink.parallelism", "1"); + tableConfig.put("sequence.field", "_timestamp"); MySqlSyncTableAction action1 = syncTableActionBuilder(mySqlConfig) From 0cbecee21a0086a8b6d6b9b07bdc732f7659e396 Mon Sep 17 00:00:00 2001 From: zhuangchong Date: Tue, 26 Mar 2024 14:40:23 +0800 Subject: [PATCH 2/5] update --- .../flink/action/cdc/SynchronizationActionBase.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java index c4a16d90536f..340c3cb44953 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java @@ -181,13 +181,21 @@ protected FileStoreTable alterTableOptions(Identifier identifier, FileStoreTable Map withoutBucket = new HashMap<>(tableConfig); withoutBucket.remove(CoreOptions.BUCKET.key()); + // check and copy the table options + FileStoreTable copiedTable = table.copy(withoutBucket); + + // alter the table options Map oldOptions = table.options(); List optionChanges = withoutBucket.entrySet().stream() + .filter( + entry -> + !CoreOptions.getImmutableOptionKeys() + .contains(entry.getKey())) .filter( entry -> !Objects.equals( - entry.getValue(), oldOptions.get(entry.getKey()))) + oldOptions.get(entry.getKey()), entry.getValue())) .map(entry -> SchemaChange.setOption(entry.getKey(), entry.getValue())) .collect(Collectors.toList()); @@ -199,7 +207,7 @@ protected FileStoreTable alterTableOptions(Identifier identifier, FileStoreTable throw new RuntimeException("This is unexpected.", e); } - return table.copy(withoutBucket); + return copiedTable; } @Override From 03c3e519f3776a4a87987ad814c29dbd6868168c Mon Sep 17 00:00:00 2001 From: zhuangchong Date: Tue, 26 Mar 2024 16:44:16 +0800 Subject: [PATCH 3/5] add test --- .../action/cdc/SynchronizationActionBase.java | 30 ++++++------ .../cdc/mysql/MySqlSyncTableActionITCase.java | 46 ++++++++++++++++++- .../test/resources/mysql/sync_table_setup.sql | 9 +++- 3 files changed, 67 insertions(+), 18 deletions(-) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java index 340c3cb44953..bfe0f659651a 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java @@ -178,24 +178,22 @@ protected abstract void buildSink( protected FileStoreTable alterTableOptions(Identifier identifier, FileStoreTable table) { // doesn't support altering bucket here - Map withoutBucket = new HashMap<>(tableConfig); - withoutBucket.remove(CoreOptions.BUCKET.key()); + Map dynamicOptions = new HashMap<>(tableConfig); + dynamicOptions.remove(CoreOptions.BUCKET.key()); - // check and copy the table options - FileStoreTable copiedTable = table.copy(withoutBucket); - - // alter the table options Map oldOptions = table.options(); + // remove immutable options and options with equal values + dynamicOptions + .entrySet() + .removeIf( + entry -> + CoreOptions.getImmutableOptionKeys().contains(entry.getKey()) + || Objects.equals( + oldOptions.get(entry.getKey()), entry.getValue())); + + // alter the table dynamic options List optionChanges = - withoutBucket.entrySet().stream() - .filter( - entry -> - !CoreOptions.getImmutableOptionKeys() - .contains(entry.getKey())) - .filter( - entry -> - !Objects.equals( - oldOptions.get(entry.getKey()), entry.getValue())) + dynamicOptions.entrySet().stream() .map(entry -> SchemaChange.setOption(entry.getKey(), entry.getValue())) .collect(Collectors.toList()); @@ -207,7 +205,7 @@ protected FileStoreTable alterTableOptions(Identifier identifier, FileStoreTable throw new RuntimeException("This is unexpected.", e); } - return copiedTable; + return table.copy(dynamicOptions); } @Override diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java index 7208cfb8e18d..f02ee8d85aeb 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java @@ -1055,7 +1055,6 @@ public void testOptionsChange() throws Exception { Map tableConfig = new HashMap<>(); tableConfig.put("bucket", "1"); tableConfig.put("sink.parallelism", "1"); - tableConfig.put("sequence.field", "_timestamp"); MySqlSyncTableAction action1 = syncTableActionBuilder(mySqlConfig) @@ -1098,6 +1097,51 @@ public void testOptionsChange() throws Exception { assertThat(table.options()).containsAllEntriesOf(tableConfig); } + @Test + public void testOptionsChangeInExistingTable() throws Exception { + Map options = new HashMap<>(); + options.put("bucket", "1"); + options.put("sink.parallelism", "1"); + options.put("sequence.field", "_timestamp"); + + createFileStoreTable( + RowType.of( + new DataType[] { + DataTypes.INT().notNull(), DataTypes.DATE(), DataTypes.TIMESTAMP(0) + }, + new String[] {"pk", "_date", "_timestamp"}), + Collections.emptyList(), + Collections.singletonList("pk"), + options); + + Map mySqlConfig = getBasicMySqlConfig(); + mySqlConfig.put("database-name", DATABASE_NAME); + mySqlConfig.put("table-name", "test_exist_options_change"); + Map tableConfig = new HashMap<>(); + // update immutable options + tableConfig.put("sequence.field", "_date"); + // update existing options + tableConfig.put("sink.parallelism", "2"); + // add new options + tableConfig.put("snapshot.expire.limit", "1000"); + + MySqlSyncTableAction action = + syncTableActionBuilder(mySqlConfig) + .withPrimaryKeys("pk") + .withTableConfig(tableConfig) + .build(); + runActionWithDefaultEnv(action); + + FileStoreTable table = getFileStoreTable(); + Map expected = new HashMap<>(); + options.put("bucket", "1"); + options.put("sink.parallelism", "2"); + options.put("sequence.field", "_timestamp"); + options.put("snapshot.expire.limit", "1000"); + + assertThat(table.options()).containsAllEntriesOf(expected); + } + @Test @Timeout(60) public void testMetadataColumns() throws Exception { diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql index 949f1c99dac5..b69661b60fa7 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql @@ -309,6 +309,13 @@ CREATE TABLE test_options_change ( PRIMARY KEY (pk) ); +CREATE TABLE test_exist_options_change ( + pk INT, + _date DATE, + _timestamp TIMESTAMP, + PRIMARY KEY (pk) +); + -- ################################################################################ -- testSyncShard -- ################################################################################ @@ -405,4 +412,4 @@ USE invalid_alter_bucket; CREATE TABLE t ( k INT PRIMARY KEY -); \ No newline at end of file +); From 9f7120098e38e083c8268ffe435228bef194de2a Mon Sep 17 00:00:00 2001 From: zhuangchong Date: Wed, 27 Mar 2024 11:05:16 +0800 Subject: [PATCH 4/5] fix --- .../flink/action/cdc/SynchronizationActionBase.java | 10 ++++------ .../action/cdc/mysql/MySqlSyncTableActionITCase.java | 10 ++++------ 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java index bfe0f659651a..48ed633b60bb 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java @@ -40,10 +40,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.time.Duration; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; +import java.util.*; import java.util.stream.Collectors; import static org.apache.paimon.CoreOptions.TagCreationMode.WATERMARK; @@ -181,13 +178,14 @@ protected FileStoreTable alterTableOptions(Identifier identifier, FileStoreTable Map dynamicOptions = new HashMap<>(tableConfig); dynamicOptions.remove(CoreOptions.BUCKET.key()); - Map oldOptions = table.options(); // remove immutable options and options with equal values + Map oldOptions = table.options(); + Set immutableOptionKeys = CoreOptions.getImmutableOptionKeys(); dynamicOptions .entrySet() .removeIf( entry -> - CoreOptions.getImmutableOptionKeys().contains(entry.getKey()) + immutableOptionKeys.contains(entry.getKey()) || Objects.equals( oldOptions.get(entry.getKey()), entry.getValue())); diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java index f02ee8d85aeb..fb54763cb7c6 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java @@ -1133,13 +1133,11 @@ public void testOptionsChangeInExistingTable() throws Exception { runActionWithDefaultEnv(action); FileStoreTable table = getFileStoreTable(); - Map expected = new HashMap<>(); - options.put("bucket", "1"); - options.put("sink.parallelism", "2"); - options.put("sequence.field", "_timestamp"); - options.put("snapshot.expire.limit", "1000"); - assertThat(table.options()).containsAllEntriesOf(expected); + assertThat(table.options().get("bucket")).isEqualTo("1"); + assertThat(table.options().get("sequence.field")).isEqualTo("_timestamp"); + assertThat(table.options().get("sink.parallelism")).isEqualTo("2"); + assertThat(table.options().get("snapshot.expire.limit")).isEqualTo("1000"); } @Test From 32cfe26d08ba935b508951f600b860a705160292 Mon Sep 17 00:00:00 2001 From: zhuangchong Date: Wed, 27 Mar 2024 11:06:41 +0800 Subject: [PATCH 5/5] fix code style --- .../paimon/flink/action/cdc/SynchronizationActionBase.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java index 48ed633b60bb..5c04d5707e1c 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java @@ -40,7 +40,11 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.time.Duration; -import java.util.*; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; import java.util.stream.Collectors; import static org.apache.paimon.CoreOptions.TagCreationMode.WATERMARK;