Skip to content

Commit

Permalink
[flink] Support minor compact strategy for dedicated compaction action.
Browse files Browse the repository at this point in the history
  • Loading branch information
LinMingQiang committed Nov 26, 2024
1 parent 16a4058 commit 8eda6ce
Show file tree
Hide file tree
Showing 13 changed files with 310 additions and 75 deletions.
8 changes: 8 additions & 0 deletions docs/content/maintenance/dedicated-compaction.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ Run the following command to submit a compaction job for the table.
--database <database-name> \
--table <table-name> \
[--partition <partition-name>] \
[--compact_strategy <minor / full>] \
[--table_conf <table_conf>] \
[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]]
```
Expand All @@ -123,10 +124,14 @@ Example: compact table
--partition dt=20221126,hh=08 \
--partition dt=20221127,hh=09 \
--table_conf sink.parallelism=10 \
--compact_strategy minor \
--catalog_conf s3.endpoint=https://****.com \
--catalog_conf s3.access-key=***** \
--catalog_conf s3.secret-key=*****
```
* `--compact_strategy` Determines how to pick files to be merged, the default is determined by the runtime execution mode, streaming-mode use `minor` strategy and batch-mode use `full` strategy.
* `full` : Only support in batch mode. All files will be picked up for merging.
* `minor` : Pick the set of files that need to be merged based on specified conditions.
You can use `-D execution.runtime-mode=batch` or `-yD execution.runtime-mode=batch` (for the ON-YARN scenario) to control batch or streaming mode. If you submit a batch job, all
current table files will be compacted. If you submit a streaming job, the job will continuously monitor new changes
Expand Down Expand Up @@ -190,6 +195,7 @@ CALL sys.compact_database(
[--including_tables <paimon-table-name|name-regular-expr>] \
[--excluding_tables <paimon-table-name|name-regular-expr>] \
[--mode <compact-mode>] \
[--compact_strategy <minor / full>] \
[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]] \
[--table_conf <paimon-table_conf> [--table_conf <paimon-table_conf> ...]]
```
Expand Down Expand Up @@ -346,6 +352,7 @@ CALL sys.compact(`table` => 'default.T', 'partition_idle_time' => '1 d')
--table <table-name> \
--partition_idle_time <partition-idle-time> \
[--partition <partition-name>] \
[--compact_strategy <minor / full>] \
[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]] \
[--table_conf <paimon-table-dynamic-conf> [--table_conf <paimon-table-dynamic-conf>] ...]
```
Expand Down Expand Up @@ -406,6 +413,7 @@ CALL sys.compact_database(
[--including_tables <paimon-table-name|name-regular-expr>] \
[--excluding_tables <paimon-table-name|name-regular-expr>] \
[--mode <compact-mode>] \
[--compact_strategy <minor / full>] \
[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]] \
[--table_conf <paimon-table_conf> [--table_conf <paimon-table_conf> ...]]
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ public interface ActionFactory extends Factory {
String TIMESTAMPFORMATTER = "timestamp_formatter";
String EXPIRE_STRATEGY = "expire_strategy";
String TIMESTAMP_PATTERN = "timestamp_pattern";
// Supports `full` and `minor`.
String COMPACT_STRATEGY = "compact_strategy";
String MINOR = "minor";

Optional<Action> create(MultipleParameterToolAdapter params);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ public class CompactAction extends TableActionBase {

@Nullable private Duration partitionIdleTime = null;

private Boolean fullCompaction;

public CompactAction(String warehouse, String database, String tableName) {
this(warehouse, database, tableName, Collections.emptyMap(), Collections.emptyMap());
}
Expand Down Expand Up @@ -100,6 +102,11 @@ public CompactAction withPartitionIdleTime(@Nullable Duration partitionIdleTime)
return this;
}

public CompactAction withFullCompaction(Boolean fullCompaction) {
this.fullCompaction = fullCompaction;
return this;
}

@Override
public void build() throws Exception {
ReadableConfig conf = env.getConfiguration();
Expand All @@ -124,6 +131,13 @@ public void build() throws Exception {
private void buildForTraditionalCompaction(
StreamExecutionEnvironment env, FileStoreTable table, boolean isStreaming)
throws Exception {
if (fullCompaction == null) {
fullCompaction = !isStreaming;
} else {
Preconditions.checkArgument(
!(fullCompaction && isStreaming),
"full compact strategy is only supported in batch mode. Please add -Dexecution.runtime-mode=BATCH.");
}
if (isStreaming) {
// for completely asynchronous compaction
HashMap<String, String> dynamicOptions =
Expand All @@ -138,8 +152,7 @@ private void buildForTraditionalCompaction(
}
CompactorSourceBuilder sourceBuilder =
new CompactorSourceBuilder(identifier.getFullName(), table);
CompactorSinkBuilder sinkBuilder =
new CompactorSinkBuilder(table).withFullCompaction(!isStreaming);
CompactorSinkBuilder sinkBuilder = new CompactorSinkBuilder(table, fullCompaction);

sourceBuilder.withPartitionPredicate(getPredicate());
DataStreamSource<RowData> source =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {
action.withPartitionIdleTime(
TimeUtils.parseDuration(params.get(PARTITION_IDLE_TIME)));
}
String compactStrategy = params.get(COMPACT_STRATEGY);
if (compactStrategy != null) {
action.withFullCompaction(!compactStrategy.trim().equalsIgnoreCase(MINOR));
}
}

if (params.has(PARTITION)) {
Expand All @@ -101,7 +105,8 @@ public void printHelp() {
+ "[--order_strategy <order_strategy>]"
+ "[--table_conf <key>=<value>]"
+ "[--order_by <order_columns>]"
+ "[--partition_idle_time <partition_idle_time>]");
+ "[--partition_idle_time <partition_idle_time>]"
+ "[--compact_strategy <compact_strategy>]");
System.out.println(
" compact --warehouse s3://path/to/warehouse --database <database_name> "
+ "--table <table_name> [--catalog_conf <paimon_catalog_conf> [--catalog_conf <paimon_catalog_conf> ...]]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ public class CompactDatabaseAction extends ActionBase {

@Nullable private Duration partitionIdleTime = null;

private Boolean fullCompaction;

private boolean isStreaming;

public CompactDatabaseAction(String warehouse, Map<String, String> catalogConfig) {
super(warehouse, catalogConfig);
}
Expand Down Expand Up @@ -110,6 +114,11 @@ public CompactDatabaseAction withPartitionIdleTime(@Nullable Duration partitionI
return this;
}

public CompactDatabaseAction withFullCompaction(boolean fullCompaction) {
this.fullCompaction = fullCompaction;
return this;
}

private boolean shouldCompactionTable(String paimonFullTableName) {
boolean shouldCompaction = includingPattern.matcher(paimonFullTableName).matches();
if (excludingPattern != null) {
Expand All @@ -124,6 +133,12 @@ private boolean shouldCompactionTable(String paimonFullTableName) {

@Override
public void build() {
ReadableConfig conf = env.getConfiguration();
isStreaming = conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING;

if (fullCompaction == null) {
fullCompaction = !isStreaming;
}
if (databaseCompactMode == MultiTablesSinkMode.DIVIDED) {
buildForDividedMode();
} else {
Expand Down Expand Up @@ -170,34 +185,26 @@ private void buildForDividedMode() {
!tableMap.isEmpty(),
"no tables to be compacted. possible cause is that there are no tables detected after pattern matching");

ReadableConfig conf = env.getConfiguration();
boolean isStreaming =
conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING;
for (Map.Entry<String, FileStoreTable> entry : tableMap.entrySet()) {
FileStoreTable fileStoreTable = entry.getValue();
switch (fileStoreTable.bucketMode()) {
case BUCKET_UNAWARE:
{
buildForUnawareBucketCompaction(
env, entry.getKey(), fileStoreTable, isStreaming);
buildForUnawareBucketCompaction(env, entry.getKey(), fileStoreTable);
break;
}
case HASH_FIXED:
case HASH_DYNAMIC:
default:
{
buildForTraditionalCompaction(
env, entry.getKey(), fileStoreTable, isStreaming);
buildForTraditionalCompaction(env, entry.getKey(), fileStoreTable);
}
}
}
}

private void buildForCombinedMode() {

ReadableConfig conf = env.getConfiguration();
boolean isStreaming =
conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING;
CombinedTableCompactorSourceBuilder sourceBuilder =
new CombinedTableCompactorSourceBuilder(
catalogLoader(),
Expand Down Expand Up @@ -234,15 +241,17 @@ private void buildForCombinedMode() {
.buildForUnawareBucketsTableSource(),
parallelism);

new CombinedTableCompactorSink(catalogLoader(), tableOptions)
new CombinedTableCompactorSink(catalogLoader(), tableOptions, fullCompaction)
.sinkFrom(awareBucketTableSource, unawareBucketTableSource);
}

private void buildForTraditionalCompaction(
StreamExecutionEnvironment env,
String fullName,
FileStoreTable table,
boolean isStreaming) {
StreamExecutionEnvironment env, String fullName, FileStoreTable table) {

Preconditions.checkArgument(
!(fullCompaction && isStreaming),
"full compact strategy is only supported in batch mode. Please add -Dexecution.runtime-mode=BATCH.");

if (isStreaming) {
// for completely asynchronous compaction
HashMap<String, String> dynamicOptions =
Expand All @@ -259,19 +268,15 @@ private void buildForTraditionalCompaction(
CompactorSourceBuilder sourceBuilder =
new CompactorSourceBuilder(fullName, table)
.withPartitionIdleTime(partitionIdleTime);
CompactorSinkBuilder sinkBuilder =
new CompactorSinkBuilder(table).withFullCompaction(!isStreaming);
CompactorSinkBuilder sinkBuilder = new CompactorSinkBuilder(table, fullCompaction);

DataStreamSource<RowData> source =
sourceBuilder.withEnv(env).withContinuousMode(isStreaming).build();
sinkBuilder.withInput(source).build();
}

private void buildForUnawareBucketCompaction(
StreamExecutionEnvironment env,
String fullName,
FileStoreTable table,
boolean isStreaming) {
StreamExecutionEnvironment env, String fullName, FileStoreTable table) {
UnawareBucketCompactionTopoBuilder unawareBucketCompactionTopoBuilder =
new UnawareBucketCompactionTopoBuilder(env, fullName, table);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {
action.withPartitionIdleTime(TimeUtils.parseDuration(partitionIdleTime));
}

String compactStrategy = params.get(COMPACT_STRATEGY);
if (compactStrategy != null && compactStrategy.trim().equalsIgnoreCase(MINOR)) {
action.withFullCompaction(false);
}

return Optional.of(action);
}

Expand All @@ -70,7 +75,8 @@ public void printHelp() {
+ "[--including_tables <paimon_table_name|name_regular_expr>] "
+ "[--excluding_tables <paimon_table_name|name_regular_expr>] "
+ "[--mode <compact_mode>]"
+ "[--partition_idle_time <partition_idle_time>]");
+ "[--partition_idle_time <partition_idle_time>]"
+ "[--compact_strategy <compact_strategy>]");
System.out.println(
" compact_database --warehouse s3://path/to/warehouse --including_databases <database-name|name-regular-expr> "
+ "[--catalog_conf <paimon_catalog_conf> [--catalog_conf <paimon_catalog_conf> ...]]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,15 @@ public class CombinedTableCompactorSink implements Serializable {

private final Catalog.Loader catalogLoader;
private final boolean ignorePreviousFiles;
private final boolean fullCompaction;

private final Options options;

public CombinedTableCompactorSink(Catalog.Loader catalogLoader, Options options) {
public CombinedTableCompactorSink(
Catalog.Loader catalogLoader, Options options, boolean fullCompaction) {
this.catalogLoader = catalogLoader;
this.ignorePreviousFiles = false;
this.fullCompaction = fullCompaction;
this.options = options;
}

Expand Down Expand Up @@ -104,7 +108,10 @@ public DataStream<MultiTableCommittable> doWrite(
String.format("%s-%s", "Multi-Bucket-Table", WRITER_NAME),
new MultiTableCommittableTypeInfo(),
combinedMultiComacptionWriteOperator(
env.getCheckpointConfig(), isStreaming, commitUser))
env.getCheckpointConfig(),
isStreaming,
fullCompaction,
commitUser))
.setParallelism(awareBucketTableSource.getParallelism());

SingleOutputStreamOperator<MultiTableCommittable> unawareBucketTableRewriter =
Expand Down Expand Up @@ -168,13 +175,17 @@ protected DataStreamSink<?> doCommit(
// TODO:refactor FlinkSink to adopt this sink
protected OneInputStreamOperator<RowData, MultiTableCommittable>
combinedMultiComacptionWriteOperator(
CheckpointConfig checkpointConfig, boolean isStreaming, String commitUser) {
CheckpointConfig checkpointConfig,
boolean isStreaming,
boolean fullCompaction,
String commitUser) {
return new MultiTablesStoreCompactOperator(
catalogLoader,
commitUser,
checkpointConfig,
isStreaming,
ignorePreviousFiles,
fullCompaction,
options);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,18 @@ public class CompactorSinkBuilder {

private DataStream<RowData> input;

private boolean fullCompaction;
private final boolean fullCompaction;

public CompactorSinkBuilder(FileStoreTable table) {
public CompactorSinkBuilder(FileStoreTable table, boolean fullCompaction) {
this.table = table;
this.fullCompaction = fullCompaction;
}

public CompactorSinkBuilder withInput(DataStream<RowData> input) {
this.input = input;
return this;
}

public CompactorSinkBuilder withFullCompaction(boolean fullCompaction) {
this.fullCompaction = fullCompaction;
return this;
}

public DataStreamSink<?> build() {
BucketMode bucketMode = table.bucketMode();
switch (bucketMode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class MultiTablesStoreCompactOperator
private final CheckpointConfig checkpointConfig;
private final boolean isStreaming;
private final boolean ignorePreviousFiles;
private final boolean fullCompaction;
private final String initialCommitUser;

private transient StoreSinkWriteState state;
Expand All @@ -80,13 +81,15 @@ public MultiTablesStoreCompactOperator(
CheckpointConfig checkpointConfig,
boolean isStreaming,
boolean ignorePreviousFiles,
boolean fullCompaction,
Options options) {
super(options);
this.catalogLoader = catalogLoader;
this.initialCommitUser = initialCommitUser;
this.checkpointConfig = checkpointConfig;
this.isStreaming = isStreaming;
this.ignorePreviousFiles = ignorePreviousFiles;
this.fullCompaction = fullCompaction;
}

@Override
Expand Down Expand Up @@ -165,7 +168,8 @@ public void processElement(StreamRecord<RowData> element) throws Exception {
files.isEmpty(),
"Batch compact job does not concern what files are compacted. "
+ "They only need to know what buckets are compacted.");
write.compact(partition, bucket, true);
// `minor` compact strategy is supported in batch mode.
write.compact(partition, bucket, fullCompaction);
}
}

Expand Down
Loading

0 comments on commit 8eda6ce

Please sign in to comment.