From f31cd18f2a679c0b002a22b7222c00a256f93173 Mon Sep 17 00:00:00 2001 From: yuzelin <33053040+yuzelin@users.noreply.github.com> Date: Tue, 19 Sep 2023 10:11:14 +0800 Subject: [PATCH] [flink] Support compact procedure (#2013) --- .github/workflows/e2e-tests-1.16-jdk11.yml | 4 +- .../paimon/catalog/AbstractCatalog.java | 6 +- .../paimon/catalog/FileSystemCatalog.java | 2 +- paimon-flink/paimon-flink-1.18/pom.xml | 85 +++++++ .../cdc/kafka/KafkaSyncDatabaseAction.java | 5 - .../cdc/kafka/KafkaSyncTableAction.java | 5 - .../mongodb/MongoDBSyncDatabaseAction.java | 5 - .../cdc/mongodb/MongoDBSyncTableAction.java | 5 - .../cdc/mysql/MySqlSyncDatabaseAction.java | 5 - .../cdc/mysql/MySqlSyncTableAction.java | 5 - .../flink/action/cdc/CdcActionITCaseBase.java | 21 +- .../KafkaCanalSyncDatabaseActionITCase.java | 3 +- .../KafkaCanalSyncTableActionITCase.java | 3 +- .../MongoDBSyncDatabaseActionITCase.java | 3 +- .../mongodb/MongoDBSyncTableActionITCase.java | 3 +- .../mysql/MySqlSyncDatabaseActionITCase.java | 3 +- .../cdc/mysql/MySqlSyncTableActionITCase.java | 3 +- .../org/apache/paimon/flink/FlinkCatalog.java | 2 +- .../paimon/flink/action/ActionBase.java | 11 +- .../flink/action/CompactActionFactory.java | 23 +- .../flink/action/SortCompactAction.java | 14 +- .../paimon/flink/action/TableActionBase.java | 2 - .../flink/procedure/CompactProcedure.java | 118 ++++++++++ .../paimon/flink/procedure/ProcedureBase.java | 59 +++++ .../paimon/flink/procedure/ProcedureUtil.java | 19 +- .../paimon/flink/action/ActionITCaseBase.java | 65 +++++- .../flink/action/CompactActionITCase.java | 210 ++++++++---------- .../flink/action/CompactActionITCaseBase.java | 6 +- .../action/CompactDatabaseActionITCase.java | 47 ++-- .../flink/action/DeleteActionITCase.java | 4 +- .../action/DropPartitionActionITCase.java | 5 +- .../flink/action/RollbackToActionITCase.java | 2 - ...Case.java => SortCompactActionITCase.java} | 68 +++--- paimon-flink/pom.xml | 1 + .../org/apache/paimon/hive/HiveCatalog.java | 2 +- 35 files changed, 542 insertions(+), 282 deletions(-) create mode 100644 paimon-flink/paimon-flink-1.18/pom.xml create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java rename paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/{OrderRewriteActionITCase.java => SortCompactActionITCase.java} (86%) diff --git a/.github/workflows/e2e-tests-1.16-jdk11.yml b/.github/workflows/e2e-tests-1.16-jdk11.yml index 9d83302bd518..1ca1efdab291 100644 --- a/.github/workflows/e2e-tests-1.16-jdk11.yml +++ b/.github/workflows/e2e-tests-1.16-jdk11.yml @@ -51,7 +51,7 @@ jobs: . .github/workflows/utils.sh jvm_timezone=$(random_timezone) echo "JVM timezone is set to $jvm_timezone" - mvn -T 1C -B clean install -DskipTests - mvn -T 1C -B test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone + mvn -T 1C -B clean install -DskipTests -Pflink-1.16 + mvn -T 1C -B test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone -Pflink-1.16 env: MAVEN_OPTS: -Xmx4096m \ No newline at end of file diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index 6d212e908a81..29cd157470b1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -157,7 +157,11 @@ Map> allTablePaths() { } } - protected abstract String warehouse(); + public abstract String warehouse(); + + public Map options() { + return catalogOptions; + } protected abstract TableSchema getDataTableSchema(Identifier identifier) throws TableNotExistException; diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java index e8468cc1b222..a9dfb6e290eb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java @@ -231,7 +231,7 @@ private static String database(Path path) { public void close() throws Exception {} @Override - protected String warehouse() { + public String warehouse() { return warehouse.toString(); } } diff --git a/paimon-flink/paimon-flink-1.18/pom.xml b/paimon-flink/paimon-flink-1.18/pom.xml new file mode 100644 index 000000000000..2df59e0ea8f7 --- /dev/null +++ b/paimon-flink/paimon-flink-1.18/pom.xml @@ -0,0 +1,85 @@ + + + + 4.0.0 + + + org.apache.paimon + paimon-flink + 0.6-SNAPSHOT + + + jar + + paimon-flink-1.18 + Paimon : Flink : 1.18 + + + 1.18-SNAPSHOT + + + + + org.apache.paimon + paimon-flink-common + ${project.version} + + + + org.apache.paimon + paimon-flink-cdc + ${project.version} + + + * + * + + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + shade-paimon + package + + shade + + + + + org.apache.paimon:paimon-flink-common + org.apache.paimon:paimon-flink-cdc + + + + + + + + + diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java index 155b0b5ea1bc..3ad479a68a37 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java @@ -202,11 +202,6 @@ public Map tableConfig() { return tableConfig; } - @VisibleForTesting - public Map catalogConfig() { - return catalogConfig; - } - // ------------------------------------------------------------------------ // Flink run methods // ------------------------------------------------------------------------ diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java index d8669eba38bf..0b362d4fb3e7 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java @@ -203,11 +203,6 @@ public Map tableConfig() { return tableConfig; } - @VisibleForTesting - public Map catalogConfig() { - return catalogConfig; - } - // ------------------------------------------------------------------------ // Flink run methods // ------------------------------------------------------------------------ diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java index f6043824c3c9..ea4d7a850a34 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java @@ -184,11 +184,6 @@ public Map tableConfig() { return tableConfig; } - @VisibleForTesting - public Map catalogConfig() { - return catalogConfig; - } - // ------------------------------------------------------------------------ // Flink run methods // ------------------------------------------------------------------------ diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java index ed868a0ece3d..1c6b78ae3aa9 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java @@ -201,11 +201,6 @@ public Map tableConfig() { return tableConfig; } - @VisibleForTesting - public Map catalogConfig() { - return catalogConfig; - } - // ------------------------------------------------------------------------ // Flink run methods // ------------------------------------------------------------------------ diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java index 8fc1ad16e9a4..29c8261f86e7 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java @@ -372,11 +372,6 @@ public List excludedTables() { return excludedTables; } - @VisibleForTesting - public Map catalogConfig() { - return catalogConfig; - } - @VisibleForTesting public Map tableConfig() { return tableConfig; diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java index 3fca52255227..acbec8eba401 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java @@ -280,11 +280,6 @@ public Map tableConfig() { return tableConfig; } - @VisibleForTesting - public Map catalogConfig() { - return catalogConfig; - } - // ------------------------------------------------------------------------ // Flink run methods // ------------------------------------------------------------------------ diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java index b4f6dcf48064..f4e1a4577d43 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java @@ -18,7 +18,6 @@ package org.apache.paimon.flink.action.cdc; -import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.action.ActionBase; import org.apache.paimon.flink.action.ActionITCaseBase; import org.apache.paimon.table.FileStoreTable; @@ -28,7 +27,11 @@ import org.apache.paimon.types.RowType; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.core.execution.JobClient; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,9 +53,19 @@ public class CdcActionITCaseBase extends ActionITCaseBase { private static final Logger LOG = LoggerFactory.getLogger(CdcActionITCaseBase.class); - protected FileStoreTable getFileStoreTable(String tableName) throws Exception { - Identifier identifier = Identifier.create(database, tableName); - return (FileStoreTable) catalog.getTable(identifier); + protected StreamExecutionEnvironment env; + + @BeforeEach + public void setEnv() { + env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(2); + env.enableCheckpointing(1000); + env.setRestartStrategy(RestartStrategies.noRestart()); + } + + @AfterEach + public void closeEnv() throws Exception { + env.close(); } protected void waitingTables(String... tables) throws Exception { diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java index 30a50afe370e..79c01d474e7e 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java @@ -566,8 +566,7 @@ public void testCatalogAndTableConfig() { .withTableConfig(Collections.singletonMap("table-key", "table-value")) .build(); - assertThat(action.catalogConfig()) - .containsExactlyEntriesOf(Collections.singletonMap("catalog-key", "catalog-value")); + assertThat(action.catalogConfig()).containsEntry("catalog-key", "catalog-value"); assertThat(action.tableConfig()) .containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value")); } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java index 6b50274e9fad..d21dad37e071 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java @@ -953,8 +953,7 @@ public void testCatalogAndTableConfig() { .withTableConfig(Collections.singletonMap("table-key", "table-value")) .build(); - assertThat(action.catalogConfig()) - .containsExactlyEntriesOf(Collections.singletonMap("catalog-key", "catalog-value")); + assertThat(action.catalogConfig()).containsEntry("catalog-key", "catalog-value"); assertThat(action.tableConfig()) .containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value")); } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java index 0d93200bdf45..2a06b4fc41e7 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java @@ -126,8 +126,7 @@ public void testCatalogAndTableConfig() { .withTableConfig(Collections.singletonMap("table-key", "table-value")) .build(); - assertThat(action.catalogConfig()) - .containsExactlyEntriesOf(Collections.singletonMap("catalog-key", "catalog-value")); + assertThat(action.catalogConfig()).containsEntry("catalog-key", "catalog-value"); assertThat(action.tableConfig()) .containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value")); } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java index 75642b89b910..bd4d78f04dcb 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java @@ -158,8 +158,7 @@ public void testCatalogAndTableConfig() { .withTableConfig(Collections.singletonMap("table-key", "table-value")) .build(); - assertThat(action.catalogConfig()) - .containsExactlyEntriesOf(Collections.singletonMap("catalog-key", "catalog-value")); + assertThat(action.catalogConfig()).containsEntry("catalog-key", "catalog-value"); assertThat(action.tableConfig()) .containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value")); } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java index d41bfe6e7e80..8990a73b7914 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java @@ -1190,8 +1190,7 @@ public void testCatalogAndTableConfig() { .withTableConfig(Collections.singletonMap("table-key", "table-value")) .build(); - assertThat(action.catalogConfig()) - .containsExactlyEntriesOf(Collections.singletonMap("catalog-key", "catalog-value")); + assertThat(action.catalogConfig()).containsEntry("catalog-key", "catalog-value"); assertThat(action.tableConfig()) .containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value")); } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java index 4b3c6ef13c58..d0e725c1e4cb 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java @@ -851,8 +851,7 @@ public void testCatalogAndTableConfig() { .withTableConfig(Collections.singletonMap("table-key", "table-value")) .build(); - assertThat(action.catalogConfig()) - .containsExactlyEntriesOf(Collections.singletonMap("catalog-key", "catalog-value")); + assertThat(action.catalogConfig()).containsEntry("catalog-key", "catalog-value"); assertThat(action.tableConfig()) .containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value")); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java index 406969ea53e1..d150689358a7 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java @@ -891,7 +891,7 @@ public List listProcedures(String dbName) */ public Procedure getProcedure(ObjectPath procedurePath) throws ProcedureNotExistException, CatalogException { - return ProcedureUtil.getProcedure(procedurePath.getObjectName()) + return ProcedureUtil.getProcedure(catalog, procedurePath.getObjectName()) .orElseThrow(() -> new ProcedureNotExistException(name, procedurePath)); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java index 6a4477f4777f..9c2687bd5b14 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink.action; +import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.flink.FlinkCatalog; import org.apache.paimon.flink.FlinkCatalogFactory; @@ -43,9 +44,7 @@ /** Abstract base of {@link Action} for table. */ public abstract class ActionBase implements Action { - private final Options catalogOptions; - - protected final Map catalogConfig; + protected final Options catalogOptions; protected final Catalog catalog; protected final FlinkCatalog flinkCatalog; protected final String catalogName = "paimon-" + UUID.randomUUID(); @@ -53,7 +52,6 @@ public abstract class ActionBase implements Action { protected final StreamTableEnvironment batchTEnv; public ActionBase(String warehouse, Map catalogConfig) { - this.catalogConfig = catalogConfig; catalogOptions = Options.fromMap(catalogConfig); catalogOptions.set(CatalogOptions.WAREHOUSE, warehouse); @@ -110,4 +108,9 @@ protected boolean compatibleCheck(List actualTypes, List exp return true; } + + @VisibleForTesting + public Map catalogConfig() { + return catalogOptions.toMap(); + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java index b74bd8b2f714..cb53c5db70aa 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java @@ -21,7 +21,6 @@ import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.utils.MultipleParameterTool; -import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Optional; @@ -44,24 +43,10 @@ public Optional create(MultipleParameterTool params) { CompactAction action; if (params.has("order-strategy")) { - SortCompactAction sortCompactAction = - new SortCompactAction(tablePath.f0, tablePath.f1, tablePath.f2, catalogConfig); - - String strategy = params.get("order-strategy"); - sortCompactAction.withOrderStrategy(strategy); - - if (params.has("order-by")) { - String sqlOrderBy = params.get("order-by"); - if (sqlOrderBy == null) { - throw new IllegalArgumentException("Please specify \"order-by\"."); - } - sortCompactAction.withOrderColumns(Arrays.asList(sqlOrderBy.split(","))); - } else { - throw new IllegalArgumentException( - "Please specify order columns in parameter --order-by."); - } - - action = sortCompactAction; + action = + new SortCompactAction(tablePath.f0, tablePath.f1, tablePath.f2, catalogConfig) + .withOrderStrategy(params.get("order-strategy")) + .withOrderColumns(getRequiredValue(params, "order-by").split(",")); } else { action = new CompactAction(tablePath.f0, tablePath.f1, tablePath.f2, catalogConfig); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java index 16557a5089af..522aace0007d 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java @@ -38,10 +38,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -126,11 +128,17 @@ public void build(StreamExecutionEnvironment env) { flinkSinkBuilder.build(); } - public void withOrderStrategy(String sortStrategy) { + public SortCompactAction withOrderStrategy(String sortStrategy) { this.sortStrategy = sortStrategy; + return this; } - public void withOrderColumns(List orderColumns) { - this.orderColumns = orderColumns; + public SortCompactAction withOrderColumns(String... orderColumns) { + return withOrderColumns(Arrays.asList(orderColumns)); + } + + public SortCompactAction withOrderColumns(List orderColumns) { + this.orderColumns = orderColumns.stream().map(String::trim).collect(Collectors.toList()); + return this; } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java index 3b8147a3efdc..02b8e1943eca 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java @@ -56,8 +56,6 @@ public abstract class TableActionBase extends ActionBase { try { table = catalog.getTable(identifier); } catch (Catalog.TableNotExistException e) { - LOG.error("Table doesn't exist in given path.", e); - System.err.println("Table doesn't exist in given path."); throw new RuntimeException(e); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java new file mode 100644 index 000000000000..cb8aa316272b --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.procedure; + +import org.apache.paimon.catalog.AbstractCatalog; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.action.CompactAction; +import org.apache.paimon.flink.action.SortCompactAction; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.procedure.ProcedureContext; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.apache.paimon.flink.action.ActionFactory.parseCommaSeparatedKeyValues; + +/** + * Compact procedure. Usage: + * + *

+ *  -- compact a table (tableId should be 'database_name.table_name')
+ *  CALL compact('tableId')
+ *
+ *  -- compact a table with sorting
+ *  CALL compact('tableId', 'order-strategy', 'order-by-columns')
+ *
+ *  -- compact specific partitions ('pt1=A,pt2=a', 'pt1=B,pt2=b', ...)
+ *  -- NOTE: if you don't need sorting but you want specify partitions, use '' as placeholder
+ *  CALL compact('tableId', '', '', partition1, partition2, ...)
+ * 
+ */ +public class CompactProcedure extends ProcedureBase { + + public CompactProcedure(Catalog catalog) { + super(catalog); + } + + public String[] call(ProcedureContext procedureContext, String tableId) throws Exception { + return call(procedureContext, tableId, "", ""); + } + + public String[] call( + ProcedureContext procedureContext, + String tableId, + String orderStrategy, + String orderByColumns) + throws Exception { + return call(procedureContext, tableId, orderStrategy, orderByColumns, new String[0]); + } + + public String[] call( + ProcedureContext procedureContext, + String tableId, + String orderStrategy, + String orderByColumns, + String... partitionStrings) + throws Exception { + String warehouse = ((AbstractCatalog) catalog).warehouse(); + Map catalogOptions = ((AbstractCatalog) catalog).options(); + Identifier identifier = Identifier.fromString(tableId); + CompactAction action; + String jobName; + if (orderStrategy.isEmpty() && orderByColumns.isEmpty()) { + action = + new CompactAction( + warehouse, + identifier.getDatabaseName(), + identifier.getObjectName(), + catalogOptions); + jobName = "Compact Job"; + } else if (!orderStrategy.isEmpty() && !orderByColumns.isEmpty()) { + action = + new SortCompactAction( + warehouse, + identifier.getDatabaseName(), + identifier.getObjectName(), + catalogOptions) + .withOrderStrategy(orderStrategy) + .withOrderColumns(orderByColumns.split(",")); + jobName = "Sort Compact Job"; + } else { + throw new IllegalArgumentException( + "You must specify 'order strategy' and 'order by columns' both."); + } + + if (partitionStrings.length != 0) { + List> partitions = new ArrayList<>(); + for (String partition : partitionStrings) { + partitions.add(parseCommaSeparatedKeyValues(partition)); + } + action.withPartitions(partitions); + } + + StreamExecutionEnvironment env = procedureContext.getExecutionEnvironment(); + action.build(env); + + return execute(env, jobName); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java new file mode 100644 index 000000000000..c44fe2701bbe --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.procedure; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils; + +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.procedures.Procedure; + +import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC; + +/** Base implementation for flink {@link Procedure}. */ +public class ProcedureBase implements Procedure { + + protected final Catalog catalog; + + ProcedureBase(Catalog catalog) { + this.catalog = catalog; + } + + protected String[] execute(StreamExecutionEnvironment env, String defaultJobName) + throws Exception { + ReadableConfig conf = StreamExecutionEnvironmentUtils.getConfiguration(env); + String name = conf.getOptional(PipelineOptions.NAME).orElse(defaultJobName); + JobClient jobClient = env.executeAsync(name); + String jobId = jobClient.getJobID().toString(); + if (conf.get(TABLE_DML_SYNC)) { + try { + jobClient.getJobExecutionResult().get(); + } catch (Exception e) { + throw new TableException(String.format("Failed to wait job '%s' finish", jobId), e); + } + return new String[] {"Success"}; + } else { + return new String[] {"JobID=" + jobId}; + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java index 8e9a024352d1..e90450afcd9b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java @@ -18,13 +18,14 @@ package org.apache.paimon.flink.procedure; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.flink.action.CompactActionFactory; + import org.apache.flink.table.procedures.Procedure; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Optional; /** Utility methods for {@link Procedure}. */ @@ -33,13 +34,21 @@ public class ProcedureUtil { private ProcedureUtil() {} private static final List SYSTEM_PROCEDURES = new ArrayList<>(); - private static final Map SYSTEM_PROCEDURES_MAP = new HashMap<>(); + + static { + SYSTEM_PROCEDURES.add(CompactActionFactory.IDENTIFIER); + } public static List listProcedures() { return Collections.unmodifiableList(SYSTEM_PROCEDURES); } - public static Optional getProcedure(String procedureName) { - return Optional.ofNullable(SYSTEM_PROCEDURES_MAP.get(procedureName)); + public static Optional getProcedure(Catalog catalog, String procedureName) { + switch (procedureName) { + case CompactActionFactory.IDENTIFIER: + return Optional.of(new CompactProcedure(catalog)); + default: + return Optional.empty(); + } } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java index f3a1c5881b6c..0e71d076f654 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java @@ -35,14 +35,21 @@ import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.TableRead; import org.apache.paimon.types.RowType; -import org.apache.paimon.utils.SnapshotManager; +import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.config.TableConfigOptions; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -55,10 +62,8 @@ public abstract class ActionITCaseBase extends AbstractTestBase { protected String database; protected String tableName; protected String commitUser; - protected SnapshotManager snapshotManager; protected StreamTableWrite write; protected StreamTableCommit commit; - protected StreamExecutionEnvironment env; protected Catalog catalog; private long incrementalIdentifier; @@ -69,11 +74,6 @@ public void before() throws IOException { tableName = "test_table_" + UUID.randomUUID(); commitUser = UUID.randomUUID().toString(); incrementalIdentifier = 0; - env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); - env.setParallelism(2); - env.enableCheckpointing(1000); - env.setRestartStrategy(RestartStrategies.noRestart()); catalog = CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse))); } @@ -81,11 +81,12 @@ public void before() throws IOException { public void after() throws Exception { if (write != null) { write.close(); + write = null; } if (commit != null) { commit.close(); + commit = null; } - env.close(); catalog.close(); } @@ -114,6 +115,11 @@ protected FileStoreTable createFileStoreTable( return (FileStoreTable) catalog.getTable(identifier); } + protected FileStoreTable getFileStoreTable(String tableName) throws Exception { + Identifier identifier = Identifier.create(database, tableName); + return (FileStoreTable) catalog.getTable(identifier); + } + protected GenericRow rowData(Object... values) { return GenericRow.of(values); } @@ -135,4 +141,45 @@ protected List getResult(TableRead read, List splits, RowType row return result; } } + + protected StreamExecutionEnvironment buildDefaultEnv(boolean isStreaming) { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); + env.setParallelism(2); + + if (isStreaming) { + env.setRuntimeMode(RuntimeExecutionMode.STREAMING); + env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); + env.getCheckpointConfig().setCheckpointInterval(500); + } else { + env.setRuntimeMode(RuntimeExecutionMode.BATCH); + } + + return env; + } + + protected void callProcedure(String procedureStatement, boolean isStreaming, boolean dmlSync) { + StreamExecutionEnvironment env = buildDefaultEnv(isStreaming); + + TableEnvironment tEnv; + if (isStreaming) { + tEnv = StreamTableEnvironment.create(env, EnvironmentSettings.inStreamingMode()); + tEnv.getConfig() + .set( + ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, + Duration.ofMillis(500)); + } else { + tEnv = StreamTableEnvironment.create(env, EnvironmentSettings.inBatchMode()); + } + + tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, dmlSync); + + tEnv.executeSql( + String.format( + "CREATE CATALOG PAIMON WITH ('type'='paimon', 'warehouse'='%s');", + warehouse)); + tEnv.useCatalog("PAIMON"); + + tEnv.executeSql(procedureStatement); + } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java index d2e7e5c17226..a05b7c2f64da 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java @@ -21,8 +21,6 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; import org.apache.paimon.data.BinaryString; -import org.apache.paimon.flink.FlinkConnectorOptions; -import org.apache.paimon.operation.FileStoreScan; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.StreamWriteBuilder; import org.apache.paimon.table.source.DataSplit; @@ -32,10 +30,8 @@ import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.CommonTestUtils; +import org.apache.paimon.utils.SnapshotManager; -import org.apache.flink.api.common.RuntimeExecutionMode; -import org.apache.flink.core.execution.JobClient; -import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -62,20 +58,11 @@ public class CompactActionITCase extends CompactActionITCaseBase { @Test @Timeout(60) public void testBatchCompact() throws Exception { - Map options = new HashMap<>(); - options.put(CoreOptions.WRITE_ONLY.key(), "true"); - FileStoreTable table = - createFileStoreTable( - ROW_TYPE, + prepareTable( Arrays.asList("dt", "hh"), Arrays.asList("dt", "hh", "k"), - options); - snapshotManager = table.snapshotManager(); - StreamWriteBuilder streamWriteBuilder = - table.newStreamWriteBuilder().withCommitUser(commitUser); - write = streamWriteBuilder.newWrite(); - commit = streamWriteBuilder.newCommit(); + Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "true")); writeData( rowData(1, 100, 15, BinaryString.fromString("20221208")), @@ -87,21 +74,15 @@ public void testBatchCompact() throws Exception { rowData(2, 100, 16, BinaryString.fromString("20221208")), rowData(2, 100, 15, BinaryString.fromString("20221209"))); - Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId()); - assertThat(snapshot.id()).isEqualTo(2); - assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND); + checkLatestSnapshot(table, 2, Snapshot.CommitKind.APPEND); - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setRuntimeMode(RuntimeExecutionMode.BATCH); - env.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1); - new CompactAction(warehouse, database, tableName) - .withPartitions(getSpecifiedPartitions()) - .build(env); - env.execute(); + if (ThreadLocalRandom.current().nextBoolean()) { + runAction(false); + } else { + callProcedure(false); + } - snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId()); - assertThat(snapshot.id()).isEqualTo(3); - assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT); + checkLatestSnapshot(table, 3, Snapshot.CommitKind.COMPACT); List splits = table.newSnapshotReader().read().dataSplits(); assertThat(splits.size()).isEqualTo(3); @@ -118,28 +99,18 @@ public void testBatchCompact() throws Exception { @Test public void testStreamingCompact() throws Exception { - Map options = new HashMap<>(); - options.put(CoreOptions.CHANGELOG_PRODUCER.key(), "full-compaction"); - options.put( - FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL.key(), - "1s"); - options.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), "1s"); - options.put(CoreOptions.WRITE_ONLY.key(), "true"); + Map tableOptions = new HashMap<>(); + tableOptions.put(CoreOptions.CHANGELOG_PRODUCER.key(), "full-compaction"); + tableOptions.put(CoreOptions.FULL_COMPACTION_DELTA_COMMITS.key(), "1"); + tableOptions.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), "1s"); + tableOptions.put(CoreOptions.WRITE_ONLY.key(), "true"); // test that dedicated compact job will expire snapshots - options.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "3"); - options.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "3"); + tableOptions.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "3"); + tableOptions.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "3"); FileStoreTable table = - createFileStoreTable( - ROW_TYPE, - Arrays.asList("dt", "hh"), - Arrays.asList("dt", "hh", "k"), - options); - snapshotManager = table.snapshotManager(); - StreamWriteBuilder streamWriteBuilder = - table.newStreamWriteBuilder().withCommitUser(commitUser); - write = streamWriteBuilder.newWrite(); - commit = streamWriteBuilder.newCommit(); + prepareTable( + Arrays.asList("dt", "hh"), Arrays.asList("dt", "hh", "k"), tableOptions); // base records writeData( @@ -147,24 +118,18 @@ public void testStreamingCompact() throws Exception { rowData(1, 100, 16, BinaryString.fromString("20221208")), rowData(1, 100, 15, BinaryString.fromString("20221209"))); - Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId()); - assertThat(snapshot.id()).isEqualTo(1); - assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND); + checkLatestSnapshot(table, 1, Snapshot.CommitKind.APPEND); // no full compaction has happened, so plan should be empty StreamTableScan scan = table.newReadBuilder().newStreamScan(); TableScan.Plan plan = scan.plan(); assertThat(plan.splits()).isEmpty(); - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setRuntimeMode(RuntimeExecutionMode.STREAMING); - env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); - env.getCheckpointConfig().setCheckpointInterval(500); - env.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1); - new CompactAction(warehouse, database, tableName) - .withPartitions(getSpecifiedPartitions()) - .build(env); - JobClient client = env.executeAsync(); + if (ThreadLocalRandom.current().nextBoolean()) { + runAction(true); + } else { + callProcedure(true); + } // first full compaction validateResult( @@ -193,6 +158,7 @@ public void testStreamingCompact() throws Exception { 60_000); // assert dedicated compact job will expire snapshots + SnapshotManager snapshotManager = table.snapshotManager(); CommonTestUtils.waitUtil( () -> snapshotManager.latestSnapshotId() - 2 @@ -200,27 +166,18 @@ public void testStreamingCompact() throws Exception { Duration.ofSeconds(60_000), Duration.ofSeconds(100), String.format("Cannot validate snapshot expiration in %s milliseconds.", 60_000)); - - client.cancel(); } @Test public void testUnawareBucketStreamingCompact() throws Exception { - Map options = new HashMap<>(); - options.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), "1s"); - // test that dedicated compact job will expire snapshots - options.put(CoreOptions.BUCKET.key(), "-1"); - options.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2"); - options.put(CoreOptions.COMPACTION_MAX_FILE_NUM.key(), "2"); + Map tableOptions = new HashMap<>(); + tableOptions.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), "1s"); + tableOptions.put(CoreOptions.BUCKET.key(), "-1"); + tableOptions.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2"); + tableOptions.put(CoreOptions.COMPACTION_MAX_FILE_NUM.key(), "2"); FileStoreTable table = - createFileStoreTable( - ROW_TYPE, Collections.singletonList("k"), Collections.emptyList(), options); - snapshotManager = table.snapshotManager(); - StreamWriteBuilder streamWriteBuilder = - table.newStreamWriteBuilder().withCommitUser(commitUser); - write = streamWriteBuilder.newWrite(); - commit = streamWriteBuilder.newCommit(); + prepareTable(Collections.singletonList("k"), Collections.emptyList(), tableOptions); // base records writeData( @@ -233,21 +190,16 @@ public void testUnawareBucketStreamingCompact() throws Exception { rowData(1, 100, 16, BinaryString.fromString("20221208")), rowData(1, 100, 15, BinaryString.fromString("20221209"))); - Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId()); - assertThat(snapshot.id()).isEqualTo(2); - assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND); - - FileStoreScan storeScan = table.store().newScan(); - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setRuntimeMode(RuntimeExecutionMode.STREAMING); - env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); - env.getCheckpointConfig().setCheckpointInterval(500); - env.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1); - new CompactAction(warehouse, database, tableName).build(env); - JobClient client = env.executeAsync(); + checkLatestSnapshot(table, 2, Snapshot.CommitKind.APPEND); + + if (ThreadLocalRandom.current().nextBoolean()) { + runAction(true); + } else { + callProcedure(true); + } // first compaction, snapshot will be 3 - checkFileAndRowSize(storeScan, 3L, 30_000L, 1, 6); + checkFileAndRowSize(table, 3L, 30_000L, 1, 6); writeData( rowData(1, 101, 15, BinaryString.fromString("20221208")), @@ -255,27 +207,18 @@ public void testUnawareBucketStreamingCompact() throws Exception { rowData(1, 101, 15, BinaryString.fromString("20221209"))); // second compaction, snapshot will be 5 - checkFileAndRowSize(storeScan, 5L, 30_000L, 1, 9); - - client.cancel().get(); + checkFileAndRowSize(table, 5L, 30_000L, 1, 9); } @Test public void testUnawareBucketBatchCompact() throws Exception { - Map options = new HashMap<>(); - // test that dedicated compact job will expire snapshots - options.put(CoreOptions.BUCKET.key(), "-1"); - options.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2"); - options.put(CoreOptions.COMPACTION_MAX_FILE_NUM.key(), "2"); + Map tableOptions = new HashMap<>(); + tableOptions.put(CoreOptions.BUCKET.key(), "-1"); + tableOptions.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2"); + tableOptions.put(CoreOptions.COMPACTION_MAX_FILE_NUM.key(), "2"); FileStoreTable table = - createFileStoreTable( - ROW_TYPE, Collections.singletonList("k"), Collections.emptyList(), options); - snapshotManager = table.snapshotManager(); - StreamWriteBuilder streamWriteBuilder = - table.newStreamWriteBuilder().withCommitUser(commitUser); - write = streamWriteBuilder.newWrite(); - commit = streamWriteBuilder.newCommit(); + prepareTable(Collections.singletonList("k"), Collections.emptyList(), tableOptions); // base records writeData( @@ -288,19 +231,60 @@ public void testUnawareBucketBatchCompact() throws Exception { rowData(1, 100, 16, BinaryString.fromString("20221208")), rowData(1, 100, 15, BinaryString.fromString("20221209"))); - Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId()); - assertThat(snapshot.id()).isEqualTo(2); - assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND); + checkLatestSnapshot(table, 2, Snapshot.CommitKind.APPEND); - FileStoreScan storeScan = table.store().newScan(); - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setRuntimeMode(RuntimeExecutionMode.BATCH); - env.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1); - new CompactAction(warehouse, database, tableName).build(env); - env.execute(); + if (ThreadLocalRandom.current().nextBoolean()) { + runAction(false); + } else { + callProcedure(false); + } // first compaction, snapshot will be 3. - checkFileAndRowSize(storeScan, 3L, 0L, 1, 6); + checkFileAndRowSize(table, 3L, 0L, 1, 6); + } + + private FileStoreTable prepareTable( + List partitionKeys, List primaryKeys, Map tableOptions) + throws Exception { + FileStoreTable table = + createFileStoreTable(ROW_TYPE, partitionKeys, primaryKeys, tableOptions); + + StreamWriteBuilder streamWriteBuilder = + table.newStreamWriteBuilder().withCommitUser(commitUser); + write = streamWriteBuilder.newWrite(); + commit = streamWriteBuilder.newCommit(); + + return table; + } + + private void checkLatestSnapshot( + FileStoreTable table, long snapshotId, Snapshot.CommitKind commitKind) { + SnapshotManager snapshotManager = table.snapshotManager(); + Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId()); + assertThat(snapshot.id()).isEqualTo(snapshotId); + assertThat(snapshot.commitKind()).isEqualTo(commitKind); + } + + private void runAction(boolean isStreaming) throws Exception { + StreamExecutionEnvironment env = buildDefaultEnv(isStreaming); + + new CompactAction(warehouse, database, tableName) + .withPartitions(getSpecifiedPartitions()) + .build(env); + if (isStreaming) { + env.executeAsync(); + } else { + env.execute(); + } + } + + private void callProcedure(boolean isStreaming) { + callProcedure( + String.format( + "CALL compact('%s.%s', '', '', '%s', '%s')", + database, tableName, "dt=20221208,hh=15", "dt=20221209,hh=15"), + isStreaming, + !isStreaming); } private List> getSpecifiedPartitions() { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCaseBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCaseBase.java index 0f2ada0a959e..4c646444cb72 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCaseBase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCaseBase.java @@ -25,6 +25,7 @@ import org.apache.paimon.table.source.StreamTableScan; import org.apache.paimon.table.source.TableScan; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.SnapshotManager; import java.util.ArrayList; import java.util.List; @@ -35,6 +36,7 @@ /** Base IT cases for {@link CompactAction} and {@link CompactDatabaseAction} . */ public class CompactActionITCaseBase extends ActionITCaseBase { + protected void validateResult( FileStoreTable table, RowType rowType, @@ -63,8 +65,10 @@ protected void validateResult( } protected void checkFileAndRowSize( - FileStoreScan scan, Long expectedSnapshotId, Long timeout, int fileNum, long rowCount) + FileStoreTable table, Long expectedSnapshotId, Long timeout, int fileNum, long rowCount) throws Exception { + SnapshotManager snapshotManager = table.snapshotManager(); + FileStoreScan scan = table.store().newScan(); long start = System.currentTimeMillis(); while (!Objects.equals(snapshotManager.latestSnapshotId(), expectedSnapshotId)) { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java index 2c112f2909b2..ce2e72641426 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java @@ -24,7 +24,6 @@ import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; import org.apache.paimon.flink.FlinkConnectorOptions; -import org.apache.paimon.operation.FileStoreScan; import org.apache.paimon.schema.Schema; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.StreamTableCommit; @@ -37,6 +36,7 @@ import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.CommonTestUtils; +import org.apache.paimon.utils.SnapshotManager; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.core.execution.JobClient; @@ -75,17 +75,15 @@ public class CompactDatabaseActionITCase extends CompactActionITCaseBase { private FileStoreTable createTable( String databaseName, String tableName, - RowType rowType, List partitionKeys, List primaryKeys, Map options) throws Exception { - Identifier identifier = Identifier.create(databaseName, tableName); catalog.createDatabase(databaseName, true); catalog.createTable( identifier, - new Schema(rowType.getFields(), partitionKeys, primaryKeys, options, ""), + new Schema(ROW_TYPE.getFields(), partitionKeys, primaryKeys, options, ""), false); return (FileStoreTable) catalog.getTable(identifier); } @@ -107,12 +105,11 @@ public void testBatchCompact(String mode) throws Exception { createTable( dbName, tableName, - ROW_TYPE, Arrays.asList("dt", "hh"), Arrays.asList("dt", "hh", "k"), options); tables.add(table); - snapshotManager = table.snapshotManager(); + SnapshotManager snapshotManager = table.snapshotManager(); StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser); write = streamWriteBuilder.newWrite(); @@ -156,7 +153,7 @@ public void testBatchCompact(String mode) throws Exception { env.execute(); for (FileStoreTable table : tables) { - snapshotManager = table.snapshotManager(); + SnapshotManager snapshotManager = table.snapshotManager(); Snapshot snapshot = table.snapshotManager().snapshot(snapshotManager.latestSnapshotId()); assertThat(snapshot.id()).isEqualTo(3); @@ -196,12 +193,11 @@ public void testStreamingCompact(String mode) throws Exception { createTable( dbName, tableName, - ROW_TYPE, Arrays.asList("dt", "hh"), Arrays.asList("dt", "hh", "k"), options); tables.add(table); - snapshotManager = table.snapshotManager(); + SnapshotManager snapshotManager = table.snapshotManager(); StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser); write = streamWriteBuilder.newWrite(); @@ -259,7 +255,7 @@ public void testStreamingCompact(String mode) throws Exception { "+I[1, 100, 16, 20221208]"), 60_000); - snapshotManager = table.snapshotManager(); + SnapshotManager snapshotManager = table.snapshotManager(); StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser); write = streamWriteBuilder.newWrite(); @@ -306,12 +302,11 @@ public void testStreamingCompact(String mode) throws Exception { createTable( dbName, tableName, - ROW_TYPE, Arrays.asList("dt", "hh"), Arrays.asList("dt", "hh", "k"), options); newtables.add(table); - snapshotManager = table.snapshotManager(); + SnapshotManager snapshotManager = table.snapshotManager(); StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser); write = streamWriteBuilder.newWrite(); @@ -346,7 +341,7 @@ public void testStreamingCompact(String mode) throws Exception { "+I[1, 100, 16, 20221208]"), 60_000); - snapshotManager = table.snapshotManager(); + SnapshotManager snapshotManager = table.snapshotManager(); StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser); write = streamWriteBuilder.newWrite(); @@ -454,7 +449,6 @@ private void includingAndExcludingTablesImpl( createTable( dbName, tableName, - ROW_TYPE, Arrays.asList("dt", "hh"), Arrays.asList("dt", "hh", "k"), options); @@ -464,7 +458,7 @@ private void includingAndExcludingTablesImpl( noCompactionTables.add(table); } - snapshotManager = table.snapshotManager(); + SnapshotManager snapshotManager = table.snapshotManager(); StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser); write = streamWriteBuilder.newWrite(); @@ -508,7 +502,7 @@ private void includingAndExcludingTablesImpl( env.execute(); for (FileStoreTable table : compactionTables) { - snapshotManager = table.snapshotManager(); + SnapshotManager snapshotManager = table.snapshotManager(); Snapshot snapshot = table.snapshotManager().snapshot(snapshotManager.latestSnapshotId()); @@ -523,7 +517,7 @@ private void includingAndExcludingTablesImpl( } for (FileStoreTable table : noCompactionTables) { - snapshotManager = table.snapshotManager(); + SnapshotManager snapshotManager = table.snapshotManager(); Snapshot snapshot = table.snapshotManager().snapshot(snapshotManager.latestSnapshotId()); @@ -553,12 +547,11 @@ public void testUnawareBucketStreamingCompact() throws Exception { createTable( database, tableName, - ROW_TYPE, - Arrays.asList("k"), + Collections.singletonList("k"), Collections.emptyList(), options); tables.add(table); - snapshotManager = table.snapshotManager(); + SnapshotManager snapshotManager = table.snapshotManager(); StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser); write = streamWriteBuilder.newWrite(); @@ -593,16 +586,13 @@ public void testUnawareBucketStreamingCompact() throws Exception { JobClient client = env.executeAsync(); for (FileStoreTable table : tables) { - FileStoreScan storeScan = table.store().newScan(); - - snapshotManager = table.snapshotManager(); StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser); write = streamWriteBuilder.newWrite(); commit = streamWriteBuilder.newCommit(); // first compaction, snapshot will be 3 - checkFileAndRowSize(storeScan, 3L, 30_000L, 1, 6); + checkFileAndRowSize(table, 3L, 30_000L, 1, 6); writeData( rowData(1, 101, 15, BinaryString.fromString("20221208")), @@ -610,7 +600,7 @@ public void testUnawareBucketStreamingCompact() throws Exception { rowData(1, 101, 15, BinaryString.fromString("20221209"))); // second compaction, snapshot will be 5 - checkFileAndRowSize(storeScan, 5L, 30_000L, 1, 9); + checkFileAndRowSize(table, 5L, 30_000L, 1, 9); } client.cancel().get(); @@ -630,12 +620,11 @@ public void testUnawareBucketBatchCompact() throws Exception { createTable( database, tableName, - ROW_TYPE, Collections.singletonList("k"), Collections.emptyList(), options); tables.add(table); - snapshotManager = table.snapshotManager(); + SnapshotManager snapshotManager = table.snapshotManager(); StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser); write = streamWriteBuilder.newWrite(); @@ -668,10 +657,8 @@ public void testUnawareBucketBatchCompact() throws Exception { env.execute(); for (FileStoreTable table : tables) { - FileStoreScan storeScan = table.store().newScan(); - snapshotManager = table.snapshotManager(); // first compaction, snapshot will be 3. - checkFileAndRowSize(storeScan, 3L, 0L, 1, 6); + checkFileAndRowSize(table, 3L, 0L, 1, 6); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java index ab20863e85d5..b250c80e2d8a 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java @@ -28,6 +28,7 @@ import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.BlockingIterator; +import org.apache.paimon.utils.SnapshotManager; import org.apache.flink.types.Row; import org.junit.jupiter.api.BeforeEach; @@ -79,6 +80,7 @@ public void testDeleteAction(boolean hasPk, List initialRecords, List action.run(); + SnapshotManager snapshotManager = getFileStoreTable(tableName).snapshotManager(); Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId()); assertThat(snapshot.id()).isEqualTo(2); assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND); @@ -156,7 +158,7 @@ private void prepareTable(boolean hasPk) throws Exception { Collections.emptyList(), hasPk ? Collections.singletonList("k") : Collections.emptyList(), options); - snapshotManager = table.snapshotManager(); + SnapshotManager snapshotManager = table.snapshotManager(); StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser); write = streamWriteBuilder.newWrite(); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java index c5c4c941b933..e477adfed553 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java @@ -27,6 +27,7 @@ import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.SnapshotManager; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -61,6 +62,7 @@ public void testDropPartitionWithSinglePartitionKey(boolean hasPk) throws Except Collections.emptyMap()) .run(); + SnapshotManager snapshotManager = getFileStoreTable(tableName).snapshotManager(); Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId()); assertThat(snapshot.id()).isEqualTo(5); assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.OVERWRITE); @@ -112,6 +114,7 @@ public void testDropPartitionWithMultiplePartitionKey(boolean hasPk) throws Exce Collections.emptyMap()) .run(); + SnapshotManager snapshotManager = getFileStoreTable(tableName).snapshotManager(); Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId()); assertThat(snapshot.id()).isEqualTo(5); assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.OVERWRITE); @@ -153,7 +156,7 @@ private FileStoreTable prepareTable(boolean hasPk) throws Exception { ? Arrays.asList("partKey0", "partKey1", "dt") : Collections.emptyList(), new HashMap<>()); - snapshotManager = table.snapshotManager(); + SnapshotManager snapshotManager = table.snapshotManager(); StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser); write = streamWriteBuilder.newWrite(); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java index 38555d47d53b..97c8637a5ba3 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java @@ -56,7 +56,6 @@ public void rollbackToSnapshotTest() throws Exception { Collections.emptyList(), Collections.singletonList("k"), Collections.emptyMap()); - snapshotManager = table.snapshotManager(); StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser); write = writeBuilder.newWrite(); commit = writeBuilder.newCommit(); @@ -83,7 +82,6 @@ public void rollbackToTagTest() throws Exception { Collections.emptyList(), Collections.singletonList("k"), Collections.emptyMap()); - snapshotManager = table.snapshotManager(); StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser); write = writeBuilder.newWrite(); commit = writeBuilder.newCommit(); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/OrderRewriteActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionITCase.java similarity index 86% rename from paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/OrderRewriteActionITCase.java rename to paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionITCase.java index a4e2424f0cb3..97bfea0a1baf 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/OrderRewriteActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionITCase.java @@ -18,19 +18,13 @@ package org.apache.paimon.flink.action; -import org.apache.paimon.catalog.Catalog; -import org.apache.paimon.catalog.CatalogContext; -import org.apache.paimon.catalog.CatalogFactory; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.Decimal; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.Timestamp; -import org.apache.paimon.fs.Path; import org.apache.paimon.manifest.ManifestEntry; -import org.apache.paimon.options.CatalogOptions; -import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.schema.Schema; @@ -44,7 +38,6 @@ import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; import java.util.ArrayList; import java.util.Arrays; @@ -54,13 +47,10 @@ import java.util.concurrent.atomic.AtomicInteger; /** Order Rewrite Action tests for {@link SortCompactAction}. */ -public class OrderRewriteActionITCase extends ActionITCaseBase { +public class SortCompactActionITCase extends ActionITCaseBase { private static final Random random = new Random(); - private Catalog catalog; - @TempDir private java.nio.file.Path path; - private void prepareData(int size, int loop) throws Exception { createTable(); List commitMessages = new ArrayList<>(); @@ -217,45 +207,43 @@ public void testCompareZorderAndOrder() throws Exception { } private void zorder(List columns) throws Exception { - SortCompactAction sortCompactAction = - new SortCompactAction( - new Path(path.toUri()).toUri().toString(), - "my_db", - "Orders1", - Collections.emptyMap()); - sortCompactAction.withOrderStrategy("zorder"); - sortCompactAction.withOrderColumns(columns); - sortCompactAction.run(); + if (random.nextBoolean()) { + new SortCompactAction(warehouse, database, tableName, Collections.emptyMap()) + .withOrderStrategy("zorder") + .withOrderColumns(columns) + .run(); + } else { + callProcedure("zorder", columns); + } } private void order(List columns) throws Exception { - SortCompactAction sortCompactAction = - new SortCompactAction( - new Path(path.toUri()).toUri().toString(), - "my_db", - "Orders1", - Collections.emptyMap()); - sortCompactAction.withOrderStrategy("order"); - sortCompactAction.withOrderColumns(columns); - sortCompactAction.run(); + if (random.nextBoolean()) { + new SortCompactAction(warehouse, database, tableName, Collections.emptyMap()) + .withOrderStrategy("order") + .withOrderColumns(columns) + .run(); + } else { + callProcedure("order", columns); + } } - public Catalog getCatalog() { - if (catalog == null) { - Options options = new Options(); - options.set(CatalogOptions.WAREHOUSE, new Path(path.toUri()).toUri().toString()); - catalog = CatalogFactory.createCatalog(CatalogContext.create(options)); - } - return catalog; + private void callProcedure(String orderStrategy, List orderByColumns) { + callProcedure( + String.format( + "CALL compact('%s.%s', '%s', '%s')", + database, tableName, orderStrategy, String.join(",", orderByColumns)), + false, + true); } public void createTable() throws Exception { - getCatalog().createDatabase("my_db", true); - getCatalog().createTable(identifier(), schema(), true); + catalog.createDatabase(database, true); + catalog.createTable(identifier(), schema(), true); } public Identifier identifier() { - return Identifier.create("my_db", "Orders1"); + return Identifier.create(database, tableName); } private void commit(List messages) throws Exception { @@ -299,7 +287,7 @@ private List writeData(int size) throws Exception { } public Table getTable() throws Exception { - return getCatalog().getTable(identifier()); + return catalog.getTable(identifier()); } private static List writeOnce(Table table, int p, int size) throws Exception { diff --git a/paimon-flink/pom.xml b/paimon-flink/pom.xml index e92bb526b199..8daa819ee26c 100644 --- a/paimon-flink/pom.xml +++ b/paimon-flink/pom.xml @@ -39,6 +39,7 @@ under the License. paimon-flink-1.15 paimon-flink-1.16 paimon-flink-1.17 + paimon-flink-1.18 paimon-flink-action paimon-flink-cdc diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index e075d7e2130c..8893960eaf57 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -441,7 +441,7 @@ public void close() throws Exception { } @Override - protected String warehouse() { + public String warehouse() { return warehouse; }