From c408d50f6432e8aef8d16e9e61a628700ab90a01 Mon Sep 17 00:00:00 2001
From: Kerwin <37063904+zhuangchong@users.noreply.github.com>
Date: Thu, 25 Jan 2024 09:51:43 +0800
Subject: [PATCH] [cdc] The newly added table cannot obtain the metadata column
in the database cdc action. (#2784)
---
.../generated/postgres_sync_table.html | 2 +-
.../action/cdc/SyncDatabaseActionBase.java | 3 +-
.../flink/sink/cdc/NewTableSchemaBuilder.java | 39 +++++++++++++++----
.../mysql/MySqlSyncDatabaseActionITCase.java | 14 +++++++
.../PostgresSyncTableActionITCase.java | 2 +-
5 files changed, 49 insertions(+), 11 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/postgres_sync_table.html b/docs/layouts/shortcodes/generated/postgres_sync_table.html
index ecb085375cf0..407a91f24f16 100644
--- a/docs/layouts/shortcodes/generated/postgres_sync_table.html
+++ b/docs/layouts/shortcodes/generated/postgres_sync_table.html
@@ -60,7 +60,7 @@
--metadata_column |
- --metadata_column is used to specify which metadata columns to include in the output schema of the connector. Metadata columns provide additional information related to the source data, for example: --metadata_column table_name --metadata_column database_name --metadata_column schema_name --metadata_column op_ts. See its document for a complete list of available metadata. |
+ --metadata_column is used to specify which metadata columns to include in the output schema of the connector. Metadata columns provide additional information related to the source data, for example: --metadata_column table_name,database_name,schema_name,op_ts. See its document for a complete list of available metadata. |
--postgres_conf |
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
index 74c2f6fc7eb8..ecfca9a1715e 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
@@ -117,7 +117,8 @@ protected FlatMapFunction recordParse() {
@Override
protected EventParser.Factory buildEventParserFactory() {
- NewTableSchemaBuilder schemaBuilder = new NewTableSchemaBuilder(tableConfig, caseSensitive);
+ NewTableSchemaBuilder schemaBuilder =
+ new NewTableSchemaBuilder(tableConfig, caseSensitive, metadataConverters);
Pattern includingPattern = Pattern.compile(includingTables);
Pattern excludingPattern =
excludingTables == null ? null : Pattern.compile(excludingTables);
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java
index c7352eb269f1..18324abedc35 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java
@@ -18,27 +18,35 @@
package org.apache.paimon.flink.sink.cdc;
+import org.apache.paimon.flink.action.cdc.CdcMetadataConverter;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.DataType;
import java.io.Serializable;
-import java.util.LinkedHashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnCaseConvertAndDuplicateCheck;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnDuplicateErrMsg;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.listCaseConvert;
-import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCaseConvert;
/** Build schema for new table found in database synchronization. */
public class NewTableSchemaBuilder implements Serializable {
private final Map tableConfig;
private final boolean caseSensitive;
+ private final CdcMetadataConverter[] metadataConverters;
- public NewTableSchemaBuilder(Map tableConfig, boolean caseSensitive) {
+ public NewTableSchemaBuilder(
+ Map tableConfig,
+ boolean caseSensitive,
+ CdcMetadataConverter[] metadataConverters) {
this.tableConfig = tableConfig;
this.caseSensitive = caseSensitive;
+ this.metadataConverters = metadataConverters;
}
public Optional build(RichCdcMultiplexRecord record) {
@@ -47,12 +55,27 @@ public Optional build(RichCdcMultiplexRecord record) {
String tableName = record.tableName();
tableName = tableName == null ? "UNKNOWN" : tableName;
- LinkedHashMap fieldTypes =
- mapKeyCaseConvert(
- record.fieldTypes(), caseSensitive, columnDuplicateErrMsg(tableName));
- for (Map.Entry entry : fieldTypes.entrySet()) {
- builder.column(entry.getKey(), entry.getValue());
+ // fields
+ Set existedFields = new HashSet<>();
+ Function columnDuplicateErrMsg = columnDuplicateErrMsg(tableName);
+
+ for (Map.Entry entry : record.fieldTypes().entrySet()) {
+ String fieldName =
+ columnCaseConvertAndDuplicateCheck(
+ entry.getKey(), existedFields, caseSensitive, columnDuplicateErrMsg);
+
+ builder.column(fieldName, entry.getValue());
+ }
+
+ for (CdcMetadataConverter metadataConverter : metadataConverters) {
+ String metadataColumnName =
+ columnCaseConvertAndDuplicateCheck(
+ metadataConverter.columnName(),
+ existedFields,
+ caseSensitive,
+ columnDuplicateErrMsg);
+ builder.column(metadataColumnName, metadataConverter.dataType());
}
builder.primaryKey(listCaseConvert(record.primaryKeys(), caseSensitive));
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
index 5074e759db8f..84169ffe02b7 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
@@ -1314,6 +1314,20 @@ public void testMetadataColumns() throws Exception {
table,
rowType,
Collections.singletonList("k"));
+
+ // test newly created table
+ if (mode == COMBINED) {
+ statement.execute("USE " + "metadata");
+ statement.executeUpdate("CREATE TABLE t3 (k INT, v1 VARCHAR(10), PRIMARY KEY (k))");
+ statement.executeUpdate("INSERT INTO t3 VALUES (1, 'Hi')");
+ waitingTables("t3");
+ table = getFileStoreTable("t3");
+ waitForResult(
+ Collections.singletonList("+I[1, Hi, t3, metadata]"),
+ table,
+ rowType,
+ Collections.singletonList("k"));
+ }
}
}
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java
index a6a34c9bf4d5..cbd6c7bd09a3 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java
@@ -508,7 +508,7 @@ public void testInvalidPrimaryKey() {
.satisfies(
anyCauseMatches(
IllegalArgumentException.class,
- "Specified primary key 'pk' does not exist in source tables or computed columns."));
+ "Specified primary key 'pk' does not exist in source tables or computed columns [pt, _id, v1]."));
}
@Test