From 8735d54b11c06a54f9071861f41058b7b63fdde8 Mon Sep 17 00:00:00 2001 From: LinMingQiang <1356469429@qq.com> Date: Wed, 27 Nov 2024 12:30:39 +0800 Subject: [PATCH] [flink] fix & doc --- docs/content/flink/procedures.md | 15 ++-- docs/content/spark/procedures.md | 4 +- .../procedure/CompactDatabaseProcedure.java | 13 +++- .../procedure/CompactDatabaseProcedure.java | 13 +++- .../flink/procedure/CompactProcedure.java | 12 ++-- .../spark/procedure/CompactProcedure.java | 4 +- .../paimon/spark/PaimonSparkTestBase.scala | 1 - .../procedure/CompactProcedureTestBase.scala | 72 +++++++++---------- 8 files changed, 79 insertions(+), 55 deletions(-) diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md index 7e669a89d43b..59b02f82bf8c 100644 --- a/docs/content/flink/procedures.md +++ b/docs/content/flink/procedures.md @@ -67,7 +67,8 @@ All available procedures are listed below. order_by => 'order_by', options => 'options', `where` => 'where', - partition_idle_time => 'partition_idle_time')

+ partition_idle_time => 'partition_idle_time', + compact_strategy => 'compact_strategy')

-- Use indexed argument
CALL [catalog.]sys.compact('table')

CALL [catalog.]sys.compact('table', 'partitions')

@@ -76,6 +77,7 @@ All available procedures are listed below. CALL [catalog.]sys.compact('table', 'partitions', 'order_strategy', 'order_by', 'options')

CALL [catalog.]sys.compact('table', 'partitions', 'order_strategy', 'order_by', 'options', 'where')

CALL [catalog.]sys.compact('table', 'partitions', 'order_strategy', 'order_by', 'options', 'where', 'partition_idle_time')

+ CALL [catalog.]sys.compact('table', 'partitions', 'order_strategy', 'order_by', 'options', 'where', 'partition_idle_time', 'compact_strategy')

To compact a table. Arguments: @@ -86,6 +88,7 @@ All available procedures are listed below.
  • options(optional): additional dynamic options of the table.
  • where(optional): partition predicate(Can't be used together with "partitions"). Note: as where is a keyword,a pair of backticks need to add around like `where`.
  • partition_idle_time(optional): this is used to do a full compaction for partition which had not received any new data for 'partition_idle_time'. And only these partitions will be compacted. This argument can not be used with order compact.
  • +
  • compact_strategy(optional): this determines how to pick files to be merged, the default is determined by the runtime execution mode. 'full' strategy only supports batch mode. All files will be selected for merging. 'minor' strategy: Pick the set of files that need to be merged based on specified conditions.
  • -- use partition filter
    @@ -104,7 +107,8 @@ All available procedures are listed below. including_tables => 'includingTables', excluding_tables => 'excludingTables', table_options => 'tableOptions', - partition_idle_time => 'partitionIdleTime')

    + partition_idle_time => 'partitionIdleTime', + compact_strategy => 'compact_strategy')

    -- Use indexed argument
    CALL [catalog.]sys.compact_database()

    CALL [catalog.]sys.compact_database('includingDatabases')

    @@ -112,7 +116,8 @@ All available procedures are listed below. CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables')

    CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables')

    CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables', 'tableOptions')

    - CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables', 'tableOptions', 'partitionIdleTime') + CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables', 'tableOptions', 'partitionIdleTime')

    + CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables', 'tableOptions', 'partitionIdleTime', 'compact_strategy')

    To compact databases. Arguments: @@ -124,6 +129,7 @@ All available procedures are listed below.
  • excludingTables: to specify tables that are not compacted. You can use regular expression.
  • tableOptions: additional dynamic options of the table.
  • partition_idle_time: this is used to do a full compaction for partition which had not received any new data for 'partition_idle_time'. And only these partitions will be compacted.
  • +
  • compact_strategy(optional): this determines how to pick files to be merged, the default is determined by the runtime execution mode. 'full' strategy only supports batch mode. All files will be selected for merging. 'minor' strategy: Pick the set of files that need to be merged based on specified conditions.
  • CALL sys.compact_database( @@ -131,7 +137,8 @@ All available procedures are listed below. mode => 'combined', including_tables => 'table_.*', excluding_tables => 'ignore', - table_options => 'sink.parallelism=4') + table_options => 'sink.parallelism=4', + compat_strategy => 'full') diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md index 1f3f554106ec..88d46fabbb2b 100644 --- a/docs/content/spark/procedures.md +++ b/docs/content/spark/procedures.md @@ -47,12 +47,14 @@ This section introduce all available spark procedures about paimon.
  • order_strategy: 'order' or 'zorder' or 'hilbert' or 'none'. Left empty for 'none'.
  • order_columns: the columns need to be sort. Left empty if 'order_strategy' is 'none'.
  • partition_idle_time: this is used to do a full compaction for partition which had not received any new data for 'partition_idle_time'. And only these partitions will be compacted. This argument can not be used with order compact.
  • +
  • compact_strategy: this determines how to pick files to be merged, the default is determined by the runtime execution mode. 'full' strategy only supports batch mode. All files will be selected for merging. 'minor' strategy: Pick the set of files that need to be merged based on specified conditions.
  • SET spark.sql.shuffle.partitions=10; --set the compact parallelism

    CALL sys.compact(table => 'T', partitions => 'p=0;p=1', order_strategy => 'zorder', order_by => 'a,b')

    CALL sys.compact(table => 'T', where => 'p>0 and p<3', order_strategy => 'zorder', order_by => 'a,b')

    - CALL sys.compact(table => 'T', partition_idle_time => '60s') + CALL sys.compact(table => 'T', partition_idle_time => '60s')

    + CALL sys.compact(table => 'T', compact_strategy => 'minor')

    diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java index 99f205bacb58..ac4340c11336 100644 --- a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java +++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java @@ -26,6 +26,8 @@ import java.util.Map; +import static org.apache.paimon.flink.action.ActionFactory.FULL; +import static org.apache.paimon.flink.action.CompactActionFactory.checkCompactStrategy; import static org.apache.paimon.utils.ParameterUtils.parseCommaSeparatedKeyValues; /** @@ -51,6 +53,7 @@ * * -- set table options ('k=v,...') * CALL sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables', 'tableOptions') + * * */ public class CompactDatabaseProcedure extends ProcedureBase { @@ -106,7 +109,8 @@ public String[] call( includingTables, excludingTables, tableOptions, - ""); + "", + null); } public String[] call( @@ -116,7 +120,8 @@ public String[] call( String includingTables, String excludingTables, String tableOptions, - String partitionIdleTime) + String partitionIdleTime, + String compactStrategy) throws Exception { String warehouse = catalog.warehouse(); Map catalogOptions = catalog.options(); @@ -133,6 +138,10 @@ public String[] call( action.withPartitionIdleTime(TimeUtils.parseDuration(partitionIdleTime)); } + if (checkCompactStrategy(compactStrategy)) { + action.withFullCompaction(compactStrategy.trim().equalsIgnoreCase(FULL)); + } + return execute(procedureContext, action, "Compact database job"); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java index dd71e974c7b1..80602b755aa5 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java @@ -29,6 +29,8 @@ import java.util.Map; +import static org.apache.paimon.flink.action.ActionFactory.FULL; +import static org.apache.paimon.flink.action.CompactActionFactory.checkCompactStrategy; import static org.apache.paimon.utils.ParameterUtils.parseCommaSeparatedKeyValues; /** @@ -82,6 +84,10 @@ public class CompactDatabaseProcedure extends ProcedureBase { @ArgumentHint( name = "partition_idle_time", type = @DataTypeHint("STRING"), + isOptional = true), + @ArgumentHint( + name = "compact_strategy", + type = @DataTypeHint("STRING"), isOptional = true) }) public String[] call( @@ -91,7 +97,8 @@ public String[] call( String includingTables, String excludingTables, String tableOptions, - String partitionIdleTime) + String partitionIdleTime, + String compactStrategy) throws Exception { partitionIdleTime = notnull(partitionIdleTime); String warehouse = catalog.warehouse(); @@ -109,6 +116,10 @@ public String[] call( action.withPartitionIdleTime(TimeUtils.parseDuration(partitionIdleTime)); } + if (checkCompactStrategy(compactStrategy)) { + action.withFullCompaction(compactStrategy.trim().equalsIgnoreCase(FULL)); + } + return execute(procedureContext, action, "Compact database job"); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java index 70f66bcc2218..282f5af34043 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java @@ -50,10 +50,6 @@ public class CompactProcedure extends ProcedureBase { name = "partitions", type = @DataTypeHint("STRING"), isOptional = true), - @ArgumentHint( - name = "compact_strategy", - type = @DataTypeHint("STRING"), - isOptional = true), @ArgumentHint( name = "order_strategy", type = @DataTypeHint("STRING"), @@ -64,18 +60,22 @@ public class CompactProcedure extends ProcedureBase { @ArgumentHint( name = "partition_idle_time", type = @DataTypeHint("STRING"), + isOptional = true), + @ArgumentHint( + name = "compact_strategy", + type = @DataTypeHint("STRING"), isOptional = true) }) public String[] call( ProcedureContext procedureContext, String tableId, String partitions, - String compactStrategy, String orderStrategy, String orderByColumns, String tableOptions, String where, - String partitionIdleTime) + String partitionIdleTime, + String compactStrategy) throws Exception { String warehouse = catalog.warehouse(); Map catalogOptions = catalog.options(); diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java index 255ca0eda11e..4a43e39c31ba 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java @@ -121,8 +121,8 @@ public class CompactProcedure extends BaseProcedure { new StructField("result", DataTypes.BooleanType, true, Metadata.empty()) }); - private final String MINOR = "minor"; - private final String FULL = "full"; + private static final String MINOR = "minor"; + private static final String FULL = "full"; protected CompactProcedure(TableCatalog tableCatalog) { super(tableCatalog); diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala index 1f2adfef256d..605b2e6ca5f2 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala @@ -64,7 +64,6 @@ class PaimonSparkTestBase "org.apache.spark.serializer.JavaSerializer" } super.sparkConf - .set("spark.driver.bindAddress", "127.0.0.1") .set("spark.sql.catalog.paimon", classOf[SparkCatalog].getName) .set("spark.sql.catalog.paimon.warehouse", tempDBDir.getCanonicalPath) .set("spark.sql.catalog.paimon.cache-enabled", "false") diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala index 858dd47465d5..31f78f61c20d 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala @@ -42,53 +42,49 @@ abstract class CompactProcedureTestBase extends PaimonSparkTestBase with StreamT // ----------------------- Minor Compact ----------------------- test("Paimon Procedure: compact aware bucket pk table with minor compact strategy") { - Seq(1, -1).foreach( - bucket => { - withTable("T") { - spark.sql( - s""" - |CREATE TABLE T (id INT, value STRING, pt STRING) - |TBLPROPERTIES ('primary-key'='id, pt', 'bucket'='$bucket', 'write-only'='true') - |PARTITIONED BY (pt) - |""".stripMargin) + withTable("T") { + spark.sql(s""" + |CREATE TABLE T (id INT, value STRING, pt STRING) + |TBLPROPERTIES ('primary-key'='id, pt', 'bucket'='1', 'write-only'='true') + |PARTITIONED BY (pt) + |""".stripMargin) - val table = loadTable("T") + val table = loadTable("T") - spark.sql(s"INSERT INTO T VALUES (1, 'a', 'p1'), (2, 'b', 'p2')") - spark.sql(s"INSERT INTO T VALUES (3, 'c', 'p1'), (4, 'd', 'p2')") + spark.sql(s"INSERT INTO T VALUES (1, 'a', 'p1'), (2, 'b', 'p2')") + spark.sql(s"INSERT INTO T VALUES (3, 'c', 'p1'), (4, 'd', 'p2')") - Assertions.assertThat(lastSnapshotCommand(table).equals(CommitKind.APPEND)).isTrue - Assertions.assertThat(lastSnapshotId(table)).isEqualTo(2) + Assertions.assertThat(lastSnapshotCommand(table).equals(CommitKind.APPEND)).isTrue + Assertions.assertThat(lastSnapshotId(table)).isEqualTo(2) - spark.sql( - "CALL sys.compact(table => 'T', compact_strategy => 'full'," + - "options => 'num-sorted-run.compaction-trigger=3')") + spark.sql( + "CALL sys.compact(table => 'T', compact_strategy => 'minor'," + + "options => 'num-sorted-run.compaction-trigger=3')") - // Due to the limitation of parameter 'num-sorted-run.compaction-trigger' = 3, so compact is not - // performed. - Assertions.assertThat(lastSnapshotCommand(table).equals(CommitKind.APPEND)).isTrue - Assertions.assertThat(lastSnapshotId(table)).isEqualTo(2) + // Due to the limitation of parameter 'num-sorted-run.compaction-trigger' = 3, so compact is not + // performed. + Assertions.assertThat(lastSnapshotCommand(table).equals(CommitKind.APPEND)).isTrue + Assertions.assertThat(lastSnapshotId(table)).isEqualTo(2) - // Make par-p1 has 3 datafile and par-p2 has 2 datafile, so par-p2 will not be picked out to - // compact. - spark.sql(s"INSERT INTO T VALUES (1, 'a', 'p1')") + // Make par-p1 has 3 datafile and par-p2 has 2 datafile, so par-p2 will not be picked out to + // compact. + spark.sql(s"INSERT INTO T VALUES (1, 'a', 'p1')") - spark.sql( - "CALL sys.compact(table => 'T', compact_strategy => 'minor'," + - "options => 'num-sorted-run.compaction-trigger=3')") + spark.sql( + "CALL sys.compact(table => 'T', compact_strategy => 'minor'," + + "options => 'num-sorted-run.compaction-trigger=3')") - Assertions.assertThat(lastSnapshotCommand(table).equals(CommitKind.COMPACT)).isTrue - Assertions.assertThat(lastSnapshotId(table)).isEqualTo(4) + Assertions.assertThat(lastSnapshotId(table)).isEqualTo(4) + Assertions.assertThat(lastSnapshotCommand(table).equals(CommitKind.COMPACT)).isTrue - val splits = table.newSnapshotReader.read.dataSplits - splits.forEach( - split => { - Assertions - .assertThat(split.dataFiles.size) - .isEqualTo(if (split.partition().getInt(1) == 16) 2 else 1) - }) - } - }) + val splits = table.newSnapshotReader.read.dataSplits + splits.forEach( + split => { + Assertions + .assertThat(split.dataFiles.size) + .isEqualTo(if (split.partition().getString(0).toString == "p2") 2 else 1) + }) + } } // ----------------------- Sort Compact -----------------------