Skip to content

Commit

Permalink
[flink] Support minor compact strategy for dedicated compaction actio…
Browse files Browse the repository at this point in the history
…n. (apache#4589)
  • Loading branch information
LinMingQiang authored Nov 27, 2024
1 parent fe69313 commit 4bcf857
Show file tree
Hide file tree
Showing 23 changed files with 622 additions and 95 deletions.
15 changes: 11 additions & 4 deletions docs/content/flink/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ All available procedures are listed below.
order_by => 'order_by',
options => 'options',
`where` => 'where',
partition_idle_time => 'partition_idle_time') <br/><br/>
partition_idle_time => 'partition_idle_time',
compact_strategy => 'compact_strategy') <br/><br/>
-- Use indexed argument<br/>
CALL [catalog.]sys.compact('table') <br/><br/>
CALL [catalog.]sys.compact('table', 'partitions') <br/><br/>
Expand All @@ -76,6 +77,7 @@ All available procedures are listed below.
CALL [catalog.]sys.compact('table', 'partitions', 'order_strategy', 'order_by', 'options') <br/><br/>
CALL [catalog.]sys.compact('table', 'partitions', 'order_strategy', 'order_by', 'options', 'where') <br/><br/>
CALL [catalog.]sys.compact('table', 'partitions', 'order_strategy', 'order_by', 'options', 'where', 'partition_idle_time') <br/><br/>
CALL [catalog.]sys.compact('table', 'partitions', 'order_strategy', 'order_by', 'options', 'where', 'partition_idle_time', 'compact_strategy') <br/><br/>
</td>
<td>
To compact a table. Arguments:
Expand All @@ -86,6 +88,7 @@ All available procedures are listed below.
<li>options(optional): additional dynamic options of the table.</li>
<li>where(optional): partition predicate(Can't be used together with "partitions"). Note: as where is a keyword,a pair of backticks need to add around like `where`.</li>
<li>partition_idle_time(optional): this is used to do a full compaction for partition which had not received any new data for 'partition_idle_time'. And only these partitions will be compacted. This argument can not be used with order compact.</li>
<li>compact_strategy(optional): this determines how to pick files to be merged, the default is determined by the runtime execution mode. 'full' strategy only supports batch mode. All files will be selected for merging. 'minor' strategy: Pick the set of files that need to be merged based on specified conditions.</li>
</td>
<td>
-- use partition filter <br/>
Expand All @@ -104,15 +107,17 @@ All available procedures are listed below.
including_tables => 'includingTables',
excluding_tables => 'excludingTables',
table_options => 'tableOptions',
partition_idle_time => 'partitionIdleTime') <br/><br/>
partition_idle_time => 'partitionIdleTime',
compact_strategy => 'compact_strategy') <br/><br/>
-- Use indexed argument<br/>
CALL [catalog.]sys.compact_database() <br/><br/>
CALL [catalog.]sys.compact_database('includingDatabases') <br/><br/>
CALL [catalog.]sys.compact_database('includingDatabases', 'mode') <br/><br/>
CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables') <br/><br/>
CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables') <br/><br/>
CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables', 'tableOptions') <br/><br/>
CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables', 'tableOptions', 'partitionIdleTime')
CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables', 'tableOptions', 'partitionIdleTime')<br/><br/>
CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables', 'tableOptions', 'partitionIdleTime', 'compact_strategy')<br/><br/>
</td>
<td>
To compact databases. Arguments:
Expand All @@ -124,14 +129,16 @@ All available procedures are listed below.
<li>excludingTables: to specify tables that are not compacted. You can use regular expression.</li>
<li>tableOptions: additional dynamic options of the table.</li>
<li>partition_idle_time: this is used to do a full compaction for partition which had not received any new data for 'partition_idle_time'. And only these partitions will be compacted.</li>
<li>compact_strategy(optional): this determines how to pick files to be merged, the default is determined by the runtime execution mode. 'full' strategy only supports batch mode. All files will be selected for merging. 'minor' strategy: Pick the set of files that need to be merged based on specified conditions.</li>
</td>
<td>
CALL sys.compact_database(
including_databases => 'db1|db2',
mode => 'combined',
including_tables => 'table_.*',
excluding_tables => 'ignore',
table_options => 'sink.parallelism=4')
table_options => 'sink.parallelism=4',
compat_strategy => 'full')
</td>
</tr>
<tr>
Expand Down
8 changes: 8 additions & 0 deletions docs/content/maintenance/dedicated-compaction.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ Run the following command to submit a compaction job for the table.
--database <database-name> \
--table <table-name> \
[--partition <partition-name>] \
[--compact_strategy <minor / full>] \
[--table_conf <table_conf>] \
[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]]
```
Expand All @@ -123,10 +124,14 @@ Example: compact table
--partition dt=20221126,hh=08 \
--partition dt=20221127,hh=09 \
--table_conf sink.parallelism=10 \
--compact_strategy minor \
--catalog_conf s3.endpoint=https://****.com \
--catalog_conf s3.access-key=***** \
--catalog_conf s3.secret-key=*****
```
* `--compact_strategy` Determines how to pick files to be merged, the default is determined by the runtime execution mode, streaming-mode use `minor` strategy and batch-mode use `full` strategy.
* `full` : Only supports batch mode. All files will be selected for merging.
* `minor` : Pick the set of files that need to be merged based on specified conditions.
You can use `-D execution.runtime-mode=batch` or `-yD execution.runtime-mode=batch` (for the ON-YARN scenario) to control batch or streaming mode. If you submit a batch job, all
current table files will be compacted. If you submit a streaming job, the job will continuously monitor new changes
Expand Down Expand Up @@ -190,6 +195,7 @@ CALL sys.compact_database(
[--including_tables <paimon-table-name|name-regular-expr>] \
[--excluding_tables <paimon-table-name|name-regular-expr>] \
[--mode <compact-mode>] \
[--compact_strategy <minor / full>] \
[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]] \
[--table_conf <paimon-table_conf> [--table_conf <paimon-table_conf> ...]]
```
Expand Down Expand Up @@ -346,6 +352,7 @@ CALL sys.compact(`table` => 'default.T', 'partition_idle_time' => '1 d')
--table <table-name> \
--partition_idle_time <partition-idle-time> \
[--partition <partition-name>] \
[--compact_strategy <minor / full>] \
[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]] \
[--table_conf <paimon-table-dynamic-conf> [--table_conf <paimon-table-dynamic-conf>] ...]
```
Expand Down Expand Up @@ -406,6 +413,7 @@ CALL sys.compact_database(
[--including_tables <paimon-table-name|name-regular-expr>] \
[--excluding_tables <paimon-table-name|name-regular-expr>] \
[--mode <compact-mode>] \
[--compact_strategy <minor / full>] \
[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]] \
[--table_conf <paimon-table_conf> [--table_conf <paimon-table_conf> ...]]
```
Expand Down
4 changes: 3 additions & 1 deletion docs/content/spark/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,14 @@ This section introduce all available spark procedures about paimon.
<li>order_strategy: 'order' or 'zorder' or 'hilbert' or 'none'. Left empty for 'none'.</li>
<li>order_columns: the columns need to be sort. Left empty if 'order_strategy' is 'none'.</li>
<li>partition_idle_time: this is used to do a full compaction for partition which had not received any new data for 'partition_idle_time'. And only these partitions will be compacted. This argument can not be used with order compact.</li>
<li>compact_strategy: this determines how to pick files to be merged, the default is determined by the runtime execution mode. 'full' strategy only supports batch mode. All files will be selected for merging. 'minor' strategy: Pick the set of files that need to be merged based on specified conditions.</li>
</td>
<td>
SET spark.sql.shuffle.partitions=10; --set the compact parallelism <br/><br/>
CALL sys.compact(table => 'T', partitions => 'p=0;p=1', order_strategy => 'zorder', order_by => 'a,b') <br/><br/>
CALL sys.compact(table => 'T', where => 'p>0 and p<3', order_strategy => 'zorder', order_by => 'a,b') <br/><br/>
CALL sys.compact(table => 'T', partition_idle_time => '60s')
CALL sys.compact(table => 'T', partition_idle_time => '60s')<br/><br/>
CALL sys.compact(table => 'T', compact_strategy => 'minor')<br/><br/>
</td>
</tr>
<tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

import java.util.Map;

import static org.apache.paimon.flink.action.ActionFactory.FULL;
import static org.apache.paimon.flink.action.CompactActionFactory.checkCompactStrategy;
import static org.apache.paimon.utils.ParameterUtils.parseCommaSeparatedKeyValues;

/**
Expand All @@ -51,6 +53,7 @@
*
* -- set table options ('k=v,...')
* CALL sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables', 'tableOptions')
*
* </code></pre>
*/
public class CompactDatabaseProcedure extends ProcedureBase {
Expand Down Expand Up @@ -106,7 +109,8 @@ public String[] call(
includingTables,
excludingTables,
tableOptions,
"");
"",
null);
}

public String[] call(
Expand All @@ -116,7 +120,8 @@ public String[] call(
String includingTables,
String excludingTables,
String tableOptions,
String partitionIdleTime)
String partitionIdleTime,
String compactStrategy)
throws Exception {
String warehouse = catalog.warehouse();
Map<String, String> catalogOptions = catalog.options();
Expand All @@ -133,6 +138,10 @@ public String[] call(
action.withPartitionIdleTime(TimeUtils.parseDuration(partitionIdleTime));
}

if (checkCompactStrategy(compactStrategy)) {
action.withFullCompaction(compactStrategy.trim().equalsIgnoreCase(FULL));
}

return execute(procedureContext, action, "Compact database job");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
import java.util.Collections;
import java.util.Map;

import static org.apache.paimon.flink.action.ActionFactory.FULL;
import static org.apache.paimon.flink.action.CompactActionFactory.checkCompactStrategy;

/**
* Stay compatible with 1.18 procedure which doesn't support named argument. Usage:
*
Expand All @@ -49,6 +52,9 @@
* -- compact specific partitions with sorting
* CALL sys.compact('tableId', 'partitions', 'ORDER/ZORDER', 'col1,col2', 'sink.parallelism=6')
*
* -- compact with specific compact strategy
* CALL sys.compact('tableId', 'partitions', 'ORDER/ZORDER', 'col1,col2', 'sink.parallelism=6', 'minor')
*
* </code></pre>
*/
public class CompactProcedure extends ProcedureBase {
Expand Down Expand Up @@ -118,7 +124,8 @@ public String[] call(
orderByColumns,
tableOptions,
whereSql,
"");
"",
null);
}

public String[] call(
Expand All @@ -129,7 +136,8 @@ public String[] call(
String orderByColumns,
String tableOptions,
String whereSql,
String partitionIdleTime)
String partitionIdleTime,
String compactStrategy)
throws Exception {

String warehouse = catalog.warehouse();
Expand All @@ -152,6 +160,10 @@ public String[] call(
if (!(StringUtils.isNullOrWhitespaceOnly(partitionIdleTime))) {
action.withPartitionIdleTime(TimeUtils.parseDuration(partitionIdleTime));
}

if (checkCompactStrategy(compactStrategy)) {
action.withFullCompaction(compactStrategy.trim().equalsIgnoreCase(FULL));
}
jobName = "Compact Job";
} else if (!orderStrategy.isEmpty() && !orderByColumns.isEmpty()) {
Preconditions.checkArgument(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,16 @@ public void testCompactDatabaseAndTable() {
sql(
"CALL sys.compact('default.T', '', '', '', 'sink.parallelism=1','pt=1')"))
.doesNotThrowAnyException();
assertThatCode(() -> sql("CALL sys.compact('default.T', '', 'zorder', 'k', '','','5s')"))
assertThatCode(
() ->
sql(
"CALL sys.compact('default.T', '' ,'zorder', 'k', '','','5s', '')"))
.message()
.contains("sort compact do not support 'partition_idle_time'.");

assertThatCode(() -> sql("CALL sys.compact('default.T', '', '' ,'', '', '', '', 'full')"))
.doesNotThrowAnyException();

assertThatCode(() -> sql("CALL sys.compact_database('default')"))
.doesNotThrowAnyException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ public interface ActionFactory extends Factory {
String TIMESTAMPFORMATTER = "timestamp_formatter";
String EXPIRE_STRATEGY = "expire_strategy";
String TIMESTAMP_PATTERN = "timestamp_pattern";
// Supports `full` and `minor`.
String COMPACT_STRATEGY = "compact_strategy";
String MINOR = "minor";
String FULL = "full";

Optional<Action> create(MultipleParameterToolAdapter params);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ public class CompactAction extends TableActionBase {

@Nullable private Duration partitionIdleTime = null;

private Boolean fullCompaction;

public CompactAction(String warehouse, String database, String tableName) {
this(warehouse, database, tableName, Collections.emptyMap(), Collections.emptyMap());
}
Expand Down Expand Up @@ -100,6 +102,11 @@ public CompactAction withPartitionIdleTime(@Nullable Duration partitionIdleTime)
return this;
}

public CompactAction withFullCompaction(Boolean fullCompaction) {
this.fullCompaction = fullCompaction;
return this;
}

@Override
public void build() throws Exception {
ReadableConfig conf = env.getConfiguration();
Expand All @@ -124,6 +131,13 @@ public void build() throws Exception {
private void buildForTraditionalCompaction(
StreamExecutionEnvironment env, FileStoreTable table, boolean isStreaming)
throws Exception {
if (fullCompaction == null) {
fullCompaction = !isStreaming;
} else {
Preconditions.checkArgument(
!(fullCompaction && isStreaming),
"The full compact strategy is only supported in batch mode. Please add -Dexecution.runtime-mode=BATCH.");
}
if (isStreaming) {
// for completely asynchronous compaction
HashMap<String, String> dynamicOptions =
Expand All @@ -138,8 +152,7 @@ private void buildForTraditionalCompaction(
}
CompactorSourceBuilder sourceBuilder =
new CompactorSourceBuilder(identifier.getFullName(), table);
CompactorSinkBuilder sinkBuilder =
new CompactorSinkBuilder(table).withFullCompaction(!isStreaming);
CompactorSinkBuilder sinkBuilder = new CompactorSinkBuilder(table, fullCompaction);

sourceBuilder.withPartitionPredicate(getPredicate());
DataStreamSource<RowData> source =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {
action.withPartitionIdleTime(
TimeUtils.parseDuration(params.get(PARTITION_IDLE_TIME)));
}
String compactStrategy = params.get(COMPACT_STRATEGY);
if (checkCompactStrategy(compactStrategy)) {
action.withFullCompaction(compactStrategy.trim().equalsIgnoreCase(FULL));
}
}

if (params.has(PARTITION)) {
Expand All @@ -88,6 +92,19 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {
return Optional.of(action);
}

public static boolean checkCompactStrategy(String compactStrategy) {
if (compactStrategy != null) {
Preconditions.checkArgument(
compactStrategy.equalsIgnoreCase(MINOR)
|| compactStrategy.equalsIgnoreCase(FULL),
String.format(
"The compact strategy only supports 'full' or 'minor', but '%s' is configured.",
compactStrategy));
return true;
}
return false;
}

@Override
public void printHelp() {
System.out.println(
Expand All @@ -101,7 +118,8 @@ public void printHelp() {
+ "[--order_strategy <order_strategy>]"
+ "[--table_conf <key>=<value>]"
+ "[--order_by <order_columns>]"
+ "[--partition_idle_time <partition_idle_time>]");
+ "[--partition_idle_time <partition_idle_time>]"
+ "[--compact_strategy <compact_strategy>]");
System.out.println(
" compact --warehouse s3://path/to/warehouse --database <database_name> "
+ "--table <table_name> [--catalog_conf <paimon_catalog_conf> [--catalog_conf <paimon_catalog_conf> ...]]");
Expand Down Expand Up @@ -132,6 +150,10 @@ public void printHelp() {
System.out.println(
" compact --warehouse hdfs:///path/to/warehouse --database test_db --table test_table "
+ "--partition_idle_time 10s");
System.out.println(
"--compact_strategy determines how to pick files to be merged, the default is determined by the runtime execution mode. "
+ "`full` : Only supports batch mode. All files will be selected for merging."
+ "`minor`: Pick the set of files that need to be merged based on specified conditions.");
System.out.println(
" compact --warehouse s3:///path/to/warehouse "
+ "--database test_db "
Expand Down
Loading

0 comments on commit 4bcf857

Please sign in to comment.