Skip to content

Commit

Permalink
add options parameter
Browse files Browse the repository at this point in the history
  • Loading branch information
askwang committed Jul 4, 2024
1 parent 2acb511 commit cca0f95
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -110,7 +111,8 @@ public class CompactProcedure extends BaseProcedure {
ProcedureParameter.optional("order_strategy", StringType),
ProcedureParameter.optional("order_by", StringType),
ProcedureParameter.optional("where", StringType),
ProcedureParameter.optional("max_concurrent_jobs", IntegerType)
ProcedureParameter.optional("max_concurrent_jobs", IntegerType),
ProcedureParameter.optional("options", StringType),
};

private static final StructType OUTPUT_TYPE =
Expand Down Expand Up @@ -144,6 +146,7 @@ public InternalRow[] call(InternalRow args) {
: Arrays.asList(args.getString(3).split(","));
String where = blank(args, 4) ? null : args.getString(4);
int maxConcurrentJobs = args.isNullAt(5) ? 15 : args.getInt(5);
String options = args.isNullAt(6) ? null : args.getString(6);
if (TableSorter.OrderType.NONE.name().equals(sortType) && !sortColumns.isEmpty()) {
throw new IllegalArgumentException(
"order_strategy \"none\" cannot work with order_by columns.");
Expand Down Expand Up @@ -174,6 +177,14 @@ public InternalRow[] call(InternalRow args) {
condition,
table.partitionKeys());
}

Map<String, String> dynamicOptions = new HashMap<>();
dynamicOptions.put(CoreOptions.WRITE_ONLY.key(), "false");
if (!StringUtils.isBlank(options)) {
dynamicOptions.putAll(ParameterUtils.parseCommaSeparatedKeyValues(options));
}
table = table.copy(dynamicOptions);

InternalRow internalRow =
newInternalRow(
execute(
Expand Down Expand Up @@ -203,7 +214,6 @@ private boolean execute(
DataSourceV2Relation relation,
@Nullable Expression condition,
int maxConcurrentJobs) {
table = table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "false"));
BucketMode bucketMode = table.bucketMode();
TableSorter.OrderType orderType = TableSorter.OrderType.of(sortType);
Predicate filter =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,39 @@ abstract class CompactProcedureTestBase extends PaimonSparkTestBase with StreamT
Assertions.assertThat(where).isEqualTo(whereExpected)
}

test("Paimon Procedure: compact unaware bucket append table with option") {
spark.sql(s"""
|CREATE TABLE T (id INT, value STRING, pt STRING)
|TBLPROPERTIES ('bucket'='-1', 'write-only'='true')
|PARTITIONED BY (pt)
|""".stripMargin)

val table = loadTable("T")

spark.sql(s"INSERT INTO T VALUES (1, 'a', 'p1'), (2, 'b', 'p2')")
spark.sql(s"INSERT INTO T VALUES (3, 'c', 'p1'), (4, 'd', 'p2')")
spark.sql(s"INSERT INTO T VALUES (5, 'e', 'p1'), (6, 'f', 'p2')")

spark.sql(
"CALL sys.compact(table => 'T', partitions => 'pt=\"p1\"', options => 'compaction.min.file-num=2,compaction.max.file-num = 3')")
Assertions.assertThat(lastSnapshotCommand(table).equals(CommitKind.COMPACT)).isTrue
Assertions.assertThat(lastSnapshotId(table)).isEqualTo(4)

spark.sql(
"CALL sys.compact(table => 'T', options => 'compaction.min.file-num=2,compaction.max.file-num = 3')")
Assertions.assertThat(lastSnapshotCommand(table).equals(CommitKind.COMPACT)).isTrue
Assertions.assertThat(lastSnapshotId(table)).isEqualTo(5)

// compact condition no longer met
spark.sql(s"CALL sys.compact(table => 'T')")
Assertions.assertThat(lastSnapshotId(table)).isEqualTo(5)

checkAnswer(
spark.sql(s"SELECT * FROM T ORDER BY id"),
Row(1, "a", "p1") :: Row(2, "b", "p2") :: Row(3, "c", "p1") :: Row(4, "d", "p2") ::
Row(5, "e", "p1") :: Row(6, "f", "p2") :: Nil)
}

def lastSnapshotCommand(table: FileStoreTable): CommitKind = {
table.snapshotManager().latestSnapshot().commitKind()
}
Expand Down

0 comments on commit cca0f95

Please sign in to comment.