Skip to content

Commit

Permalink
[flink][cdc] Add operator and transformation name for cdc pipeline (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin authored Oct 17, 2023
1 parent 5d4dbbe commit 8764854
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ public void build(StreamExecutionEnvironment env) throws Exception {
new FlinkCdcSyncDatabaseSinkBuilder<RichCdcMultiplexRecord>()
.withInput(
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")
.flatMap(recordParser))
.flatMap(recordParser)
.name("Parse"))
.withParserFactory(parserFactory)
.withCatalogLoader(catalogLoader())
.withDatabase(database)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,10 @@ public void processElement(T raw, Context context, Collector<Void> collector) th
try {
catalog.createTable(identifier, schema, true);
} catch (Exception e) {
LOG.error("create newly added paimon table error.", e);
LOG.error(
"Cannot create newly added Paimon table {}",
identifier.getFullName(),
e);
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ private void buildCombinedCdcSink() {
.process(
new CdcDynamicTableParsingProcessFunction<>(
database, catalogLoader, parserFactory))
.name("Side Output")
.setParallelism(input.getParallelism());

// for newly-added tables, create a multiplexing operator that handles all their records
Expand All @@ -149,7 +150,8 @@ private void buildCombinedCdcSink() {
SingleOutputStreamOperatorUtils.getSideOutput(
parsed,
CdcDynamicTableParsingProcessFunction.DYNAMIC_SCHEMA_CHANGE_OUTPUT_TAG)
.process(new MultiTableUpdatedDataFieldsProcessFunction(catalogLoader));
.process(new MultiTableUpdatedDataFieldsProcessFunction(catalogLoader))
.name("Schema Evolution");

DataStream<CdcMultiplexRecord> partitioned =
partition(
Expand Down

0 comments on commit 8764854

Please sign in to comment.