Skip to content

Commit

Permalink
[cdc] The newly added table cannot obtain the metadata column in the …
Browse files Browse the repository at this point in the history
…database cdc action. (#2784)
  • Loading branch information
zhuangchong authored Jan 25, 2024
1 parent 4cbd24d commit c408d50
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 11 deletions.
2 changes: 1 addition & 1 deletion docs/layouts/shortcodes/generated/postgres_sync_table.html
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
</tr>
<tr>
<td><h5>--metadata_column</h5></td>
<td>--metadata_column is used to specify which metadata columns to include in the output schema of the connector. Metadata columns provide additional information related to the source data, for example: --metadata_column table_name --metadata_column database_name --metadata_column schema_name --metadata_column op_ts. See its <a href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/postgres-cdc.html#available-metadata">document</a> for a complete list of available metadata.</td>
<td>--metadata_column is used to specify which metadata columns to include in the output schema of the connector. Metadata columns provide additional information related to the source data, for example: --metadata_column table_name,database_name,schema_name,op_ts. See its <a href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/postgres-cdc.html#available-metadata">document</a> for a complete list of available metadata.</td>
</tr>
<tr>
<td><h5>--postgres_conf</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ protected FlatMapFunction<String, RichCdcMultiplexRecord> recordParse() {

@Override
protected EventParser.Factory<RichCdcMultiplexRecord> buildEventParserFactory() {
NewTableSchemaBuilder schemaBuilder = new NewTableSchemaBuilder(tableConfig, caseSensitive);
NewTableSchemaBuilder schemaBuilder =
new NewTableSchemaBuilder(tableConfig, caseSensitive, metadataConverters);
Pattern includingPattern = Pattern.compile(includingTables);
Pattern excludingPattern =
excludingTables == null ? null : Pattern.compile(excludingTables);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,35 @@

package org.apache.paimon.flink.sink.cdc;

import org.apache.paimon.flink.action.cdc.CdcMetadataConverter;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.DataType;

import java.io.Serializable;
import java.util.LinkedHashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;

import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnCaseConvertAndDuplicateCheck;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnDuplicateErrMsg;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.listCaseConvert;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCaseConvert;

/** Build schema for new table found in database synchronization. */
public class NewTableSchemaBuilder implements Serializable {

private final Map<String, String> tableConfig;
private final boolean caseSensitive;
private final CdcMetadataConverter[] metadataConverters;

public NewTableSchemaBuilder(Map<String, String> tableConfig, boolean caseSensitive) {
public NewTableSchemaBuilder(
Map<String, String> tableConfig,
boolean caseSensitive,
CdcMetadataConverter[] metadataConverters) {
this.tableConfig = tableConfig;
this.caseSensitive = caseSensitive;
this.metadataConverters = metadataConverters;
}

public Optional<Schema> build(RichCdcMultiplexRecord record) {
Expand All @@ -47,12 +55,27 @@ public Optional<Schema> build(RichCdcMultiplexRecord record) {

String tableName = record.tableName();
tableName = tableName == null ? "UNKNOWN" : tableName;
LinkedHashMap<String, DataType> fieldTypes =
mapKeyCaseConvert(
record.fieldTypes(), caseSensitive, columnDuplicateErrMsg(tableName));

for (Map.Entry<String, DataType> entry : fieldTypes.entrySet()) {
builder.column(entry.getKey(), entry.getValue());
// fields
Set<String> existedFields = new HashSet<>();
Function<String, String> columnDuplicateErrMsg = columnDuplicateErrMsg(tableName);

for (Map.Entry<String, DataType> entry : record.fieldTypes().entrySet()) {
String fieldName =
columnCaseConvertAndDuplicateCheck(
entry.getKey(), existedFields, caseSensitive, columnDuplicateErrMsg);

builder.column(fieldName, entry.getValue());
}

for (CdcMetadataConverter metadataConverter : metadataConverters) {
String metadataColumnName =
columnCaseConvertAndDuplicateCheck(
metadataConverter.columnName(),
existedFields,
caseSensitive,
columnDuplicateErrMsg);
builder.column(metadataColumnName, metadataConverter.dataType());
}

builder.primaryKey(listCaseConvert(record.primaryKeys(), caseSensitive));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1314,6 +1314,20 @@ public void testMetadataColumns() throws Exception {
table,
rowType,
Collections.singletonList("k"));

// test newly created table
if (mode == COMBINED) {
statement.execute("USE " + "metadata");
statement.executeUpdate("CREATE TABLE t3 (k INT, v1 VARCHAR(10), PRIMARY KEY (k))");
statement.executeUpdate("INSERT INTO t3 VALUES (1, 'Hi')");
waitingTables("t3");
table = getFileStoreTable("t3");
waitForResult(
Collections.singletonList("+I[1, Hi, t3, metadata]"),
table,
rowType,
Collections.singletonList("k"));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ public void testInvalidPrimaryKey() {
.satisfies(
anyCauseMatches(
IllegalArgumentException.class,
"Specified primary key 'pk' does not exist in source tables or computed columns."));
"Specified primary key 'pk' does not exist in source tables or computed columns [pt, _id, v1]."));
}

@Test
Expand Down

0 comments on commit c408d50

Please sign in to comment.