Skip to content

Commit

Permalink
[flink] All flink actions should parse catalog config (#1328)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin authored Jun 7, 2023
1 parent e3f789d commit c3102b4
Show file tree
Hide file tree
Showing 18 changed files with 135 additions and 98 deletions.
14 changes: 8 additions & 6 deletions docs/content/how-to/writing-tables.md
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,9 @@ Run the following command to submit a drop-partition job for the table.
drop-partition \
--warehouse <warehouse-path> \
--database <database-name> \
--table <table-name>
--partition <partition_spec>
[--partition <partition_spec> ...]
--table <table-name> \
[--partition <partition_spec> [--partition <partition_spec> ...]] \
[--catalog-conf <paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf> ...]]

partition_spec:
key1=value1,key2=value2...
Expand Down Expand Up @@ -307,8 +307,9 @@ Run the following command to submit a 'delete' job for the table.
delete \
--warehouse <warehouse-path> \
--database <database-name> \
--table <table-name>
--where <filter_spec>
--table <table-name> \
--where <filter_spec> \
[--catalog-conf <paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf> ...]]
filter_spec is equal to the 'WHERE' clause in SQL DELETE statement. Examples:
age >= 18 AND age <= 60
Expand Down Expand Up @@ -418,7 +419,8 @@ Run the following command to submit a 'merge-into' job for the table.
--not-matched-insert-values <insert-values> \
--not-matched-by-source-upsert-condition <not-matched-by-source-condition> \
--not-matched-by-source-upsert-set <not-matched-upsert-changes> \
--not-matched-by-source-delete-condition <not-matched-by-source-condition>
--not-matched-by-source-delete-condition <not-matched-by-source-condition> \
[--catalog-conf <paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf> ...]]
You can pass sqls by '--source-sql <sql> [, --source-sql <sql> ...]' to config environment and create source table at runtime.
Expand Down
3 changes: 2 additions & 1 deletion docs/content/maintenance/manage-snapshots.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ Run the following command:
--warehouse <warehouse-path> \
--database <database-name> \
--table <table-name> \
--snapshot <snapshot-id>
--snapshot <snapshot-id> \
[--catalog-conf <paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf> ...]]
```
{{< /tab >}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ protected List<String> ddl() {
public void testDeleteAction() throws Exception {
batchSql("INSERT INTO T VALUES (1, 'Hi'), (2, 'Hello'), (3, 'World')");

DeleteAction action = new DeleteAction(path, "default", "T", "k = 1");
DeleteAction action =
new DeleteAction(path, "default", "T", "k = 1", Collections.emptyMap());

BlockingIterator<Row, Row> iterator =
BlockingIterator.of(sEnv.executeSql("SELECT * FROM T").collect());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ protected List<String> ddl() {
public void testDeleteAction() throws Exception {
batchSql("INSERT INTO T VALUES (1, 'Hi'), (2, 'Hello'), (3, 'World')");

DeleteAction action = new DeleteAction(path, "default", "T", "k = 1");
DeleteAction action =
new DeleteAction(path, "default", "T", "k = 1", Collections.emptyMap());

BlockingIterator<Row, Row> iterator =
BlockingIterator.of(sEnv.executeSql("SELECT * FROM T").collect());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -170,9 +171,9 @@ public static void printHelp() {
}
}

static Optional<Map<String, String>> getConfigMap(MultipleParameterTool params, String key) {
static Map<String, String> optionalConfigMap(MultipleParameterTool params, String key) {
if (!params.has(key)) {
return Optional.empty();
return Collections.emptyMap();
}

Map<String, String> map = new HashMap<>();
Expand All @@ -183,9 +184,10 @@ static Optional<Map<String, String>> getConfigMap(MultipleParameterTool params,
continue;
}

System.err.println("Invalid key " + key + ". Please use format 'key=value'");
return Optional.empty();
throw new IllegalArgumentException(
String.format(
"Invalid argument '%s %s'. Please use format 'key=value'", key, param));
}
return Optional.of(map);
return map;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,8 @@ public abstract class ActionBase implements Action {

protected final String catalogName = "paimon-" + UUID.randomUUID();

public ActionBase(String warehouse, Map<String, String> catalogOptions) {
this(warehouse, Options.fromMap(catalogOptions));
}

public ActionBase(String warehouse, Options catalogOptions) {
public ActionBase(String warehouse, Map<String, String> catalogConfig) {
Options catalogOptions = Options.fromMap(catalogConfig);
catalogOptions.set(CatalogOptions.WAREHOUSE, warehouse);
catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
flinkCatalog = FlinkCatalogFactory.createCatalog(catalogName, catalog);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.paimon.flink.sink.CompactorSinkBuilder;
import org.apache.paimon.flink.source.CompactorSourceBuilder;
import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;

import org.apache.flink.api.common.RuntimeExecutionMode;
Expand All @@ -41,9 +40,9 @@
import java.util.Map;
import java.util.Optional;

import static org.apache.paimon.flink.action.Action.getConfigMap;
import static org.apache.paimon.flink.action.Action.getPartitions;
import static org.apache.paimon.flink.action.Action.getTablePath;
import static org.apache.paimon.flink.action.Action.optionalConfigMap;

/** Table compact action for Flink. */
public class CompactAction extends TableActionBase {
Expand All @@ -54,12 +53,15 @@ public class CompactAction extends TableActionBase {
private final CompactorSinkBuilder sinkBuilder;

public CompactAction(String warehouse, String database, String tableName) {
this(warehouse, database, tableName, new Options());
this(warehouse, database, tableName, Collections.emptyMap());
}

public CompactAction(
String warehouse, String database, String tableName, Options catalogOptions) {
super(warehouse, database, tableName, catalogOptions);
String warehouse,
String database,
String tableName,
Map<String, String> catalogConfig) {
super(warehouse, database, tableName, catalogConfig);
if (!(table instanceof FileStoreTable)) {
throw new UnsupportedOperationException(
String.format(
Expand Down Expand Up @@ -111,12 +113,10 @@ public static Optional<Action> create(String[] args) {
return Optional.empty();
}

Optional<Map<String, String>> catalogConfigOption = getConfigMap(params, "catalog-conf");
Options catalogOptions =
Options.fromMap(catalogConfigOption.orElse(Collections.emptyMap()));
Map<String, String> catalogConfig = optionalConfigMap(params, "catalog-conf");

CompactAction action =
new CompactAction(tablePath.f0, tablePath.f1, tablePath.f2, catalogOptions);
new CompactAction(tablePath.f0, tablePath.f1, tablePath.f2, catalogConfig);

if (params.has("partition")) {
List<Map<String, String>> partitions = getPartitions(params);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

import static org.apache.paimon.flink.action.Action.getTablePath;
import static org.apache.paimon.flink.action.Action.optionalConfigMap;

/** Delete from table action for Flink. */
public class DeleteAction extends TableActionBase {
Expand All @@ -43,8 +45,13 @@ public class DeleteAction extends TableActionBase {

private final String filter;

public DeleteAction(String warehouse, String databaseName, String tableName, String filter) {
super(warehouse, databaseName, tableName);
public DeleteAction(
String warehouse,
String databaseName,
String tableName,
String filter,
Map<String, String> catalogConfig) {
super(warehouse, databaseName, tableName, catalogConfig);
changeIgnoreMergeEngine();
this.filter = filter;
}
Expand All @@ -70,7 +77,10 @@ public static Optional<Action> create(String[] args) {
return Optional.empty();
}

DeleteAction action = new DeleteAction(tablePath.f0, tablePath.f1, tablePath.f2, filter);
Map<String, String> catalogConfig = optionalConfigMap(params, "catalog-conf");

DeleteAction action =
new DeleteAction(tablePath.f0, tablePath.f1, tablePath.f2, filter, catalogConfig);

return Optional.of(action);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import static org.apache.paimon.flink.action.Action.getPartitions;
import static org.apache.paimon.flink.action.Action.getTablePath;
import static org.apache.paimon.flink.action.Action.optionalConfigMap;

/** Table drop partition action for Flink. */
public class DropPartitionAction extends TableActionBase {
Expand All @@ -48,8 +49,9 @@ public class DropPartitionAction extends TableActionBase {
String warehouse,
String databaseName,
String tableName,
List<Map<String, String>> partitions) {
super(warehouse, databaseName, tableName);
List<Map<String, String>> partitions,
Map<String, String> catalogConfig) {
super(warehouse, databaseName, tableName, catalogConfig);
if (!(table instanceof FileStoreTable)) {
throw new UnsupportedOperationException(
String.format(
Expand Down Expand Up @@ -95,8 +97,11 @@ public static Optional<Action> create(String[] args) {
return Optional.empty();
}

Map<String, String> catalogConfig = optionalConfigMap(params, "catalog-conf");

return Optional.of(
new DropPartitionAction(tablePath.f0, tablePath.f1, tablePath.f2, partitions));
new DropPartitionAction(
tablePath.f0, tablePath.f1, tablePath.f2, partitions, catalogConfig));
}

private static void printHelp() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.stream.Stream;

import static org.apache.paimon.flink.action.Action.getTablePath;
import static org.apache.paimon.flink.action.Action.optionalConfigMap;
import static org.apache.paimon.flink.action.Action.parseKeyValues;

/**
Expand Down Expand Up @@ -122,7 +123,15 @@ public class MergeIntoAction extends TableActionBase {
@Nullable private String notMatchedInsertValues;

MergeIntoAction(String warehouse, String database, String tableName) {
super(warehouse, database, tableName);
this(warehouse, database, tableName, Collections.emptyMap());
}

MergeIntoAction(
String warehouse,
String database,
String tableName,
Map<String, String> catalogConfig) {
super(warehouse, database, tableName, catalogConfig);

if (!(table instanceof FileStoreTable)) {
throw new UnsupportedOperationException(
Expand Down Expand Up @@ -228,7 +237,10 @@ public static Optional<Action> create(String[] args) {
return Optional.empty();
}

MergeIntoAction action = new MergeIntoAction(tablePath.f0, tablePath.f1, tablePath.f2);
Map<String, String> catalogConfig = optionalConfigMap(params, "catalog-conf");

MergeIntoAction action =
new MergeIntoAction(tablePath.f0, tablePath.f1, tablePath.f2, catalogConfig);

if (params.has("target-as")) {
action.withTargetAlias(params.get("target-as"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.Optional;

import static org.apache.paimon.flink.action.Action.getTablePath;
import static org.apache.paimon.flink.action.Action.optionalConfigMap;

/** Rollback to specific snapshot action for Flink. */
public class RollbackToAction extends TableActionBase {
Expand All @@ -37,8 +39,12 @@ public class RollbackToAction extends TableActionBase {
private final long snapshotId;

public RollbackToAction(
String warehouse, String databaseName, String tableName, long snapshotId) {
super(warehouse, databaseName, tableName);
String warehouse,
String databaseName,
String tableName,
long snapshotId,
Map<String, String> catalogConfig) {
super(warehouse, databaseName, tableName, catalogConfig);
this.snapshotId = snapshotId;
}

Expand All @@ -63,9 +69,15 @@ public static Optional<Action> create(String[] args) {
throw new IllegalArgumentException("Please specific snapshot.");
}

Map<String, String> catalogConfig = optionalConfigMap(params, "catalog-conf");

RollbackToAction action =
new RollbackToAction(
tablePath.f0, tablePath.f1, tablePath.f2, Long.parseLong(snapshot));
tablePath.f0,
tablePath.f1,
tablePath.f2,
Long.parseLong(snapshot),
catalogConfig);

return Optional.of(action);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.paimon.flink.sink.FlinkSinkBuilder;
import org.apache.paimon.flink.utils.TableEnvironmentUtils;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.types.DataType;
Expand Down Expand Up @@ -59,13 +58,12 @@ public abstract class TableActionBase extends ActionBase {

protected Table table;

TableActionBase(String warehouse, String databaseName, String tableName) {
this(warehouse, databaseName, tableName, new Options());
}

TableActionBase(
String warehouse, String databaseName, String tableName, Options catalogOptions) {
super(warehouse, catalogOptions);
String warehouse,
String databaseName,
String tableName,
Map<String, String> catalogConfig) {
super(warehouse, catalogConfig);
env = StreamExecutionEnvironment.getExecutionEnvironment();
batchTEnv = StreamTableEnvironment.create(env, EnvironmentSettings.inBatchMode());

Expand Down
Loading

0 comments on commit c3102b4

Please sign in to comment.