From 50554fa85befe551064f93235b9212c23928eded Mon Sep 17 00:00:00 2001 From: yuzelin <747884505@qq.com> Date: Thu, 14 Sep 2023 16:32:45 +0800 Subject: [PATCH 1/5] [flink] Support compact procedure --- .github/workflows/e2e-tests-1.16-jdk11.yml | 4 +- .../paimon/catalog/AbstractCatalog.java | 5 +- .../org/apache/paimon/catalog/Catalog.java | 7 + .../paimon/catalog/FileSystemCatalog.java | 2 +- paimon-flink/paimon-flink-1.18/pom.xml | 85 ++++++ .../cdc/kafka/KafkaSyncDatabaseAction.java | 2 +- .../cdc/kafka/KafkaSyncTableAction.java | 2 +- .../mongodb/MongoDBSyncDatabaseAction.java | 2 +- .../cdc/mongodb/MongoDBSyncTableAction.java | 2 +- .../cdc/mysql/MySqlSyncDatabaseAction.java | 2 +- .../cdc/mysql/MySqlSyncTableAction.java | 2 +- .../flink/action/cdc/CdcActionITCaseBase.java | 21 +- .../KafkaCanalSyncDatabaseActionITCase.java | 3 +- .../KafkaCanalSyncTableActionITCase.java | 3 +- .../MongoDBSyncDatabaseActionITCase.java | 3 +- .../mongodb/MongoDBSyncTableActionITCase.java | 3 +- .../mysql/MySqlSyncDatabaseActionITCase.java | 3 +- .../cdc/mysql/MySqlSyncTableActionITCase.java | 3 +- .../org/apache/paimon/flink/FlinkCatalog.java | 2 +- .../paimon/flink/action/ActionBase.java | 5 +- .../paimon/flink/action/TableActionBase.java | 2 - .../flink/procedure/CompactProcedure.java | 97 +++++++ .../paimon/flink/procedure/ProcedureUtil.java | 19 +- .../paimon/flink/action/ActionITCaseBase.java | 18 +- .../flink/action/CompactActionITCase.java | 241 ++++++++++-------- .../flink/action/CompactActionITCaseBase.java | 6 +- .../action/CompactDatabaseActionITCase.java | 47 ++-- .../flink/action/DeleteActionITCase.java | 4 +- .../action/DropPartitionActionITCase.java | 5 +- .../flink/action/RollbackToActionITCase.java | 2 - paimon-flink/pom.xml | 1 + .../org/apache/paimon/hive/HiveCatalog.java | 2 +- 32 files changed, 412 insertions(+), 193 deletions(-) create mode 100644 paimon-flink/paimon-flink-1.18/pom.xml create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java diff --git a/.github/workflows/e2e-tests-1.16-jdk11.yml b/.github/workflows/e2e-tests-1.16-jdk11.yml index 9d83302bd518..1ca1efdab291 100644 --- a/.github/workflows/e2e-tests-1.16-jdk11.yml +++ b/.github/workflows/e2e-tests-1.16-jdk11.yml @@ -51,7 +51,7 @@ jobs: . .github/workflows/utils.sh jvm_timezone=$(random_timezone) echo "JVM timezone is set to $jvm_timezone" - mvn -T 1C -B clean install -DskipTests - mvn -T 1C -B test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone + mvn -T 1C -B clean install -DskipTests -Pflink-1.16 + mvn -T 1C -B test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone -Pflink-1.16 env: MAVEN_OPTS: -Xmx4096m \ No newline at end of file diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index 6d212e908a81..2e0b046a2158 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -157,7 +157,10 @@ Map> allTablePaths() { } } - protected abstract String warehouse(); + @Override + public Map options() { + return catalogOptions; + } protected abstract TableSchema getDataTableSchema(Identifier identifier) throws TableNotExistException; diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java index 7e8b03f13164..0bbf07f33d3a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java @@ -27,6 +27,7 @@ import java.io.Serializable; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Optional; /** @@ -212,6 +213,12 @@ default boolean caseSensitive() { return true; } + /** Return the warehouse path. */ + String warehouse(); + + /** Return the catalog options. */ + Map options(); + /** Exception for trying to drop on a database that is not empty. */ class DatabaseNotEmptyException extends Exception { private static final String MSG = "Database %s is not empty."; diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java index e8468cc1b222..a9dfb6e290eb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java @@ -231,7 +231,7 @@ private static String database(Path path) { public void close() throws Exception {} @Override - protected String warehouse() { + public String warehouse() { return warehouse.toString(); } } diff --git a/paimon-flink/paimon-flink-1.18/pom.xml b/paimon-flink/paimon-flink-1.18/pom.xml new file mode 100644 index 000000000000..2df59e0ea8f7 --- /dev/null +++ b/paimon-flink/paimon-flink-1.18/pom.xml @@ -0,0 +1,85 @@ + + + + 4.0.0 + + + org.apache.paimon + paimon-flink + 0.6-SNAPSHOT + + + jar + + paimon-flink-1.18 + Paimon : Flink : 1.18 + + + 1.18-SNAPSHOT + + + + + org.apache.paimon + paimon-flink-common + ${project.version} + + + + org.apache.paimon + paimon-flink-cdc + ${project.version} + + + * + * + + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + shade-paimon + package + + shade + + + + + org.apache.paimon:paimon-flink-common + org.apache.paimon:paimon-flink-cdc + + + + + + + + + diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java index 155b0b5ea1bc..2679aaa3722c 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java @@ -204,7 +204,7 @@ public Map tableConfig() { @VisibleForTesting public Map catalogConfig() { - return catalogConfig; + return catalogOptions.toMap(); } // ------------------------------------------------------------------------ diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java index d8669eba38bf..50dee75eba79 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java @@ -205,7 +205,7 @@ public Map tableConfig() { @VisibleForTesting public Map catalogConfig() { - return catalogConfig; + return catalogOptions.toMap(); } // ------------------------------------------------------------------------ diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java index f6043824c3c9..dd23eaf2ae98 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java @@ -186,7 +186,7 @@ public Map tableConfig() { @VisibleForTesting public Map catalogConfig() { - return catalogConfig; + return catalogOptions.toMap(); } // ------------------------------------------------------------------------ diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java index ed868a0ece3d..401495a4ce6d 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java @@ -203,7 +203,7 @@ public Map tableConfig() { @VisibleForTesting public Map catalogConfig() { - return catalogConfig; + return catalogOptions.toMap(); } // ------------------------------------------------------------------------ diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java index 8fc1ad16e9a4..16e324d80f06 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java @@ -374,7 +374,7 @@ public List excludedTables() { @VisibleForTesting public Map catalogConfig() { - return catalogConfig; + return catalogOptions.toMap(); } @VisibleForTesting diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java index 3fca52255227..b3e13c26ba61 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java @@ -282,7 +282,7 @@ public Map tableConfig() { @VisibleForTesting public Map catalogConfig() { - return catalogConfig; + return catalogOptions.toMap(); } // ------------------------------------------------------------------------ diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java index b4f6dcf48064..f4e1a4577d43 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java @@ -18,7 +18,6 @@ package org.apache.paimon.flink.action.cdc; -import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.action.ActionBase; import org.apache.paimon.flink.action.ActionITCaseBase; import org.apache.paimon.table.FileStoreTable; @@ -28,7 +27,11 @@ import org.apache.paimon.types.RowType; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.core.execution.JobClient; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,9 +53,19 @@ public class CdcActionITCaseBase extends ActionITCaseBase { private static final Logger LOG = LoggerFactory.getLogger(CdcActionITCaseBase.class); - protected FileStoreTable getFileStoreTable(String tableName) throws Exception { - Identifier identifier = Identifier.create(database, tableName); - return (FileStoreTable) catalog.getTable(identifier); + protected StreamExecutionEnvironment env; + + @BeforeEach + public void setEnv() { + env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(2); + env.enableCheckpointing(1000); + env.setRestartStrategy(RestartStrategies.noRestart()); + } + + @AfterEach + public void closeEnv() throws Exception { + env.close(); } protected void waitingTables(String... tables) throws Exception { diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java index 30a50afe370e..79c01d474e7e 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java @@ -566,8 +566,7 @@ public void testCatalogAndTableConfig() { .withTableConfig(Collections.singletonMap("table-key", "table-value")) .build(); - assertThat(action.catalogConfig()) - .containsExactlyEntriesOf(Collections.singletonMap("catalog-key", "catalog-value")); + assertThat(action.catalogConfig()).containsEntry("catalog-key", "catalog-value"); assertThat(action.tableConfig()) .containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value")); } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java index 6b50274e9fad..d21dad37e071 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java @@ -953,8 +953,7 @@ public void testCatalogAndTableConfig() { .withTableConfig(Collections.singletonMap("table-key", "table-value")) .build(); - assertThat(action.catalogConfig()) - .containsExactlyEntriesOf(Collections.singletonMap("catalog-key", "catalog-value")); + assertThat(action.catalogConfig()).containsEntry("catalog-key", "catalog-value"); assertThat(action.tableConfig()) .containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value")); } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java index 0d93200bdf45..2a06b4fc41e7 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java @@ -126,8 +126,7 @@ public void testCatalogAndTableConfig() { .withTableConfig(Collections.singletonMap("table-key", "table-value")) .build(); - assertThat(action.catalogConfig()) - .containsExactlyEntriesOf(Collections.singletonMap("catalog-key", "catalog-value")); + assertThat(action.catalogConfig()).containsEntry("catalog-key", "catalog-value"); assertThat(action.tableConfig()) .containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value")); } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java index 75642b89b910..bd4d78f04dcb 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java @@ -158,8 +158,7 @@ public void testCatalogAndTableConfig() { .withTableConfig(Collections.singletonMap("table-key", "table-value")) .build(); - assertThat(action.catalogConfig()) - .containsExactlyEntriesOf(Collections.singletonMap("catalog-key", "catalog-value")); + assertThat(action.catalogConfig()).containsEntry("catalog-key", "catalog-value"); assertThat(action.tableConfig()) .containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value")); } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java index d41bfe6e7e80..8990a73b7914 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java @@ -1190,8 +1190,7 @@ public void testCatalogAndTableConfig() { .withTableConfig(Collections.singletonMap("table-key", "table-value")) .build(); - assertThat(action.catalogConfig()) - .containsExactlyEntriesOf(Collections.singletonMap("catalog-key", "catalog-value")); + assertThat(action.catalogConfig()).containsEntry("catalog-key", "catalog-value"); assertThat(action.tableConfig()) .containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value")); } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java index 4b3c6ef13c58..d0e725c1e4cb 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java @@ -851,8 +851,7 @@ public void testCatalogAndTableConfig() { .withTableConfig(Collections.singletonMap("table-key", "table-value")) .build(); - assertThat(action.catalogConfig()) - .containsExactlyEntriesOf(Collections.singletonMap("catalog-key", "catalog-value")); + assertThat(action.catalogConfig()).containsEntry("catalog-key", "catalog-value"); assertThat(action.tableConfig()) .containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value")); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java index 406969ea53e1..d150689358a7 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java @@ -891,7 +891,7 @@ public List listProcedures(String dbName) */ public Procedure getProcedure(ObjectPath procedurePath) throws ProcedureNotExistException, CatalogException { - return ProcedureUtil.getProcedure(procedurePath.getObjectName()) + return ProcedureUtil.getProcedure(catalog, procedurePath.getObjectName()) .orElseThrow(() -> new ProcedureNotExistException(name, procedurePath)); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java index 6a4477f4777f..6ee59e765dec 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java @@ -43,9 +43,7 @@ /** Abstract base of {@link Action} for table. */ public abstract class ActionBase implements Action { - private final Options catalogOptions; - - protected final Map catalogConfig; + protected final Options catalogOptions; protected final Catalog catalog; protected final FlinkCatalog flinkCatalog; protected final String catalogName = "paimon-" + UUID.randomUUID(); @@ -53,7 +51,6 @@ public abstract class ActionBase implements Action { protected final StreamTableEnvironment batchTEnv; public ActionBase(String warehouse, Map catalogConfig) { - this.catalogConfig = catalogConfig; catalogOptions = Options.fromMap(catalogConfig); catalogOptions.set(CatalogOptions.WAREHOUSE, warehouse); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java index 3b8147a3efdc..02b8e1943eca 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java @@ -56,8 +56,6 @@ public abstract class TableActionBase extends ActionBase { try { table = catalog.getTable(identifier); } catch (Catalog.TableNotExistException e) { - LOG.error("Table doesn't exist in given path.", e); - System.err.println("Table doesn't exist in given path."); throw new RuntimeException(e); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java new file mode 100644 index 000000000000..3908b3e62b14 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.procedure; + +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.action.CompactAction; +import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils; + +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.configuration.ExecutionOptions; +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.procedure.ProcedureContext; +import org.apache.flink.table.procedures.Procedure; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.apache.paimon.flink.action.ActionFactory.parseCommaSeparatedKeyValues; + +/** + * Compact procedure. Usage: + * + *

+ *  -- compact a table (tableId should be 'database_name.table_name')
+ *  CALL compact(tableId)
+ *
+ *  -- compact specific partitions ('pt1=A,pt2=a', 'pt1=B,pt2=b', ...)
+ *  CALL compact(tableId, partition1, partition2, ...)
+ * 
+ */ +public class CompactProcedure implements Procedure { + + private final String warehouse; + private final Map catalogOptions; + + public CompactProcedure(String warehouse, Map catalogOptions) { + this.warehouse = warehouse; + this.catalogOptions = catalogOptions; + } + + public String[] call(ProcedureContext procedureContext, String tableId) throws Exception { + return call(procedureContext, tableId, new String[0]); + } + + public String[] call( + ProcedureContext procedureContext, String tableId, String... partitionStrings) + throws Exception { + Identifier identifier = Identifier.fromString(tableId); + CompactAction action = + new CompactAction( + warehouse, + identifier.getDatabaseName(), + identifier.getObjectName(), + catalogOptions); + + if (partitionStrings.length != 0) { + List> partitions = new ArrayList<>(); + for (String partition : partitionStrings) { + partitions.add(parseCommaSeparatedKeyValues(partition)); + } + action.withPartitions(partitions); + } + + StreamExecutionEnvironment env = procedureContext.getExecutionEnvironment(); + action.build(env); + + ReadableConfig conf = StreamExecutionEnvironmentUtils.getConfiguration(env); + String name = conf.getOptional(PipelineOptions.NAME).orElse("Compact job"); + if (conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING) { + JobClient jobClient = env.executeAsync(name); + return new String[] {"JobID=" + jobClient.getJobID()}; + } else { + env.execute(name); + return new String[] {"Success"}; + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java index 8e9a024352d1..15a5a186a529 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java @@ -18,13 +18,14 @@ package org.apache.paimon.flink.procedure; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.flink.action.CompactActionFactory; + import org.apache.flink.table.procedures.Procedure; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Optional; /** Utility methods for {@link Procedure}. */ @@ -33,13 +34,21 @@ public class ProcedureUtil { private ProcedureUtil() {} private static final List SYSTEM_PROCEDURES = new ArrayList<>(); - private static final Map SYSTEM_PROCEDURES_MAP = new HashMap<>(); + + static { + SYSTEM_PROCEDURES.add(CompactActionFactory.IDENTIFIER); + } public static List listProcedures() { return Collections.unmodifiableList(SYSTEM_PROCEDURES); } - public static Optional getProcedure(String procedureName) { - return Optional.ofNullable(SYSTEM_PROCEDURES_MAP.get(procedureName)); + public static Optional getProcedure(Catalog catalog, String procedureName) { + switch (procedureName) { + case CompactActionFactory.IDENTIFIER: + return Optional.of(new CompactProcedure(catalog.warehouse(), catalog.options())); + default: + return Optional.empty(); + } } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java index f3a1c5881b6c..e9ca1dcf4e95 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java @@ -35,10 +35,7 @@ import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.TableRead; import org.apache.paimon.types.RowType; -import org.apache.paimon.utils.SnapshotManager; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -55,10 +52,8 @@ public abstract class ActionITCaseBase extends AbstractTestBase { protected String database; protected String tableName; protected String commitUser; - protected SnapshotManager snapshotManager; protected StreamTableWrite write; protected StreamTableCommit commit; - protected StreamExecutionEnvironment env; protected Catalog catalog; private long incrementalIdentifier; @@ -69,11 +64,6 @@ public void before() throws IOException { tableName = "test_table_" + UUID.randomUUID(); commitUser = UUID.randomUUID().toString(); incrementalIdentifier = 0; - env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); - env.setParallelism(2); - env.enableCheckpointing(1000); - env.setRestartStrategy(RestartStrategies.noRestart()); catalog = CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse))); } @@ -81,11 +71,12 @@ public void before() throws IOException { public void after() throws Exception { if (write != null) { write.close(); + write = null; } if (commit != null) { commit.close(); + commit = null; } - env.close(); catalog.close(); } @@ -114,6 +105,11 @@ protected FileStoreTable createFileStoreTable( return (FileStoreTable) catalog.getTable(identifier); } + protected FileStoreTable getFileStoreTable(String tableName) throws Exception { + Identifier identifier = Identifier.create(database, tableName); + return (FileStoreTable) catalog.getTable(identifier); + } + protected GenericRow rowData(Object... values) { return GenericRow.of(values); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java index d2e7e5c17226..f1883b7c4b2e 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java @@ -21,8 +21,6 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; import org.apache.paimon.data.BinaryString; -import org.apache.paimon.flink.FlinkConnectorOptions; -import org.apache.paimon.operation.FileStoreScan; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.StreamWriteBuilder; import org.apache.paimon.table.source.DataSplit; @@ -32,11 +30,16 @@ import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.CommonTestUtils; +import org.apache.paimon.utils.SnapshotManager; import org.apache.flink.api.common.RuntimeExecutionMode; -import org.apache.flink.core.execution.JobClient; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -62,20 +65,11 @@ public class CompactActionITCase extends CompactActionITCaseBase { @Test @Timeout(60) public void testBatchCompact() throws Exception { - Map options = new HashMap<>(); - options.put(CoreOptions.WRITE_ONLY.key(), "true"); - FileStoreTable table = - createFileStoreTable( - ROW_TYPE, + prepareTable( Arrays.asList("dt", "hh"), Arrays.asList("dt", "hh", "k"), - options); - snapshotManager = table.snapshotManager(); - StreamWriteBuilder streamWriteBuilder = - table.newStreamWriteBuilder().withCommitUser(commitUser); - write = streamWriteBuilder.newWrite(); - commit = streamWriteBuilder.newCommit(); + Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "true")); writeData( rowData(1, 100, 15, BinaryString.fromString("20221208")), @@ -87,21 +81,15 @@ public void testBatchCompact() throws Exception { rowData(2, 100, 16, BinaryString.fromString("20221208")), rowData(2, 100, 15, BinaryString.fromString("20221209"))); - Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId()); - assertThat(snapshot.id()).isEqualTo(2); - assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND); + checkLatestSnapshot(table, 2, Snapshot.CommitKind.APPEND); - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setRuntimeMode(RuntimeExecutionMode.BATCH); - env.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1); - new CompactAction(warehouse, database, tableName) - .withPartitions(getSpecifiedPartitions()) - .build(env); - env.execute(); + if (ThreadLocalRandom.current().nextBoolean()) { + runAction(false); + } else { + callProcedure(false); + } - snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId()); - assertThat(snapshot.id()).isEqualTo(3); - assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT); + checkLatestSnapshot(table, 3, Snapshot.CommitKind.COMPACT); List splits = table.newSnapshotReader().read().dataSplits(); assertThat(splits.size()).isEqualTo(3); @@ -118,28 +106,18 @@ public void testBatchCompact() throws Exception { @Test public void testStreamingCompact() throws Exception { - Map options = new HashMap<>(); - options.put(CoreOptions.CHANGELOG_PRODUCER.key(), "full-compaction"); - options.put( - FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL.key(), - "1s"); - options.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), "1s"); - options.put(CoreOptions.WRITE_ONLY.key(), "true"); + Map tableOptions = new HashMap<>(); + tableOptions.put(CoreOptions.CHANGELOG_PRODUCER.key(), "full-compaction"); + tableOptions.put(CoreOptions.FULL_COMPACTION_DELTA_COMMITS.key(), "1"); + tableOptions.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), "1s"); + tableOptions.put(CoreOptions.WRITE_ONLY.key(), "true"); // test that dedicated compact job will expire snapshots - options.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "3"); - options.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "3"); + tableOptions.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "3"); + tableOptions.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "3"); FileStoreTable table = - createFileStoreTable( - ROW_TYPE, - Arrays.asList("dt", "hh"), - Arrays.asList("dt", "hh", "k"), - options); - snapshotManager = table.snapshotManager(); - StreamWriteBuilder streamWriteBuilder = - table.newStreamWriteBuilder().withCommitUser(commitUser); - write = streamWriteBuilder.newWrite(); - commit = streamWriteBuilder.newCommit(); + prepareTable( + Arrays.asList("dt", "hh"), Arrays.asList("dt", "hh", "k"), tableOptions); // base records writeData( @@ -147,24 +125,18 @@ public void testStreamingCompact() throws Exception { rowData(1, 100, 16, BinaryString.fromString("20221208")), rowData(1, 100, 15, BinaryString.fromString("20221209"))); - Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId()); - assertThat(snapshot.id()).isEqualTo(1); - assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND); + checkLatestSnapshot(table, 1, Snapshot.CommitKind.APPEND); // no full compaction has happened, so plan should be empty StreamTableScan scan = table.newReadBuilder().newStreamScan(); TableScan.Plan plan = scan.plan(); assertThat(plan.splits()).isEmpty(); - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setRuntimeMode(RuntimeExecutionMode.STREAMING); - env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); - env.getCheckpointConfig().setCheckpointInterval(500); - env.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1); - new CompactAction(warehouse, database, tableName) - .withPartitions(getSpecifiedPartitions()) - .build(env); - JobClient client = env.executeAsync(); + if (ThreadLocalRandom.current().nextBoolean()) { + runAction(true); + } else { + callProcedure(true); + } // first full compaction validateResult( @@ -193,6 +165,7 @@ public void testStreamingCompact() throws Exception { 60_000); // assert dedicated compact job will expire snapshots + SnapshotManager snapshotManager = table.snapshotManager(); CommonTestUtils.waitUtil( () -> snapshotManager.latestSnapshotId() - 2 @@ -200,27 +173,19 @@ public void testStreamingCompact() throws Exception { Duration.ofSeconds(60_000), Duration.ofSeconds(100), String.format("Cannot validate snapshot expiration in %s milliseconds.", 60_000)); - - client.cancel(); } @Test public void testUnawareBucketStreamingCompact() throws Exception { - Map options = new HashMap<>(); - options.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), "1s"); + Map tableOptions = new HashMap<>(); + tableOptions.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), "1s"); // test that dedicated compact job will expire snapshots - options.put(CoreOptions.BUCKET.key(), "-1"); - options.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2"); - options.put(CoreOptions.COMPACTION_MAX_FILE_NUM.key(), "2"); + tableOptions.put(CoreOptions.BUCKET.key(), "-1"); + tableOptions.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2"); + tableOptions.put(CoreOptions.COMPACTION_MAX_FILE_NUM.key(), "2"); FileStoreTable table = - createFileStoreTable( - ROW_TYPE, Collections.singletonList("k"), Collections.emptyList(), options); - snapshotManager = table.snapshotManager(); - StreamWriteBuilder streamWriteBuilder = - table.newStreamWriteBuilder().withCommitUser(commitUser); - write = streamWriteBuilder.newWrite(); - commit = streamWriteBuilder.newCommit(); + prepareTable(Collections.singletonList("k"), Collections.emptyList(), tableOptions); // base records writeData( @@ -233,21 +198,16 @@ public void testUnawareBucketStreamingCompact() throws Exception { rowData(1, 100, 16, BinaryString.fromString("20221208")), rowData(1, 100, 15, BinaryString.fromString("20221209"))); - Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId()); - assertThat(snapshot.id()).isEqualTo(2); - assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND); + checkLatestSnapshot(table, 2, Snapshot.CommitKind.APPEND); - FileStoreScan storeScan = table.store().newScan(); - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setRuntimeMode(RuntimeExecutionMode.STREAMING); - env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); - env.getCheckpointConfig().setCheckpointInterval(500); - env.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1); - new CompactAction(warehouse, database, tableName).build(env); - JobClient client = env.executeAsync(); + if (ThreadLocalRandom.current().nextBoolean()) { + runAction(true); + } else { + callProcedure(true); + } // first compaction, snapshot will be 3 - checkFileAndRowSize(storeScan, 3L, 30_000L, 1, 6); + checkFileAndRowSize(table, 3L, 30_000L, 1, 6); writeData( rowData(1, 101, 15, BinaryString.fromString("20221208")), @@ -255,27 +215,18 @@ public void testUnawareBucketStreamingCompact() throws Exception { rowData(1, 101, 15, BinaryString.fromString("20221209"))); // second compaction, snapshot will be 5 - checkFileAndRowSize(storeScan, 5L, 30_000L, 1, 9); - - client.cancel().get(); + checkFileAndRowSize(table, 5L, 30_000L, 1, 9); } @Test public void testUnawareBucketBatchCompact() throws Exception { - Map options = new HashMap<>(); - // test that dedicated compact job will expire snapshots - options.put(CoreOptions.BUCKET.key(), "-1"); - options.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2"); - options.put(CoreOptions.COMPACTION_MAX_FILE_NUM.key(), "2"); + Map tableOptions = new HashMap<>(); + tableOptions.put(CoreOptions.BUCKET.key(), "-1"); + tableOptions.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2"); + tableOptions.put(CoreOptions.COMPACTION_MAX_FILE_NUM.key(), "2"); FileStoreTable table = - createFileStoreTable( - ROW_TYPE, Collections.singletonList("k"), Collections.emptyList(), options); - snapshotManager = table.snapshotManager(); - StreamWriteBuilder streamWriteBuilder = - table.newStreamWriteBuilder().withCommitUser(commitUser); - write = streamWriteBuilder.newWrite(); - commit = streamWriteBuilder.newCommit(); + prepareTable(Collections.singletonList("k"), Collections.emptyList(), tableOptions); // base records writeData( @@ -288,19 +239,93 @@ public void testUnawareBucketBatchCompact() throws Exception { rowData(1, 100, 16, BinaryString.fromString("20221208")), rowData(1, 100, 15, BinaryString.fromString("20221209"))); + checkLatestSnapshot(table, 2, Snapshot.CommitKind.APPEND); + + if (ThreadLocalRandom.current().nextBoolean()) { + runAction(false); + } else { + callProcedure(false); + } + + // first compaction, snapshot will be 3. + checkFileAndRowSize(table, 3L, 0L, 1, 6); + } + + private FileStoreTable prepareTable( + List partitionKeys, List primaryKeys, Map tableOptions) + throws Exception { + FileStoreTable table = + createFileStoreTable(ROW_TYPE, partitionKeys, primaryKeys, tableOptions); + + StreamWriteBuilder streamWriteBuilder = + table.newStreamWriteBuilder().withCommitUser(commitUser); + write = streamWriteBuilder.newWrite(); + commit = streamWriteBuilder.newCommit(); + + return table; + } + + private void checkLatestSnapshot( + FileStoreTable table, long snapshotId, Snapshot.CommitKind commitKind) { + SnapshotManager snapshotManager = table.snapshotManager(); Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId()); - assertThat(snapshot.id()).isEqualTo(2); - assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND); + assertThat(snapshot.id()).isEqualTo(snapshotId); + assertThat(snapshot.commitKind()).isEqualTo(commitKind); + } + + private void runAction(boolean isStreaming) throws Exception { + StreamExecutionEnvironment env = buildDefaultEnv(isStreaming); + + new CompactAction(warehouse, database, tableName) + .withPartitions(getSpecifiedPartitions()) + .build(env); + if (isStreaming) { + env.executeAsync(); + } else { + env.execute(); + } + } + + private void callProcedure(boolean isStreaming) { + StreamExecutionEnvironment env = buildDefaultEnv(isStreaming); + + TableEnvironment tEnv; + if (isStreaming) { + tEnv = StreamTableEnvironment.create(env, EnvironmentSettings.inStreamingMode()); + tEnv.getConfig() + .set( + ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, + Duration.ofMillis(500)); + } else { + tEnv = StreamTableEnvironment.create(env, EnvironmentSettings.inBatchMode()); + } + + tEnv.executeSql( + String.format( + "CREATE CATALOG PAIMON WITH ('type'='paimon', 'warehouse'='%s');", + warehouse)); + tEnv.useCatalog("PAIMON"); - FileStoreScan storeScan = table.store().newScan(); + tEnv.executeSql( + String.format( + "CALL compact('%s.%s', '%s', '%s')", + database, tableName, "dt=20221208,hh=15", "dt=20221209,hh=15")); + } + + private StreamExecutionEnvironment buildDefaultEnv(boolean isStreaming) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setRuntimeMode(RuntimeExecutionMode.BATCH); - env.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1); - new CompactAction(warehouse, database, tableName).build(env); - env.execute(); + env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); + env.setParallelism(2); + + if (isStreaming) { + env.setRuntimeMode(RuntimeExecutionMode.STREAMING); + env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); + env.getCheckpointConfig().setCheckpointInterval(500); + } else { + env.setRuntimeMode(RuntimeExecutionMode.BATCH); + } - // first compaction, snapshot will be 3. - checkFileAndRowSize(storeScan, 3L, 0L, 1, 6); + return env; } private List> getSpecifiedPartitions() { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCaseBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCaseBase.java index 0f2ada0a959e..4c646444cb72 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCaseBase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCaseBase.java @@ -25,6 +25,7 @@ import org.apache.paimon.table.source.StreamTableScan; import org.apache.paimon.table.source.TableScan; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.SnapshotManager; import java.util.ArrayList; import java.util.List; @@ -35,6 +36,7 @@ /** Base IT cases for {@link CompactAction} and {@link CompactDatabaseAction} . */ public class CompactActionITCaseBase extends ActionITCaseBase { + protected void validateResult( FileStoreTable table, RowType rowType, @@ -63,8 +65,10 @@ protected void validateResult( } protected void checkFileAndRowSize( - FileStoreScan scan, Long expectedSnapshotId, Long timeout, int fileNum, long rowCount) + FileStoreTable table, Long expectedSnapshotId, Long timeout, int fileNum, long rowCount) throws Exception { + SnapshotManager snapshotManager = table.snapshotManager(); + FileStoreScan scan = table.store().newScan(); long start = System.currentTimeMillis(); while (!Objects.equals(snapshotManager.latestSnapshotId(), expectedSnapshotId)) { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java index 2c112f2909b2..ce2e72641426 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java @@ -24,7 +24,6 @@ import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; import org.apache.paimon.flink.FlinkConnectorOptions; -import org.apache.paimon.operation.FileStoreScan; import org.apache.paimon.schema.Schema; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.StreamTableCommit; @@ -37,6 +36,7 @@ import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.CommonTestUtils; +import org.apache.paimon.utils.SnapshotManager; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.core.execution.JobClient; @@ -75,17 +75,15 @@ public class CompactDatabaseActionITCase extends CompactActionITCaseBase { private FileStoreTable createTable( String databaseName, String tableName, - RowType rowType, List partitionKeys, List primaryKeys, Map options) throws Exception { - Identifier identifier = Identifier.create(databaseName, tableName); catalog.createDatabase(databaseName, true); catalog.createTable( identifier, - new Schema(rowType.getFields(), partitionKeys, primaryKeys, options, ""), + new Schema(ROW_TYPE.getFields(), partitionKeys, primaryKeys, options, ""), false); return (FileStoreTable) catalog.getTable(identifier); } @@ -107,12 +105,11 @@ public void testBatchCompact(String mode) throws Exception { createTable( dbName, tableName, - ROW_TYPE, Arrays.asList("dt", "hh"), Arrays.asList("dt", "hh", "k"), options); tables.add(table); - snapshotManager = table.snapshotManager(); + SnapshotManager snapshotManager = table.snapshotManager(); StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser); write = streamWriteBuilder.newWrite(); @@ -156,7 +153,7 @@ public void testBatchCompact(String mode) throws Exception { env.execute(); for (FileStoreTable table : tables) { - snapshotManager = table.snapshotManager(); + SnapshotManager snapshotManager = table.snapshotManager(); Snapshot snapshot = table.snapshotManager().snapshot(snapshotManager.latestSnapshotId()); assertThat(snapshot.id()).isEqualTo(3); @@ -196,12 +193,11 @@ public void testStreamingCompact(String mode) throws Exception { createTable( dbName, tableName, - ROW_TYPE, Arrays.asList("dt", "hh"), Arrays.asList("dt", "hh", "k"), options); tables.add(table); - snapshotManager = table.snapshotManager(); + SnapshotManager snapshotManager = table.snapshotManager(); StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser); write = streamWriteBuilder.newWrite(); @@ -259,7 +255,7 @@ public void testStreamingCompact(String mode) throws Exception { "+I[1, 100, 16, 20221208]"), 60_000); - snapshotManager = table.snapshotManager(); + SnapshotManager snapshotManager = table.snapshotManager(); StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser); write = streamWriteBuilder.newWrite(); @@ -306,12 +302,11 @@ public void testStreamingCompact(String mode) throws Exception { createTable( dbName, tableName, - ROW_TYPE, Arrays.asList("dt", "hh"), Arrays.asList("dt", "hh", "k"), options); newtables.add(table); - snapshotManager = table.snapshotManager(); + SnapshotManager snapshotManager = table.snapshotManager(); StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser); write = streamWriteBuilder.newWrite(); @@ -346,7 +341,7 @@ public void testStreamingCompact(String mode) throws Exception { "+I[1, 100, 16, 20221208]"), 60_000); - snapshotManager = table.snapshotManager(); + SnapshotManager snapshotManager = table.snapshotManager(); StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser); write = streamWriteBuilder.newWrite(); @@ -454,7 +449,6 @@ private void includingAndExcludingTablesImpl( createTable( dbName, tableName, - ROW_TYPE, Arrays.asList("dt", "hh"), Arrays.asList("dt", "hh", "k"), options); @@ -464,7 +458,7 @@ private void includingAndExcludingTablesImpl( noCompactionTables.add(table); } - snapshotManager = table.snapshotManager(); + SnapshotManager snapshotManager = table.snapshotManager(); StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser); write = streamWriteBuilder.newWrite(); @@ -508,7 +502,7 @@ private void includingAndExcludingTablesImpl( env.execute(); for (FileStoreTable table : compactionTables) { - snapshotManager = table.snapshotManager(); + SnapshotManager snapshotManager = table.snapshotManager(); Snapshot snapshot = table.snapshotManager().snapshot(snapshotManager.latestSnapshotId()); @@ -523,7 +517,7 @@ private void includingAndExcludingTablesImpl( } for (FileStoreTable table : noCompactionTables) { - snapshotManager = table.snapshotManager(); + SnapshotManager snapshotManager = table.snapshotManager(); Snapshot snapshot = table.snapshotManager().snapshot(snapshotManager.latestSnapshotId()); @@ -553,12 +547,11 @@ public void testUnawareBucketStreamingCompact() throws Exception { createTable( database, tableName, - ROW_TYPE, - Arrays.asList("k"), + Collections.singletonList("k"), Collections.emptyList(), options); tables.add(table); - snapshotManager = table.snapshotManager(); + SnapshotManager snapshotManager = table.snapshotManager(); StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser); write = streamWriteBuilder.newWrite(); @@ -593,16 +586,13 @@ public void testUnawareBucketStreamingCompact() throws Exception { JobClient client = env.executeAsync(); for (FileStoreTable table : tables) { - FileStoreScan storeScan = table.store().newScan(); - - snapshotManager = table.snapshotManager(); StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser); write = streamWriteBuilder.newWrite(); commit = streamWriteBuilder.newCommit(); // first compaction, snapshot will be 3 - checkFileAndRowSize(storeScan, 3L, 30_000L, 1, 6); + checkFileAndRowSize(table, 3L, 30_000L, 1, 6); writeData( rowData(1, 101, 15, BinaryString.fromString("20221208")), @@ -610,7 +600,7 @@ public void testUnawareBucketStreamingCompact() throws Exception { rowData(1, 101, 15, BinaryString.fromString("20221209"))); // second compaction, snapshot will be 5 - checkFileAndRowSize(storeScan, 5L, 30_000L, 1, 9); + checkFileAndRowSize(table, 5L, 30_000L, 1, 9); } client.cancel().get(); @@ -630,12 +620,11 @@ public void testUnawareBucketBatchCompact() throws Exception { createTable( database, tableName, - ROW_TYPE, Collections.singletonList("k"), Collections.emptyList(), options); tables.add(table); - snapshotManager = table.snapshotManager(); + SnapshotManager snapshotManager = table.snapshotManager(); StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser); write = streamWriteBuilder.newWrite(); @@ -668,10 +657,8 @@ public void testUnawareBucketBatchCompact() throws Exception { env.execute(); for (FileStoreTable table : tables) { - FileStoreScan storeScan = table.store().newScan(); - snapshotManager = table.snapshotManager(); // first compaction, snapshot will be 3. - checkFileAndRowSize(storeScan, 3L, 0L, 1, 6); + checkFileAndRowSize(table, 3L, 0L, 1, 6); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java index ab20863e85d5..b250c80e2d8a 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java @@ -28,6 +28,7 @@ import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.BlockingIterator; +import org.apache.paimon.utils.SnapshotManager; import org.apache.flink.types.Row; import org.junit.jupiter.api.BeforeEach; @@ -79,6 +80,7 @@ public void testDeleteAction(boolean hasPk, List initialRecords, List action.run(); + SnapshotManager snapshotManager = getFileStoreTable(tableName).snapshotManager(); Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId()); assertThat(snapshot.id()).isEqualTo(2); assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND); @@ -156,7 +158,7 @@ private void prepareTable(boolean hasPk) throws Exception { Collections.emptyList(), hasPk ? Collections.singletonList("k") : Collections.emptyList(), options); - snapshotManager = table.snapshotManager(); + SnapshotManager snapshotManager = table.snapshotManager(); StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser); write = streamWriteBuilder.newWrite(); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java index c5c4c941b933..e477adfed553 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java @@ -27,6 +27,7 @@ import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.SnapshotManager; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -61,6 +62,7 @@ public void testDropPartitionWithSinglePartitionKey(boolean hasPk) throws Except Collections.emptyMap()) .run(); + SnapshotManager snapshotManager = getFileStoreTable(tableName).snapshotManager(); Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId()); assertThat(snapshot.id()).isEqualTo(5); assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.OVERWRITE); @@ -112,6 +114,7 @@ public void testDropPartitionWithMultiplePartitionKey(boolean hasPk) throws Exce Collections.emptyMap()) .run(); + SnapshotManager snapshotManager = getFileStoreTable(tableName).snapshotManager(); Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId()); assertThat(snapshot.id()).isEqualTo(5); assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.OVERWRITE); @@ -153,7 +156,7 @@ private FileStoreTable prepareTable(boolean hasPk) throws Exception { ? Arrays.asList("partKey0", "partKey1", "dt") : Collections.emptyList(), new HashMap<>()); - snapshotManager = table.snapshotManager(); + SnapshotManager snapshotManager = table.snapshotManager(); StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser); write = streamWriteBuilder.newWrite(); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java index 38555d47d53b..97c8637a5ba3 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java @@ -56,7 +56,6 @@ public void rollbackToSnapshotTest() throws Exception { Collections.emptyList(), Collections.singletonList("k"), Collections.emptyMap()); - snapshotManager = table.snapshotManager(); StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser); write = writeBuilder.newWrite(); commit = writeBuilder.newCommit(); @@ -83,7 +82,6 @@ public void rollbackToTagTest() throws Exception { Collections.emptyList(), Collections.singletonList("k"), Collections.emptyMap()); - snapshotManager = table.snapshotManager(); StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser); write = writeBuilder.newWrite(); commit = writeBuilder.newCommit(); diff --git a/paimon-flink/pom.xml b/paimon-flink/pom.xml index e92bb526b199..8daa819ee26c 100644 --- a/paimon-flink/pom.xml +++ b/paimon-flink/pom.xml @@ -39,6 +39,7 @@ under the License. paimon-flink-1.15 paimon-flink-1.16 paimon-flink-1.17 + paimon-flink-1.18 paimon-flink-action paimon-flink-cdc diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index e075d7e2130c..8893960eaf57 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -441,7 +441,7 @@ public void close() throws Exception { } @Override - protected String warehouse() { + public String warehouse() { return warehouse; } From d2595055916742144b2e670d8e19e64bf931ca53 Mon Sep 17 00:00:00 2001 From: yuzelin <747884505@qq.com> Date: Fri, 15 Sep 2023 17:53:10 +0800 Subject: [PATCH 2/5] sort compact --- .../flink/action/CompactActionFactory.java | 23 ++----- .../flink/action/SortCompactAction.java | 14 +++- .../flink/procedure/CompactProcedure.java | 58 +++++++++++++--- .../paimon/flink/action/ActionITCaseBase.java | 48 +++++++++++++ .../flink/action/CompactActionITCase.java | 50 ++------------ ...Case.java => SortCompactActionITCase.java} | 67 ++++++++----------- 6 files changed, 141 insertions(+), 119 deletions(-) rename paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/{OrderRewriteActionITCase.java => SortCompactActionITCase.java} (86%) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java index b74bd8b2f714..cb53c5db70aa 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java @@ -21,7 +21,6 @@ import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.utils.MultipleParameterTool; -import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Optional; @@ -44,24 +43,10 @@ public Optional create(MultipleParameterTool params) { CompactAction action; if (params.has("order-strategy")) { - SortCompactAction sortCompactAction = - new SortCompactAction(tablePath.f0, tablePath.f1, tablePath.f2, catalogConfig); - - String strategy = params.get("order-strategy"); - sortCompactAction.withOrderStrategy(strategy); - - if (params.has("order-by")) { - String sqlOrderBy = params.get("order-by"); - if (sqlOrderBy == null) { - throw new IllegalArgumentException("Please specify \"order-by\"."); - } - sortCompactAction.withOrderColumns(Arrays.asList(sqlOrderBy.split(","))); - } else { - throw new IllegalArgumentException( - "Please specify order columns in parameter --order-by."); - } - - action = sortCompactAction; + action = + new SortCompactAction(tablePath.f0, tablePath.f1, tablePath.f2, catalogConfig) + .withOrderStrategy(params.get("order-strategy")) + .withOrderColumns(getRequiredValue(params, "order-by").split(",")); } else { action = new CompactAction(tablePath.f0, tablePath.f1, tablePath.f2, catalogConfig); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java index 16557a5089af..522aace0007d 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java @@ -38,10 +38,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -126,11 +128,17 @@ public void build(StreamExecutionEnvironment env) { flinkSinkBuilder.build(); } - public void withOrderStrategy(String sortStrategy) { + public SortCompactAction withOrderStrategy(String sortStrategy) { this.sortStrategy = sortStrategy; + return this; } - public void withOrderColumns(List orderColumns) { - this.orderColumns = orderColumns; + public SortCompactAction withOrderColumns(String... orderColumns) { + return withOrderColumns(Arrays.asList(orderColumns)); + } + + public SortCompactAction withOrderColumns(List orderColumns) { + this.orderColumns = orderColumns.stream().map(String::trim).collect(Collectors.toList()); + return this; } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java index 3908b3e62b14..ab20f09ffcc5 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java @@ -20,6 +20,7 @@ import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.action.CompactAction; +import org.apache.paimon.flink.action.SortCompactAction; import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils; import org.apache.flink.api.common.RuntimeExecutionMode; @@ -42,10 +43,14 @@ * *

  *  -- compact a table (tableId should be 'database_name.table_name')
- *  CALL compact(tableId)
+ *  CALL compact('tableId')
+ *
+ *  -- compact a table with sorting
+ *  CALL compact('tableId', 'order-strategy', 'order-by-columns')
  *
  *  -- compact specific partitions ('pt1=A,pt2=a', 'pt1=B,pt2=b', ...)
- *  CALL compact(tableId, partition1, partition2, ...)
+ *  -- NOTE: if you don't need sorting but you want specify partitions, use '' as placeholder
+ *  CALL compact('tableId', '', '', partition1, partition2, ...)
  * 
*/ public class CompactProcedure implements Procedure { @@ -59,19 +64,50 @@ public CompactProcedure(String warehouse, Map catalogOptions) { } public String[] call(ProcedureContext procedureContext, String tableId) throws Exception { - return call(procedureContext, tableId, new String[0]); + return call(procedureContext, tableId, "", ""); } public String[] call( - ProcedureContext procedureContext, String tableId, String... partitionStrings) + ProcedureContext procedureContext, + String tableId, + String orderStrategy, + String orderByColumns) + throws Exception { + return call(procedureContext, tableId, orderStrategy, orderByColumns, new String[0]); + } + + public String[] call( + ProcedureContext procedureContext, + String tableId, + String orderStrategy, + String orderByColumns, + String... partitionStrings) throws Exception { Identifier identifier = Identifier.fromString(tableId); - CompactAction action = - new CompactAction( - warehouse, - identifier.getDatabaseName(), - identifier.getObjectName(), - catalogOptions); + CompactAction action; + String jobName; + if (orderStrategy.isEmpty() && orderByColumns.isEmpty()) { + action = + new CompactAction( + warehouse, + identifier.getDatabaseName(), + identifier.getObjectName(), + catalogOptions); + jobName = "Compact Job"; + } else if (!orderStrategy.isEmpty() && !orderByColumns.isEmpty()) { + action = + new SortCompactAction( + warehouse, + identifier.getDatabaseName(), + identifier.getObjectName(), + catalogOptions) + .withOrderStrategy(orderStrategy) + .withOrderColumns(orderByColumns.split(",")); + jobName = "Sort Compact Job"; + } else { + throw new IllegalArgumentException( + "You must specify 'order strategy' and 'order by columns' both."); + } if (partitionStrings.length != 0) { List> partitions = new ArrayList<>(); @@ -85,7 +121,7 @@ public String[] call( action.build(env); ReadableConfig conf = StreamExecutionEnvironmentUtils.getConfiguration(env); - String name = conf.getOptional(PipelineOptions.NAME).orElse("Compact job"); + String name = conf.getOptional(PipelineOptions.NAME).orElse(jobName); if (conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING) { JobClient jobClient = env.executeAsync(name); return new String[] {"JobID=" + jobClient.getJobID()}; diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java index e9ca1dcf4e95..bad590c2690f 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java @@ -36,10 +36,19 @@ import org.apache.paimon.table.source.TableRead; import org.apache.paimon.types.RowType; +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -131,4 +140,43 @@ protected List getResult(TableRead read, List splits, RowType row return result; } } + + protected StreamExecutionEnvironment buildDefaultEnv(boolean isStreaming) { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); + env.setParallelism(2); + + if (isStreaming) { + env.setRuntimeMode(RuntimeExecutionMode.STREAMING); + env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); + env.getCheckpointConfig().setCheckpointInterval(500); + } else { + env.setRuntimeMode(RuntimeExecutionMode.BATCH); + } + + return env; + } + + protected void callProcedure(String procedureStatement, boolean isStreaming) { + StreamExecutionEnvironment env = buildDefaultEnv(isStreaming); + + TableEnvironment tEnv; + if (isStreaming) { + tEnv = StreamTableEnvironment.create(env, EnvironmentSettings.inStreamingMode()); + tEnv.getConfig() + .set( + ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, + Duration.ofMillis(500)); + } else { + tEnv = StreamTableEnvironment.create(env, EnvironmentSettings.inBatchMode()); + } + + tEnv.executeSql( + String.format( + "CREATE CATALOG PAIMON WITH ('type'='paimon', 'warehouse'='%s');", + warehouse)); + tEnv.useCatalog("PAIMON"); + + tEnv.executeSql(procedureStatement); + } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java index f1883b7c4b2e..e4a3348da083 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java @@ -32,14 +32,7 @@ import org.apache.paimon.utils.CommonTestUtils; import org.apache.paimon.utils.SnapshotManager; -import org.apache.flink.api.common.RuntimeExecutionMode; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; -import org.apache.flink.streaming.api.CheckpointingMode; -import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.EnvironmentSettings; -import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -179,7 +172,6 @@ public void testStreamingCompact() throws Exception { public void testUnawareBucketStreamingCompact() throws Exception { Map tableOptions = new HashMap<>(); tableOptions.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), "1s"); - // test that dedicated compact job will expire snapshots tableOptions.put(CoreOptions.BUCKET.key(), "-1"); tableOptions.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2"); tableOptions.put(CoreOptions.COMPACTION_MAX_FILE_NUM.key(), "2"); @@ -287,45 +279,11 @@ private void runAction(boolean isStreaming) throws Exception { } private void callProcedure(boolean isStreaming) { - StreamExecutionEnvironment env = buildDefaultEnv(isStreaming); - - TableEnvironment tEnv; - if (isStreaming) { - tEnv = StreamTableEnvironment.create(env, EnvironmentSettings.inStreamingMode()); - tEnv.getConfig() - .set( - ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, - Duration.ofMillis(500)); - } else { - tEnv = StreamTableEnvironment.create(env, EnvironmentSettings.inBatchMode()); - } - - tEnv.executeSql( - String.format( - "CREATE CATALOG PAIMON WITH ('type'='paimon', 'warehouse'='%s');", - warehouse)); - tEnv.useCatalog("PAIMON"); - - tEnv.executeSql( + callProcedure( String.format( - "CALL compact('%s.%s', '%s', '%s')", - database, tableName, "dt=20221208,hh=15", "dt=20221209,hh=15")); - } - - private StreamExecutionEnvironment buildDefaultEnv(boolean isStreaming) { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); - env.setParallelism(2); - - if (isStreaming) { - env.setRuntimeMode(RuntimeExecutionMode.STREAMING); - env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); - env.getCheckpointConfig().setCheckpointInterval(500); - } else { - env.setRuntimeMode(RuntimeExecutionMode.BATCH); - } - - return env; + "CALL compact('%s.%s', '', '', '%s', '%s')", + database, tableName, "dt=20221208,hh=15", "dt=20221209,hh=15"), + isStreaming); } private List> getSpecifiedPartitions() { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/OrderRewriteActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionITCase.java similarity index 86% rename from paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/OrderRewriteActionITCase.java rename to paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionITCase.java index a4e2424f0cb3..e3246706442a 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/OrderRewriteActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionITCase.java @@ -18,19 +18,13 @@ package org.apache.paimon.flink.action; -import org.apache.paimon.catalog.Catalog; -import org.apache.paimon.catalog.CatalogContext; -import org.apache.paimon.catalog.CatalogFactory; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.Decimal; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.Timestamp; -import org.apache.paimon.fs.Path; import org.apache.paimon.manifest.ManifestEntry; -import org.apache.paimon.options.CatalogOptions; -import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.schema.Schema; @@ -44,7 +38,6 @@ import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; import java.util.ArrayList; import java.util.Arrays; @@ -54,13 +47,10 @@ import java.util.concurrent.atomic.AtomicInteger; /** Order Rewrite Action tests for {@link SortCompactAction}. */ -public class OrderRewriteActionITCase extends ActionITCaseBase { +public class SortCompactActionITCase extends ActionITCaseBase { private static final Random random = new Random(); - private Catalog catalog; - @TempDir private java.nio.file.Path path; - private void prepareData(int size, int loop) throws Exception { createTable(); List commitMessages = new ArrayList<>(); @@ -217,45 +207,42 @@ public void testCompareZorderAndOrder() throws Exception { } private void zorder(List columns) throws Exception { - SortCompactAction sortCompactAction = - new SortCompactAction( - new Path(path.toUri()).toUri().toString(), - "my_db", - "Orders1", - Collections.emptyMap()); - sortCompactAction.withOrderStrategy("zorder"); - sortCompactAction.withOrderColumns(columns); - sortCompactAction.run(); + if (random.nextBoolean()) { + new SortCompactAction(warehouse, database, tableName, Collections.emptyMap()) + .withOrderStrategy("zorder") + .withOrderColumns(columns) + .run(); + } else { + callProcedure("zorder", columns); + } } private void order(List columns) throws Exception { - SortCompactAction sortCompactAction = - new SortCompactAction( - new Path(path.toUri()).toUri().toString(), - "my_db", - "Orders1", - Collections.emptyMap()); - sortCompactAction.withOrderStrategy("order"); - sortCompactAction.withOrderColumns(columns); - sortCompactAction.run(); + if (random.nextBoolean()) { + new SortCompactAction(warehouse, database, tableName, Collections.emptyMap()) + .withOrderStrategy("order") + .withOrderColumns(columns) + .run(); + } else { + callProcedure("order", columns); + } } - public Catalog getCatalog() { - if (catalog == null) { - Options options = new Options(); - options.set(CatalogOptions.WAREHOUSE, new Path(path.toUri()).toUri().toString()); - catalog = CatalogFactory.createCatalog(CatalogContext.create(options)); - } - return catalog; + private void callProcedure(String orderStrategy, List orderByColumns) { + callProcedure( + String.format( + "CALL compact('%s.%s', '%s', '%s')", + database, tableName, orderStrategy, String.join(",", orderByColumns)), + false); } public void createTable() throws Exception { - getCatalog().createDatabase("my_db", true); - getCatalog().createTable(identifier(), schema(), true); + catalog.createDatabase(database, true); + catalog.createTable(identifier(), schema(), true); } public Identifier identifier() { - return Identifier.create("my_db", "Orders1"); + return Identifier.create(database, tableName); } private void commit(List messages) throws Exception { @@ -299,7 +286,7 @@ private List writeData(int size) throws Exception { } public Table getTable() throws Exception { - return getCatalog().getTable(identifier()); + return catalog.getTable(identifier()); } private static List writeOnce(Table table, int p, int size) throws Exception { From ae416b91ccb5308366a784caee884e8d1d1a4e5f Mon Sep 17 00:00:00 2001 From: yuzelin <747884505@qq.com> Date: Mon, 18 Sep 2023 15:16:33 +0800 Subject: [PATCH 3/5] [fix] address comments --- .../org/apache/paimon/catalog/AbstractCatalog.java | 3 ++- .../main/java/org/apache/paimon/catalog/Catalog.java | 7 ------- .../action/cdc/kafka/KafkaSyncDatabaseAction.java | 5 ----- .../flink/action/cdc/kafka/KafkaSyncTableAction.java | 5 ----- .../action/cdc/mongodb/MongoDBSyncDatabaseAction.java | 5 ----- .../action/cdc/mongodb/MongoDBSyncTableAction.java | 5 ----- .../action/cdc/mysql/MySqlSyncDatabaseAction.java | 5 ----- .../flink/action/cdc/mysql/MySqlSyncTableAction.java | 5 ----- .../org/apache/paimon/flink/action/ActionBase.java | 6 ++++++ .../paimon/flink/procedure/CompactProcedure.java | 11 +++++------ .../apache/paimon/flink/procedure/ProcedureUtil.java | 11 ++++++++++- 11 files changed, 23 insertions(+), 45 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index 2e0b046a2158..29cd157470b1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -157,7 +157,8 @@ Map> allTablePaths() { } } - @Override + public abstract String warehouse(); + public Map options() { return catalogOptions; } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java index 0bbf07f33d3a..7e8b03f13164 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java @@ -27,7 +27,6 @@ import java.io.Serializable; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.Optional; /** @@ -213,12 +212,6 @@ default boolean caseSensitive() { return true; } - /** Return the warehouse path. */ - String warehouse(); - - /** Return the catalog options. */ - Map options(); - /** Exception for trying to drop on a database that is not empty. */ class DatabaseNotEmptyException extends Exception { private static final String MSG = "Database %s is not empty."; diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java index 2679aaa3722c..3ad479a68a37 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java @@ -202,11 +202,6 @@ public Map tableConfig() { return tableConfig; } - @VisibleForTesting - public Map catalogConfig() { - return catalogOptions.toMap(); - } - // ------------------------------------------------------------------------ // Flink run methods // ------------------------------------------------------------------------ diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java index 50dee75eba79..0b362d4fb3e7 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java @@ -203,11 +203,6 @@ public Map tableConfig() { return tableConfig; } - @VisibleForTesting - public Map catalogConfig() { - return catalogOptions.toMap(); - } - // ------------------------------------------------------------------------ // Flink run methods // ------------------------------------------------------------------------ diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java index dd23eaf2ae98..ea4d7a850a34 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java @@ -184,11 +184,6 @@ public Map tableConfig() { return tableConfig; } - @VisibleForTesting - public Map catalogConfig() { - return catalogOptions.toMap(); - } - // ------------------------------------------------------------------------ // Flink run methods // ------------------------------------------------------------------------ diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java index 401495a4ce6d..1c6b78ae3aa9 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java @@ -201,11 +201,6 @@ public Map tableConfig() { return tableConfig; } - @VisibleForTesting - public Map catalogConfig() { - return catalogOptions.toMap(); - } - // ------------------------------------------------------------------------ // Flink run methods // ------------------------------------------------------------------------ diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java index 16e324d80f06..29c8261f86e7 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java @@ -372,11 +372,6 @@ public List excludedTables() { return excludedTables; } - @VisibleForTesting - public Map catalogConfig() { - return catalogOptions.toMap(); - } - @VisibleForTesting public Map tableConfig() { return tableConfig; diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java index b3e13c26ba61..acbec8eba401 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java @@ -280,11 +280,6 @@ public Map tableConfig() { return tableConfig; } - @VisibleForTesting - public Map catalogConfig() { - return catalogOptions.toMap(); - } - // ------------------------------------------------------------------------ // Flink run methods // ------------------------------------------------------------------------ diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java index 6ee59e765dec..9c2687bd5b14 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink.action; +import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.flink.FlinkCatalog; import org.apache.paimon.flink.FlinkCatalogFactory; @@ -107,4 +108,9 @@ protected boolean compatibleCheck(List actualTypes, List exp return true; } + + @VisibleForTesting + public Map catalogConfig() { + return catalogOptions.toMap(); + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java index ab20f09ffcc5..cb8b9aabd178 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java @@ -23,8 +23,6 @@ import org.apache.paimon.flink.action.SortCompactAction; import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils; -import org.apache.flink.api.common.RuntimeExecutionMode; -import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.core.execution.JobClient; @@ -36,6 +34,7 @@ import java.util.List; import java.util.Map; +import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC; import static org.apache.paimon.flink.action.ActionFactory.parseCommaSeparatedKeyValues; /** @@ -122,12 +121,12 @@ public String[] call( ReadableConfig conf = StreamExecutionEnvironmentUtils.getConfiguration(env); String name = conf.getOptional(PipelineOptions.NAME).orElse(jobName); - if (conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING) { - JobClient jobClient = env.executeAsync(name); - return new String[] {"JobID=" + jobClient.getJobID()}; - } else { + if (conf.get(TABLE_DML_SYNC)) { env.execute(name); return new String[] {"Success"}; + } else { + JobClient jobClient = env.executeAsync(name); + return new String[] {"JobID=" + jobClient.getJobID()}; } } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java index 15a5a186a529..daf3a458c13e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink.procedure; +import org.apache.paimon.catalog.AbstractCatalog; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.flink.action.CompactActionFactory; @@ -28,6 +29,8 @@ import java.util.List; import java.util.Optional; +import static org.apache.paimon.utils.Preconditions.checkArgument; + /** Utility methods for {@link Procedure}. */ public class ProcedureUtil { @@ -44,9 +47,15 @@ public static List listProcedures() { } public static Optional getProcedure(Catalog catalog, String procedureName) { + checkArgument( + catalog instanceof AbstractCatalog, + "Currently, only Paimon built-in AbstractCatalog supports procedure."); + AbstractCatalog abstractCatalog = (AbstractCatalog) catalog; switch (procedureName) { case CompactActionFactory.IDENTIFIER: - return Optional.of(new CompactProcedure(catalog.warehouse(), catalog.options())); + return Optional.of( + new CompactProcedure( + abstractCatalog.warehouse(), abstractCatalog.options())); default: return Optional.empty(); } From aff327a118748e017280541fb42c4855d9e1da8d Mon Sep 17 00:00:00 2001 From: yuzelin <747884505@qq.com> Date: Mon, 18 Sep 2023 18:17:54 +0800 Subject: [PATCH 4/5] [fix] address comments --- .../flink/procedure/CompactProcedure.java | 18 +------ .../paimon/flink/procedure/ProcedureBase.java | 52 +++++++++++++++++++ .../paimon/flink/action/ActionITCaseBase.java | 5 +- .../flink/action/CompactActionITCase.java | 3 +- .../flink/action/SortCompactActionITCase.java | 3 +- 5 files changed, 62 insertions(+), 19 deletions(-) create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java index cb8b9aabd178..4ca38e8de5dc 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java @@ -21,20 +21,14 @@ import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.action.CompactAction; import org.apache.paimon.flink.action.SortCompactAction; -import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils; -import org.apache.flink.configuration.PipelineOptions; -import org.apache.flink.configuration.ReadableConfig; -import org.apache.flink.core.execution.JobClient; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.procedure.ProcedureContext; -import org.apache.flink.table.procedures.Procedure; import java.util.ArrayList; import java.util.List; import java.util.Map; -import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC; import static org.apache.paimon.flink.action.ActionFactory.parseCommaSeparatedKeyValues; /** @@ -52,7 +46,7 @@ * CALL compact('tableId', '', '', partition1, partition2, ...) * */ -public class CompactProcedure implements Procedure { +public class CompactProcedure extends ProcedureBase { private final String warehouse; private final Map catalogOptions; @@ -119,14 +113,6 @@ public String[] call( StreamExecutionEnvironment env = procedureContext.getExecutionEnvironment(); action.build(env); - ReadableConfig conf = StreamExecutionEnvironmentUtils.getConfiguration(env); - String name = conf.getOptional(PipelineOptions.NAME).orElse(jobName); - if (conf.get(TABLE_DML_SYNC)) { - env.execute(name); - return new String[] {"Success"}; - } else { - JobClient jobClient = env.executeAsync(name); - return new String[] {"JobID=" + jobClient.getJobID()}; - } + return execute(env, jobName); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java new file mode 100644 index 000000000000..061718bdf2da --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.procedure; + +import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils; + +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.procedures.Procedure; + +import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC; + +/** Base implementation for flink {@link Procedure}. */ +public class ProcedureBase implements Procedure { + + protected String[] execute(StreamExecutionEnvironment env, String defaultJobName) + throws Exception { + ReadableConfig conf = StreamExecutionEnvironmentUtils.getConfiguration(env); + String name = conf.getOptional(PipelineOptions.NAME).orElse(defaultJobName); + JobClient jobClient = env.executeAsync(name); + String jobId = jobClient.getJobID().toString(); + if (conf.get(TABLE_DML_SYNC)) { + try { + jobClient.getJobExecutionResult().get(); + } catch (Exception e) { + throw new TableException(String.format("Failed to wait job '%s' finish", jobId), e); + } + return new String[] {"Success"}; + } else { + return new String[] {"JobID=" + jobId}; + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java index bad590c2690f..0e71d076f654 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java @@ -44,6 +44,7 @@ import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.config.TableConfigOptions; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -157,7 +158,7 @@ protected StreamExecutionEnvironment buildDefaultEnv(boolean isStreaming) { return env; } - protected void callProcedure(String procedureStatement, boolean isStreaming) { + protected void callProcedure(String procedureStatement, boolean isStreaming, boolean dmlSync) { StreamExecutionEnvironment env = buildDefaultEnv(isStreaming); TableEnvironment tEnv; @@ -171,6 +172,8 @@ protected void callProcedure(String procedureStatement, boolean isStreaming) { tEnv = StreamTableEnvironment.create(env, EnvironmentSettings.inBatchMode()); } + tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, dmlSync); + tEnv.executeSql( String.format( "CREATE CATALOG PAIMON WITH ('type'='paimon', 'warehouse'='%s');", diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java index e4a3348da083..a05b7c2f64da 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java @@ -283,7 +283,8 @@ private void callProcedure(boolean isStreaming) { String.format( "CALL compact('%s.%s', '', '', '%s', '%s')", database, tableName, "dt=20221208,hh=15", "dt=20221209,hh=15"), - isStreaming); + isStreaming, + !isStreaming); } private List> getSpecifiedPartitions() { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionITCase.java index e3246706442a..97bfea0a1baf 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionITCase.java @@ -233,7 +233,8 @@ private void callProcedure(String orderStrategy, List orderByColumns) { String.format( "CALL compact('%s.%s', '%s', '%s')", database, tableName, orderStrategy, String.join(",", orderByColumns)), - false); + false, + true); } public void createTable() throws Exception { From 410681ad3ad5b4c3497cafe9f789059e592d9069 Mon Sep 17 00:00:00 2001 From: yuzelin <747884505@qq.com> Date: Mon, 18 Sep 2023 19:54:07 +0800 Subject: [PATCH 5/5] [improve] --- .../paimon/flink/procedure/CompactProcedure.java | 12 ++++++------ .../apache/paimon/flink/procedure/ProcedureBase.java | 7 +++++++ .../apache/paimon/flink/procedure/ProcedureUtil.java | 11 +---------- 3 files changed, 14 insertions(+), 16 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java index 4ca38e8de5dc..cb8aa316272b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java @@ -18,6 +18,8 @@ package org.apache.paimon.flink.procedure; +import org.apache.paimon.catalog.AbstractCatalog; +import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.action.CompactAction; import org.apache.paimon.flink.action.SortCompactAction; @@ -48,12 +50,8 @@ */ public class CompactProcedure extends ProcedureBase { - private final String warehouse; - private final Map catalogOptions; - - public CompactProcedure(String warehouse, Map catalogOptions) { - this.warehouse = warehouse; - this.catalogOptions = catalogOptions; + public CompactProcedure(Catalog catalog) { + super(catalog); } public String[] call(ProcedureContext procedureContext, String tableId) throws Exception { @@ -76,6 +74,8 @@ public String[] call( String orderByColumns, String... partitionStrings) throws Exception { + String warehouse = ((AbstractCatalog) catalog).warehouse(); + Map catalogOptions = ((AbstractCatalog) catalog).options(); Identifier identifier = Identifier.fromString(tableId); CompactAction action; String jobName; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java index 061718bdf2da..c44fe2701bbe 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink.procedure; +import org.apache.paimon.catalog.Catalog; import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils; import org.apache.flink.configuration.PipelineOptions; @@ -32,6 +33,12 @@ /** Base implementation for flink {@link Procedure}. */ public class ProcedureBase implements Procedure { + protected final Catalog catalog; + + ProcedureBase(Catalog catalog) { + this.catalog = catalog; + } + protected String[] execute(StreamExecutionEnvironment env, String defaultJobName) throws Exception { ReadableConfig conf = StreamExecutionEnvironmentUtils.getConfiguration(env); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java index daf3a458c13e..e90450afcd9b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java @@ -18,7 +18,6 @@ package org.apache.paimon.flink.procedure; -import org.apache.paimon.catalog.AbstractCatalog; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.flink.action.CompactActionFactory; @@ -29,8 +28,6 @@ import java.util.List; import java.util.Optional; -import static org.apache.paimon.utils.Preconditions.checkArgument; - /** Utility methods for {@link Procedure}. */ public class ProcedureUtil { @@ -47,15 +44,9 @@ public static List listProcedures() { } public static Optional getProcedure(Catalog catalog, String procedureName) { - checkArgument( - catalog instanceof AbstractCatalog, - "Currently, only Paimon built-in AbstractCatalog supports procedure."); - AbstractCatalog abstractCatalog = (AbstractCatalog) catalog; switch (procedureName) { case CompactActionFactory.IDENTIFIER: - return Optional.of( - new CompactProcedure( - abstractCatalog.warehouse(), abstractCatalog.options())); + return Optional.of(new CompactProcedure(catalog)); default: return Optional.empty(); }