Skip to content

Commit

Permalink
[flink] add table-conf for ordinary compact (apache#2302)
Browse files Browse the repository at this point in the history
  • Loading branch information
leaves12138 authored Nov 13, 2023
1 parent 24dea46 commit e9bfc68
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 1 deletion.
2 changes: 2 additions & 0 deletions docs/content/maintenance/dedicated-compaction.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ Run the following command to submit a compaction job for the table.
--database <database-name> \
--table <table-name> \
[--partition <partition-name>] \
[--table-conf <table-conf>] \
[--catalog-conf <paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf> ...]] \
```
Expand All @@ -107,6 +108,7 @@ Example: compact table
--table test_table \
--partition dt=20221126,hh=08 \
--partition dt=20221127,hh=09 \
--table-conf sink.parallelism=10 \
--catalog-conf s3.endpoint=https://****.com \
--catalog-conf s3.access-key=***** \
--catalog-conf s3.secret-key=*****
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@

package org.apache.paimon.flink.sink;

import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.data.RowData;

import java.util.Optional;

import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;

/** Builder for {@link CompactorSink}. */
Expand Down Expand Up @@ -56,7 +59,13 @@ public DataStreamSink<?> build() {
}

private DataStreamSink<?> buildForBucketAware() {
DataStream<RowData> partitioned = partition(input, new BucketsRowChannelComputer(), null);
Integer parallelism =
Optional.ofNullable(
table.options().get(FlinkConnectorOptions.SINK_PARALLELISM.key()))
.map(Integer::valueOf)
.orElse(null);
DataStream<RowData> partitioned =
partition(input, new BucketsRowChannelComputer(), parallelism);
return new CompactorSink(table).sinkFrom(partitioned);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.source.CompactorSourceBuilder;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.fs.Path;
Expand All @@ -45,6 +46,7 @@
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

Expand All @@ -54,6 +56,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -134,6 +137,36 @@ public void testCompact() throws Exception {
}
}

@Test
public void testCompactParallelism() throws Exception {
FileStoreTable table = createFileStoreTable();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
CompactorSourceBuilder sourceBuilder =
new CompactorSourceBuilder(tablePath.toString(), table);
DataStreamSource<RowData> source =
sourceBuilder
.withEnv(env)
.withContinuousMode(false)
.withPartitions(getSpecifiedPartitions())
.build();
Integer sinkParalellism = new Random().nextInt(100);
new CompactorSinkBuilder(
table.copy(
new HashMap<String, String>() {
{
put(
FlinkConnectorOptions.SINK_PARALLELISM.key(),
String.valueOf(sinkParalellism));
}
}))
.withInput(source)
.build();

Assertions.assertThat(env.getTransformations().get(0).getParallelism())
.isEqualTo(sinkParalellism);
}

private List<Map<String, String>> getSpecifiedPartitions() {
Map<String, String> partition1 = new HashMap<>();
partition1.put("dt", "20221208");
Expand Down

0 comments on commit e9bfc68

Please sign in to comment.