From 8eda6ce6b286b9716173d7aa15da2ec84f18e0af Mon Sep 17 00:00:00 2001 From: LinMingQiang <1356469429@qq.com> Date: Wed, 20 Nov 2024 23:13:16 +0800 Subject: [PATCH] [flink] Support minor compact strategy for dedicated compaction action. --- .../maintenance/dedicated-compaction.md | 8 + .../paimon/flink/action/ActionFactory.java | 3 + .../paimon/flink/action/CompactAction.java | 17 +- .../flink/action/CompactActionFactory.java | 7 +- .../flink/action/CompactDatabaseAction.java | 47 ++--- .../action/CompactDatabaseActionFactory.java | 8 +- .../sink/CombinedTableCompactorSink.java | 17 +- .../flink/sink/CompactorSinkBuilder.java | 10 +- .../sink/MultiTablesStoreCompactOperator.java | 6 +- .../flink/action/CompactActionITCase.java | 35 ---- .../flink/action/CompactActionITCaseBase.java | 36 ++++ .../action/MinorCompactActionITCase.java | 177 ++++++++++++++++++ .../flink/sink/CompactorSinkITCase.java | 14 +- 13 files changed, 310 insertions(+), 75 deletions(-) create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MinorCompactActionITCase.java diff --git a/docs/content/maintenance/dedicated-compaction.md b/docs/content/maintenance/dedicated-compaction.md index c0010bf9cc854..b2ef182856af0 100644 --- a/docs/content/maintenance/dedicated-compaction.md +++ b/docs/content/maintenance/dedicated-compaction.md @@ -107,6 +107,7 @@ Run the following command to submit a compaction job for the table. --database \ --table \ [--partition ] \ + [--compact_strategy ] \ [--table_conf ] \ [--catalog_conf [--catalog_conf ...]] ``` @@ -123,10 +124,14 @@ Example: compact table --partition dt=20221126,hh=08 \ --partition dt=20221127,hh=09 \ --table_conf sink.parallelism=10 \ + --compact_strategy minor \ --catalog_conf s3.endpoint=https://****.com \ --catalog_conf s3.access-key=***** \ --catalog_conf s3.secret-key=***** ``` +* `--compact_strategy` Determines how to pick files to be merged, the default is determined by the runtime execution mode, streaming-mode use `minor` strategy and batch-mode use `full` strategy. + * `full` : Only support in batch mode. All files will be picked up for merging. + * `minor` : Pick the set of files that need to be merged based on specified conditions. You can use `-D execution.runtime-mode=batch` or `-yD execution.runtime-mode=batch` (for the ON-YARN scenario) to control batch or streaming mode. If you submit a batch job, all current table files will be compacted. If you submit a streaming job, the job will continuously monitor new changes @@ -190,6 +195,7 @@ CALL sys.compact_database( [--including_tables ] \ [--excluding_tables ] \ [--mode ] \ + [--compact_strategy ] \ [--catalog_conf [--catalog_conf ...]] \ [--table_conf [--table_conf ...]] ``` @@ -346,6 +352,7 @@ CALL sys.compact(`table` => 'default.T', 'partition_idle_time' => '1 d') --table \ --partition_idle_time \ [--partition ] \ + [--compact_strategy ] \ [--catalog_conf [--catalog_conf ...]] \ [--table_conf [--table_conf ] ...] ``` @@ -406,6 +413,7 @@ CALL sys.compact_database( [--including_tables ] \ [--excluding_tables ] \ [--mode ] \ + [--compact_strategy ] \ [--catalog_conf [--catalog_conf ...]] \ [--table_conf [--table_conf ...]] ``` diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java index 43719f715d9de..00fb7984a7719 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java @@ -58,6 +58,9 @@ public interface ActionFactory extends Factory { String TIMESTAMPFORMATTER = "timestamp_formatter"; String EXPIRE_STRATEGY = "expire_strategy"; String TIMESTAMP_PATTERN = "timestamp_pattern"; + // Supports `full` and `minor`. + String COMPACT_STRATEGY = "compact_strategy"; + String MINOR = "minor"; Optional create(MultipleParameterToolAdapter params); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java index ce88857f1b141..b96b223122e98 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java @@ -59,6 +59,8 @@ public class CompactAction extends TableActionBase { @Nullable private Duration partitionIdleTime = null; + private Boolean fullCompaction; + public CompactAction(String warehouse, String database, String tableName) { this(warehouse, database, tableName, Collections.emptyMap(), Collections.emptyMap()); } @@ -100,6 +102,11 @@ public CompactAction withPartitionIdleTime(@Nullable Duration partitionIdleTime) return this; } + public CompactAction withFullCompaction(Boolean fullCompaction) { + this.fullCompaction = fullCompaction; + return this; + } + @Override public void build() throws Exception { ReadableConfig conf = env.getConfiguration(); @@ -124,6 +131,13 @@ public void build() throws Exception { private void buildForTraditionalCompaction( StreamExecutionEnvironment env, FileStoreTable table, boolean isStreaming) throws Exception { + if (fullCompaction == null) { + fullCompaction = !isStreaming; + } else { + Preconditions.checkArgument( + !(fullCompaction && isStreaming), + "full compact strategy is only supported in batch mode. Please add -Dexecution.runtime-mode=BATCH."); + } if (isStreaming) { // for completely asynchronous compaction HashMap dynamicOptions = @@ -138,8 +152,7 @@ private void buildForTraditionalCompaction( } CompactorSourceBuilder sourceBuilder = new CompactorSourceBuilder(identifier.getFullName(), table); - CompactorSinkBuilder sinkBuilder = - new CompactorSinkBuilder(table).withFullCompaction(!isStreaming); + CompactorSinkBuilder sinkBuilder = new CompactorSinkBuilder(table, fullCompaction); sourceBuilder.withPartitionPredicate(getPredicate()); DataStreamSource source = diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java index f43c7a747c990..ee2825dde5ba0 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java @@ -76,6 +76,10 @@ public Optional create(MultipleParameterToolAdapter params) { action.withPartitionIdleTime( TimeUtils.parseDuration(params.get(PARTITION_IDLE_TIME))); } + String compactStrategy = params.get(COMPACT_STRATEGY); + if (compactStrategy != null) { + action.withFullCompaction(!compactStrategy.trim().equalsIgnoreCase(MINOR)); + } } if (params.has(PARTITION)) { @@ -101,7 +105,8 @@ public void printHelp() { + "[--order_strategy ]" + "[--table_conf =]" + "[--order_by ]" - + "[--partition_idle_time ]"); + + "[--partition_idle_time ]" + + "[--compact_strategy ]"); System.out.println( " compact --warehouse s3://path/to/warehouse --database " + "--table [--catalog_conf [--catalog_conf ...]]"); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java index 471c6fdd4da69..0c8d459c95a33 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java @@ -72,6 +72,10 @@ public class CompactDatabaseAction extends ActionBase { @Nullable private Duration partitionIdleTime = null; + private Boolean fullCompaction; + + private boolean isStreaming; + public CompactDatabaseAction(String warehouse, Map catalogConfig) { super(warehouse, catalogConfig); } @@ -110,6 +114,11 @@ public CompactDatabaseAction withPartitionIdleTime(@Nullable Duration partitionI return this; } + public CompactDatabaseAction withFullCompaction(boolean fullCompaction) { + this.fullCompaction = fullCompaction; + return this; + } + private boolean shouldCompactionTable(String paimonFullTableName) { boolean shouldCompaction = includingPattern.matcher(paimonFullTableName).matches(); if (excludingPattern != null) { @@ -124,6 +133,12 @@ private boolean shouldCompactionTable(String paimonFullTableName) { @Override public void build() { + ReadableConfig conf = env.getConfiguration(); + isStreaming = conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING; + + if (fullCompaction == null) { + fullCompaction = !isStreaming; + } if (databaseCompactMode == MultiTablesSinkMode.DIVIDED) { buildForDividedMode(); } else { @@ -170,24 +185,19 @@ private void buildForDividedMode() { !tableMap.isEmpty(), "no tables to be compacted. possible cause is that there are no tables detected after pattern matching"); - ReadableConfig conf = env.getConfiguration(); - boolean isStreaming = - conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING; for (Map.Entry entry : tableMap.entrySet()) { FileStoreTable fileStoreTable = entry.getValue(); switch (fileStoreTable.bucketMode()) { case BUCKET_UNAWARE: { - buildForUnawareBucketCompaction( - env, entry.getKey(), fileStoreTable, isStreaming); + buildForUnawareBucketCompaction(env, entry.getKey(), fileStoreTable); break; } case HASH_FIXED: case HASH_DYNAMIC: default: { - buildForTraditionalCompaction( - env, entry.getKey(), fileStoreTable, isStreaming); + buildForTraditionalCompaction(env, entry.getKey(), fileStoreTable); } } } @@ -195,9 +205,6 @@ private void buildForDividedMode() { private void buildForCombinedMode() { - ReadableConfig conf = env.getConfiguration(); - boolean isStreaming = - conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING; CombinedTableCompactorSourceBuilder sourceBuilder = new CombinedTableCompactorSourceBuilder( catalogLoader(), @@ -234,15 +241,17 @@ private void buildForCombinedMode() { .buildForUnawareBucketsTableSource(), parallelism); - new CombinedTableCompactorSink(catalogLoader(), tableOptions) + new CombinedTableCompactorSink(catalogLoader(), tableOptions, fullCompaction) .sinkFrom(awareBucketTableSource, unawareBucketTableSource); } private void buildForTraditionalCompaction( - StreamExecutionEnvironment env, - String fullName, - FileStoreTable table, - boolean isStreaming) { + StreamExecutionEnvironment env, String fullName, FileStoreTable table) { + + Preconditions.checkArgument( + !(fullCompaction && isStreaming), + "full compact strategy is only supported in batch mode. Please add -Dexecution.runtime-mode=BATCH."); + if (isStreaming) { // for completely asynchronous compaction HashMap dynamicOptions = @@ -259,8 +268,7 @@ private void buildForTraditionalCompaction( CompactorSourceBuilder sourceBuilder = new CompactorSourceBuilder(fullName, table) .withPartitionIdleTime(partitionIdleTime); - CompactorSinkBuilder sinkBuilder = - new CompactorSinkBuilder(table).withFullCompaction(!isStreaming); + CompactorSinkBuilder sinkBuilder = new CompactorSinkBuilder(table, fullCompaction); DataStreamSource source = sourceBuilder.withEnv(env).withContinuousMode(isStreaming).build(); @@ -268,10 +276,7 @@ private void buildForTraditionalCompaction( } private void buildForUnawareBucketCompaction( - StreamExecutionEnvironment env, - String fullName, - FileStoreTable table, - boolean isStreaming) { + StreamExecutionEnvironment env, String fullName, FileStoreTable table) { UnawareBucketCompactionTopoBuilder unawareBucketCompactionTopoBuilder = new UnawareBucketCompactionTopoBuilder(env, fullName, table); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseActionFactory.java index b268709078093..f67504a029628 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseActionFactory.java @@ -55,6 +55,11 @@ public Optional create(MultipleParameterToolAdapter params) { action.withPartitionIdleTime(TimeUtils.parseDuration(partitionIdleTime)); } + String compactStrategy = params.get(COMPACT_STRATEGY); + if (compactStrategy != null && compactStrategy.trim().equalsIgnoreCase(MINOR)) { + action.withFullCompaction(false); + } + return Optional.of(action); } @@ -70,7 +75,8 @@ public void printHelp() { + "[--including_tables ] " + "[--excluding_tables ] " + "[--mode ]" - + "[--partition_idle_time ]"); + + "[--partition_idle_time ]" + + "[--compact_strategy ]"); System.out.println( " compact_database --warehouse s3://path/to/warehouse --including_databases " + "[--catalog_conf [--catalog_conf ...]]"); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java index 87a28091fa302..ce4e37305909a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java @@ -57,11 +57,15 @@ public class CombinedTableCompactorSink implements Serializable { private final Catalog.Loader catalogLoader; private final boolean ignorePreviousFiles; + private final boolean fullCompaction; + private final Options options; - public CombinedTableCompactorSink(Catalog.Loader catalogLoader, Options options) { + public CombinedTableCompactorSink( + Catalog.Loader catalogLoader, Options options, boolean fullCompaction) { this.catalogLoader = catalogLoader; this.ignorePreviousFiles = false; + this.fullCompaction = fullCompaction; this.options = options; } @@ -104,7 +108,10 @@ public DataStream doWrite( String.format("%s-%s", "Multi-Bucket-Table", WRITER_NAME), new MultiTableCommittableTypeInfo(), combinedMultiComacptionWriteOperator( - env.getCheckpointConfig(), isStreaming, commitUser)) + env.getCheckpointConfig(), + isStreaming, + fullCompaction, + commitUser)) .setParallelism(awareBucketTableSource.getParallelism()); SingleOutputStreamOperator unawareBucketTableRewriter = @@ -168,13 +175,17 @@ protected DataStreamSink doCommit( // TODO:refactor FlinkSink to adopt this sink protected OneInputStreamOperator combinedMultiComacptionWriteOperator( - CheckpointConfig checkpointConfig, boolean isStreaming, String commitUser) { + CheckpointConfig checkpointConfig, + boolean isStreaming, + boolean fullCompaction, + String commitUser) { return new MultiTablesStoreCompactOperator( catalogLoader, commitUser, checkpointConfig, isStreaming, ignorePreviousFiles, + fullCompaction, options); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSinkBuilder.java index 2173b1d34a72e..2d84ae6726fd2 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSinkBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSinkBuilder.java @@ -37,10 +37,11 @@ public class CompactorSinkBuilder { private DataStream input; - private boolean fullCompaction; + private final boolean fullCompaction; - public CompactorSinkBuilder(FileStoreTable table) { + public CompactorSinkBuilder(FileStoreTable table, boolean fullCompaction) { this.table = table; + this.fullCompaction = fullCompaction; } public CompactorSinkBuilder withInput(DataStream input) { @@ -48,11 +49,6 @@ public CompactorSinkBuilder withInput(DataStream input) { return this; } - public CompactorSinkBuilder withFullCompaction(boolean fullCompaction) { - this.fullCompaction = fullCompaction; - return this; - } - public DataStreamSink build() { BucketMode bucketMode = table.bucketMode(); switch (bucketMode) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java index 7cb5d30c2f8ed..e59b3470545bf 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java @@ -62,6 +62,7 @@ public class MultiTablesStoreCompactOperator private final CheckpointConfig checkpointConfig; private final boolean isStreaming; private final boolean ignorePreviousFiles; + private final boolean fullCompaction; private final String initialCommitUser; private transient StoreSinkWriteState state; @@ -80,6 +81,7 @@ public MultiTablesStoreCompactOperator( CheckpointConfig checkpointConfig, boolean isStreaming, boolean ignorePreviousFiles, + boolean fullCompaction, Options options) { super(options); this.catalogLoader = catalogLoader; @@ -87,6 +89,7 @@ public MultiTablesStoreCompactOperator( this.checkpointConfig = checkpointConfig; this.isStreaming = isStreaming; this.ignorePreviousFiles = ignorePreviousFiles; + this.fullCompaction = fullCompaction; } @Override @@ -165,7 +168,8 @@ public void processElement(StreamRecord element) throws Exception { files.isEmpty(), "Batch compact job does not concern what files are compacted. " + "They only need to know what buckets are compacted."); - write.compact(partition, bucket, true); + // `minor` compact strategy is supported in batch mode. + write.compact(partition, bucket, fullCompaction); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java index bc849f0a135ff..2c4fb64f331c3 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java @@ -23,13 +23,9 @@ import org.apache.paimon.data.BinaryString; import org.apache.paimon.flink.FlinkConnectorOptions; import org.apache.paimon.table.FileStoreTable; -import org.apache.paimon.table.sink.StreamWriteBuilder; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.StreamTableScan; import org.apache.paimon.table.source.TableScan; -import org.apache.paimon.types.DataType; -import org.apache.paimon.types.DataTypes; -import org.apache.paimon.types.RowType; import org.apache.paimon.utils.CommonTestUtils; import org.apache.paimon.utils.SnapshotManager; @@ -56,12 +52,6 @@ /** IT cases for {@link CompactAction}. */ public class CompactActionITCase extends CompactActionITCaseBase { - private static final DataType[] FIELD_TYPES = - new DataType[] {DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.STRING()}; - - private static final RowType ROW_TYPE = - RowType.of(FIELD_TYPES, new String[] {"k", "v", "hh", "dt"}); - @Test @Timeout(60) public void testBatchCompact() throws Exception { @@ -402,31 +392,6 @@ public void testWrongUsage() throws Exception { .hasMessage("sort compact do not support 'partition_idle_time'."); } - private FileStoreTable prepareTable( - List partitionKeys, - List primaryKeys, - List bucketKey, - Map tableOptions) - throws Exception { - FileStoreTable table = - createFileStoreTable(ROW_TYPE, partitionKeys, primaryKeys, bucketKey, tableOptions); - - StreamWriteBuilder streamWriteBuilder = - table.newStreamWriteBuilder().withCommitUser(commitUser); - write = streamWriteBuilder.newWrite(); - commit = streamWriteBuilder.newCommit(); - - return table; - } - - private void checkLatestSnapshot( - FileStoreTable table, long snapshotId, Snapshot.CommitKind commitKind) { - SnapshotManager snapshotManager = table.snapshotManager(); - Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId()); - assertThat(snapshot.id()).isEqualTo(snapshotId); - assertThat(snapshot.commitKind()).isEqualTo(commitKind); - } - private void runAction(boolean isStreaming) throws Exception { runAction(isStreaming, false); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCaseBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCaseBase.java index 4c646444cb72c..41d01bdf7f35e 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCaseBase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCaseBase.java @@ -18,17 +18,22 @@ package org.apache.paimon.flink.action; +import org.apache.paimon.Snapshot; import org.apache.paimon.manifest.FileKind; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.operation.FileStoreScan; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.StreamWriteBuilder; import org.apache.paimon.table.source.StreamTableScan; import org.apache.paimon.table.source.TableScan; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.SnapshotManager; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.concurrent.TimeoutException; @@ -37,6 +42,12 @@ /** Base IT cases for {@link CompactAction} and {@link CompactDatabaseAction} . */ public class CompactActionITCaseBase extends ActionITCaseBase { + protected static final DataType[] FIELD_TYPES = + new DataType[] {DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.STRING()}; + + protected static final RowType ROW_TYPE = + RowType.of(FIELD_TYPES, new String[] {"k", "v", "hh", "dt"}); + protected void validateResult( FileStoreTable table, RowType rowType, @@ -87,4 +98,29 @@ protected void checkFileAndRowSize( assertThat(files.size()).isEqualTo(fileNum); assertThat(count).isEqualTo(rowCount); } + + protected void checkLatestSnapshot( + FileStoreTable table, long snapshotId, Snapshot.CommitKind commitKind) { + SnapshotManager snapshotManager = table.snapshotManager(); + Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId()); + assertThat(snapshot.id()).isEqualTo(snapshotId); + assertThat(snapshot.commitKind()).isEqualTo(commitKind); + } + + protected FileStoreTable prepareTable( + List partitionKeys, + List primaryKeys, + List bucketKey, + Map tableOptions) + throws Exception { + FileStoreTable table = + createFileStoreTable(ROW_TYPE, partitionKeys, primaryKeys, bucketKey, tableOptions); + + StreamWriteBuilder streamWriteBuilder = + table.newStreamWriteBuilder().withCommitUser(commitUser); + write = streamWriteBuilder.newWrite(); + commit = streamWriteBuilder.newCommit(); + + return table; + } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MinorCompactActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MinorCompactActionITCase.java new file mode 100644 index 0000000000000..5a32597fcf1ae --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MinorCompactActionITCase.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.Snapshot; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.source.DataSplit; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** IT cases for compact strategy {@link CompactAction}. */ +public class MinorCompactActionITCase extends CompactActionITCaseBase { + + @Test + @Timeout(60) + public void testBatchMinorCompactStrategy() throws Exception { + FileStoreTable table = + prepareTable( + Arrays.asList("dt", "hh"), + Arrays.asList("dt", "hh", "k"), + Collections.emptyList(), + Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "true")); + + writeData( + rowData(1, 100, 15, BinaryString.fromString("20221208")), + rowData(1, 100, 16, BinaryString.fromString("20221208"))); + + writeData( + rowData(2, 100, 15, BinaryString.fromString("20221208")), + rowData(2, 100, 16, BinaryString.fromString("20221208"))); + + checkLatestSnapshot(table, 2, Snapshot.CommitKind.APPEND); + + CompactAction action = + createAction( + CompactAction.class, + "compact", + "--warehouse", + warehouse, + "--database", + database, + "--table", + tableName, + "--compact_strategy", + "minor", + "--table_conf", + CoreOptions.NUM_SORTED_RUNS_COMPACTION_TRIGGER.key() + "=3"); + StreamExecutionEnvironment env = streamExecutionEnvironmentBuilder().batchMode().build(); + action.withStreamExecutionEnvironment(env).build(); + env.execute(); + + // Due to the limitation of parameter 'num-sorted-run.compaction-trigger', so compact is not + // performed. + checkLatestSnapshot(table, 2, Snapshot.CommitKind.APPEND); + + // Make par-15 has 3 datafile and par-16 has 2 datafile, so par-16 will not be picked out to + // compact. + writeData(rowData(2, 100, 15, BinaryString.fromString("20221208"))); + + env = streamExecutionEnvironmentBuilder().batchMode().build(); + action.withStreamExecutionEnvironment(env).build(); + env.execute(); + + checkLatestSnapshot(table, 4, Snapshot.CommitKind.COMPACT); + + List splits = table.newSnapshotReader().read().dataSplits(); + assertThat(splits.size()).isEqualTo(2); + for (DataSplit split : splits) { + // Par-16 is not compacted. + assertThat(split.dataFiles().size()) + .isEqualTo(split.partition().getInt(1) == 16 ? 2 : 1); + } + } + + @Test + @Timeout(60) + public void testBatchFullCompactStrategy() throws Exception { + FileStoreTable table = + prepareTable( + Arrays.asList("dt", "hh"), + Arrays.asList("dt", "hh", "k"), + Collections.emptyList(), + Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "true")); + + writeData( + rowData(1, 100, 15, BinaryString.fromString("20221208")), + rowData(1, 100, 16, BinaryString.fromString("20221208"))); + + writeData( + rowData(2, 100, 15, BinaryString.fromString("20221208")), + rowData(2, 100, 16, BinaryString.fromString("20221208"))); + + checkLatestSnapshot(table, 2, Snapshot.CommitKind.APPEND); + + CompactAction action = + createAction( + CompactAction.class, + "compact", + "--warehouse", + warehouse, + "--database", + database, + "--table", + tableName, + "--compact_strategy", + "full", + "--table_conf", + CoreOptions.NUM_SORTED_RUNS_COMPACTION_TRIGGER.key() + "=3"); + StreamExecutionEnvironment env = streamExecutionEnvironmentBuilder().batchMode().build(); + action.withStreamExecutionEnvironment(env).build(); + env.execute(); + + checkLatestSnapshot(table, 3, Snapshot.CommitKind.COMPACT); + + List splits = table.newSnapshotReader().read().dataSplits(); + assertThat(splits.size()).isEqualTo(2); + for (DataSplit split : splits) { + assertThat(split.dataFiles().size()).isEqualTo(1); + } + } + + @Test + @Timeout(60) + public void testStreamingFullCompactStrategy() throws Exception { + prepareTable( + Arrays.asList("dt", "hh"), + Arrays.asList("dt", "hh", "k"), + Collections.emptyList(), + Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "true")); + CompactAction action = + createAction( + CompactAction.class, + "compact", + "--warehouse", + warehouse, + "--database", + database, + "--table", + tableName, + "--compact_strategy", + "full", + "--table_conf", + CoreOptions.NUM_SORTED_RUNS_COMPACTION_TRIGGER.key() + "=3"); + StreamExecutionEnvironment env = + streamExecutionEnvironmentBuilder().streamingMode().build(); + Assertions.assertThatThrownBy(() -> action.withStreamExecutionEnvironment(env).build()) + .hasMessage( + "full compact strategy is only supported in batch mode. Please add -Dexecution.runtime-mode=BATCH."); + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java index c38ac4b3d685c..42293ca2842e1 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java @@ -132,7 +132,7 @@ public void testCompact() throws Exception { .withContinuousMode(false) .withPartitionPredicate(predicate) .build(); - new CompactorSinkBuilder(table).withFullCompaction(true).withInput(source).build(); + new CompactorSinkBuilder(table, true).withInput(source).build(); env.execute(); snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId()); @@ -181,8 +181,8 @@ public void testCompactParallelism() throws Exception { FlinkConnectorOptions.SINK_PARALLELISM.key(), String.valueOf(sinkParalellism)); } - })) - .withFullCompaction(false) + }), + false) .withInput(source) .build(); @@ -275,7 +275,13 @@ protected StoreCompactOperator createCompactOperator(FileStoreTable table) { protected MultiTablesStoreCompactOperator createMultiTablesCompactOperator( Catalog.Loader catalogLoader) throws Exception { return new MultiTablesStoreCompactOperator( - catalogLoader, commitUser, new CheckpointConfig(), false, false, new Options()); + catalogLoader, + commitUser, + new CheckpointConfig(), + false, + false, + true, + new Options()); } private static byte[] partition(String dt, int hh) {