diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java index e21702bab5f3..eaf7e3a00e1f 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java @@ -74,6 +74,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -110,7 +111,8 @@ public class CompactProcedure extends BaseProcedure { ProcedureParameter.optional("order_strategy", StringType), ProcedureParameter.optional("order_by", StringType), ProcedureParameter.optional("where", StringType), - ProcedureParameter.optional("max_concurrent_jobs", IntegerType) + ProcedureParameter.optional("max_concurrent_jobs", IntegerType), + ProcedureParameter.optional("options", StringType), }; private static final StructType OUTPUT_TYPE = @@ -144,6 +146,7 @@ public InternalRow[] call(InternalRow args) { : Arrays.asList(args.getString(3).split(",")); String where = blank(args, 4) ? null : args.getString(4); int maxConcurrentJobs = args.isNullAt(5) ? 15 : args.getInt(5); + String options = args.isNullAt(6) ? null : args.getString(6); if (TableSorter.OrderType.NONE.name().equals(sortType) && !sortColumns.isEmpty()) { throw new IllegalArgumentException( "order_strategy \"none\" cannot work with order_by columns."); @@ -174,6 +177,14 @@ public InternalRow[] call(InternalRow args) { condition, table.partitionKeys()); } + + Map dynamicOptions = new HashMap<>(); + dynamicOptions.put(CoreOptions.WRITE_ONLY.key(), "false"); + if (!StringUtils.isBlank(options)) { + dynamicOptions.putAll(ParameterUtils.parseCommaSeparatedKeyValues(options)); + } + table = table.copy(dynamicOptions); + InternalRow internalRow = newInternalRow( execute( @@ -203,7 +214,6 @@ private boolean execute( DataSourceV2Relation relation, @Nullable Expression condition, int maxConcurrentJobs) { - table = table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "false")); BucketMode bucketMode = table.bucketMode(); TableSorter.OrderType orderType = TableSorter.OrderType.of(sortType); Predicate filter = diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala index 245076f7b38d..73d6fc442589 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala @@ -528,6 +528,39 @@ abstract class CompactProcedureTestBase extends PaimonSparkTestBase with StreamT Assertions.assertThat(where).isEqualTo(whereExpected) } + test("Paimon Procedure: compact unaware bucket append table with option") { + spark.sql(s""" + |CREATE TABLE T (id INT, value STRING, pt STRING) + |TBLPROPERTIES ('bucket'='-1', 'write-only'='true') + |PARTITIONED BY (pt) + |""".stripMargin) + + val table = loadTable("T") + + spark.sql(s"INSERT INTO T VALUES (1, 'a', 'p1'), (2, 'b', 'p2')") + spark.sql(s"INSERT INTO T VALUES (3, 'c', 'p1'), (4, 'd', 'p2')") + spark.sql(s"INSERT INTO T VALUES (5, 'e', 'p1'), (6, 'f', 'p2')") + + spark.sql( + "CALL sys.compact(table => 'T', partitions => 'pt=\"p1\"', options => 'compaction.min.file-num=2,compaction.max.file-num = 3')") + Assertions.assertThat(lastSnapshotCommand(table).equals(CommitKind.COMPACT)).isTrue + Assertions.assertThat(lastSnapshotId(table)).isEqualTo(4) + + spark.sql( + "CALL sys.compact(table => 'T', options => 'compaction.min.file-num=2,compaction.max.file-num = 3')") + Assertions.assertThat(lastSnapshotCommand(table).equals(CommitKind.COMPACT)).isTrue + Assertions.assertThat(lastSnapshotId(table)).isEqualTo(5) + + // compact condition no longer met + spark.sql(s"CALL sys.compact(table => 'T')") + Assertions.assertThat(lastSnapshotId(table)).isEqualTo(5) + + checkAnswer( + spark.sql(s"SELECT * FROM T ORDER BY id"), + Row(1, "a", "p1") :: Row(2, "b", "p2") :: Row(3, "c", "p1") :: Row(4, "d", "p2") :: + Row(5, "e", "p1") :: Row(6, "f", "p2") :: Nil) + } + def lastSnapshotCommand(table: FileStoreTable): CommitKind = { table.snapshotManager().latestSnapshot().commitKind() }