diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java index 63aa6c906b947..1354e9999b82d 100644 --- a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java +++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java @@ -31,6 +31,9 @@ import java.util.Collections; import java.util.Map; +import static org.apache.paimon.flink.action.ActionFactory.MINOR; +import static org.apache.paimon.flink.action.CompactActionFactory.checkCompactStrategy; + /** * Stay compatible with 1.18 procedure which doesn't support named argument. Usage: * @@ -59,9 +62,9 @@ public String[] call(ProcedureContext procedureContext, String tableId) throws E return call(procedureContext, tableId, ""); } - public String[] call(ProcedureContext procedureContext, String tableId, String partitions) + public String[] call(ProcedureContext procedureContext, String tableId, String compactStrategy) throws Exception { - return call(procedureContext, tableId, partitions, "", "", "", ""); + return call(procedureContext, tableId, "", "", compactStrategy, "", ""); } public String[] call( @@ -114,6 +117,7 @@ public String[] call( procedureContext, tableId, partitions, + null, orderStrategy, orderByColumns, tableOptions, @@ -125,6 +129,7 @@ public String[] call( ProcedureContext procedureContext, String tableId, String partitions, + String compactStrategy, String orderStrategy, String orderByColumns, String tableOptions, @@ -152,6 +157,10 @@ public String[] call( if (!(StringUtils.isNullOrWhitespaceOnly(partitionIdleTime))) { action.withPartitionIdleTime(TimeUtils.parseDuration(partitionIdleTime)); } + + if (checkCompactStrategy(compactStrategy)) { + action.withFullCompaction(!compactStrategy.trim().equalsIgnoreCase(MINOR)); + } jobName = "Compact Job"; } else if (!orderStrategy.isEmpty() && !orderByColumns.isEmpty()) { Preconditions.checkArgument( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java index 00fb7984a7719..fbf8f12f49eb6 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java @@ -61,6 +61,7 @@ public interface ActionFactory extends Factory { // Supports `full` and `minor`. String COMPACT_STRATEGY = "compact_strategy"; String MINOR = "minor"; + String FULL = "full"; Optional create(MultipleParameterToolAdapter params); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java index d7f6a2bf5aa24..bed4ff88a9c48 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java @@ -77,7 +77,7 @@ public Optional create(MultipleParameterToolAdapter params) { TimeUtils.parseDuration(params.get(PARTITION_IDLE_TIME))); } String compactStrategy = params.get(COMPACT_STRATEGY); - if (compactStrategy != null) { + if (checkCompactStrategy(compactStrategy)) { action.withFullCompaction(!compactStrategy.trim().equalsIgnoreCase(MINOR)); } } @@ -92,6 +92,19 @@ public Optional create(MultipleParameterToolAdapter params) { return Optional.of(action); } + public static boolean checkCompactStrategy(String compactStrategy) { + if (compactStrategy != null) { + Preconditions.checkArgument( + compactStrategy.equalsIgnoreCase(MINOR) + || compactStrategy.equalsIgnoreCase(FULL), + String.format( + "The compact strategy only supports 'full' or 'minor', but '%s' is configured.", + compactStrategy)); + return true; + } + return false; + } + @Override public void printHelp() { System.out.println( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseActionFactory.java index 0b44cd54e995d..01fc6123701ec 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseActionFactory.java @@ -22,6 +22,8 @@ import java.util.Optional; +import static org.apache.paimon.flink.action.CompactActionFactory.checkCompactStrategy; + /** Factory to create {@link CompactDatabaseAction}. */ public class CompactDatabaseActionFactory implements ActionFactory { @@ -56,8 +58,8 @@ public Optional create(MultipleParameterToolAdapter params) { } String compactStrategy = params.get(COMPACT_STRATEGY); - if (compactStrategy != null && compactStrategy.trim().equalsIgnoreCase(MINOR)) { - action.withFullCompaction(false); + if (checkCompactStrategy(compactStrategy)) { + action.withFullCompaction(!compactStrategy.trim().equalsIgnoreCase(MINOR)); } return Optional.of(action); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java index 8589069126988..12a4174aa41a6 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java @@ -32,6 +32,8 @@ import java.util.Collections; import java.util.Map; +import static org.apache.paimon.flink.action.ActionFactory.MINOR; +import static org.apache.paimon.flink.action.CompactActionFactory.checkCompactStrategy; import static org.apache.paimon.utils.ParameterUtils.getPartitions; import static org.apache.paimon.utils.ParameterUtils.parseCommaSeparatedKeyValues; import static org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly; @@ -48,6 +50,10 @@ public class CompactProcedure extends ProcedureBase { name = "partitions", type = @DataTypeHint("STRING"), isOptional = true), + @ArgumentHint( + name = "compact_strategy", + type = @DataTypeHint("STRING"), + isOptional = true), @ArgumentHint( name = "order_strategy", type = @DataTypeHint("STRING"), @@ -64,6 +70,7 @@ public String[] call( ProcedureContext procedureContext, String tableId, String partitions, + String compactStrategy, String orderStrategy, String orderByColumns, String tableOptions, @@ -90,6 +97,10 @@ public String[] call( if (!isNullOrWhitespaceOnly(partitionIdleTime)) { action.withPartitionIdleTime(TimeUtils.parseDuration(partitionIdleTime)); } + + if (checkCompactStrategy(compactStrategy)) { + action.withFullCompaction(!compactStrategy.trim().equalsIgnoreCase(MINOR)); + } jobName = "Compact Job"; } else if (!isNullOrWhitespaceOnly(orderStrategy) && !isNullOrWhitespaceOnly(orderByColumns)) { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MinorCompactActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MinorCompactActionITCase.java index 8e15a6eb4da75..0373eb01a2d92 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MinorCompactActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MinorCompactActionITCase.java @@ -174,4 +174,32 @@ public void testStreamingFullCompactStrategy() throws Exception { .hasMessage( "The full compact strategy is only supported in batch mode. Please add -Dexecution.runtime-mode=BATCH."); } + + @Test + @Timeout(60) + public void testCompactStrategyWithWrongUsage() throws Exception { + prepareTable( + Arrays.asList("dt", "hh"), + Arrays.asList("dt", "hh", "k"), + Collections.emptyList(), + Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "true")); + Assertions.assertThatThrownBy( + () -> + createAction( + CompactAction.class, + "compact", + "--warehouse", + warehouse, + "--database", + database, + "--table", + tableName, + "--compact_strategy", + "wrong_usage", + "--table_conf", + CoreOptions.NUM_SORTED_RUNS_COMPACTION_TRIGGER.key() + + "=3")) + .hasMessage( + "The compact strategy only supports 'full' or 'minor', but 'wrong_usage' is configured."); + } }