Skip to content

Commit

Permalink
[flink] flink procedure.
Browse files Browse the repository at this point in the history
  • Loading branch information
LinMingQiang committed Nov 26, 2024
1 parent 9c959d0 commit d618b2f
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
import java.util.Collections;
import java.util.Map;

import static org.apache.paimon.flink.action.ActionFactory.MINOR;
import static org.apache.paimon.flink.action.CompactActionFactory.checkCompactStrategy;

/**
* Stay compatible with 1.18 procedure which doesn't support named argument. Usage:
*
Expand Down Expand Up @@ -59,9 +62,9 @@ public String[] call(ProcedureContext procedureContext, String tableId) throws E
return call(procedureContext, tableId, "");
}

public String[] call(ProcedureContext procedureContext, String tableId, String partitions)
public String[] call(ProcedureContext procedureContext, String tableId, String compactStrategy)
throws Exception {
return call(procedureContext, tableId, partitions, "", "", "", "");
return call(procedureContext, tableId, "", "", compactStrategy, "", "");
}

public String[] call(
Expand Down Expand Up @@ -114,6 +117,7 @@ public String[] call(
procedureContext,
tableId,
partitions,
null,
orderStrategy,
orderByColumns,
tableOptions,
Expand All @@ -125,6 +129,7 @@ public String[] call(
ProcedureContext procedureContext,
String tableId,
String partitions,
String compactStrategy,
String orderStrategy,
String orderByColumns,
String tableOptions,
Expand Down Expand Up @@ -152,6 +157,10 @@ public String[] call(
if (!(StringUtils.isNullOrWhitespaceOnly(partitionIdleTime))) {
action.withPartitionIdleTime(TimeUtils.parseDuration(partitionIdleTime));
}

if (checkCompactStrategy(compactStrategy)) {
action.withFullCompaction(!compactStrategy.trim().equalsIgnoreCase(MINOR));
}
jobName = "Compact Job";
} else if (!orderStrategy.isEmpty() && !orderByColumns.isEmpty()) {
Preconditions.checkArgument(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public interface ActionFactory extends Factory {
// Supports `full` and `minor`.
String COMPACT_STRATEGY = "compact_strategy";
String MINOR = "minor";
String FULL = "full";

Optional<Action> create(MultipleParameterToolAdapter params);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {
TimeUtils.parseDuration(params.get(PARTITION_IDLE_TIME)));
}
String compactStrategy = params.get(COMPACT_STRATEGY);
if (compactStrategy != null) {
if (checkCompactStrategy(compactStrategy)) {
action.withFullCompaction(!compactStrategy.trim().equalsIgnoreCase(MINOR));
}
}
Expand All @@ -92,6 +92,19 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {
return Optional.of(action);
}

public static boolean checkCompactStrategy(String compactStrategy) {
if (compactStrategy != null) {
Preconditions.checkArgument(
compactStrategy.equalsIgnoreCase(MINOR)
|| compactStrategy.equalsIgnoreCase(FULL),
String.format(
"The compact strategy only supports 'full' or 'minor', but '%s' is configured.",
compactStrategy));
return true;
}
return false;
}

@Override
public void printHelp() {
System.out.println(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

import java.util.Optional;

import static org.apache.paimon.flink.action.CompactActionFactory.checkCompactStrategy;

/** Factory to create {@link CompactDatabaseAction}. */
public class CompactDatabaseActionFactory implements ActionFactory {

Expand Down Expand Up @@ -56,8 +58,8 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {
}

String compactStrategy = params.get(COMPACT_STRATEGY);
if (compactStrategy != null && compactStrategy.trim().equalsIgnoreCase(MINOR)) {
action.withFullCompaction(false);
if (checkCompactStrategy(compactStrategy)) {
action.withFullCompaction(!compactStrategy.trim().equalsIgnoreCase(MINOR));
}

return Optional.of(action);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import java.util.Collections;
import java.util.Map;

import static org.apache.paimon.flink.action.ActionFactory.MINOR;
import static org.apache.paimon.flink.action.CompactActionFactory.checkCompactStrategy;
import static org.apache.paimon.utils.ParameterUtils.getPartitions;
import static org.apache.paimon.utils.ParameterUtils.parseCommaSeparatedKeyValues;
import static org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly;
Expand All @@ -48,6 +50,10 @@ public class CompactProcedure extends ProcedureBase {
name = "partitions",
type = @DataTypeHint("STRING"),
isOptional = true),
@ArgumentHint(
name = "compact_strategy",
type = @DataTypeHint("STRING"),
isOptional = true),
@ArgumentHint(
name = "order_strategy",
type = @DataTypeHint("STRING"),
Expand All @@ -64,6 +70,7 @@ public String[] call(
ProcedureContext procedureContext,
String tableId,
String partitions,
String compactStrategy,
String orderStrategy,
String orderByColumns,
String tableOptions,
Expand All @@ -90,6 +97,10 @@ public String[] call(
if (!isNullOrWhitespaceOnly(partitionIdleTime)) {
action.withPartitionIdleTime(TimeUtils.parseDuration(partitionIdleTime));
}

if (checkCompactStrategy(compactStrategy)) {
action.withFullCompaction(!compactStrategy.trim().equalsIgnoreCase(MINOR));
}
jobName = "Compact Job";
} else if (!isNullOrWhitespaceOnly(orderStrategy)
&& !isNullOrWhitespaceOnly(orderByColumns)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,4 +174,32 @@ public void testStreamingFullCompactStrategy() throws Exception {
.hasMessage(
"The full compact strategy is only supported in batch mode. Please add -Dexecution.runtime-mode=BATCH.");
}

@Test
@Timeout(60)
public void testCompactStrategyWithWrongUsage() throws Exception {
prepareTable(
Arrays.asList("dt", "hh"),
Arrays.asList("dt", "hh", "k"),
Collections.emptyList(),
Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "true"));
Assertions.assertThatThrownBy(
() ->
createAction(
CompactAction.class,
"compact",
"--warehouse",
warehouse,
"--database",
database,
"--table",
tableName,
"--compact_strategy",
"wrong_usage",
"--table_conf",
CoreOptions.NUM_SORTED_RUNS_COMPACTION_TRIGGER.key()
+ "=3"))
.hasMessage(
"The compact strategy only supports 'full' or 'minor', but 'wrong_usage' is configured.");
}
}

0 comments on commit d618b2f

Please sign in to comment.