Skip to content

Commit

Permalink
[flink] fix & doc
Browse files Browse the repository at this point in the history
  • Loading branch information
LinMingQiang committed Nov 27, 2024
1 parent 8c3aa74 commit 8735d54
Show file tree
Hide file tree
Showing 8 changed files with 79 additions and 55 deletions.
15 changes: 11 additions & 4 deletions docs/content/flink/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ All available procedures are listed below.
order_by => 'order_by',
options => 'options',
`where` => 'where',
partition_idle_time => 'partition_idle_time') <br/><br/>
partition_idle_time => 'partition_idle_time',
compact_strategy => 'compact_strategy') <br/><br/>
-- Use indexed argument<br/>
CALL [catalog.]sys.compact('table') <br/><br/>
CALL [catalog.]sys.compact('table', 'partitions') <br/><br/>
Expand All @@ -76,6 +77,7 @@ All available procedures are listed below.
CALL [catalog.]sys.compact('table', 'partitions', 'order_strategy', 'order_by', 'options') <br/><br/>
CALL [catalog.]sys.compact('table', 'partitions', 'order_strategy', 'order_by', 'options', 'where') <br/><br/>
CALL [catalog.]sys.compact('table', 'partitions', 'order_strategy', 'order_by', 'options', 'where', 'partition_idle_time') <br/><br/>
CALL [catalog.]sys.compact('table', 'partitions', 'order_strategy', 'order_by', 'options', 'where', 'partition_idle_time', 'compact_strategy') <br/><br/>
</td>
<td>
To compact a table. Arguments:
Expand All @@ -86,6 +88,7 @@ All available procedures are listed below.
<li>options(optional): additional dynamic options of the table.</li>
<li>where(optional): partition predicate(Can't be used together with "partitions"). Note: as where is a keyword,a pair of backticks need to add around like `where`.</li>
<li>partition_idle_time(optional): this is used to do a full compaction for partition which had not received any new data for 'partition_idle_time'. And only these partitions will be compacted. This argument can not be used with order compact.</li>
<li>compact_strategy(optional): this determines how to pick files to be merged, the default is determined by the runtime execution mode. 'full' strategy only supports batch mode. All files will be selected for merging. 'minor' strategy: Pick the set of files that need to be merged based on specified conditions.</li>
</td>
<td>
-- use partition filter <br/>
Expand All @@ -104,15 +107,17 @@ All available procedures are listed below.
including_tables => 'includingTables',
excluding_tables => 'excludingTables',
table_options => 'tableOptions',
partition_idle_time => 'partitionIdleTime') <br/><br/>
partition_idle_time => 'partitionIdleTime',
compact_strategy => 'compact_strategy') <br/><br/>
-- Use indexed argument<br/>
CALL [catalog.]sys.compact_database() <br/><br/>
CALL [catalog.]sys.compact_database('includingDatabases') <br/><br/>
CALL [catalog.]sys.compact_database('includingDatabases', 'mode') <br/><br/>
CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables') <br/><br/>
CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables') <br/><br/>
CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables', 'tableOptions') <br/><br/>
CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables', 'tableOptions', 'partitionIdleTime')
CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables', 'tableOptions', 'partitionIdleTime')<br/><br/>
CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables', 'tableOptions', 'partitionIdleTime', 'compact_strategy')<br/><br/>
</td>
<td>
To compact databases. Arguments:
Expand All @@ -124,14 +129,16 @@ All available procedures are listed below.
<li>excludingTables: to specify tables that are not compacted. You can use regular expression.</li>
<li>tableOptions: additional dynamic options of the table.</li>
<li>partition_idle_time: this is used to do a full compaction for partition which had not received any new data for 'partition_idle_time'. And only these partitions will be compacted.</li>
<li>compact_strategy(optional): this determines how to pick files to be merged, the default is determined by the runtime execution mode. 'full' strategy only supports batch mode. All files will be selected for merging. 'minor' strategy: Pick the set of files that need to be merged based on specified conditions.</li>
</td>
<td>
CALL sys.compact_database(
including_databases => 'db1|db2',
mode => 'combined',
including_tables => 'table_.*',
excluding_tables => 'ignore',
table_options => 'sink.parallelism=4')
table_options => 'sink.parallelism=4',
compat_strategy => 'full')
</td>
</tr>
<tr>
Expand Down
4 changes: 3 additions & 1 deletion docs/content/spark/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,14 @@ This section introduce all available spark procedures about paimon.
<li>order_strategy: 'order' or 'zorder' or 'hilbert' or 'none'. Left empty for 'none'.</li>
<li>order_columns: the columns need to be sort. Left empty if 'order_strategy' is 'none'.</li>
<li>partition_idle_time: this is used to do a full compaction for partition which had not received any new data for 'partition_idle_time'. And only these partitions will be compacted. This argument can not be used with order compact.</li>
<li>compact_strategy: this determines how to pick files to be merged, the default is determined by the runtime execution mode. 'full' strategy only supports batch mode. All files will be selected for merging. 'minor' strategy: Pick the set of files that need to be merged based on specified conditions.</li>
</td>
<td>
SET spark.sql.shuffle.partitions=10; --set the compact parallelism <br/><br/>
CALL sys.compact(table => 'T', partitions => 'p=0;p=1', order_strategy => 'zorder', order_by => 'a,b') <br/><br/>
CALL sys.compact(table => 'T', where => 'p>0 and p<3', order_strategy => 'zorder', order_by => 'a,b') <br/><br/>
CALL sys.compact(table => 'T', partition_idle_time => '60s')
CALL sys.compact(table => 'T', partition_idle_time => '60s')<br/><br/>
CALL sys.compact(table => 'T', compact_strategy => 'minor')<br/><br/>
</td>
</tr>
<tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

import java.util.Map;

import static org.apache.paimon.flink.action.ActionFactory.FULL;
import static org.apache.paimon.flink.action.CompactActionFactory.checkCompactStrategy;
import static org.apache.paimon.utils.ParameterUtils.parseCommaSeparatedKeyValues;

/**
Expand All @@ -51,6 +53,7 @@
*
* -- set table options ('k=v,...')
* CALL sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables', 'tableOptions')
*
* </code></pre>
*/
public class CompactDatabaseProcedure extends ProcedureBase {
Expand Down Expand Up @@ -106,7 +109,8 @@ public String[] call(
includingTables,
excludingTables,
tableOptions,
"");
"",
null);
}

public String[] call(
Expand All @@ -116,7 +120,8 @@ public String[] call(
String includingTables,
String excludingTables,
String tableOptions,
String partitionIdleTime)
String partitionIdleTime,
String compactStrategy)
throws Exception {
String warehouse = catalog.warehouse();
Map<String, String> catalogOptions = catalog.options();
Expand All @@ -133,6 +138,10 @@ public String[] call(
action.withPartitionIdleTime(TimeUtils.parseDuration(partitionIdleTime));
}

if (checkCompactStrategy(compactStrategy)) {
action.withFullCompaction(compactStrategy.trim().equalsIgnoreCase(FULL));
}

return execute(procedureContext, action, "Compact database job");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@

import java.util.Map;

import static org.apache.paimon.flink.action.ActionFactory.FULL;
import static org.apache.paimon.flink.action.CompactActionFactory.checkCompactStrategy;
import static org.apache.paimon.utils.ParameterUtils.parseCommaSeparatedKeyValues;

/**
Expand Down Expand Up @@ -82,6 +84,10 @@ public class CompactDatabaseProcedure extends ProcedureBase {
@ArgumentHint(
name = "partition_idle_time",
type = @DataTypeHint("STRING"),
isOptional = true),
@ArgumentHint(
name = "compact_strategy",
type = @DataTypeHint("STRING"),
isOptional = true)
})
public String[] call(
Expand All @@ -91,7 +97,8 @@ public String[] call(
String includingTables,
String excludingTables,
String tableOptions,
String partitionIdleTime)
String partitionIdleTime,
String compactStrategy)
throws Exception {
partitionIdleTime = notnull(partitionIdleTime);
String warehouse = catalog.warehouse();
Expand All @@ -109,6 +116,10 @@ public String[] call(
action.withPartitionIdleTime(TimeUtils.parseDuration(partitionIdleTime));
}

if (checkCompactStrategy(compactStrategy)) {
action.withFullCompaction(compactStrategy.trim().equalsIgnoreCase(FULL));
}

return execute(procedureContext, action, "Compact database job");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,6 @@ public class CompactProcedure extends ProcedureBase {
name = "partitions",
type = @DataTypeHint("STRING"),
isOptional = true),
@ArgumentHint(
name = "compact_strategy",
type = @DataTypeHint("STRING"),
isOptional = true),
@ArgumentHint(
name = "order_strategy",
type = @DataTypeHint("STRING"),
Expand All @@ -64,18 +60,22 @@ public class CompactProcedure extends ProcedureBase {
@ArgumentHint(
name = "partition_idle_time",
type = @DataTypeHint("STRING"),
isOptional = true),
@ArgumentHint(
name = "compact_strategy",
type = @DataTypeHint("STRING"),
isOptional = true)
})
public String[] call(
ProcedureContext procedureContext,
String tableId,
String partitions,
String compactStrategy,
String orderStrategy,
String orderByColumns,
String tableOptions,
String where,
String partitionIdleTime)
String partitionIdleTime,
String compactStrategy)
throws Exception {
String warehouse = catalog.warehouse();
Map<String, String> catalogOptions = catalog.options();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ public class CompactProcedure extends BaseProcedure {
new StructField("result", DataTypes.BooleanType, true, Metadata.empty())
});

private final String MINOR = "minor";
private final String FULL = "full";
private static final String MINOR = "minor";
private static final String FULL = "full";

protected CompactProcedure(TableCatalog tableCatalog) {
super(tableCatalog);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ class PaimonSparkTestBase
"org.apache.spark.serializer.JavaSerializer"
}
super.sparkConf
.set("spark.driver.bindAddress", "127.0.0.1")
.set("spark.sql.catalog.paimon", classOf[SparkCatalog].getName)
.set("spark.sql.catalog.paimon.warehouse", tempDBDir.getCanonicalPath)
.set("spark.sql.catalog.paimon.cache-enabled", "false")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,53 +42,49 @@ abstract class CompactProcedureTestBase extends PaimonSparkTestBase with StreamT
// ----------------------- Minor Compact -----------------------

test("Paimon Procedure: compact aware bucket pk table with minor compact strategy") {
Seq(1, -1).foreach(
bucket => {
withTable("T") {
spark.sql(
s"""
|CREATE TABLE T (id INT, value STRING, pt STRING)
|TBLPROPERTIES ('primary-key'='id, pt', 'bucket'='$bucket', 'write-only'='true')
|PARTITIONED BY (pt)
|""".stripMargin)
withTable("T") {
spark.sql(s"""
|CREATE TABLE T (id INT, value STRING, pt STRING)
|TBLPROPERTIES ('primary-key'='id, pt', 'bucket'='1', 'write-only'='true')
|PARTITIONED BY (pt)
|""".stripMargin)

val table = loadTable("T")
val table = loadTable("T")

spark.sql(s"INSERT INTO T VALUES (1, 'a', 'p1'), (2, 'b', 'p2')")
spark.sql(s"INSERT INTO T VALUES (3, 'c', 'p1'), (4, 'd', 'p2')")
spark.sql(s"INSERT INTO T VALUES (1, 'a', 'p1'), (2, 'b', 'p2')")
spark.sql(s"INSERT INTO T VALUES (3, 'c', 'p1'), (4, 'd', 'p2')")

Assertions.assertThat(lastSnapshotCommand(table).equals(CommitKind.APPEND)).isTrue
Assertions.assertThat(lastSnapshotId(table)).isEqualTo(2)
Assertions.assertThat(lastSnapshotCommand(table).equals(CommitKind.APPEND)).isTrue
Assertions.assertThat(lastSnapshotId(table)).isEqualTo(2)

spark.sql(
"CALL sys.compact(table => 'T', compact_strategy => 'full'," +
"options => 'num-sorted-run.compaction-trigger=3')")
spark.sql(
"CALL sys.compact(table => 'T', compact_strategy => 'minor'," +
"options => 'num-sorted-run.compaction-trigger=3')")

// Due to the limitation of parameter 'num-sorted-run.compaction-trigger' = 3, so compact is not
// performed.
Assertions.assertThat(lastSnapshotCommand(table).equals(CommitKind.APPEND)).isTrue
Assertions.assertThat(lastSnapshotId(table)).isEqualTo(2)
// Due to the limitation of parameter 'num-sorted-run.compaction-trigger' = 3, so compact is not
// performed.
Assertions.assertThat(lastSnapshotCommand(table).equals(CommitKind.APPEND)).isTrue
Assertions.assertThat(lastSnapshotId(table)).isEqualTo(2)

// Make par-p1 has 3 datafile and par-p2 has 2 datafile, so par-p2 will not be picked out to
// compact.
spark.sql(s"INSERT INTO T VALUES (1, 'a', 'p1')")
// Make par-p1 has 3 datafile and par-p2 has 2 datafile, so par-p2 will not be picked out to
// compact.
spark.sql(s"INSERT INTO T VALUES (1, 'a', 'p1')")

spark.sql(
"CALL sys.compact(table => 'T', compact_strategy => 'minor'," +
"options => 'num-sorted-run.compaction-trigger=3')")
spark.sql(
"CALL sys.compact(table => 'T', compact_strategy => 'minor'," +
"options => 'num-sorted-run.compaction-trigger=3')")

Assertions.assertThat(lastSnapshotCommand(table).equals(CommitKind.COMPACT)).isTrue
Assertions.assertThat(lastSnapshotId(table)).isEqualTo(4)
Assertions.assertThat(lastSnapshotId(table)).isEqualTo(4)
Assertions.assertThat(lastSnapshotCommand(table).equals(CommitKind.COMPACT)).isTrue

val splits = table.newSnapshotReader.read.dataSplits
splits.forEach(
split => {
Assertions
.assertThat(split.dataFiles.size)
.isEqualTo(if (split.partition().getInt(1) == 16) 2 else 1)
})
}
})
val splits = table.newSnapshotReader.read.dataSplits
splits.forEach(
split => {
Assertions
.assertThat(split.dataFiles.size)
.isEqualTo(if (split.partition().getString(0).toString == "p2") 2 else 1)
})
}
}

// ----------------------- Sort Compact -----------------------
Expand Down

0 comments on commit 8735d54

Please sign in to comment.