Skip to content

Commit

Permalink
[flink] make warehouse in Flink action optional
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin committed Dec 18, 2024
1 parent f3813f0 commit a2f1902
Show file tree
Hide file tree
Showing 103 changed files with 355 additions and 612 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ public void testDeleteAction() throws Exception {
batchSql("INSERT INTO T VALUES (1, 'Hi'), (2, 'Hello'), (3, 'World')");

DeleteAction action =
new DeleteAction(path, "default", "T", "k = 1", Collections.emptyMap());
new DeleteAction(
"default", "T", "k = 1", Collections.singletonMap("warehouse", path));

BlockingIterator<Row, Row> iterator =
BlockingIterator.of(sEnv.executeSql("SELECT * FROM T").collect());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,9 @@ public String[] call(
String partitionIdleTime,
String compactStrategy)
throws Exception {
String warehouse = catalog.warehouse();
Map<String, String> catalogOptions = catalog.options();
CompactDatabaseAction action =
new CompactDatabaseAction(warehouse, catalogOptions)
new CompactDatabaseAction(catalogOptions)
.includingDatabases(nullable(includingDatabases))
.includingTables(nullable(includingTables))
.excludingTables(nullable(excludingTables))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,6 @@ public String[] call(
String partitionIdleTime,
String compactStrategy)
throws Exception {

String warehouse = catalog.warehouse();
Map<String, String> catalogOptions = catalog.options();
Map<String, String> tableConf =
StringUtils.isNullOrWhitespaceOnly(tableOptions)
Expand All @@ -152,7 +150,6 @@ public String[] call(
if (orderStrategy.isEmpty() && orderByColumns.isEmpty()) {
action =
new CompactAction(
warehouse,
identifier.getDatabaseName(),
identifier.getObjectName(),
catalogOptions,
Expand All @@ -171,7 +168,6 @@ public String[] call(
"sort compact do not support 'partition_idle_time'.");
action =
new SortCompactAction(
warehouse,
identifier.getDatabaseName(),
identifier.getObjectName(),
catalogOptions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,15 +179,11 @@ public String[] call(
String notMatchedInsertCondition,
String notMatchedInsertValues,
String matchedDeleteCondition) {
String warehouse = catalog.warehouse();
Map<String, String> catalogOptions = catalog.options();
Identifier identifier = Identifier.fromString(targetTableId);
MergeIntoAction action =
new MergeIntoAction(
warehouse,
identifier.getDatabaseName(),
identifier.getObjectName(),
catalogOptions);
identifier.getDatabaseName(), identifier.getObjectName(), catalogOptions);
action.withTargetAlias(nullable(targetAlias));

if (!sourceSqls.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,12 @@
public abstract class MessageQueueSyncTableActionBase extends SyncTableActionBase {

public MessageQueueSyncTableActionBase(
String warehouse,
String database,
String table,
Map<String, String> catalogConfig,
Map<String, String> mqConfig,
SyncJobHandler.SourceType sourceType) {
super(warehouse, database, table, catalogConfig, mqConfig, sourceType);
super(database, table, catalogConfig, mqConfig, sourceType);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,11 @@ public abstract class SyncDatabaseActionBase extends SynchronizationActionBase {
protected Map<String, List<String>> partitionKeyMultiple = new HashMap<>();

public SyncDatabaseActionBase(
String warehouse,
String database,
Map<String, String> catalogConfig,
Map<String, String> cdcSourceConfig,
SyncJobHandler.SourceType sourceType) {
super(
warehouse,
database,
catalogConfig,
cdcSourceConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,11 @@
public abstract class SyncDatabaseActionFactoryBase<T extends SyncDatabaseActionBase>
extends SynchronizationActionFactoryBase<T> {

protected String warehouse;
protected String database;

@Override
public Optional<Action> create(MultipleParameterToolAdapter params) {
this.warehouse = getRequiredValue(params, WAREHOUSE);
this.database = getRequiredValue(params, DATABASE);
this.database = params.getRequired(DATABASE);
return super.create(params);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,12 @@ public abstract class SyncTableActionBase extends SynchronizationActionBase {
protected List<ComputedColumn> computedColumns = new ArrayList<>();

public SyncTableActionBase(
String warehouse,
String database,
String table,
Map<String, String> catalogConfig,
Map<String, String> cdcSourceConfig,
SyncJobHandler.SourceType sourceType) {
super(
warehouse,
database,
catalogConfig,
cdcSourceConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import org.apache.paimon.flink.action.ActionFactory;
import org.apache.paimon.flink.action.MultipleParameterToolAdapter;

import org.apache.flink.api.java.tuple.Tuple3;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand All @@ -39,11 +37,13 @@
public abstract class SyncTableActionFactoryBase
extends SynchronizationActionFactoryBase<SyncTableActionBase> {

protected Tuple3<String, String, String> tablePath;
protected String database;
protected String table;

@Override
public Optional<Action> create(MultipleParameterToolAdapter params) {
this.tablePath = getTablePath(params);
this.database = params.getRequired(DATABASE);
this.table = params.getRequired(TABLE);
return super.create(params);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,11 @@ public abstract class SynchronizationActionBase extends ActionBase {
protected CdcMetadataConverter[] metadataConverters = new CdcMetadataConverter[] {};

public SynchronizationActionBase(
String warehouse,
String database,
Map<String, String> catalogConfig,
Map<String, String> cdcSourceConfig,
SyncJobHandler syncJobHandler) {
super(warehouse, catalogConfig);
super(catalogConfig);
this.database = database;
this.cdcSourceConfig = Configuration.fromMap(cdcSourceConfig);
this.syncJobHandler = syncJobHandler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.util.Map;
import java.util.Optional;

import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Base {@link ActionFactory} for table/database synchronizing job. */
public abstract class SynchronizationActionFactoryBase<T extends SynchronizationActionBase>
implements ActionFactory {
Expand All @@ -38,8 +40,11 @@ public abstract class SynchronizationActionFactoryBase<T extends Synchronization

@Override
public Optional<Action> create(MultipleParameterToolAdapter params) {
checkRequiredArgument(params, cdcConfigIdentifier());
this.catalogConfig = optionalConfigMap(params, CATALOG_CONF);
checkArgument(
params.has(cdcConfigIdentifier()),
"Argument '%s' is required. Run '<action> --help' for help.",
cdcConfigIdentifier());
this.catalogConfig = catalogConfigMap(params);
this.cdcSourceConfig = optionalConfigMap(params, cdcConfigIdentifier());

T action = createAction();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,8 @@
public class KafkaSyncDatabaseAction extends SyncDatabaseActionBase {

public KafkaSyncDatabaseAction(
String warehouse,
String database,
Map<String, String> catalogConfig,
Map<String, String> kafkaConfig) {
super(warehouse, database, catalogConfig, kafkaConfig, SyncJobHandler.SourceType.KAFKA);
String database, Map<String, String> catalogConfig, Map<String, String> kafkaConfig) {
super(database, catalogConfig, kafkaConfig, SyncJobHandler.SourceType.KAFKA);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ protected String cdcConfigIdentifier() {

@Override
public KafkaSyncDatabaseAction createAction() {
return new KafkaSyncDatabaseAction(warehouse, database, catalogConfig, cdcSourceConfig);
return new KafkaSyncDatabaseAction(database, catalogConfig, cdcSourceConfig);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,10 @@
public class KafkaSyncTableAction extends MessageQueueSyncTableActionBase {

public KafkaSyncTableAction(
String warehouse,
String database,
String table,
Map<String, String> catalogConfig,
Map<String, String> kafkaConfig) {
super(
warehouse,
database,
table,
catalogConfig,
kafkaConfig,
SyncJobHandler.SourceType.KAFKA);
super(database, table, catalogConfig, kafkaConfig, SyncJobHandler.SourceType.KAFKA);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,7 @@ public String cdcConfigIdentifier() {
@Override
public KafkaSyncTableAction createAction() {
return new KafkaSyncTableAction(
this.tablePath.f0,
this.tablePath.f1,
this.tablePath.f2,
this.catalogConfig,
this.cdcSourceConfig);
this.database, this.table, this.catalogConfig, this.cdcSourceConfig);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,8 @@
public class MongoDBSyncDatabaseAction extends SyncDatabaseActionBase {

public MongoDBSyncDatabaseAction(
String warehouse,
String database,
Map<String, String> catalogConfig,
Map<String, String> mongodbConfig) {
super(warehouse, database, catalogConfig, mongodbConfig, SyncJobHandler.SourceType.MONGODB);
String database, Map<String, String> catalogConfig, Map<String, String> mongodbConfig) {
super(database, catalogConfig, mongodbConfig, SyncJobHandler.SourceType.MONGODB);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ protected String cdcConfigIdentifier() {

@Override
public MongoDBSyncDatabaseAction createAction() {
return new MongoDBSyncDatabaseAction(warehouse, database, catalogConfig, cdcSourceConfig);
return new MongoDBSyncDatabaseAction(database, catalogConfig, cdcSourceConfig);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,11 @@
public class MongoDBSyncTableAction extends SyncTableActionBase {

public MongoDBSyncTableAction(
String warehouse,
String database,
String table,
Map<String, String> catalogConfig,
Map<String, String> mongodbConfig) {
super(
warehouse,
database,
table,
catalogConfig,
mongodbConfig,
SyncJobHandler.SourceType.MONGODB);
super(database, table, catalogConfig, mongodbConfig, SyncJobHandler.SourceType.MONGODB);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,7 @@ public String cdcConfigIdentifier() {
@Override
public SyncTableActionBase createAction() {
return new MongoDBSyncTableAction(
this.tablePath.f0,
this.tablePath.f1,
this.tablePath.f2,
this.catalogConfig,
this.cdcSourceConfig);
database, table, this.catalogConfig, this.cdcSourceConfig);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,8 @@ public class MySqlSyncDatabaseAction extends SyncDatabaseActionBase {
private final List<Identifier> excludedTables = new ArrayList<>();

public MySqlSyncDatabaseAction(
String warehouse,
String database,
Map<String, String> catalogConfig,
Map<String, String> mySqlConfig) {
super(warehouse, database, catalogConfig, mySqlConfig, SyncJobHandler.SourceType.MYSQL);
String database, Map<String, String> catalogConfig, Map<String, String> mySqlConfig) {
super(database, catalogConfig, mySqlConfig, SyncJobHandler.SourceType.MYSQL);
this.mode = DIVIDED;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ protected String cdcConfigIdentifier() {

@Override
public MySqlSyncDatabaseAction createAction() {
return new MySqlSyncDatabaseAction(warehouse, database, catalogConfig, cdcSourceConfig);
return new MySqlSyncDatabaseAction(database, catalogConfig, cdcSourceConfig);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,18 +76,11 @@ public class MySqlSyncTableAction extends SyncTableActionBase {
private JdbcSchemasInfo mySqlSchemasInfo;

public MySqlSyncTableAction(
String warehouse,
String database,
String table,
Map<String, String> catalogConfig,
Map<String, String> mySqlConfig) {
super(
warehouse,
database,
table,
catalogConfig,
mySqlConfig,
SyncJobHandler.SourceType.MYSQL);
super(database, table, catalogConfig, mySqlConfig, SyncJobHandler.SourceType.MYSQL);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,7 @@ public String cdcConfigIdentifier() {

@Override
public MySqlSyncTableAction createAction() {
return new MySqlSyncTableAction(
this.tablePath.f0,
this.tablePath.f1,
this.tablePath.f2,
this.catalogConfig,
this.cdcSourceConfig);
return new MySqlSyncTableAction(database, table, this.catalogConfig, this.cdcSourceConfig);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,11 @@ public class PostgresSyncTableAction extends SyncTableActionBase {
private JdbcSchemasInfo postgresSchemasInfo;

public PostgresSyncTableAction(
String warehouse,
String database,
String table,
Map<String, String> catalogConfig,
Map<String, String> postgresConfig) {
super(
warehouse,
database,
table,
catalogConfig,
postgresConfig,
SyncJobHandler.SourceType.POSTGRES);
super(database, table, catalogConfig, postgresConfig, SyncJobHandler.SourceType.POSTGRES);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,7 @@ public String cdcConfigIdentifier() {
@Override
public PostgresSyncTableAction createAction() {
return new PostgresSyncTableAction(
this.tablePath.f0,
this.tablePath.f1,
this.tablePath.f2,
this.catalogConfig,
this.cdcSourceConfig);
database, table, this.catalogConfig, this.cdcSourceConfig);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,8 @@
public class PulsarSyncDatabaseAction extends SyncDatabaseActionBase {

public PulsarSyncDatabaseAction(
String warehouse,
String database,
Map<String, String> catalogConfig,
Map<String, String> pulsarConfig) {
super(warehouse, database, catalogConfig, pulsarConfig, SyncJobHandler.SourceType.PULSAR);
String database, Map<String, String> catalogConfig, Map<String, String> pulsarConfig) {
super(database, catalogConfig, pulsarConfig, SyncJobHandler.SourceType.PULSAR);
}

@Override
Expand Down
Loading

0 comments on commit a2f1902

Please sign in to comment.