Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cdc] Fix cdc existing table options unavailable to change. #3095

Merged
merged 6 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.paimon.CoreOptions.TagCreationMode.WATERMARK;
Expand Down Expand Up @@ -178,11 +179,23 @@ protected abstract void buildSink(

protected FileStoreTable alterTableOptions(Identifier identifier, FileStoreTable table) {
// doesn't support altering bucket here
Map<String, String> withoutBucket = new HashMap<>(tableConfig);
withoutBucket.remove(CoreOptions.BUCKET.key());

Map<String, String> dynamicOptions = new HashMap<>(tableConfig);
dynamicOptions.remove(CoreOptions.BUCKET.key());

// remove immutable options and options with equal values
Map<String, String> oldOptions = table.options();
Set<String> immutableOptionKeys = CoreOptions.getImmutableOptionKeys();
dynamicOptions
.entrySet()
.removeIf(
entry ->
immutableOptionKeys.contains(entry.getKey())
|| Objects.equals(
oldOptions.get(entry.getKey()), entry.getValue()));

// alter the table dynamic options
List<SchemaChange> optionChanges =
withoutBucket.entrySet().stream()
dynamicOptions.entrySet().stream()
.map(entry -> SchemaChange.setOption(entry.getKey(), entry.getValue()))
.collect(Collectors.toList());

Expand All @@ -194,7 +207,7 @@ protected FileStoreTable alterTableOptions(Identifier identifier, FileStoreTable
throw new RuntimeException("This is unexpected.", e);
}

return table.copy(withoutBucket);
return table.copy(dynamicOptions);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1097,6 +1097,49 @@ public void testOptionsChange() throws Exception {
assertThat(table.options()).containsAllEntriesOf(tableConfig);
}

@Test
public void testOptionsChangeInExistingTable() throws Exception {
Map<String, String> options = new HashMap<>();
options.put("bucket", "1");
options.put("sink.parallelism", "1");
options.put("sequence.field", "_timestamp");

createFileStoreTable(
RowType.of(
new DataType[] {
DataTypes.INT().notNull(), DataTypes.DATE(), DataTypes.TIMESTAMP(0)
},
new String[] {"pk", "_date", "_timestamp"}),
Collections.emptyList(),
Collections.singletonList("pk"),
options);

Map<String, String> mySqlConfig = getBasicMySqlConfig();
mySqlConfig.put("database-name", DATABASE_NAME);
mySqlConfig.put("table-name", "test_exist_options_change");
Map<String, String> tableConfig = new HashMap<>();
// update immutable options
tableConfig.put("sequence.field", "_date");
// update existing options
tableConfig.put("sink.parallelism", "2");
// add new options
tableConfig.put("snapshot.expire.limit", "1000");

MySqlSyncTableAction action =
syncTableActionBuilder(mySqlConfig)
.withPrimaryKeys("pk")
.withTableConfig(tableConfig)
.build();
runActionWithDefaultEnv(action);

FileStoreTable table = getFileStoreTable();

assertThat(table.options().get("bucket")).isEqualTo("1");
assertThat(table.options().get("sequence.field")).isEqualTo("_timestamp");
assertThat(table.options().get("sink.parallelism")).isEqualTo("2");
assertThat(table.options().get("snapshot.expire.limit")).isEqualTo("1000");
}

@Test
@Timeout(60)
public void testMetadataColumns() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,13 @@ CREATE TABLE test_options_change (
PRIMARY KEY (pk)
);

CREATE TABLE test_exist_options_change (
pk INT,
_date DATE,
_timestamp TIMESTAMP,
PRIMARY KEY (pk)
);

-- ################################################################################
-- testSyncShard
-- ################################################################################
Expand Down Expand Up @@ -405,4 +412,4 @@ USE invalid_alter_bucket;

CREATE TABLE t (
k INT PRIMARY KEY
);
);
Loading