Skip to content

Commit

Permalink
[cdc] Added database action factory base class (#2793)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuangchong authored Mar 12, 2024
1 parent 58113a7 commit 8d51ae5
Show file tree
Hide file tree
Showing 7 changed files with 161 additions and 144 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action.cdc;

import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.ActionFactory;
import org.apache.paimon.flink.action.MultipleParameterToolAdapter;

import java.util.Optional;

import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_TABLES;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.INCLUDING_TABLES;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_PREFIX;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_SUFFIX;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TYPE_MAPPING;

/** Base {@link ActionFactory} for synchronizing into database. */
public abstract class SyncDatabaseActionFactoryBase<T extends SyncDatabaseActionBase>
extends SynchronizationActionFactoryBase<T> {

protected String warehouse;
protected String database;

@Override
public Optional<Action> create(MultipleParameterToolAdapter params) {
this.warehouse = getRequiredValue(params, WAREHOUSE);
this.database = getRequiredValue(params, DATABASE);
return super.create(params);
}

@Override
protected void withParams(MultipleParameterToolAdapter params, T action) {
action.withTablePrefix(params.get(TABLE_PREFIX))
.withTableSuffix(params.get(TABLE_SUFFIX))
.includingTables(params.get(INCLUDING_TABLES))
.excludingTables(params.get(EXCLUDING_TABLES));

if (params.has(TYPE_MAPPING)) {
String[] options = params.get(TYPE_MAPPING).split(",");
action.withTypeMapping(TypeMapping.parse(options));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.COMPUTED_COLUMN;
Expand All @@ -37,31 +36,18 @@
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TYPE_MAPPING;

/** Base {@link ActionFactory} for synchronizing into one Paimon table. */
public abstract class SyncTableActionFactoryBase implements ActionFactory {
public abstract class SyncTableActionFactoryBase
extends SynchronizationActionFactoryBase<SyncTableActionBase> {

protected Tuple3<String, String, String> tablePath;
protected Map<String, String> catalogConfig;
protected Map<String, String> cdcSourceConfig;

public abstract String cdcConfigIdentifier();

public abstract SyncTableActionBase createAction();

@Override
public Optional<Action> create(MultipleParameterToolAdapter params) {
checkRequiredArgument(params, cdcConfigIdentifier());
this.tablePath = getTablePath(params);
this.catalogConfig = optionalConfigMap(params, CATALOG_CONF);
this.cdcSourceConfig = optionalConfigMap(params, cdcConfigIdentifier());

SyncTableActionBase action = createAction();

action.withTableConfig(optionalConfigMap(params, TABLE_CONF));
withParams(params, action);

return Optional.of(action);
return super.create(params);
}

@Override
protected void withParams(MultipleParameterToolAdapter params, SyncTableActionBase action) {
if (params.has(PARTITION_KEYS)) {
action.withPartitionKeys(params.get(PARTITION_KEYS).split(","));
Expand All @@ -82,8 +68,7 @@ protected void withParams(MultipleParameterToolAdapter params, SyncTableActionBa
if (metadataColumns.size() == 1) {
action.withMetadataColumns(Arrays.asList(metadataColumns.get(0).split(",")));
} else {
action.withMetadataColumns(
new ArrayList<>(params.getMultiParameter(METADATA_COLUMN)));
action.withMetadataColumns(metadataColumns);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action.cdc;

import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.ActionFactory;
import org.apache.paimon.flink.action.MultipleParameterToolAdapter;

import java.util.Map;
import java.util.Optional;

/** Base {@link ActionFactory} for table/database synchronizing job. */
public abstract class SynchronizationActionFactoryBase<T extends SynchronizationActionBase>
implements ActionFactory {

protected Map<String, String> catalogConfig;
protected Map<String, String> cdcSourceConfig;

protected abstract String cdcConfigIdentifier();

public abstract T createAction();

@Override
public Optional<Action> create(MultipleParameterToolAdapter params) {
checkRequiredArgument(params, cdcConfigIdentifier());
this.catalogConfig = optionalConfigMap(params, CATALOG_CONF);
this.cdcSourceConfig = optionalConfigMap(params, cdcConfigIdentifier());

T action = createAction();

action.withTableConfig(optionalConfigMap(params, TABLE_CONF));
withParams(params, action);

return Optional.of(action);
}

protected abstract void withParams(MultipleParameterToolAdapter params, T action);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,13 @@

package org.apache.paimon.flink.action.cdc.kafka;

import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.ActionFactory;
import org.apache.paimon.flink.action.MultipleParameterToolAdapter;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.SyncDatabaseActionFactoryBase;

import java.util.Optional;

import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_TABLES;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.INCLUDING_TABLES;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.KAFKA_CONF;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_PREFIX;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_SUFFIX;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TYPE_MAPPING;

/** Factory to create {@link KafkaSyncDatabaseAction}. */
public class KafkaSyncDatabaseActionFactory implements ActionFactory {
public class KafkaSyncDatabaseActionFactory
extends SyncDatabaseActionFactoryBase<KafkaSyncDatabaseAction> {

public static final String IDENTIFIER = "kafka_sync_database";

Expand All @@ -43,28 +34,13 @@ public String identifier() {
}

@Override
public Optional<Action> create(MultipleParameterToolAdapter params) {
checkRequiredArgument(params, KAFKA_CONF);

KafkaSyncDatabaseAction action =
new KafkaSyncDatabaseAction(
getRequiredValue(params, WAREHOUSE),
getRequiredValue(params, DATABASE),
optionalConfigMap(params, CATALOG_CONF),
optionalConfigMap(params, KAFKA_CONF));

action.withTablePrefix(params.get(TABLE_PREFIX))
.withTableSuffix(params.get(TABLE_SUFFIX))
.includingTables(params.get(INCLUDING_TABLES))
.excludingTables(params.get(EXCLUDING_TABLES))
.withTableConfig(optionalConfigMap(params, TABLE_CONF));

if (params.has(TYPE_MAPPING)) {
String[] options = params.get(TYPE_MAPPING).split(",");
action.withTypeMapping(TypeMapping.parse(options));
}

return Optional.of(action);
protected String cdcConfigIdentifier() {
return KAFKA_CONF;
}

@Override
public KafkaSyncDatabaseAction createAction() {
return new KafkaSyncDatabaseAction(warehouse, database, catalogConfig, cdcSourceConfig);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,13 @@

package org.apache.paimon.flink.action.cdc.mongodb;

import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.ActionFactory;
import org.apache.paimon.flink.action.MultipleParameterToolAdapter;
import org.apache.paimon.flink.action.cdc.SyncDatabaseActionFactoryBase;

import java.util.Optional;

import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_TABLES;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.INCLUDING_TABLES;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.MONGODB_CONF;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_PREFIX;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_SUFFIX;

/** Factory to create {@link MongoDBSyncDatabaseAction}. */
public class MongoDBSyncDatabaseActionFactory implements ActionFactory {
public class MongoDBSyncDatabaseActionFactory
extends SyncDatabaseActionFactoryBase<MongoDBSyncDatabaseAction> {

public static final String IDENTIFIER = "mongodb_sync_database";

Expand All @@ -40,23 +33,14 @@ public String identifier() {
return IDENTIFIER;
}

public Optional<Action> create(MultipleParameterToolAdapter params) {
checkRequiredArgument(params, MONGODB_CONF);

MongoDBSyncDatabaseAction action =
new MongoDBSyncDatabaseAction(
getRequiredValue(params, WAREHOUSE),
getRequiredValue(params, DATABASE),
optionalConfigMap(params, CATALOG_CONF),
optionalConfigMap(params, MONGODB_CONF));

action.withTablePrefix(params.get(TABLE_PREFIX))
.withTableSuffix(params.get(TABLE_SUFFIX))
.includingTables(params.get(INCLUDING_TABLES))
.excludingTables(params.get(EXCLUDING_TABLES))
.withTableConfig(optionalConfigMap(params, TABLE_CONF));
@Override
protected String cdcConfigIdentifier() {
return MONGODB_CONF;
}

return Optional.of(action);
@Override
public MongoDBSyncDatabaseAction createAction() {
return new MongoDBSyncDatabaseAction(warehouse, database, catalogConfig, cdcSourceConfig);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,18 @@

package org.apache.paimon.flink.action.cdc.mysql;

import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.ActionFactory;
import org.apache.paimon.flink.action.MultiTablesSinkMode;
import org.apache.paimon.flink.action.MultipleParameterToolAdapter;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.SyncDatabaseActionFactoryBase;

import java.util.Arrays;
import java.util.Optional;

import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_TABLES;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.INCLUDING_TABLES;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.METADATA_COLUMN;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.MYSQL_CONF;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_PREFIX;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_SUFFIX;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TYPE_MAPPING;

/** Factory to create {@link MySqlSyncDatabaseAction}. */
public class MySqlSyncDatabaseActionFactory implements ActionFactory {
public class MySqlSyncDatabaseActionFactory
extends SyncDatabaseActionFactoryBase<MySqlSyncDatabaseAction> {

public static final String IDENTIFIER = "mysql_sync_database";

Expand All @@ -50,35 +43,25 @@ public String identifier() {
}

@Override
public Optional<Action> create(MultipleParameterToolAdapter params) {
checkRequiredArgument(params, MYSQL_CONF);
protected String cdcConfigIdentifier() {
return MYSQL_CONF;
}

MySqlSyncDatabaseAction action =
new MySqlSyncDatabaseAction(
getRequiredValue(params, WAREHOUSE),
getRequiredValue(params, DATABASE),
optionalConfigMap(params, CATALOG_CONF),
optionalConfigMap(params, MYSQL_CONF));
@Override
public MySqlSyncDatabaseAction createAction() {
return new MySqlSyncDatabaseAction(warehouse, database, catalogConfig, cdcSourceConfig);
}

action.withTableConfig(optionalConfigMap(params, TABLE_CONF));
@Override
protected void withParams(MultipleParameterToolAdapter params, MySqlSyncDatabaseAction action) {
super.withParams(params, action);
action.ignoreIncompatible(Boolean.parseBoolean(params.get(IGNORE_INCOMPATIBLE)))
.mergeShards(
!params.has(MERGE_SHARDS) || Boolean.parseBoolean(params.get(MERGE_SHARDS)))
.withTablePrefix(params.get(TABLE_PREFIX))
.withTableSuffix(params.get(TABLE_SUFFIX))
.includingTables(params.get(INCLUDING_TABLES))
.excludingTables(params.get(EXCLUDING_TABLES))
.withMode(MultiTablesSinkMode.fromString(params.get(MODE)));
if (params.has(METADATA_COLUMN)) {
action.withMetadataColumns(Arrays.asList(params.get(METADATA_COLUMN).split(",")));
}

if (params.has(TYPE_MAPPING)) {
String[] options = params.get(TYPE_MAPPING).split(",");
action.withTypeMapping(TypeMapping.parse(options));
}

return Optional.of(action);
}

@Override
Expand Down
Loading

0 comments on commit 8d51ae5

Please sign in to comment.