Skip to content

Commit

Permalink
[flink] add flink procedure.
Browse files Browse the repository at this point in the history
  • Loading branch information
LinMingQiang committed Nov 27, 2024
1 parent 8eda6ce commit ee89c4c
Show file tree
Hide file tree
Showing 12 changed files with 196 additions and 10 deletions.
2 changes: 1 addition & 1 deletion docs/content/maintenance/dedicated-compaction.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ Example: compact table
--catalog_conf s3.secret-key=*****
```
* `--compact_strategy` Determines how to pick files to be merged, the default is determined by the runtime execution mode, streaming-mode use `minor` strategy and batch-mode use `full` strategy.
* `full` : Only support in batch mode. All files will be picked up for merging.
* `full` : Only supports batch mode. All files will be selected for merging.
* `minor` : Pick the set of files that need to be merged based on specified conditions.
You can use `-D execution.runtime-mode=batch` or `-yD execution.runtime-mode=batch` (for the ON-YARN scenario) to control batch or streaming mode. If you submit a batch job, all
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
import java.util.Collections;
import java.util.Map;

import static org.apache.paimon.flink.action.ActionFactory.FULL;
import static org.apache.paimon.flink.action.CompactActionFactory.checkCompactStrategy;

/**
* Stay compatible with 1.18 procedure which doesn't support named argument. Usage:
*
Expand Down Expand Up @@ -114,6 +117,7 @@ public String[] call(
procedureContext,
tableId,
partitions,
null,
orderStrategy,
orderByColumns,
tableOptions,
Expand All @@ -125,6 +129,7 @@ public String[] call(
ProcedureContext procedureContext,
String tableId,
String partitions,
String compactStrategy,
String orderStrategy,
String orderByColumns,
String tableOptions,
Expand Down Expand Up @@ -152,6 +157,10 @@ public String[] call(
if (!(StringUtils.isNullOrWhitespaceOnly(partitionIdleTime))) {
action.withPartitionIdleTime(TimeUtils.parseDuration(partitionIdleTime));
}

if (checkCompactStrategy(compactStrategy)) {
action.withFullCompaction(compactStrategy.trim().equalsIgnoreCase(FULL));
}
jobName = "Compact Job";
} else if (!orderStrategy.isEmpty() && !orderByColumns.isEmpty()) {
Preconditions.checkArgument(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,16 @@ public void testCompactDatabaseAndTable() {
sql(
"CALL sys.compact('default.T', '', '', '', 'sink.parallelism=1','pt=1')"))
.doesNotThrowAnyException();
assertThatCode(() -> sql("CALL sys.compact('default.T', '', 'zorder', 'k', '','','5s')"))
assertThatCode(
() ->
sql(
"CALL sys.compact('default.T', '', '' ,'zorder', 'k', '','','5s')"))
.message()
.contains("sort compact do not support 'partition_idle_time'.");

assertThatCode(() -> sql("CALL sys.compact('default.T', '', 'full' ,'', '', '','','')"))
.doesNotThrowAnyException();

assertThatCode(() -> sql("CALL sys.compact_database('default')"))
.doesNotThrowAnyException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public interface ActionFactory extends Factory {
// Supports `full` and `minor`.
String COMPACT_STRATEGY = "compact_strategy";
String MINOR = "minor";
String FULL = "full";

Optional<Action> create(MultipleParameterToolAdapter params);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ private void buildForTraditionalCompaction(
} else {
Preconditions.checkArgument(
!(fullCompaction && isStreaming),
"full compact strategy is only supported in batch mode. Please add -Dexecution.runtime-mode=BATCH.");
"The full compact strategy is only supported in batch mode. Please add -Dexecution.runtime-mode=BATCH.");
}
if (isStreaming) {
// for completely asynchronous compaction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {
TimeUtils.parseDuration(params.get(PARTITION_IDLE_TIME)));
}
String compactStrategy = params.get(COMPACT_STRATEGY);
if (compactStrategy != null) {
action.withFullCompaction(!compactStrategy.trim().equalsIgnoreCase(MINOR));
if (checkCompactStrategy(compactStrategy)) {
action.withFullCompaction(compactStrategy.trim().equalsIgnoreCase(FULL));
}
}

Expand All @@ -92,6 +92,19 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {
return Optional.of(action);
}

public static boolean checkCompactStrategy(String compactStrategy) {
if (compactStrategy != null) {
Preconditions.checkArgument(
compactStrategy.equalsIgnoreCase(MINOR)
|| compactStrategy.equalsIgnoreCase(FULL),
String.format(
"The compact strategy only supports 'full' or 'minor', but '%s' is configured.",
compactStrategy));
return true;
}
return false;
}

@Override
public void printHelp() {
System.out.println(
Expand Down Expand Up @@ -137,6 +150,10 @@ public void printHelp() {
System.out.println(
" compact --warehouse hdfs:///path/to/warehouse --database test_db --table test_table "
+ "--partition_idle_time 10s");
System.out.println(
"--compact_strategy determines how to pick files to be merged, the default is determined by the runtime execution mode. "
+ "`full` : Only supports batch mode. All files will be selected for merging."
+ "`minor`: Pick the set of files that need to be merged based on specified conditions.");
System.out.println(
" compact --warehouse s3:///path/to/warehouse "
+ "--database test_db "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ private void buildForTraditionalCompaction(

Preconditions.checkArgument(
!(fullCompaction && isStreaming),
"full compact strategy is only supported in batch mode. Please add -Dexecution.runtime-mode=BATCH.");
"The full compact strategy is only supported in batch mode. Please add -Dexecution.runtime-mode=BATCH.");

if (isStreaming) {
// for completely asynchronous compaction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

import java.util.Optional;

import static org.apache.paimon.flink.action.CompactActionFactory.checkCompactStrategy;

/** Factory to create {@link CompactDatabaseAction}. */
public class CompactDatabaseActionFactory implements ActionFactory {

Expand Down Expand Up @@ -56,8 +58,8 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {
}

String compactStrategy = params.get(COMPACT_STRATEGY);
if (compactStrategy != null && compactStrategy.trim().equalsIgnoreCase(MINOR)) {
action.withFullCompaction(false);
if (checkCompactStrategy(compactStrategy)) {
action.withFullCompaction(compactStrategy.trim().equalsIgnoreCase(FULL));
}

return Optional.of(action);
Expand Down Expand Up @@ -99,6 +101,11 @@ public void printHelp() {
System.out.println(
"--partition_idle_time is used to do a full compaction for partition which had not receive any new data for 'partition_idle_time' time. And only these partitions will be compacted.");
System.out.println("--partition_idle_time is only supported in batch mode. ");
System.out.println(
"--compact_strategy determines how to pick files to be merged, the default is determined by the runtime execution mode. "
+ "`full` : Only supports batch mode. All files will be selected for merging."
+ "`minor`: Pick the set of files that need to be merged based on specified conditions.");

System.out.println();

System.out.println("Examples:");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import java.util.Collections;
import java.util.Map;

import static org.apache.paimon.flink.action.ActionFactory.FULL;
import static org.apache.paimon.flink.action.CompactActionFactory.checkCompactStrategy;
import static org.apache.paimon.utils.ParameterUtils.getPartitions;
import static org.apache.paimon.utils.ParameterUtils.parseCommaSeparatedKeyValues;
import static org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly;
Expand All @@ -48,6 +50,10 @@ public class CompactProcedure extends ProcedureBase {
name = "partitions",
type = @DataTypeHint("STRING"),
isOptional = true),
@ArgumentHint(
name = "compact_strategy",
type = @DataTypeHint("STRING"),
isOptional = true),
@ArgumentHint(
name = "order_strategy",
type = @DataTypeHint("STRING"),
Expand All @@ -64,6 +70,7 @@ public String[] call(
ProcedureContext procedureContext,
String tableId,
String partitions,
String compactStrategy,
String orderStrategy,
String orderByColumns,
String tableOptions,
Expand All @@ -90,6 +97,10 @@ public String[] call(
if (!isNullOrWhitespaceOnly(partitionIdleTime)) {
action.withPartitionIdleTime(TimeUtils.parseDuration(partitionIdleTime));
}

if (checkCompactStrategy(compactStrategy)) {
action.withFullCompaction(compactStrategy.trim().equalsIgnoreCase(FULL));
}
jobName = "Compact Job";
} else if (!isNullOrWhitespaceOnly(orderStrategy)
&& !isNullOrWhitespaceOnly(orderByColumns)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,13 +162,13 @@ public void processElement(StreamRecord<RowData> element) throws Exception {

if (write.streamingMode()) {
write.notifyNewFiles(snapshotId, partition, bucket, files);
// The full compact is not supported in streaming mode.
write.compact(partition, bucket, false);
} else {
Preconditions.checkArgument(
files.isEmpty(),
"Batch compact job does not concern what files are compacted. "
+ "They only need to know what buckets are compacted.");
// `minor` compact strategy is supported in batch mode.
write.compact(partition, bucket, fullCompaction);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,34 @@ public void testStreamingFullCompactStrategy() throws Exception {
streamExecutionEnvironmentBuilder().streamingMode().build();
Assertions.assertThatThrownBy(() -> action.withStreamExecutionEnvironment(env).build())
.hasMessage(
"full compact strategy is only supported in batch mode. Please add -Dexecution.runtime-mode=BATCH.");
"The full compact strategy is only supported in batch mode. Please add -Dexecution.runtime-mode=BATCH.");
}

@Test
@Timeout(60)
public void testCompactStrategyWithWrongUsage() throws Exception {
prepareTable(
Arrays.asList("dt", "hh"),
Arrays.asList("dt", "hh", "k"),
Collections.emptyList(),
Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "true"));
Assertions.assertThatThrownBy(
() ->
createAction(
CompactAction.class,
"compact",
"--warehouse",
warehouse,
"--database",
database,
"--table",
tableName,
"--compact_strategy",
"wrong_usage",
"--table_conf",
CoreOptions.NUM_SORTED_RUNS_COMPACTION_TRIGGER.key()
+ "=3"))
.hasMessage(
"The compact strategy only supports 'full' or 'minor', but 'wrong_usage' is configured.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.List;
Expand Down Expand Up @@ -240,6 +241,112 @@ public void testDynamicBucketSortCompact() throws Exception {
checkLatestSnapshot(table, 21, Snapshot.CommitKind.OVERWRITE);
}

// ----------------------- Minor Compact -----------------------

@Test
public void testBatchMinorCompactStrategy() throws Exception {
sql(
"CREATE TABLE T ("
+ " k INT,"
+ " v INT,"
+ " hh INT,"
+ " dt STRING,"
+ " PRIMARY KEY (k, dt, hh) NOT ENFORCED"
+ ") PARTITIONED BY (dt, hh) WITH ("
+ " 'write-only' = 'true',"
+ " 'bucket' = '1'"
+ ")");
FileStoreTable table = paimonTable("T");
tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true);

sql("INSERT INTO T VALUES (1, 100, 15, '20221208'), (1, 100, 16, '20221208')");
sql("INSERT INTO T VALUES (2, 100, 15, '20221208'), (2, 100, 16, '20221208')");

checkLatestSnapshot(table, 2, Snapshot.CommitKind.APPEND);

sql(
"CALL sys.compact(`table` => 'default.T', compact_strategy => 'minor', "
+ "options => 'num-sorted-run.compaction-trigger=3')");

checkLatestSnapshot(table, 2, Snapshot.CommitKind.APPEND);

sql("INSERT INTO T VALUES (1, 100, 15, '20221208')");

sql(
"CALL sys.compact(`table` => 'default.T', compact_strategy => 'minor', "
+ "options => 'num-sorted-run.compaction-trigger=3')");

checkLatestSnapshot(table, 4, Snapshot.CommitKind.COMPACT);

List<DataSplit> splits = table.newSnapshotReader().read().dataSplits();
assertThat(splits.size()).isEqualTo(2);
for (DataSplit split : splits) {
// Par-16 is not compacted.
assertThat(split.dataFiles().size())
.isEqualTo(split.partition().getInt(1) == 16 ? 2 : 1);
}
}

@Test
public void testBatchFullCompactStrategy() throws Exception {
sql(
"CREATE TABLE T ("
+ " k INT,"
+ " v INT,"
+ " hh INT,"
+ " dt STRING,"
+ " PRIMARY KEY (k, dt, hh) NOT ENFORCED"
+ ") PARTITIONED BY (dt, hh) WITH ("
+ " 'write-only' = 'true',"
+ " 'bucket' = '1'"
+ ")");
FileStoreTable table = paimonTable("T");
tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true);

sql("INSERT INTO T VALUES (1, 100, 15, '20221208'), (1, 100, 16, '20221208')");
sql("INSERT INTO T VALUES (2, 100, 15, '20221208'), (2, 100, 16, '20221208')");

checkLatestSnapshot(table, 2, Snapshot.CommitKind.APPEND);

sql(
"CALL sys.compact(`table` => 'default.T', compact_strategy => 'full', "
+ "options => 'num-sorted-run.compaction-trigger=3')");

checkLatestSnapshot(table, 3, Snapshot.CommitKind.COMPACT);

List<DataSplit> splits = table.newSnapshotReader().read().dataSplits();
assertThat(splits.size()).isEqualTo(2);
for (DataSplit split : splits) {
// Par-16 is not compacted.
assertThat(split.dataFiles().size()).isEqualTo(1);
}
}

@Test
public void testStreamFullCompactStrategy() throws Exception {
sql(
"CREATE TABLE T ("
+ " k INT,"
+ " v INT,"
+ " hh INT,"
+ " dt STRING,"
+ " PRIMARY KEY (k, dt, hh) NOT ENFORCED"
+ ") PARTITIONED BY (dt, hh) WITH ("
+ " 'write-only' = 'true',"
+ " 'bucket' = '1'"
+ ")");
tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true);

Assertions.assertThatThrownBy(
() ->
streamSqlIter(
"CALL sys.compact(`table` => 'default.T', compact_strategy => 'full', "
+ "options => 'num-sorted-run.compaction-trigger=3')")
.close())
.hasMessageContaining(
"The full compact strategy is only supported in batch mode. Please add -Dexecution.runtime-mode=BATCH.");
}

private void checkLatestSnapshot(
FileStoreTable table, long snapshotId, Snapshot.CommitKind commitKind) {
SnapshotManager snapshotManager = table.snapshotManager();
Expand Down

0 comments on commit ee89c4c

Please sign in to comment.