Skip to content

Commit

Permalink
[flink] make warehouse in Flink action optional
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin committed Dec 17, 2024
1 parent 9c2f6d1 commit c14421c
Show file tree
Hide file tree
Showing 75 changed files with 307 additions and 489 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,6 @@ public String[] call(
String partitionIdleTime,
String compactStrategy)
throws Exception {

String warehouse = catalog.warehouse();
Map<String, String> catalogOptions = catalog.options();
Map<String, String> tableConf =
StringUtils.isNullOrWhitespaceOnly(tableOptions)
Expand All @@ -152,7 +150,6 @@ public String[] call(
if (orderStrategy.isEmpty() && orderByColumns.isEmpty()) {
action =
new CompactAction(
warehouse,
identifier.getDatabaseName(),
identifier.getObjectName(),
catalogOptions,
Expand All @@ -171,7 +168,6 @@ public String[] call(
"sort compact do not support 'partition_idle_time'.");
action =
new SortCompactAction(
warehouse,
identifier.getDatabaseName(),
identifier.getObjectName(),
catalogOptions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,11 @@ public abstract class SynchronizationActionBase extends ActionBase {
protected CdcMetadataConverter[] metadataConverters = new CdcMetadataConverter[] {};

public SynchronizationActionBase(
String warehouse,
String database,
Map<String, String> catalogConfig,
Map<String, String> cdcSourceConfig,
SyncJobHandler syncJobHandler) {
super(warehouse, catalogConfig);
super(catalogConfig);
this.database = database;
this.cdcSourceConfig = Configuration.fromMap(cdcSourceConfig);
this.syncJobHandler = syncJobHandler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.paimon.flink.FlinkCatalog;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeCasts;
Expand Down Expand Up @@ -53,9 +52,8 @@ public abstract class ActionBase implements Action {
protected StreamExecutionEnvironment env;
protected StreamTableEnvironment batchTEnv;

public ActionBase(String warehouse, Map<String, String> catalogConfig) {
public ActionBase(Map<String, String> catalogConfig) {
catalogOptions = Options.fromMap(catalogConfig);
catalogOptions.set(CatalogOptions.WAREHOUSE, warehouse);

// disable cache to avoid concurrent modification exception
if (!catalogOptions.contains(CACHE_ENABLED)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@

package org.apache.paimon.flink.action;

import org.apache.paimon.catalog.CatalogUtils;
import org.apache.paimon.factories.Factory;
import org.apache.paimon.factories.FactoryException;
import org.apache.paimon.factories.FactoryUtil;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.options.CatalogOptions;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -50,7 +48,7 @@ public interface ActionFactory extends Factory {
String WAREHOUSE = "warehouse";
String DATABASE = "database";
String TABLE = "table";
String PATH = "path";
@Deprecated String PATH = "path";
String CATALOG_CONF = "catalog_conf";
String TABLE_CONF = "table_conf";
String PARTITION = "partition";
Expand Down Expand Up @@ -88,6 +86,14 @@ static Optional<Action> createAction(String[] args) {
return Optional.empty();
}

if (params.has(PATH)) {
throw new UnsupportedOperationException(
String.format(
"Parameter '%s' is deprecated. Please use '--%s %s=<warehouse>' to specify warehouse if needed, "
+ "and use '%s' to specify database and '%s' to specify table.",
PATH, CATALOG_CONF, CatalogOptions.WAREHOUSE.key(), DATABASE, TABLE));
}

return actionFactory.create(params);
}

Expand All @@ -105,39 +111,6 @@ static void printDefaultHelp() {
System.out.println("For detailed options of each action, run <action> --help");
}

default Tuple3<String, String, String> getTablePath(MultipleParameterToolAdapter params) {
String warehouse = params.get(WAREHOUSE);
String database = params.get(DATABASE);
String table = params.get(TABLE);
String path = params.get(PATH);

Tuple3<String, String, String> tablePath = null;
int count = 0;
if (warehouse != null || database != null || table != null) {
if (warehouse == null || database == null || table == null) {
throw new IllegalArgumentException(
"Warehouse, database and table must be specified all at once to specify a table.");
}
tablePath = Tuple3.of(warehouse, database, table);
count++;
}
if (path != null) {
tablePath =
Tuple3.of(
CatalogUtils.warehouse(path),
CatalogUtils.database(path),
CatalogUtils.table(path));
count++;
}

if (count != 1) {
throw new IllegalArgumentException(
"Please specify either \"warehouse, database and table\" or \"path\".");
}

return tablePath;
}

default List<Map<String, String>> getPartitions(MultipleParameterToolAdapter params) {
List<Map<String, String>> partitions = new ArrayList<>();
for (String partition : params.getMultiParameter(PARTITION)) {
Expand All @@ -160,16 +133,6 @@ default Map<String, String> optionalConfigMap(MultipleParameterToolAdapter param
return config;
}

default void checkRequiredArgument(MultipleParameterToolAdapter params, String key) {
Preconditions.checkArgument(
params.has(key), "Argument '%s' is required. Run '<action> --help' for help.", key);
}

default String getRequiredValue(MultipleParameterToolAdapter params, String key) {
checkRequiredArgument(params, key);
return params.get(key);
}

default Map<String, List<String>> optionalConfigMapList(
MultipleParameterToolAdapter params, String key) {
if (!params.has(key)) {
Expand All @@ -182,4 +145,13 @@ default Map<String, List<String>> optionalConfigMapList(
}
return config;
}

default Map<String, String> catalogConfigMap(MultipleParameterToolAdapter params) {
Map<String, String> catalogConfig = optionalConfigMap(params, CATALOG_CONF);
String warehouse = params.get(WAREHOUSE);
if (warehouse != null && !catalogConfig.containsKey(WAREHOUSE)) {
catalogConfig.put(WAREHOUSE, warehouse);
}
return catalogConfig;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.paimon.flink.clone.SnapshotHintChannelComputer;
import org.apache.paimon.flink.clone.SnapshotHintOperator;
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
import org.apache.paimon.options.CatalogOptions;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
Expand All @@ -37,7 +36,6 @@
import java.util.HashMap;
import java.util.Map;

import static org.apache.paimon.utils.Preconditions.checkNotNull;
import static org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly;

/** The Latest Snapshot clone action for Flink. */
Expand All @@ -54,19 +52,14 @@ public class CloneAction extends ActionBase {
private final String targetTableName;

public CloneAction(
String warehouse,
String database,
String tableName,
Map<String, String> sourceCatalogConfig,
String targetWarehouse,
String targetDatabase,
String targetTableName,
Map<String, String> targetCatalogConfig,
String parallelismStr) {
super(warehouse, sourceCatalogConfig);

checkNotNull(warehouse, "warehouse must not be null.");
checkNotNull(targetWarehouse, "targetWarehouse must not be null.");
super(sourceCatalogConfig);

this.parallelism =
isNullOrWhitespaceOnly(parallelismStr)
Expand All @@ -77,15 +70,13 @@ public CloneAction(
if (!sourceCatalogConfig.isEmpty()) {
this.sourceCatalogConfig = sourceCatalogConfig;
}
this.sourceCatalogConfig.put(CatalogOptions.WAREHOUSE.key(), warehouse);
this.database = database;
this.tableName = tableName;

this.targetCatalogConfig = new HashMap<>();
if (!targetCatalogConfig.isEmpty()) {
this.targetCatalogConfig = targetCatalogConfig;
}
this.targetCatalogConfig.put(CatalogOptions.WAREHOUSE.key(), targetWarehouse);
this.targetDatabase = targetDatabase;
this.targetTableName = targetTableName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.flink.action;

import java.util.Map;
import java.util.Optional;

/** Factory to create {@link CloneAction}. */
Expand All @@ -37,16 +38,22 @@ public String identifier() {

@Override
public Optional<Action> create(MultipleParameterToolAdapter params) {
Map<String, String> catalogConfig = catalogConfigMap(params);

Map<String, String> targetCatalogConfig = optionalConfigMap(params, TARGET_CATALOG_CONF);
String targetWarehouse = params.get(TARGET_WAREHOUSE);
if (targetWarehouse != null && !targetCatalogConfig.containsKey(TARGET_WAREHOUSE)) {
catalogConfig.put(TARGET_WAREHOUSE, targetWarehouse);
}

CloneAction cloneAction =
new CloneAction(
params.get(WAREHOUSE),
params.get(DATABASE),
params.get(TABLE),
optionalConfigMap(params, CATALOG_CONF),
params.get(TARGET_WAREHOUSE),
catalogConfig,
params.get(TARGET_DATABASE),
params.get(TARGET_TABLE),
optionalConfigMap(params, TARGET_CATALOG_CONF),
targetCatalogConfig,
params.get(PARALLELISM));

return Optional.of(cloneAction);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import javax.annotation.Nullable;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -61,17 +60,12 @@ public class CompactAction extends TableActionBase {

private Boolean fullCompaction;

public CompactAction(String warehouse, String database, String tableName) {
this(warehouse, database, tableName, Collections.emptyMap(), Collections.emptyMap());
}

public CompactAction(
String warehouse,
String database,
String tableName,
Map<String, String> catalogConfig,
Map<String, String> tableConf) {
super(warehouse, database, tableName, catalogConfig);
super(database, tableName, catalogConfig);
if (!(table instanceof FileStoreTable)) {
throw new UnsupportedOperationException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.TimeUtils;

import org.apache.flink.api.java.tuple.Tuple3;

import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -46,32 +44,22 @@ public String identifier() {

@Override
public Optional<Action> create(MultipleParameterToolAdapter params) {
Tuple3<String, String, String> tablePath = getTablePath(params);

Map<String, String> catalogConfig = optionalConfigMap(params, CATALOG_CONF);
String database = params.getRequired(DATABASE);
String table = params.getRequired(TABLE);
Map<String, String> catalogConfig = catalogConfigMap(params);
Map<String, String> tableConfig = optionalConfigMap(params, TABLE_CONF);

CompactAction action;
if (params.has(ORDER_STRATEGY)) {
Preconditions.checkArgument(
!params.has(PARTITION_IDLE_TIME),
"sort compact do not support 'partition_idle_time'.");
action =
new SortCompactAction(
tablePath.f0,
tablePath.f1,
tablePath.f2,
catalogConfig,
optionalConfigMap(params, TABLE_CONF))
new SortCompactAction(database, table, catalogConfig, tableConfig)
.withOrderStrategy(params.get(ORDER_STRATEGY))
.withOrderColumns(getRequiredValue(params, ORDER_BY).split(","));
.withOrderColumns(params.getRequired(ORDER_BY).split(","));
} else {
action =
new CompactAction(
tablePath.f0,
tablePath.f1,
tablePath.f2,
catalogConfig,
optionalConfigMap(params, TABLE_CONF));
action = new CompactAction(database, table, catalogConfig, tableConfig);
if (params.has(PARTITION_IDLE_TIME)) {
action.withPartitionIdleTime(
TimeUtils.parseDuration(params.get(PARTITION_IDLE_TIME)));
Expand Down Expand Up @@ -123,7 +111,6 @@ public void printHelp() {
System.out.println(
" compact --warehouse s3://path/to/warehouse --database <database_name> "
+ "--table <table_name> [--catalog_conf <paimon_catalog_conf> [--catalog_conf <paimon_catalog_conf> ...]]");
System.out.println(" compact --path <table_path> [--partition <partition_name>]");
System.out.println();

System.out.println("Partition name syntax:");
Expand All @@ -142,8 +129,6 @@ public void printHelp() {
System.out.println("Examples:");
System.out.println(
" compact --warehouse hdfs:///path/to/warehouse --database test_db --table test_table");
System.out.println(
" compact --path hdfs:///path/to/warehouse/test_db.db/test_table --partition dt=20221126,hh=08");
System.out.println(
" compact --warehouse hdfs:///path/to/warehouse --database test_db --table test_table "
+ "--partition dt=20221126,hh=08 --partition dt=20221127,hh=09");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ public class CompactDatabaseAction extends ActionBase {

private boolean isStreaming;

public CompactDatabaseAction(String warehouse, Map<String, String> catalogConfig) {
super(warehouse, catalogConfig);
public CompactDatabaseAction(Map<String, String> catalogConfig) {
super(catalogConfig);
}

public CompactDatabaseAction includingDatabases(@Nullable String includingDatabases) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,7 @@ public String identifier() {

@Override
public Optional<Action> create(MultipleParameterToolAdapter params) {
CompactDatabaseAction action =
new CompactDatabaseAction(
getRequiredValue(params, WAREHOUSE),
optionalConfigMap(params, CATALOG_CONF));
CompactDatabaseAction action = new CompactDatabaseAction(catalogConfigMap(params));

action.includingDatabases(params.get(INCLUDING_DATABASES))
.includingTables(params.get(INCLUDING_TABLES))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,12 @@ public class CreateBranchAction extends TableActionBase {
private final String tagName;

public CreateBranchAction(
String warehouse,
String databaseName,
String tableName,
Map<String, String> catalogConfig,
String branchName,
String tagName) {
super(warehouse, databaseName, tableName, catalogConfig);
super(databaseName, tableName, catalogConfig);
this.branchName = branchName;
this.tagName = tagName;
}
Expand Down
Loading

0 comments on commit c14421c

Please sign in to comment.