From 3a794685bc92b6352bb4275fac71cc6c48ecf4c2 Mon Sep 17 00:00:00 2001 From: yuzelin Date: Sun, 17 Dec 2023 13:56:30 +0800 Subject: [PATCH 1/4] [hotfix] fix typo --- .../paimon/flink/action/cdc/SyncDatabaseActionBase.java | 2 +- .../apache/paimon/flink/action/cdc/SyncTableActionBase.java | 2 +- .../paimon/flink/action/cdc/SynchronizationActionBase.java | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java index 74c2f6fc7eb8..01bfa7e6d716 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java @@ -110,7 +110,7 @@ protected void validateCaseSensitivity() { } @Override - protected FlatMapFunction recordParse() { + protected FlatMapFunction recordParser() { return syncJobHandler.provideRecordParser( caseSensitive, Collections.emptyList(), typeMapping, metadataConverters); } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java index a5aab32d08ef..de421ae54fea 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java @@ -154,7 +154,7 @@ protected void beforeBuildingSourceSink() throws Exception { } @Override - protected FlatMapFunction recordParse() { + protected FlatMapFunction recordParser() { return syncJobHandler.provideRecordParser( caseSensitive, computedColumns, typeMapping, metadataConverters); } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java index 3cc77e6f5b99..d1e0bdb44278 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java @@ -106,7 +106,7 @@ public void build() throws Exception { beforeBuildingSourceSink(); DataStream input = - buildDataStreamSource(buildSource()).flatMap(recordParse()).name("Parse"); + buildDataStreamSource(buildSource()).flatMap(recordParser()).name("Parse"); EventParser.Factory parserFactory = buildEventParserFactory(); @@ -135,7 +135,7 @@ private DataStreamSource buildDataStreamSource(Object source) { throw new UnsupportedOperationException("Unrecognized source type"); } - protected abstract FlatMapFunction recordParse(); + protected abstract FlatMapFunction recordParser(); protected abstract EventParser.Factory buildEventParserFactory(); From 9633e5ecec6afa1657b709eceb8a23d1ee917a50 Mon Sep 17 00:00:00 2001 From: yuzelin Date: Sun, 17 Dec 2023 14:37:42 +0800 Subject: [PATCH 2/4] [hotfix] fix typo --- .../sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java index d4c630777a6d..ef1753624605 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java @@ -141,11 +141,6 @@ private void buildCombinedCdcSink() { .name("Side Output") .setParallelism(input.getParallelism()); - // for newly-added tables, create a multiplexing operator that handles all their records - // and writes to multiple tables - DataStream newlyAddedTableStream = - SingleOutputStreamOperatorUtils.getSideOutput( - parsed, CdcDynamicTableParsingProcessFunction.DYNAMIC_OUTPUT_TAG); // handles schema change for newly added tables SingleOutputStreamOperatorUtils.getSideOutput( parsed, @@ -153,6 +148,12 @@ private void buildCombinedCdcSink() { .process(new MultiTableUpdatedDataFieldsProcessFunction(catalogLoader)) .name("Schema Evolution"); + // for newly-added tables, create a multiplexing operator that handles all their records + // and writes to multiple tables + DataStream newlyAddedTableStream = + SingleOutputStreamOperatorUtils.getSideOutput( + parsed, CdcDynamicTableParsingProcessFunction.DYNAMIC_OUTPUT_TAG); + DataStream partitioned = partition( newlyAddedTableStream, From 1d34030275321d8381653c6a7f46e13c1e1e9ff0 Mon Sep 17 00:00:00 2001 From: yuzelin Date: Sun, 17 Dec 2023 17:30:54 +0800 Subject: [PATCH 3/4] [hotfix] fix typo --- .../flink/sink/cdc/CdcMultiplexRecordChannelComputer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputer.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputer.java index e626dc1ed5cc..e9793a669a4d 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputer.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputer.java @@ -76,7 +76,7 @@ private ChannelComputer computeChannelComputer(CdcMultiplexRecord rec FileStoreTable table; try { table = (FileStoreTable) catalog.getTable(id); - table.copy(dynamicOptions); + table = table.copy(dynamicOptions); } catch (Catalog.TableNotExistException e) { LOG.error("Failed to get table " + id.getFullName()); return null; From e8f436bfbb550eabfdb9356403dfe2d77de69cf4 Mon Sep 17 00:00:00 2001 From: yuzelin Date: Sun, 17 Dec 2023 17:58:38 +0800 Subject: [PATCH 4/4] [hotfix] fix typo --- .../paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java index 7330e63c5afa..bc905fdb0ab1 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java @@ -181,7 +181,7 @@ private FileStoreTable getTable(Identifier tableId) throws InterruptedException while (true) { try { table = (FileStoreTable) catalog.getTable(tableId); - table.copy(dynamicOptions); + table = table.copy(dynamicOptions); tables.put(tableId, table); break; } catch (Catalog.TableNotExistException e) {