From ed79d13c02eb50227c6b787d3923c1c531decdc7 Mon Sep 17 00:00:00 2001 From: MOBIN-F <18814118038@163.com> Date: Thu, 9 May 2024 20:26:04 +0800 Subject: [PATCH 01/11] [Flink]Fixed UpdatedDataFieldsProcessFunction.canConvert parameter of the schemaCompatible method was written in reverse position --- .../apache/paimon/flink/action/cdc/CdcActionCommonUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java index 20740d1a7f2b..c4bfe884f021 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java @@ -87,7 +87,7 @@ public static boolean schemaCompatible( return false; } DataType type = paimonSchema.fields().get(idx).type(); - if (UpdatedDataFieldsProcessFunction.canConvert(field.type(), type) + if (UpdatedDataFieldsProcessFunction.canConvert(type, field.type()) != UpdatedDataFieldsProcessFunction.ConvertAction.CONVERT) { LOG.info( "Cannot convert field '{}' from source table type '{}' to Paimon type '{}'.", From a071a8f9233d226dc265f07f0c025894ec56dada Mon Sep 17 00:00:00 2001 From: MOBIN-F <18814118038@163.com> Date: Sat, 11 May 2024 14:59:10 +0800 Subject: [PATCH 02/11] [cdc]schemaCompatible failed after changing the length of the data type --- .../cdc/mysql/MySqlSyncTableActionITCase.java | 37 +++++++++++++++++++ .../test/resources/mysql/sync_table_setup.sql | 7 ++++ 2 files changed, 44 insertions(+) diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java index 58ed2098e9da..03ccb7c1d33d 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java @@ -33,6 +33,7 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.core.execution.JobClient; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -1364,4 +1365,40 @@ public void testColumnCommentChangeInExistingTable() throws Exception { assertThat(actual.get("c1").description()).isEqualTo("c1 comment"); assertThat(actual.get("c2").description()).isEqualTo("c2 comment"); } + + @Test + @Timeout(60) + public void testColumnTypeLengthChangeInExistingTable() throws Exception { + Map options = new HashMap<>(); + options.put("bucket", "1"); + options.put("sink.parallelism", "1"); + + RowType rowType = + RowType.builder() + .field("pk", DataTypes.INT().notNull(), "pk comment") + .field("c1", DataTypes.DATE(), "c1 comment") + .field("c2", DataTypes.VARCHAR(10).notNull(), "c2 comment") + .build(); + + createFileStoreTable( + rowType, Collections.emptyList(), Collections.singletonList("pk"), options); + + // Alter column type length + try (Statement statement = getStatement()) { + statement.executeUpdate("USE " + DATABASE_NAME); + statement.executeUpdate( + "ALTER TABLE test_exist_column_length_change MODIFY COLUMN v2 varchar(20)"); + } + + Map mySqlConfig = getBasicMySqlConfig(); + mySqlConfig.put("database-name", DATABASE_NAME); + mySqlConfig.put("table-name", "test_exist_column_length_change"); + + MySqlSyncTableAction action = + syncTableActionBuilder(mySqlConfig) + .withPrimaryKeys("pk") + .withTableConfig(getBasicTableConfig()) + .build(); + Assertions.assertDoesNotThrow(() -> runActionWithDefaultEnv(action)); + } } diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql index 5cf9cc1d9328..d30e32c389a9 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql @@ -323,6 +323,13 @@ CREATE TABLE test_exist_column_comment_change ( PRIMARY KEY (pk) ); +CREATE TABLE test_exist_column_length_change ( + pk INT, + c1 DATE, + c2 VARCHAR(10) not null, + PRIMARY KEY (pk) +); + -- ################################################################################ -- testSyncShard -- ################################################################################ From 9c988f6b46420e563f07609df4f1fcbad781b655 Mon Sep 17 00:00:00 2001 From: MOBIN-F <18814118038@163.com> Date: Sat, 11 May 2024 15:33:22 +0800 Subject: [PATCH 03/11] [cdc]fixed --- .../flink/action/cdc/mysql/MySqlSyncTableActionITCase.java | 4 ++-- .../src/test/resources/mysql/sync_table_setup.sql | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java index 03ccb7c1d33d..b000ac3b6e6d 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java @@ -1387,12 +1387,12 @@ public void testColumnTypeLengthChangeInExistingTable() throws Exception { try (Statement statement = getStatement()) { statement.executeUpdate("USE " + DATABASE_NAME); statement.executeUpdate( - "ALTER TABLE test_exist_column_length_change MODIFY COLUMN v2 varchar(20)"); + "ALTER TABLE test_exist_column_type_length_change MODIFY COLUMN c2 varchar(20)"); } Map mySqlConfig = getBasicMySqlConfig(); mySqlConfig.put("database-name", DATABASE_NAME); - mySqlConfig.put("table-name", "test_exist_column_length_change"); + mySqlConfig.put("table-name", "test_exist_column_type_length_change"); MySqlSyncTableAction action = syncTableActionBuilder(mySqlConfig) diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql index d30e32c389a9..66a0e46b3150 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql @@ -323,7 +323,7 @@ CREATE TABLE test_exist_column_comment_change ( PRIMARY KEY (pk) ); -CREATE TABLE test_exist_column_length_change ( +CREATE TABLE test_exist_column_type_length_change ( pk INT, c1 DATE, c2 VARCHAR(10) not null, From 3262643ca6d288f0e7fc02abbbabfb9123963f22 Mon Sep 17 00:00:00 2001 From: sinat_28987027 Date: Sat, 18 May 2024 00:56:49 +0800 Subject: [PATCH 04/11] fix paimon-pg-cdc varchar/char always convert string type --- .../cdc/postgres/PostgresRecordParser.java | 53 +++++++++++++++---- 1 file changed, 44 insertions(+), 9 deletions(-) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java index 0299d57f7741..c8ff78e3ea2f 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java @@ -25,11 +25,14 @@ import org.apache.paimon.flink.action.cdc.mysql.format.DebeziumEvent; import org.apache.paimon.flink.sink.cdc.CdcRecord; import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.types.CharType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; +import org.apache.paimon.types.VarCharType; import org.apache.paimon.utils.DateTimeUtils; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.StringUtils; @@ -70,6 +73,7 @@ import java.util.Map; import java.util.Set; import java.util.function.Function; +import java.util.stream.Collectors; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnCaseConvertAndDuplicateCheck; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnDuplicateErrMsg; @@ -101,18 +105,21 @@ public class PostgresRecordParser private String currentTable; private String databaseName; private final CdcMetadataConverter[] metadataConverters; + private TableSchema paimonSchema; public PostgresRecordParser( Configuration postgresConfig, boolean caseSensitive, TypeMapping typeMapping, - CdcMetadataConverter[] metadataConverters) { + CdcMetadataConverter[] metadataConverters, + TableSchema schema) { this( postgresConfig, caseSensitive, Collections.emptyList(), typeMapping, - metadataConverters); + metadataConverters, + schema); } public PostgresRecordParser( @@ -120,7 +127,8 @@ public PostgresRecordParser( boolean caseSensitive, List computedColumns, TypeMapping typeMapping, - CdcMetadataConverter[] metadataConverters) { + CdcMetadataConverter[] metadataConverters, + TableSchema paimonSchema) { this.caseSensitive = caseSensitive; this.computedColumns = computedColumns; this.typeMapping = typeMapping; @@ -133,6 +141,7 @@ public PostgresRecordParser( stringifyServerTimeZone == null ? ZoneId.systemDefault() : ZoneId.of(stringifyServerTimeZone); + this.paimonSchema = paimonSchema; } @Override @@ -146,7 +155,7 @@ public void flatMap(CdcSourceRecord rawEvent, Collector extractRecords().forEach(out::collect); } - private List extractFields(DebeziumEvent.Field schema) { + private List extractFields(DebeziumEvent.Field schema, JsonNode afterData) { Map afterFields = schema.afterFields(); Preconditions.checkArgument( !afterFields.isEmpty(), @@ -157,16 +166,22 @@ private List extractFields(DebeziumEvent.Field schema) { RowType.Builder rowType = RowType.builder(); Set existedFields = new HashSet<>(); Function columnDuplicateErrMsg = columnDuplicateErrMsg(currentTable); + + Map paimonFields = + paimonSchema.fields().stream() + .collect(Collectors.toMap(DataField::name, Function.identity())); + afterFields.forEach( - (key, value) -> { + (key, afterField) -> { String columnName = columnCaseConvertAndDuplicateCheck( key, existedFields, caseSensitive, columnDuplicateErrMsg); - DataType dataType = extractFieldType(value); + DataType dataType = + extractFieldType(afterField, paimonFields.get(key), afterData); dataType = dataType.copy( - typeMapping.containsMode(TO_NULLABLE) || value.optional()); + typeMapping.containsMode(TO_NULLABLE) || afterField.optional()); rowType.field(columnName, dataType); }); @@ -177,7 +192,8 @@ private List extractFields(DebeziumEvent.Field schema) { * Extract fields from json records, see postgresql-data-types. */ - private DataType extractFieldType(DebeziumEvent.Field field) { + private DataType extractFieldType( + DebeziumEvent.Field field, DataField paimonField, JsonNode afterData) { switch (field.type()) { case "array": return DataTypes.ARRAY(DataTypes.STRING()); @@ -209,6 +225,25 @@ private DataType extractFieldType(DebeziumEvent.Field field) { case "boolean": return DataTypes.BOOLEAN(); case "string": + if (paimonField == null) { + return DataTypes.VARCHAR(afterData.get(field.field()).asText().length()); + } else if (paimonField.type() instanceof VarCharType) { + int oldLength = ((VarCharType) paimonField.type()).getLength(); + int newLength = afterData.get(field.field()).asText().length(); + if (oldLength < newLength) { + return DataTypes.VARCHAR(newLength); + } else { + return DataTypes.VARCHAR(oldLength); + } + } else if (paimonField.type() instanceof CharType) { + int oldLength = ((CharType) paimonField.type()).getLength(); + int newLength = afterData.get(field.field()).asText().length(); + if (oldLength < newLength) { + return DataTypes.CHAR(newLength); + } else { + return DataTypes.CHAR(oldLength); + } + } return DataTypes.STRING(); case "bytes": if (decimalLogicalName().equals(field.name())) { @@ -248,7 +283,7 @@ private List extractRecords() { Map after = extractRow(root.payload().after()); if (!after.isEmpty()) { after = mapKeyCaseConvert(after, caseSensitive, recordKeyDuplicateErrMsg(after)); - List fields = extractFields(root.schema()); + List fields = extractFields(root.schema(), root.payload().after()); records.add( new RichCdcMultiplexRecord( databaseName, From 0f32e5a10c0fe6e922aefa2d6650722cf30ae7d9 Mon Sep 17 00:00:00 2001 From: sinat_28987027 Date: Sat, 18 May 2024 01:03:23 +0800 Subject: [PATCH 05/11] Supported schemaEvolution when restarting the paimon cdc job --- .../action/cdc/CdcActionCommonUtils.java | 6 ++- .../flink/action/cdc/SyncJobHandler.java | 14 +++++- .../flink/action/cdc/SyncTableActionBase.java | 8 +++- .../action/cdc/SynchronizationActionBase.java | 16 +++++-- .../cdc/mysql/MySqlSyncDatabaseAction.java | 2 +- .../UpdatedDataFieldsProcessFunctionBase.java | 2 +- .../cdc/mysql/MySqlSyncTableActionITCase.java | 32 +++++++------ .../PostgresSyncTableActionITCase.java | 46 ++++++++++++------- .../test/resources/mysql/sync_table_setup.sql | 12 +++-- 9 files changed, 91 insertions(+), 47 deletions(-) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java index c4bfe884f021..91538c81f85d 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java @@ -83,8 +83,10 @@ public static boolean schemaCompatible( for (DataField field : sourceTableFields) { int idx = paimonSchema.fieldNames().indexOf(field.name()); if (idx < 0) { - LOG.info("Cannot find field '{}' in Paimon table.", field.name()); - return false; + LOG.info( + "New fields '{}' found in source table, will be synchronized to Paimon table.", + field.name()); + return true; } DataType type = paimonSchema.fields().get(idx).type(); if (UpdatedDataFieldsProcessFunction.canConvert(type, field.type()) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncJobHandler.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncJobHandler.java index c674e560b11f..72844d8418ac 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncJobHandler.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncJobHandler.java @@ -27,6 +27,7 @@ import org.apache.paimon.flink.action.cdc.postgres.PostgresRecordParser; import org.apache.paimon.flink.action.cdc.pulsar.PulsarActionUtils; import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord; +import org.apache.paimon.schema.TableSchema; import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions; import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions; @@ -197,6 +198,16 @@ public FlatMapFunction provideRecordPar List computedColumns, TypeMapping typeMapping, CdcMetadataConverter[] metadataConverters) { + return this.provideRecordParser( + caseSensitive, computedColumns, typeMapping, metadataConverters, null); + } + + public FlatMapFunction provideRecordParser( + boolean caseSensitive, + List computedColumns, + TypeMapping typeMapping, + CdcMetadataConverter[] metadataConverters, + TableSchema paimonSchema) { switch (sourceType) { case MYSQL: return new MySqlRecordParser( @@ -211,7 +222,8 @@ public FlatMapFunction provideRecordPar caseSensitive, computedColumns, typeMapping, - metadataConverters); + metadataConverters, + paimonSchema); case KAFKA: case PULSAR: DataFormat dataFormat = provideDataFormat(); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java index f4e5bfe6abe0..f0b8617d4303 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java @@ -123,13 +123,13 @@ protected void beforeBuildingSourceSink() throws Exception { // Check if table exists before trying to get or create it if (catalog.tableExists(identifier)) { fileStoreTable = (FileStoreTable) catalog.getTable(identifier); - fileStoreTable = alterTableOptions(identifier, fileStoreTable); try { Schema retrievedSchema = retrieveSchema(); computedColumns = buildComputedColumns(computedColumnArgs, retrievedSchema.fields()); Schema paimonSchema = buildPaimonSchema(retrievedSchema); assertSchemaCompatible(fileStoreTable.schema(), paimonSchema.fields()); + alterTableSchema(identifier, fileStoreTable, paimonSchema); } catch (SchemaRetrievalException e) { LOG.info( "Failed to retrieve schema from record data but there exists specified Paimon table. " @@ -157,7 +157,11 @@ protected void beforeBuildingSourceSink() throws Exception { @Override protected FlatMapFunction recordParse() { return syncJobHandler.provideRecordParser( - caseSensitive, computedColumns, typeMapping, metadataConverters); + caseSensitive, + computedColumns, + typeMapping, + metadataConverters, + fileStoreTable.schema()); } @Override diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java index 944185347c35..45da47e1741a 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java @@ -27,8 +27,11 @@ import org.apache.paimon.flink.action.cdc.watermark.CdcWatermarkStrategy; import org.apache.paimon.flink.sink.cdc.EventParser; import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord; +import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction; import org.apache.paimon.options.Options; +import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; +import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.FileStoreTable; import org.apache.flink.api.common.eventtime.WatermarkStrategy; @@ -79,7 +82,6 @@ public SynchronizationActionBase( this.cdcSourceConfig = Configuration.fromMap(cdcSourceConfig); this.syncJobHandler = syncJobHandler; this.caseSensitive = catalog.caseSensitive(); - this.syncJobHandler.registerJdbcDriver(); } @@ -177,7 +179,8 @@ protected abstract void buildSink( DataStream input, EventParser.Factory parserFactory); - protected FileStoreTable alterTableOptions(Identifier identifier, FileStoreTable table) { + protected FileStoreTable alterTableSchema( + Identifier identifier, FileStoreTable table, Schema paimonSchema) { // doesn't support altering bucket here Map dynamicOptions = new HashMap<>(tableConfig); dynamicOptions.remove(CoreOptions.BUCKET.key()); @@ -194,13 +197,18 @@ protected FileStoreTable alterTableOptions(Identifier identifier, FileStoreTable oldOptions.get(entry.getKey()), entry.getValue())); // alter the table dynamic options - List optionChanges = + List changes = dynamicOptions.entrySet().stream() .map(entry -> SchemaChange.setOption(entry.getKey(), entry.getValue())) .collect(Collectors.toList()); + List columnChanges = + UpdatedDataFieldsProcessFunction.extractSchemaChanges( + new SchemaManager(table.fileIO(), table.location()), paimonSchema.fields()); + + changes.addAll(columnChanges); try { - catalog.alterTable(identifier, optionChanges, false); + catalog.alterTable(identifier, changes, false); } catch (Catalog.TableNotExistException | Catalog.ColumnAlreadyExistException | Catalog.ColumnNotExistException e) { diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java index 80a7c8b84674..0aec02815c50 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java @@ -159,7 +159,7 @@ protected void beforeBuildingSourceSink() throws Exception { Supplier errMsg = incompatibleMessage(table.schema(), tableInfo, identifier); if (shouldMonitorTable(table.schema(), fromMySql, errMsg)) { - table = alterTableOptions(identifier, table); + table = alterTableSchema(identifier, table, fromMySql); tables.add(table); monitoredTables.addAll(tableInfo.identifiers()); } else { diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java index bc31a05e27e2..ab3ea349b2a5 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java @@ -177,7 +177,7 @@ public static ConvertAction canConvert(DataType oldType, DataType newType) { return ConvertAction.EXCEPTION; } - protected List extractSchemaChanges( + public static List extractSchemaChanges( SchemaManager schemaManager, List updatedDataFields) { RowType oldRowType = schemaManager.latest().get().logicalRowType(); Map oldFields = new HashMap<>(); diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java index b000ac3b6e6d..fd49cf0b0626 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java @@ -33,7 +33,6 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.core.execution.JobClient; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -1368,37 +1367,42 @@ public void testColumnCommentChangeInExistingTable() throws Exception { @Test @Timeout(60) - public void testColumnTypeLengthChangeInExistingTable() throws Exception { + public void testColumnAlterInExistingTableWhenStartJob() throws Exception { Map options = new HashMap<>(); options.put("bucket", "1"); options.put("sink.parallelism", "1"); RowType rowType = RowType.builder() - .field("pk", DataTypes.INT().notNull(), "pk comment") - .field("c1", DataTypes.DATE(), "c1 comment") - .field("c2", DataTypes.VARCHAR(10).notNull(), "c2 comment") + .field("pk", DataTypes.INT().notNull()) + .field("a", DataTypes.BIGINT()) + .field("b", DataTypes.VARCHAR(20)) .build(); createFileStoreTable( rowType, Collections.emptyList(), Collections.singletonList("pk"), options); - // Alter column type length - try (Statement statement = getStatement()) { - statement.executeUpdate("USE " + DATABASE_NAME); - statement.executeUpdate( - "ALTER TABLE test_exist_column_type_length_change MODIFY COLUMN c2 varchar(20)"); - } - Map mySqlConfig = getBasicMySqlConfig(); mySqlConfig.put("database-name", DATABASE_NAME); - mySqlConfig.put("table-name", "test_exist_column_type_length_change"); + mySqlConfig.put("table-name", "test_exist_column_alter"); MySqlSyncTableAction action = syncTableActionBuilder(mySqlConfig) .withPrimaryKeys("pk") .withTableConfig(getBasicTableConfig()) .build(); - Assertions.assertDoesNotThrow(() -> runActionWithDefaultEnv(action)); + + runActionWithDefaultEnv(action); + + FileStoreTable table = getFileStoreTable(); + + Map actual = + table.schema().fields().stream() + .collect(Collectors.toMap(DataField::name, Function.identity())); + + assertThat(actual.get("pk").type()).isEqualTo(DataTypes.INT().notNull()); + assertThat(actual.get("a").type()).isEqualTo(DataTypes.BIGINT()); + assertThat(actual.get("b").type()).isEqualTo(DataTypes.VARCHAR(30)); + assertThat(actual.get("c").type()).isEqualTo(DataTypes.INT()); } } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java index fbd441125cf6..c37bc4dd5ca6 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java @@ -100,7 +100,9 @@ private void testSchemaEvolutionImpl(Statement statement) throws Exception { RowType rowType = RowType.of( new DataType[] { - DataTypes.INT().notNull(), DataTypes.INT().notNull(), DataTypes.STRING() + DataTypes.INT().notNull(), + DataTypes.INT().notNull(), + DataTypes.VARCHAR(10) }, new String[] {"pt", "_id", "v1"}); List primaryKeys = Arrays.asList("pt", "_id"); @@ -118,7 +120,7 @@ private void testSchemaEvolutionImpl(Statement statement) throws Exception { new DataType[] { DataTypes.INT().notNull(), DataTypes.INT().notNull(), - DataTypes.STRING(), + DataTypes.VARCHAR(10), DataTypes.INT() }, new String[] {"pt", "_id", "v1", "v2"}); @@ -145,7 +147,7 @@ private void testSchemaEvolutionImpl(Statement statement) throws Exception { new DataType[] { DataTypes.INT().notNull(), DataTypes.INT().notNull(), - DataTypes.STRING(), + DataTypes.VARCHAR(10), DataTypes.BIGINT() }, new String[] {"pt", "_id", "v1", "v2"}); @@ -170,14 +172,14 @@ private void testSchemaEvolutionImpl(Statement statement) throws Exception { statement.executeUpdate("ALTER TABLE schema_evolution_2 ADD COLUMN v4 BYTEA"); statement.executeUpdate("ALTER TABLE schema_evolution_2 ADD COLUMN v5 FLOAT"); statement.executeUpdate("ALTER TABLE schema_evolution_2 ALTER COLUMN v1 TYPE VARCHAR(20)"); - statement.executeUpdate( - "UPDATE schema_evolution_2 SET v1 = 'very long string' WHERE _id = 8"); + String v1 = "very long string"; + statement.executeUpdate("UPDATE schema_evolution_2 SET v1 = '" + v1 + "' WHERE _id = 8"); rowType = RowType.of( new DataType[] { DataTypes.INT().notNull(), DataTypes.INT().notNull(), - DataTypes.STRING(), + DataTypes.VARCHAR(Math.max(v1.length(), 10)), DataTypes.BIGINT(), DataTypes.DECIMAL(8, 3), DataTypes.BYTES(), @@ -209,7 +211,7 @@ private void testSchemaEvolutionImpl(Statement statement) throws Exception { new DataType[] { DataTypes.INT().notNull(), DataTypes.INT().notNull(), - DataTypes.STRING(), + DataTypes.VARCHAR(Math.max(v1.length(), 10)), DataTypes.BIGINT(), DataTypes.DECIMAL(8, 3), DataTypes.BYTES(), @@ -263,9 +265,9 @@ private void testSchemaEvolutionMultipleImpl(Statement statement) throws Excepti RowType.of( new DataType[] { DataTypes.INT().notNull(), - DataTypes.STRING(), + DataTypes.VARCHAR(10), DataTypes.INT(), - DataTypes.STRING() + DataTypes.VARCHAR(10) }, new String[] {"_id", "v1", "v2", "v3"}); List primaryKeys = Collections.singletonList("_id"); @@ -280,20 +282,30 @@ private void testSchemaEvolutionMultipleImpl(Statement statement) throws Excepti + "ADD COLUMN v6 DECIMAL(5, 3)," + "ADD COLUMN \"$% ^,& *(\" VARCHAR(10)," + "ALTER COLUMN v2 TYPE BIGINT"); + String v1 = "long_string_two"; + String v3 = "string_2"; + String v7 = "test_2"; + statement.executeUpdate( "INSERT INTO schema_evolution_multiple VALUES " - + "(2, 'long_string_two', 2000000000000, 'string_2', 20, 20.5, 20.002, 'test_2')"); + + "(2, '" + + v1 + + "', 2000000000000, '" + + v3 + + "', 20, 20.5, 20.002, '" + + v7 + + "')"); rowType = RowType.of( new DataType[] { DataTypes.INT().notNull(), - DataTypes.STRING(), + DataTypes.VARCHAR(v1.length()), DataTypes.BIGINT(), - DataTypes.STRING(), + DataTypes.VARCHAR(10), DataTypes.INT(), DataTypes.DOUBLE(), DataTypes.DECIMAL(5, 3), - DataTypes.STRING() + DataTypes.VARCHAR(v7.length()) }, new String[] {"_id", "v1", "v2", "v3", "v4", "v5", "v6", "$% ^,& *("}); expected = @@ -362,8 +374,8 @@ private void testAllTypesImpl(Statement statement) throws Exception { DataTypes.TIMESTAMP(6), // _timestamp0 DataTypes.TIME(6), // _time DataTypes.TIME(6), // _time0 - DataTypes.STRING(), // _char - DataTypes.STRING(), // _varchar + DataTypes.CHAR(10), // _char + DataTypes.VARCHAR(20), // _varchar DataTypes.STRING(), // _text DataTypes.BYTES(), // _bin DataTypes.STRING(), // _json @@ -659,7 +671,7 @@ public void testSyncShards() throws Exception { RowType.of( new DataType[] { DataTypes.INT().notNull(), - DataTypes.STRING(), + DataTypes.VARCHAR(10), DataTypes.STRING().notNull() }, new String[] {"pk", "_date", "pt"}); @@ -752,7 +764,7 @@ public void testMetadataColumns() throws Exception { RowType.of( new DataType[] { DataTypes.INT().notNull(), - DataTypes.STRING(), + DataTypes.VARCHAR(10), DataTypes.STRING().notNull(), DataTypes.STRING().notNull(), DataTypes.STRING().notNull() diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql index 66a0e46b3150..cd85f7dce5b4 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql @@ -323,13 +323,15 @@ CREATE TABLE test_exist_column_comment_change ( PRIMARY KEY (pk) ); -CREATE TABLE test_exist_column_type_length_change ( - pk INT, - c1 DATE, - c2 VARCHAR(10) not null, - PRIMARY KEY (pk) +CREATE TABLE test_exist_column_alter ( + pk INT, + a BIGINT, + b VARCHAR(30), + c INT, + PRIMARY KEY (pk) ); + -- ################################################################################ -- testSyncShard -- ################################################################################ From 55627fd2592b51424a2999b4028c9a8fcc290460 Mon Sep 17 00:00:00 2001 From: sinat_28987027 Date: Sat, 18 May 2024 01:33:32 +0800 Subject: [PATCH 06/11] optimized code --- .../cdc/postgres/PostgresRecordParser.java | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java index c8ff78e3ea2f..f99a8781049f 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java @@ -225,24 +225,15 @@ private DataType extractFieldType( case "boolean": return DataTypes.BOOLEAN(); case "string": + int newLength = afterData.get(field.field()).asText().length(); if (paimonField == null) { - return DataTypes.VARCHAR(afterData.get(field.field()).asText().length()); + return DataTypes.VARCHAR(newLength); } else if (paimonField.type() instanceof VarCharType) { int oldLength = ((VarCharType) paimonField.type()).getLength(); - int newLength = afterData.get(field.field()).asText().length(); - if (oldLength < newLength) { - return DataTypes.VARCHAR(newLength); - } else { - return DataTypes.VARCHAR(oldLength); - } + return DataTypes.VARCHAR(Math.max(oldLength, newLength)); } else if (paimonField.type() instanceof CharType) { int oldLength = ((CharType) paimonField.type()).getLength(); - int newLength = afterData.get(field.field()).asText().length(); - if (oldLength < newLength) { - return DataTypes.CHAR(newLength); - } else { - return DataTypes.CHAR(oldLength); - } + return DataTypes.CHAR(Math.max(oldLength, newLength)); } return DataTypes.STRING(); case "bytes": From 488e841a1e4e8dfdb6340eaa891b34ac3a2c37b0 Mon Sep 17 00:00:00 2001 From: sinat_28987027 Date: Sat, 18 May 2024 14:11:15 +0800 Subject: [PATCH 07/11] resolve conflict --- .../action/cdc/mysql/MySqlSyncTableActionITCase.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java index dae0ba6b4d4e..5b48018d3e22 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java @@ -1388,7 +1388,11 @@ public void testColumnAlterInExistingTableWhenStartJob() throws Exception { .build(); createFileStoreTable( - rowType, Collections.emptyList(), Collections.singletonList("pk"), options); + rowType, + Collections.emptyList(), + Collections.singletonList("pk"), + Collections.emptyList(), + options); Map mySqlConfig = getBasicMySqlConfig(); mySqlConfig.put("database-name", DATABASE_NAME); @@ -1412,7 +1416,8 @@ public void testColumnAlterInExistingTableWhenStartJob() throws Exception { assertThat(actual.get("a").type()).isEqualTo(DataTypes.BIGINT()); assertThat(actual.get("b").type()).isEqualTo(DataTypes.VARCHAR(30)); assertThat(actual.get("c").type()).isEqualTo(DataTypes.INT()); - + } + public void testWriteOnlyAndSchemaEvolution() throws Exception { Map mySqlConfig = getBasicMySqlConfig(); mySqlConfig.put("database-name", "write_only_and_schema_evolution"); From fe5803bd9fa8544642f94f1c9a5031f36466606e Mon Sep 17 00:00:00 2001 From: sinat_28987027 Date: Sat, 18 May 2024 14:11:43 +0800 Subject: [PATCH 08/11] add pg-test --- .../PostgresSyncTableActionITCase.java | 47 +++++++++++++++++++ .../resources/postgres/sync_table_setup.sql | 9 ++++ 2 files changed, 56 insertions(+) diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java index 59b146aaaf94..a68026488e90 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java @@ -20,6 +20,7 @@ import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; @@ -38,6 +39,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Function; +import java.util.stream.Collectors; import static org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches; import static org.assertj.core.api.Assertions.assertThat; @@ -797,6 +800,50 @@ public void testCatalogAndTableConfig() { .containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value")); } + @Test + @Timeout(60) + public void testColumnAlterInExistingTableWhenStartJob() throws Exception { + String tableName = "test_exist_column_alter"; + Map options = new HashMap<>(); + options.put("bucket", "1"); + options.put("sink.parallelism", "1"); + + RowType rowType = + RowType.builder() + .field("pk", DataTypes.INT().notNull()) + .field("a", DataTypes.BIGINT()) + .field("b", DataTypes.VARCHAR(20)) + .build(); + + createFileStoreTable( + rowType, + Collections.emptyList(), + Collections.singletonList("pk"), + Collections.emptyList(), + options); + + Map postgresConfig = getBasicPostgresConfig(); + postgresConfig.put(PostgresSourceOptions.DATABASE_NAME.key(), DATABASE_NAME); + postgresConfig.put(PostgresSourceOptions.SCHEMA_NAME.key(), SCHEMA_NAME); + postgresConfig.put(PostgresSourceOptions.TABLE_NAME.key(), tableName); + + PostgresSyncTableAction action = + syncTableActionBuilder(postgresConfig).withPrimaryKeys("pk").build(); + + runActionWithDefaultEnv(action); + + FileStoreTable table = getFileStoreTable(); + + Map actual = + table.schema().fields().stream() + .collect(Collectors.toMap(DataField::name, Function.identity())); + + assertThat(actual.get("pk").type()).isEqualTo(DataTypes.INT().notNull()); + assertThat(actual.get("a").type()).isEqualTo(DataTypes.BIGINT()); + assertThat(actual.get("b").type()).isEqualTo(DataTypes.VARCHAR(30)); + assertThat(actual.get("c").type()).isEqualTo(DataTypes.INT()); + } + private FileStoreTable getFileStoreTable() throws Exception { return getFileStoreTable(tableName); } diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/postgres/sync_table_setup.sql b/paimon-flink/paimon-flink-cdc/src/test/resources/postgres/sync_table_setup.sql index 373eb3880f40..a451cb1c7b6a 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/resources/postgres/sync_table_setup.sql +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/postgres/sync_table_setup.sql @@ -229,6 +229,15 @@ CREATE TABLE test_options_change ( ALTER TABLE test_options_change REPLICA IDENTITY FULL; +CREATE TABLE test_exist_column_alter ( + pk INT, + a BIGINT, + b VARCHAR(30), + c INT, + PRIMARY KEY (pk) +); + + -- ################################################################################ -- testMetadataColumns -- ################################################################################ From 2addf772b196ce399d91fdf662b55dc12c7c63b1 Mon Sep 17 00:00:00 2001 From: sinat_28987027 Date: Sat, 18 May 2024 18:57:25 +0800 Subject: [PATCH 09/11] fixed --- .../action/cdc/CdcActionCommonUtils.java | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java index 91538c81f85d..47b28f571f8c 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java @@ -86,17 +86,17 @@ public static boolean schemaCompatible( LOG.info( "New fields '{}' found in source table, will be synchronized to Paimon table.", field.name()); - return true; - } - DataType type = paimonSchema.fields().get(idx).type(); - if (UpdatedDataFieldsProcessFunction.canConvert(type, field.type()) - != UpdatedDataFieldsProcessFunction.ConvertAction.CONVERT) { - LOG.info( - "Cannot convert field '{}' from source table type '{}' to Paimon type '{}'.", - field.name(), - field.type(), - type); - return false; + } else { + DataType type = paimonSchema.fields().get(idx).type(); + if (UpdatedDataFieldsProcessFunction.canConvert(type, field.type()) + != UpdatedDataFieldsProcessFunction.ConvertAction.CONVERT) { + LOG.info( + "Cannot convert field '{}' from source table type '{}' to Paimon type '{}'.", + field.name(), + field.type(), + type); + return false; + } } } return true; From 3d206327e83cc2376eaaff7d5d4978b12c672817 Mon Sep 17 00:00:00 2001 From: sinat_28987027 Date: Sat, 18 May 2024 18:59:29 +0800 Subject: [PATCH 10/11] update test cast,support ColumnComment change --- .../flink/action/cdc/mysql/MySqlSyncTableActionITCase.java | 6 ++---- .../src/test/resources/mysql/sync_table_setup.sql | 4 ++-- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java index 5b48018d3e22..ff138ec38349 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java @@ -1355,8 +1355,6 @@ public void testColumnCommentChangeInExistingTable() throws Exception { mySqlConfig.put("database-name", DATABASE_NAME); mySqlConfig.put("table-name", "test_exist_column_comment_change"); - // Flink cdc 2.3 does not support collecting field comments, and existing paimon table field - // comments will not be changed. MySqlSyncTableAction action = syncTableActionBuilder(mySqlConfig) .withPrimaryKeys("pk") @@ -1368,8 +1366,8 @@ public void testColumnCommentChangeInExistingTable() throws Exception { Map actual = table.schema().fields().stream() .collect(Collectors.toMap(DataField::name, Function.identity())); - assertThat(actual.get("pk").description()).isEqualTo("pk comment"); - assertThat(actual.get("c1").description()).isEqualTo("c1 comment"); + assertThat(actual.get("pk").description()).isEqualTo("pk new_comment"); + assertThat(actual.get("c1").description()).isEqualTo("c1 new_comment"); assertThat(actual.get("c2").description()).isEqualTo("c2 comment"); } diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql index df0c19f5ea78..87db716ca127 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql @@ -317,8 +317,8 @@ CREATE TABLE test_exist_options_change ( ); CREATE TABLE test_exist_column_comment_change ( - pk INT, - c1 DATE, + pk INT comment 'pk new_comment', + c1 DATE comment 'c1 new_comment', c2 VARCHAR(10) not null comment 'c2 comment', PRIMARY KEY (pk) ); From c936ab76781eeb2bcc29a39d51404ea796979689 Mon Sep 17 00:00:00 2001 From: sinat_28987027 Date: Tue, 21 May 2024 11:42:41 +0800 Subject: [PATCH 11/11] fixed --- .../org/apache/paimon/flink/action/cdc/SyncTableActionBase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java index f0b8617d4303..f77940bed390 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java @@ -129,7 +129,7 @@ protected void beforeBuildingSourceSink() throws Exception { buildComputedColumns(computedColumnArgs, retrievedSchema.fields()); Schema paimonSchema = buildPaimonSchema(retrievedSchema); assertSchemaCompatible(fileStoreTable.schema(), paimonSchema.fields()); - alterTableSchema(identifier, fileStoreTable, paimonSchema); + fileStoreTable = alterTableSchema(identifier, fileStoreTable, paimonSchema); } catch (SchemaRetrievalException e) { LOG.info( "Failed to retrieve schema from record data but there exists specified Paimon table. "