diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java new file mode 100644 index 000000000000..3d87970574a2 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc; + +import org.apache.paimon.flink.action.Action; +import org.apache.paimon.flink.action.ActionFactory; +import org.apache.paimon.flink.action.MultipleParameterToolAdapter; + +import java.util.Optional; + +import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_TABLES; +import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.INCLUDING_TABLES; +import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_PREFIX; +import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_SUFFIX; +import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TYPE_MAPPING; + +/** Base {@link ActionFactory} for synchronizing into database. */ +public abstract class SyncDatabaseActionFactoryBase + extends SynchronizationActionFactoryBase { + + protected String warehouse; + protected String database; + + @Override + public Optional create(MultipleParameterToolAdapter params) { + this.warehouse = getRequiredValue(params, WAREHOUSE); + this.database = getRequiredValue(params, DATABASE); + return super.create(params); + } + + @Override + protected void withParams(MultipleParameterToolAdapter params, T action) { + action.withTablePrefix(params.get(TABLE_PREFIX)) + .withTableSuffix(params.get(TABLE_SUFFIX)) + .includingTables(params.get(INCLUDING_TABLES)) + .excludingTables(params.get(EXCLUDING_TABLES)); + + if (params.has(TYPE_MAPPING)) { + String[] options = params.get(TYPE_MAPPING).split(","); + action.withTypeMapping(TypeMapping.parse(options)); + } + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionFactoryBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionFactoryBase.java index 17edc0124d2b..8f67ee16b51c 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionFactoryBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionFactoryBase.java @@ -27,7 +27,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.Map; import java.util.Optional; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.COMPUTED_COLUMN; @@ -37,31 +36,18 @@ import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TYPE_MAPPING; /** Base {@link ActionFactory} for synchronizing into one Paimon table. */ -public abstract class SyncTableActionFactoryBase implements ActionFactory { +public abstract class SyncTableActionFactoryBase + extends SynchronizationActionFactoryBase { protected Tuple3 tablePath; - protected Map catalogConfig; - protected Map cdcSourceConfig; - - public abstract String cdcConfigIdentifier(); - - public abstract SyncTableActionBase createAction(); @Override public Optional create(MultipleParameterToolAdapter params) { - checkRequiredArgument(params, cdcConfigIdentifier()); this.tablePath = getTablePath(params); - this.catalogConfig = optionalConfigMap(params, CATALOG_CONF); - this.cdcSourceConfig = optionalConfigMap(params, cdcConfigIdentifier()); - - SyncTableActionBase action = createAction(); - - action.withTableConfig(optionalConfigMap(params, TABLE_CONF)); - withParams(params, action); - - return Optional.of(action); + return super.create(params); } + @Override protected void withParams(MultipleParameterToolAdapter params, SyncTableActionBase action) { if (params.has(PARTITION_KEYS)) { action.withPartitionKeys(params.get(PARTITION_KEYS).split(",")); @@ -82,8 +68,7 @@ protected void withParams(MultipleParameterToolAdapter params, SyncTableActionBa if (metadataColumns.size() == 1) { action.withMetadataColumns(Arrays.asList(metadataColumns.get(0).split(","))); } else { - action.withMetadataColumns( - new ArrayList<>(params.getMultiParameter(METADATA_COLUMN))); + action.withMetadataColumns(metadataColumns); } } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionFactoryBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionFactoryBase.java new file mode 100644 index 000000000000..a6482ab27f1d --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionFactoryBase.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc; + +import org.apache.paimon.flink.action.Action; +import org.apache.paimon.flink.action.ActionFactory; +import org.apache.paimon.flink.action.MultipleParameterToolAdapter; + +import java.util.Map; +import java.util.Optional; + +/** Base {@link ActionFactory} for table/database synchronizing job. */ +public abstract class SynchronizationActionFactoryBase + implements ActionFactory { + + protected Map catalogConfig; + protected Map cdcSourceConfig; + + protected abstract String cdcConfigIdentifier(); + + public abstract T createAction(); + + @Override + public Optional create(MultipleParameterToolAdapter params) { + checkRequiredArgument(params, cdcConfigIdentifier()); + this.catalogConfig = optionalConfigMap(params, CATALOG_CONF); + this.cdcSourceConfig = optionalConfigMap(params, cdcConfigIdentifier()); + + T action = createAction(); + + action.withTableConfig(optionalConfigMap(params, TABLE_CONF)); + withParams(params, action); + + return Optional.of(action); + } + + protected abstract void withParams(MultipleParameterToolAdapter params, T action); +} diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionFactory.java index 7f787196b208..dbf30b056ee3 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionFactory.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionFactory.java @@ -18,22 +18,13 @@ package org.apache.paimon.flink.action.cdc.kafka; -import org.apache.paimon.flink.action.Action; -import org.apache.paimon.flink.action.ActionFactory; -import org.apache.paimon.flink.action.MultipleParameterToolAdapter; -import org.apache.paimon.flink.action.cdc.TypeMapping; +import org.apache.paimon.flink.action.cdc.SyncDatabaseActionFactoryBase; -import java.util.Optional; - -import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_TABLES; -import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.INCLUDING_TABLES; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.KAFKA_CONF; -import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_PREFIX; -import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_SUFFIX; -import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TYPE_MAPPING; /** Factory to create {@link KafkaSyncDatabaseAction}. */ -public class KafkaSyncDatabaseActionFactory implements ActionFactory { +public class KafkaSyncDatabaseActionFactory + extends SyncDatabaseActionFactoryBase { public static final String IDENTIFIER = "kafka_sync_database"; @@ -43,28 +34,13 @@ public String identifier() { } @Override - public Optional create(MultipleParameterToolAdapter params) { - checkRequiredArgument(params, KAFKA_CONF); - - KafkaSyncDatabaseAction action = - new KafkaSyncDatabaseAction( - getRequiredValue(params, WAREHOUSE), - getRequiredValue(params, DATABASE), - optionalConfigMap(params, CATALOG_CONF), - optionalConfigMap(params, KAFKA_CONF)); - - action.withTablePrefix(params.get(TABLE_PREFIX)) - .withTableSuffix(params.get(TABLE_SUFFIX)) - .includingTables(params.get(INCLUDING_TABLES)) - .excludingTables(params.get(EXCLUDING_TABLES)) - .withTableConfig(optionalConfigMap(params, TABLE_CONF)); - - if (params.has(TYPE_MAPPING)) { - String[] options = params.get(TYPE_MAPPING).split(","); - action.withTypeMapping(TypeMapping.parse(options)); - } - - return Optional.of(action); + protected String cdcConfigIdentifier() { + return KAFKA_CONF; + } + + @Override + public KafkaSyncDatabaseAction createAction() { + return new KafkaSyncDatabaseAction(warehouse, database, catalogConfig, cdcSourceConfig); } @Override diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionFactory.java index 654572c48784..0468e1c11150 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionFactory.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionFactory.java @@ -18,20 +18,13 @@ package org.apache.paimon.flink.action.cdc.mongodb; -import org.apache.paimon.flink.action.Action; -import org.apache.paimon.flink.action.ActionFactory; -import org.apache.paimon.flink.action.MultipleParameterToolAdapter; +import org.apache.paimon.flink.action.cdc.SyncDatabaseActionFactoryBase; -import java.util.Optional; - -import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_TABLES; -import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.INCLUDING_TABLES; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.MONGODB_CONF; -import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_PREFIX; -import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_SUFFIX; /** Factory to create {@link MongoDBSyncDatabaseAction}. */ -public class MongoDBSyncDatabaseActionFactory implements ActionFactory { +public class MongoDBSyncDatabaseActionFactory + extends SyncDatabaseActionFactoryBase { public static final String IDENTIFIER = "mongodb_sync_database"; @@ -40,23 +33,14 @@ public String identifier() { return IDENTIFIER; } - public Optional create(MultipleParameterToolAdapter params) { - checkRequiredArgument(params, MONGODB_CONF); - - MongoDBSyncDatabaseAction action = - new MongoDBSyncDatabaseAction( - getRequiredValue(params, WAREHOUSE), - getRequiredValue(params, DATABASE), - optionalConfigMap(params, CATALOG_CONF), - optionalConfigMap(params, MONGODB_CONF)); - - action.withTablePrefix(params.get(TABLE_PREFIX)) - .withTableSuffix(params.get(TABLE_SUFFIX)) - .includingTables(params.get(INCLUDING_TABLES)) - .excludingTables(params.get(EXCLUDING_TABLES)) - .withTableConfig(optionalConfigMap(params, TABLE_CONF)); + @Override + protected String cdcConfigIdentifier() { + return MONGODB_CONF; + } - return Optional.of(action); + @Override + public MongoDBSyncDatabaseAction createAction() { + return new MongoDBSyncDatabaseAction(warehouse, database, catalogConfig, cdcSourceConfig); } @Override diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java index 30c4895bc900..f84bf979ab96 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java @@ -18,25 +18,18 @@ package org.apache.paimon.flink.action.cdc.mysql; -import org.apache.paimon.flink.action.Action; -import org.apache.paimon.flink.action.ActionFactory; import org.apache.paimon.flink.action.MultiTablesSinkMode; import org.apache.paimon.flink.action.MultipleParameterToolAdapter; -import org.apache.paimon.flink.action.cdc.TypeMapping; +import org.apache.paimon.flink.action.cdc.SyncDatabaseActionFactoryBase; import java.util.Arrays; -import java.util.Optional; -import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_TABLES; -import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.INCLUDING_TABLES; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.METADATA_COLUMN; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.MYSQL_CONF; -import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_PREFIX; -import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_SUFFIX; -import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TYPE_MAPPING; /** Factory to create {@link MySqlSyncDatabaseAction}. */ -public class MySqlSyncDatabaseActionFactory implements ActionFactory { +public class MySqlSyncDatabaseActionFactory + extends SyncDatabaseActionFactoryBase { public static final String IDENTIFIER = "mysql_sync_database"; @@ -50,35 +43,25 @@ public String identifier() { } @Override - public Optional create(MultipleParameterToolAdapter params) { - checkRequiredArgument(params, MYSQL_CONF); + protected String cdcConfigIdentifier() { + return MYSQL_CONF; + } - MySqlSyncDatabaseAction action = - new MySqlSyncDatabaseAction( - getRequiredValue(params, WAREHOUSE), - getRequiredValue(params, DATABASE), - optionalConfigMap(params, CATALOG_CONF), - optionalConfigMap(params, MYSQL_CONF)); + @Override + public MySqlSyncDatabaseAction createAction() { + return new MySqlSyncDatabaseAction(warehouse, database, catalogConfig, cdcSourceConfig); + } - action.withTableConfig(optionalConfigMap(params, TABLE_CONF)); + @Override + protected void withParams(MultipleParameterToolAdapter params, MySqlSyncDatabaseAction action) { + super.withParams(params, action); action.ignoreIncompatible(Boolean.parseBoolean(params.get(IGNORE_INCOMPATIBLE))) .mergeShards( !params.has(MERGE_SHARDS) || Boolean.parseBoolean(params.get(MERGE_SHARDS))) - .withTablePrefix(params.get(TABLE_PREFIX)) - .withTableSuffix(params.get(TABLE_SUFFIX)) - .includingTables(params.get(INCLUDING_TABLES)) - .excludingTables(params.get(EXCLUDING_TABLES)) .withMode(MultiTablesSinkMode.fromString(params.get(MODE))); if (params.has(METADATA_COLUMN)) { action.withMetadataColumns(Arrays.asList(params.get(METADATA_COLUMN).split(","))); } - - if (params.has(TYPE_MAPPING)) { - String[] options = params.get(TYPE_MAPPING).split(","); - action.withTypeMapping(TypeMapping.parse(options)); - } - - return Optional.of(action); } @Override diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncDatabaseActionFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncDatabaseActionFactory.java index 45301254a80d..adebef7275c8 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncDatabaseActionFactory.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncDatabaseActionFactory.java @@ -18,22 +18,13 @@ package org.apache.paimon.flink.action.cdc.pulsar; -import org.apache.paimon.flink.action.Action; -import org.apache.paimon.flink.action.ActionFactory; -import org.apache.paimon.flink.action.MultipleParameterToolAdapter; -import org.apache.paimon.flink.action.cdc.TypeMapping; +import org.apache.paimon.flink.action.cdc.SyncDatabaseActionFactoryBase; -import java.util.Optional; - -import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_TABLES; -import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.INCLUDING_TABLES; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PULSAR_CONF; -import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_PREFIX; -import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_SUFFIX; -import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TYPE_MAPPING; /** Factory to create {@link PulsarSyncDatabaseAction}. */ -public class PulsarSyncDatabaseActionFactory implements ActionFactory { +public class PulsarSyncDatabaseActionFactory + extends SyncDatabaseActionFactoryBase { public static final String IDENTIFIER = "pulsar_sync_database"; @@ -43,28 +34,13 @@ public String identifier() { } @Override - public Optional create(MultipleParameterToolAdapter params) { - checkRequiredArgument(params, PULSAR_CONF); - - PulsarSyncDatabaseAction action = - new PulsarSyncDatabaseAction( - getRequiredValue(params, WAREHOUSE), - getRequiredValue(params, DATABASE), - optionalConfigMap(params, CATALOG_CONF), - optionalConfigMap(params, PULSAR_CONF)); - - action.withTablePrefix(params.get(TABLE_PREFIX)) - .withTableSuffix(params.get(TABLE_SUFFIX)) - .includingTables(params.get(INCLUDING_TABLES)) - .excludingTables(params.get(EXCLUDING_TABLES)) - .withTableConfig(optionalConfigMap(params, TABLE_CONF)); - - if (params.has(TYPE_MAPPING)) { - String[] options = params.get(TYPE_MAPPING).split(","); - action.withTypeMapping(TypeMapping.parse(options)); - } - - return Optional.of(action); + protected String cdcConfigIdentifier() { + return PULSAR_CONF; + } + + @Override + public PulsarSyncDatabaseAction createAction() { + return new PulsarSyncDatabaseAction(warehouse, database, catalogConfig, cdcSourceConfig); } @Override