diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java index 74c2f6fc7eb8..01bfa7e6d716 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java @@ -110,7 +110,7 @@ protected void validateCaseSensitivity() { } @Override - protected FlatMapFunction recordParse() { + protected FlatMapFunction recordParser() { return syncJobHandler.provideRecordParser( caseSensitive, Collections.emptyList(), typeMapping, metadataConverters); } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java index a5aab32d08ef..de421ae54fea 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java @@ -154,7 +154,7 @@ protected void beforeBuildingSourceSink() throws Exception { } @Override - protected FlatMapFunction recordParse() { + protected FlatMapFunction recordParser() { return syncJobHandler.provideRecordParser( caseSensitive, computedColumns, typeMapping, metadataConverters); } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java index 3cc77e6f5b99..d1e0bdb44278 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java @@ -106,7 +106,7 @@ public void build() throws Exception { beforeBuildingSourceSink(); DataStream input = - buildDataStreamSource(buildSource()).flatMap(recordParse()).name("Parse"); + buildDataStreamSource(buildSource()).flatMap(recordParser()).name("Parse"); EventParser.Factory parserFactory = buildEventParserFactory(); @@ -135,7 +135,7 @@ private DataStreamSource buildDataStreamSource(Object source) { throw new UnsupportedOperationException("Unrecognized source type"); } - protected abstract FlatMapFunction recordParse(); + protected abstract FlatMapFunction recordParser(); protected abstract EventParser.Factory buildEventParserFactory(); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputer.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputer.java index e626dc1ed5cc..e9793a669a4d 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputer.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputer.java @@ -76,7 +76,7 @@ private ChannelComputer computeChannelComputer(CdcMultiplexRecord rec FileStoreTable table; try { table = (FileStoreTable) catalog.getTable(id); - table.copy(dynamicOptions); + table = table.copy(dynamicOptions); } catch (Catalog.TableNotExistException e) { LOG.error("Failed to get table " + id.getFullName()); return null; diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java index 7330e63c5afa..bc905fdb0ab1 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java @@ -181,7 +181,7 @@ private FileStoreTable getTable(Identifier tableId) throws InterruptedException while (true) { try { table = (FileStoreTable) catalog.getTable(tableId); - table.copy(dynamicOptions); + table = table.copy(dynamicOptions); tables.put(tableId, table); break; } catch (Catalog.TableNotExistException e) { diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java index d4c630777a6d..ef1753624605 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java @@ -141,11 +141,6 @@ private void buildCombinedCdcSink() { .name("Side Output") .setParallelism(input.getParallelism()); - // for newly-added tables, create a multiplexing operator that handles all their records - // and writes to multiple tables - DataStream newlyAddedTableStream = - SingleOutputStreamOperatorUtils.getSideOutput( - parsed, CdcDynamicTableParsingProcessFunction.DYNAMIC_OUTPUT_TAG); // handles schema change for newly added tables SingleOutputStreamOperatorUtils.getSideOutput( parsed, @@ -153,6 +148,12 @@ private void buildCombinedCdcSink() { .process(new MultiTableUpdatedDataFieldsProcessFunction(catalogLoader)) .name("Schema Evolution"); + // for newly-added tables, create a multiplexing operator that handles all their records + // and writes to multiple tables + DataStream newlyAddedTableStream = + SingleOutputStreamOperatorUtils.getSideOutput( + parsed, CdcDynamicTableParsingProcessFunction.DYNAMIC_OUTPUT_TAG); + DataStream partitioned = partition( newlyAddedTableStream,