From d43ea0a3a73d61e6e3048ea7b95d66f9cda1a8fc Mon Sep 17 00:00:00 2001 From: zhuangchong Date: Mon, 11 Dec 2023 16:56:07 +0800 Subject: [PATCH 1/6] Unified usage method for the parameter 'metadata columns' in the mysql database cdc and mysql table cdc. --- docs/content/cdc-ingestion/mysql-cdc.md | 4 ++-- .../shortcodes/generated/mysql_sync_database.html | 4 ++-- .../shortcodes/generated/mysql_sync_table.html | 4 ++-- .../paimon/flink/action/cdc/CdcActionCommonUtils.java | 2 +- .../flink/action/cdc/SyncDatabaseActionBase.java | 10 +++++----- .../paimon/flink/action/cdc/SyncTableActionBase.java | 4 ++++ .../cdc/mysql/MySqlSyncDatabaseActionFactory.java | 11 ++++++----- .../action/cdc/mysql/MySqlSyncTableActionFactory.java | 10 +++++----- 8 files changed, 27 insertions(+), 22 deletions(-) diff --git a/docs/content/cdc-ingestion/mysql-cdc.md b/docs/content/cdc-ingestion/mysql-cdc.md index 2bfc38c3597a..1b2cfa281754 100644 --- a/docs/content/cdc-ingestion/mysql-cdc.md +++ b/docs/content/cdc-ingestion/mysql-cdc.md @@ -51,7 +51,7 @@ To use this feature through `flink run`, run the following shell command. [--primary_keys ] \ [--type_mapping ] \ [--computed_column <'column-name=expr-name(args[, ...])'> [--computed_column ...]] \ - [--metadata_column ] \ + [--metadata_columns ] \ [--mysql_conf [--mysql_conf ...]] \ [--catalog_conf [--catalog_conf ...]] \ [--table_conf [--table_conf ...]] @@ -135,7 +135,7 @@ To use this feature through `flink run`, run the following shell command. [--including_tables ] \ [--excluding_tables ] \ [--mode ] \ - [--metadata_column ] \ + [--metadata_columns ] \ [--type_mapping ] \ [--mysql_conf [--mysql_conf ...]] \ [--catalog_conf [--catalog_conf ...]] \ diff --git a/docs/layouts/shortcodes/generated/mysql_sync_database.html b/docs/layouts/shortcodes/generated/mysql_sync_database.html index 8bb0e3bc2cd0..592762b8fb97 100644 --- a/docs/layouts/shortcodes/generated/mysql_sync_database.html +++ b/docs/layouts/shortcodes/generated/mysql_sync_database.html @@ -62,8 +62,8 @@ It is used to specify synchronization mode.
Possible values:
  • "divided" (the default mode if you haven't specified one): start a sink for each table, the synchronization of the new table requires restarting the job.
  • "combined": start a single combined sink for all tables, the new table will be automatically synchronized.
-
--metadata_column
- --metadata_column is used to specify which metadata columns to include in the output schema of the connector. Metadata columns provide additional information related to the source data, such as the `table_name`, `database_name`, and `op_ts`. Each configuration should be specified in the format "key=value". See its document for a complete list of available metadata. +
--metadata_columns
+ --metadata_columns is used to specify which metadata columns to include in the output schema of the connector. Metadata columns provide additional information related to the source data, for example "table_name,database_name,op_ts". See its document for a complete list of available metadata.
--type_mapping
diff --git a/docs/layouts/shortcodes/generated/mysql_sync_table.html b/docs/layouts/shortcodes/generated/mysql_sync_table.html index bae03410977c..d26427e66495 100644 --- a/docs/layouts/shortcodes/generated/mysql_sync_table.html +++ b/docs/layouts/shortcodes/generated/mysql_sync_table.html @@ -66,8 +66,8 @@ The definitions of computed columns. The argument field is from MySQL table field name. See here for a complete list of configurations. -
--metadata_column
- --metadata_column is used to specify which metadata columns to include in the output schema of the connector. Metadata columns provide additional information related to the source data, such as the `table_name`, `database_name`, and `op_ts`. Each configuration should be specified in the format "key=value". See its document for a complete list of available metadata. +
--metadata_columns
+ --metadata_columns is used to specify which metadata columns to include in the output schema of the connector. Metadata columns provide additional information related to the source data, for example "table_name,database_name,op_ts". See its document for a complete list of available metadata.
--mysql_conf
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java index 97d4c9a10d53..6a8032506e84 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java @@ -59,7 +59,7 @@ public class CdcActionCommonUtils { public static final String PARTITION_KEYS = "partition_keys"; public static final String PRIMARY_KEYS = "primary_keys"; public static final String COMPUTED_COLUMN = "computed_column"; - public static final String METADATA_COLUMN = "metadata_column"; + public static final String METADATA_COLUMNS = "metadata_columns"; public static void assertSchemaCompatible( TableSchema paimonSchema, List sourceTableFields) { diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java index d31c6cc12235..922751c37dae 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java @@ -37,11 +37,7 @@ import javax.annotation.Nullable; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; +import java.util.*; import java.util.regex.Pattern; import static org.apache.paimon.flink.action.MultiTablesSinkMode.COMBINED; @@ -119,6 +115,10 @@ public SyncDatabaseActionBase withTypeMapping(TypeMapping typeMapping) { return this; } + public SyncDatabaseActionBase withMetadataColumns(String... metadataColumns) { + return withMetadataColumns(Arrays.asList(metadataColumns)); + } + public SyncDatabaseActionBase withMetadataColumns(List metadataColumns) { this.metadataConverters = metadataColumns.stream() diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java index 82510f15a35d..857c01e56c47 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java @@ -120,6 +120,10 @@ public SyncTableActionBase withTypeMapping(TypeMapping typeMapping) { return this; } + public SyncTableActionBase withMetadataColumns(String... metadataColumns) { + return withMetadataColumns(Arrays.asList(metadataColumns)); + } + public SyncTableActionBase withMetadataColumns(List metadataColumns) { this.metadataConverters = metadataColumns.stream() diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java index 30c4895bc900..defbf33d80a7 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java @@ -29,7 +29,7 @@ import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_TABLES; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.INCLUDING_TABLES; -import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.METADATA_COLUMN; +import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.METADATA_COLUMNS; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.MYSQL_CONF; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_PREFIX; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_SUFFIX; @@ -69,8 +69,9 @@ public Optional create(MultipleParameterToolAdapter params) { .includingTables(params.get(INCLUDING_TABLES)) .excludingTables(params.get(EXCLUDING_TABLES)) .withMode(MultiTablesSinkMode.fromString(params.get(MODE))); - if (params.has(METADATA_COLUMN)) { - action.withMetadataColumns(Arrays.asList(params.get(METADATA_COLUMN).split(","))); + + if (params.has(METADATA_COLUMNS)) { + action.withMetadataColumns(params.get(METADATA_COLUMNS).split(",")); } if (params.has(TYPE_MAPPING)) { @@ -101,7 +102,7 @@ public void printHelp() { + "[--including_tables ] " + "[--excluding_tables ] " + "[--mode ] " - + "[--metadata_column ] " + + "[--metadata_columns ] " + "[--type_mapping ] " + "[--mysql_conf [--mysql_conf ...]] " + "[--catalog_conf [--catalog_conf ...]] " @@ -147,7 +148,7 @@ public void printHelp() { System.out.println(); System.out.println( - "--metadata_column is used to specify which metadata columns to include in the output schema of the connector. Please see the doc for usage."); + "--metadata_columns is used to specify which metadata columns to include in the output schema of the connector. Please see the doc for usage."); System.out.println(); System.out.println( diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionFactory.java index 7cd115c7f32c..a0b48c4b5ec8 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionFactory.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionFactory.java @@ -29,7 +29,7 @@ import java.util.Optional; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.COMPUTED_COLUMN; -import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.METADATA_COLUMN; +import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.METADATA_COLUMNS; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.MYSQL_CONF; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PARTITION_KEYS; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PRIMARY_KEYS; @@ -73,8 +73,8 @@ public Optional create(MultipleParameterToolAdapter params) { new ArrayList<>(params.getMultiParameter(COMPUTED_COLUMN))); } - if (params.has(METADATA_COLUMN)) { - action.withMetadataColumns(new ArrayList<>(params.getMultiParameter(METADATA_COLUMN))); + if (params.has(METADATA_COLUMNS)) { + action.withMetadataColumns(params.get(METADATA_COLUMNS).split(",")); } if (params.has(TYPE_MAPPING)) { @@ -100,7 +100,7 @@ public void printHelp() { + "[--primary_keys ] " + "[--type_mapping ] " + "[--computed_column <'column_name=expr_name(args[, ...])'> [--computed_column ...]] " - + "[--metadata_column ] " + + "[--metadata_columns ] " + "[--mysql_conf [--mysql_conf ...]] " + "[--catalog_conf [--catalog_conf ...]] " + "[--table_conf [--table_conf ...]]"); @@ -126,7 +126,7 @@ public void printHelp() { System.out.println(); System.out.println( - "--metadata_column is used to specify which metadata columns to include in the output schema of the connector. Please see the doc for usage."); + "--metadata_columns is used to specify which metadata columns to include in the output schema of the connector. Please see the doc for usage."); System.out.println(); System.out.println("MySQL CDC source conf syntax:"); From f4d66ab1d48de65c34681b58953e802fd2174f2a Mon Sep 17 00:00:00 2001 From: zhuangchong Date: Mon, 11 Dec 2023 16:58:32 +0800 Subject: [PATCH 2/6] fix code style --- .../paimon/flink/action/cdc/SyncDatabaseActionBase.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java index 922751c37dae..a6c9869d709b 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java @@ -37,7 +37,12 @@ import javax.annotation.Nullable; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.regex.Pattern; import static org.apache.paimon.flink.action.MultiTablesSinkMode.COMBINED; From 4f21a4384240e45499399eba9c43c7d03257bc71 Mon Sep 17 00:00:00 2001 From: zhuangchong Date: Mon, 11 Dec 2023 17:11:08 +0800 Subject: [PATCH 3/6] fix code style --- .../paimon/flink/action/cdc/SynchronizationActionBase.java | 6 +++++- .../action/cdc/mysql/MySqlSyncDatabaseActionFactory.java | 1 - 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java index 782585149e4d..e3198ddcbe6d 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java @@ -32,7 +32,11 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.functions.source.SourceFunction; -import java.util.*; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; /** Base {@link Action} for table/database synchronizing job. */ public abstract class SynchronizationActionBase extends ActionBase { diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java index defbf33d80a7..575cef16c178 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java @@ -24,7 +24,6 @@ import org.apache.paimon.flink.action.MultipleParameterToolAdapter; import org.apache.paimon.flink.action.cdc.TypeMapping; -import java.util.Arrays; import java.util.Optional; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_TABLES; From 410122d390194c33ae737b3864a05be18cd0bd75 Mon Sep 17 00:00:00 2001 From: zhuangchong Date: Mon, 11 Dec 2023 18:18:58 +0800 Subject: [PATCH 4/6] update --- docs/content/cdc-ingestion/mysql-cdc.md | 4 ++-- .../shortcodes/generated/mysql_sync_database.html | 4 ++-- .../layouts/shortcodes/generated/mysql_sync_table.html | 4 ++-- .../paimon/flink/action/cdc/CdcActionCommonUtils.java | 2 +- .../cdc/mysql/MySqlSyncDatabaseActionFactory.java | 10 +++++----- .../action/cdc/mysql/MySqlSyncTableActionFactory.java | 10 +++++----- 6 files changed, 17 insertions(+), 17 deletions(-) diff --git a/docs/content/cdc-ingestion/mysql-cdc.md b/docs/content/cdc-ingestion/mysql-cdc.md index 1b2cfa281754..2bfc38c3597a 100644 --- a/docs/content/cdc-ingestion/mysql-cdc.md +++ b/docs/content/cdc-ingestion/mysql-cdc.md @@ -51,7 +51,7 @@ To use this feature through `flink run`, run the following shell command. [--primary_keys ] \ [--type_mapping ] \ [--computed_column <'column-name=expr-name(args[, ...])'> [--computed_column ...]] \ - [--metadata_columns ] \ + [--metadata_column ] \ [--mysql_conf [--mysql_conf ...]] \ [--catalog_conf [--catalog_conf ...]] \ [--table_conf [--table_conf ...]] @@ -135,7 +135,7 @@ To use this feature through `flink run`, run the following shell command. [--including_tables ] \ [--excluding_tables ] \ [--mode ] \ - [--metadata_columns ] \ + [--metadata_column ] \ [--type_mapping ] \ [--mysql_conf [--mysql_conf ...]] \ [--catalog_conf [--catalog_conf ...]] \ diff --git a/docs/layouts/shortcodes/generated/mysql_sync_database.html b/docs/layouts/shortcodes/generated/mysql_sync_database.html index 592762b8fb97..7eeca1bd9d39 100644 --- a/docs/layouts/shortcodes/generated/mysql_sync_database.html +++ b/docs/layouts/shortcodes/generated/mysql_sync_database.html @@ -62,8 +62,8 @@ It is used to specify synchronization mode.
Possible values:
  • "divided" (the default mode if you haven't specified one): start a sink for each table, the synchronization of the new table requires restarting the job.
  • "combined": start a single combined sink for all tables, the new table will be automatically synchronized.
-
--metadata_columns
- --metadata_columns is used to specify which metadata columns to include in the output schema of the connector. Metadata columns provide additional information related to the source data, for example "table_name,database_name,op_ts". See its document for a complete list of available metadata. +
--metadata_column
+ --metadata_column is used to specify which metadata columns to include in the output schema of the connector. Metadata columns provide additional information related to the source data, for example "table_name,database_name,op_ts". See its document for a complete list of available metadata.
--type_mapping
diff --git a/docs/layouts/shortcodes/generated/mysql_sync_table.html b/docs/layouts/shortcodes/generated/mysql_sync_table.html index d26427e66495..f64b84a11ef4 100644 --- a/docs/layouts/shortcodes/generated/mysql_sync_table.html +++ b/docs/layouts/shortcodes/generated/mysql_sync_table.html @@ -66,8 +66,8 @@ The definitions of computed columns. The argument field is from MySQL table field name. See here for a complete list of configurations. -
--metadata_columns
- --metadata_columns is used to specify which metadata columns to include in the output schema of the connector. Metadata columns provide additional information related to the source data, for example "table_name,database_name,op_ts". See its document for a complete list of available metadata. +
--metadata_column
+ --metadata_column is used to specify which metadata columns to include in the output schema of the connector. Metadata columns provide additional information related to the source data, for example "table_name,database_name,op_ts". See its document for a complete list of available metadata.
--mysql_conf
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java index d94d32bea196..d54b1ea36394 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java @@ -61,7 +61,7 @@ public class CdcActionCommonUtils { public static final String PARTITION_KEYS = "partition_keys"; public static final String PRIMARY_KEYS = "primary_keys"; public static final String COMPUTED_COLUMN = "computed_column"; - public static final String METADATA_COLUMNS = "metadata_columns"; + public static final String METADATA_COLUMN = "metadata_column"; public static void assertSchemaCompatible( TableSchema paimonSchema, List sourceTableFields) { diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java index 575cef16c178..beba733a4216 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java @@ -28,7 +28,7 @@ import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_TABLES; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.INCLUDING_TABLES; -import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.METADATA_COLUMNS; +import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.METADATA_COLUMN; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.MYSQL_CONF; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_PREFIX; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_SUFFIX; @@ -69,8 +69,8 @@ public Optional create(MultipleParameterToolAdapter params) { .excludingTables(params.get(EXCLUDING_TABLES)) .withMode(MultiTablesSinkMode.fromString(params.get(MODE))); - if (params.has(METADATA_COLUMNS)) { - action.withMetadataColumns(params.get(METADATA_COLUMNS).split(",")); + if (params.has(METADATA_COLUMN)) { + action.withMetadataColumns(params.get(METADATA_COLUMN).split(",")); } if (params.has(TYPE_MAPPING)) { @@ -101,7 +101,7 @@ public void printHelp() { + "[--including_tables ] " + "[--excluding_tables ] " + "[--mode ] " - + "[--metadata_columns ] " + + "[--metadata_column ] " + "[--type_mapping ] " + "[--mysql_conf [--mysql_conf ...]] " + "[--catalog_conf [--catalog_conf ...]] " @@ -147,7 +147,7 @@ public void printHelp() { System.out.println(); System.out.println( - "--metadata_columns is used to specify which metadata columns to include in the output schema of the connector. Please see the doc for usage."); + "--metadata_column is used to specify which metadata columns to include in the output schema of the connector. Please see the doc for usage."); System.out.println(); System.out.println( diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionFactory.java index a0b48c4b5ec8..6e6df82c2b5f 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionFactory.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionFactory.java @@ -29,7 +29,7 @@ import java.util.Optional; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.COMPUTED_COLUMN; -import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.METADATA_COLUMNS; +import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.METADATA_COLUMN; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.MYSQL_CONF; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PARTITION_KEYS; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PRIMARY_KEYS; @@ -73,8 +73,8 @@ public Optional create(MultipleParameterToolAdapter params) { new ArrayList<>(params.getMultiParameter(COMPUTED_COLUMN))); } - if (params.has(METADATA_COLUMNS)) { - action.withMetadataColumns(params.get(METADATA_COLUMNS).split(",")); + if (params.has(METADATA_COLUMN)) { + action.withMetadataColumns(params.get(METADATA_COLUMN).split(",")); } if (params.has(TYPE_MAPPING)) { @@ -100,7 +100,7 @@ public void printHelp() { + "[--primary_keys ] " + "[--type_mapping ] " + "[--computed_column <'column_name=expr_name(args[, ...])'> [--computed_column ...]] " - + "[--metadata_columns ] " + + "[--metadata_column ] " + "[--mysql_conf [--mysql_conf ...]] " + "[--catalog_conf [--catalog_conf ...]] " + "[--table_conf [--table_conf ...]]"); @@ -126,7 +126,7 @@ public void printHelp() { System.out.println(); System.out.println( - "--metadata_columns is used to specify which metadata columns to include in the output schema of the connector. Please see the doc for usage."); + "--metadata_column is used to specify which metadata columns to include in the output schema of the connector. Please see the doc for usage."); System.out.println(); System.out.println("MySQL CDC source conf syntax:"); From 53376686df8cfa7313a853ca1845e3ee3e56c476 Mon Sep 17 00:00:00 2001 From: zhuangchong Date: Mon, 11 Dec 2023 18:47:17 +0800 Subject: [PATCH 5/6] fix --- .../paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java index b9f91b3b4da0..7ac25c9b7516 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java @@ -128,7 +128,7 @@ public MySqlSyncTableAction build() { args.addAll(listToArgs("--type-mapping", typeMappingModes)); args.addAll(listToMultiArgs("--computed-column", computedColumnArgs)); - args.addAll(listToMultiArgs("--metadata-column", metadataColumns)); + args.addAll(listToArgs("--metadata-column", metadataColumns)); return createAction(MySqlSyncTableAction.class, args); } From 91d4130644f8b1f7d69c96809901da7f3770d301 Mon Sep 17 00:00:00 2001 From: zhuangchong Date: Tue, 12 Dec 2023 10:44:55 +0800 Subject: [PATCH 6/6] add docs --- docs/layouts/shortcodes/generated/mysql_sync_database.html | 2 +- docs/layouts/shortcodes/generated/mysql_sync_table.html | 2 +- .../paimon/flink/action/cdc/SynchronizationActionBase.java | 5 ----- .../action/cdc/mysql/MySqlSyncDatabaseActionFactory.java | 4 ++-- .../flink/action/cdc/mysql/MySqlSyncTableActionFactory.java | 2 +- .../paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java | 2 +- 6 files changed, 6 insertions(+), 11 deletions(-) diff --git a/docs/layouts/shortcodes/generated/mysql_sync_database.html b/docs/layouts/shortcodes/generated/mysql_sync_database.html index 7eeca1bd9d39..cd005848e473 100644 --- a/docs/layouts/shortcodes/generated/mysql_sync_database.html +++ b/docs/layouts/shortcodes/generated/mysql_sync_database.html @@ -63,7 +63,7 @@
--metadata_column
- --metadata_column is used to specify which metadata columns to include in the output schema of the connector. Metadata columns provide additional information related to the source data, for example "table_name,database_name,op_ts". See its document for a complete list of available metadata. + --metadata_column is used to specify which metadata columns to include in the output schema of the connector. Metadata columns provide additional information related to the source data, for example: --metadata_column table_name,database_name,op_ts. See its document for a complete list of available metadata.
--type_mapping
diff --git a/docs/layouts/shortcodes/generated/mysql_sync_table.html b/docs/layouts/shortcodes/generated/mysql_sync_table.html index f64b84a11ef4..85ee1a5b395e 100644 --- a/docs/layouts/shortcodes/generated/mysql_sync_table.html +++ b/docs/layouts/shortcodes/generated/mysql_sync_table.html @@ -67,7 +67,7 @@
--metadata_column
- --metadata_column is used to specify which metadata columns to include in the output schema of the connector. Metadata columns provide additional information related to the source data, for example "table_name,database_name,op_ts". See its document for a complete list of available metadata. + --metadata_column is used to specify which metadata columns to include in the output schema of the connector. Metadata columns provide additional information related to the source data, for example: --metadata_column table_name --metadata_column database_name --metadata_column op_ts. See its document for a complete list of available metadata.
--mysql_conf
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java index e3198ddcbe6d..3cc77e6f5b99 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java @@ -32,7 +32,6 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.functions.source.SourceFunction; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -77,10 +76,6 @@ public SynchronizationActionBase withTypeMapping(TypeMapping typeMapping) { return this; } - public SynchronizationActionBase withMetadataColumns(String... metadataColumns) { - return withMetadataColumns(Arrays.asList(metadataColumns)); - } - public SynchronizationActionBase withMetadataColumns(List metadataColumns) { this.metadataConverters = metadataColumns.stream() diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java index beba733a4216..30c4895bc900 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java @@ -24,6 +24,7 @@ import org.apache.paimon.flink.action.MultipleParameterToolAdapter; import org.apache.paimon.flink.action.cdc.TypeMapping; +import java.util.Arrays; import java.util.Optional; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_TABLES; @@ -68,9 +69,8 @@ public Optional create(MultipleParameterToolAdapter params) { .includingTables(params.get(INCLUDING_TABLES)) .excludingTables(params.get(EXCLUDING_TABLES)) .withMode(MultiTablesSinkMode.fromString(params.get(MODE))); - if (params.has(METADATA_COLUMN)) { - action.withMetadataColumns(params.get(METADATA_COLUMN).split(",")); + action.withMetadataColumns(Arrays.asList(params.get(METADATA_COLUMN).split(","))); } if (params.has(TYPE_MAPPING)) { diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionFactory.java index 6e6df82c2b5f..7cd115c7f32c 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionFactory.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionFactory.java @@ -74,7 +74,7 @@ public Optional create(MultipleParameterToolAdapter params) { } if (params.has(METADATA_COLUMN)) { - action.withMetadataColumns(params.get(METADATA_COLUMN).split(",")); + action.withMetadataColumns(new ArrayList<>(params.getMultiParameter(METADATA_COLUMN))); } if (params.has(TYPE_MAPPING)) { diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java index 7ac25c9b7516..b9f91b3b4da0 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java @@ -128,7 +128,7 @@ public MySqlSyncTableAction build() { args.addAll(listToArgs("--type-mapping", typeMappingModes)); args.addAll(listToMultiArgs("--computed-column", computedColumnArgs)); - args.addAll(listToArgs("--metadata-column", metadataColumns)); + args.addAll(listToMultiArgs("--metadata-column", metadataColumns)); return createAction(MySqlSyncTableAction.class, args); }