Skip to content

Commit

Permalink
[flink] Introduce Range Partition And Sort in Append Scalable Table B…
Browse files Browse the repository at this point in the history
…atch Writing for Flink (apache#3384)
  • Loading branch information
WencongLiu authored May 31, 2024
1 parent 9615fc4 commit 885c2e0
Show file tree
Hide file tree
Showing 15 changed files with 962 additions and 99 deletions.
32 changes: 32 additions & 0 deletions docs/content/flink/sql-write.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,38 @@ snapshot expiration, and even partition expiration in Flink Sink (if it is confi

For multiple jobs to write the same table, you can refer to [dedicated compaction job]({{< ref "maintenance/dedicated-compaction#dedicated-compaction-job" >}}) for more info.

### Clustering

In Paimon, clustering is a feature that allows you to cluster data in your [Append Table]({{< ref "append-table/append-table#Append Table" >}})
based on the values of certain columns during the write process. This organization of data can significantly enhance the efficiency of downstream
tasks when reading the data, as it enables faster and more targeted data retrieval. This feature is only supported for [Append Table]({{< ref "append-table/append-table#Append Table" >}})
and batch execution mode.

To utilize clustering, you can specify the columns you want to cluster when creating or writing to a table. Here's a simple example of how to enable clustering:

```sql
CREATE TABLE my_table (
a STRING,
b STRING,
c STRING,
) WITH (
'sink.clustering.by-columns' = 'a,b',
);
```

You can also use SQL hints to dynamically set clustering options:

```sql
INSERT INTO my_table /*+ OPTIONS('sink.clustering.by-columns' = 'a,b') */
SELECT * FROM source;
```

The data is clustered using an automatically chosen strategy (such as ORDER, ZORDER, or HILBERT), but you can manually specify the clustering strategy
by setting the `sink.clustering.strategy`. Clustering relies on sampling and sorting. If the clustering process takes too much time, you can decrease
the total sample number by setting the `sink.clustering.sample-factor` or disable the sorting step by setting the `sink.clustering.sort-in-cluster` to false.

You can refer to [FlinkConnectorOptions]({{< ref "maintenance/configurations#FlinkConnectorOptions" >}}) for more info about the configurations above.

## Overwriting the Whole Table

For unpartitioned tables, Paimon supports overwriting the whole table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,30 @@
<td>Duration</td>
<td>If no records flow in a partition of a stream for that amount of time, then that partition is considered "idle" and will not hold back the progress of watermarks in downstream operators.</td>
</tr>
<tr>
<td><h5>sink.clustering.by-columns</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Specifies the column name(s) used for comparison during range partitioning, in the format 'columnName1,columnName2'. If not set or set to an empty string, it indicates that the range partitioning feature is not enabled. This option will be effective only for bucket unaware table without primary keys and batch execution mode.</td>
</tr>
<tr>
<td><h5>sink.clustering.sample-factor</h5></td>
<td style="word-wrap: break-word;">100</td>
<td>Integer</td>
<td>Specifies the sample factor. Let S represent the total number of samples, F represent the sample factor, and P represent the sink parallelism, then S=F×P. The minimum allowed sample factor is 20.</td>
</tr>
<tr>
<td><h5>sink.clustering.sort-in-cluster</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Indicates whether to further sort data belonged to each sink task after range partitioning.</td>
</tr>
<tr>
<td><h5>sink.clustering.strategy</h5></td>
<td style="word-wrap: break-word;">"auto"</td>
<td>String</td>
<td>Specifies the comparison algorithm used for range partitioning, including 'zorder', 'hilbert', and 'order', corresponding to the z-order curve algorithm, hilbert curve algorithm, and basic type comparison algorithm, respectively. When not configured, it will automatically determine the algorithm based on the number of columns in 'sink.clustering.by-columns'. 'order' is used for 1 column, 'zorder' for less than 5 columns, and 'hilbert' for 5 or more columns.</td>
</tr>
<tr>
<td><h5>sink.committer-cpu</h5></td>
<td style="word-wrap: break-word;">1.0</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public class FlinkConnectorOptions {

public static final String TABLE_DYNAMIC_OPTION_PREFIX = "paimon";

public static final int MIN_CLUSTERING_SAMPLE_FACTOR = 20;

@ExcludeFromDocumentation("Confused without log system")
public static final ConfigOption<String> LOG_SYSTEM =
ConfigOptions.key("log.system")
Expand Down Expand Up @@ -387,6 +389,41 @@ public class FlinkConnectorOptions {
"Both can be configured at the same time: 'done-partition,success-file'.")
.build());

public static final ConfigOption<String> CLUSTERING_COLUMNS =
key("sink.clustering.by-columns")
.stringType()
.noDefaultValue()
.withDescription(
"Specifies the column name(s) used for comparison during range partitioning, in the format 'columnName1,columnName2'. "
+ "If not set or set to an empty string, it indicates that the range partitioning feature is not enabled. "
+ "This option will be effective only for bucket unaware table without primary keys and batch execution mode.");

public static final ConfigOption<String> CLUSTERING_STRATEGY =
key("sink.clustering.strategy")
.stringType()
.defaultValue("auto")
.withDescription(
"Specifies the comparison algorithm used for range partitioning, including 'zorder', 'hilbert', and 'order', "
+ "corresponding to the z-order curve algorithm, hilbert curve algorithm, and basic type comparison algorithm, "
+ "respectively. When not configured, it will automatically determine the algorithm based on the number of columns "
+ "in 'sink.clustering.by-columns'. 'order' is used for 1 column, 'zorder' for less than 5 columns, "
+ "and 'hilbert' for 5 or more columns.");

public static final ConfigOption<Boolean> CLUSTERING_SORT_IN_CLUSTER =
key("sink.clustering.sort-in-cluster")
.booleanType()
.defaultValue(true)
.withDescription(
"Indicates whether to further sort data belonged to each sink task after range partitioning.");

public static final ConfigOption<Integer> CLUSTERING_SAMPLE_FACTOR =
key("sink.clustering.sample-factor")
.intType()
.defaultValue(100)
.withDescription(
"Specifies the sample factor. Let S represent the total number of samples, F represent the sample factor, "
+ "and P represent the sink parallelism, then S=F×P. The minimum allowed sample factor is 20.");

public static List<ConfigOption<?>> getOptions() {
final Field[] fields = FlinkConnectorOptions.class.getFields();
final List<ConfigOption<?>> list = new ArrayList<>(fields.length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.sink.SortCompactSinkBuilder;
import org.apache.paimon.flink.sorter.TableSortInfo;
import org.apache.paimon.flink.sorter.TableSorter;
import org.apache.paimon.flink.sorter.TableSorter.OrderType;
import org.apache.paimon.flink.source.FlinkSourceBuilder;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
Expand Down Expand Up @@ -116,8 +118,32 @@ public void build() {
}

DataStream<RowData> source = sourceBuilder.env(env).sourceBounded(true).build();
TableSorter sorter =
TableSorter.getSorter(env, source, fileStoreTable, sortStrategy, orderColumns);
int localSampleMagnification =
((FileStoreTable) table).coreOptions().getLocalSampleMagnification();
if (localSampleMagnification < 20) {
throw new IllegalArgumentException(
String.format(
"the config '%s=%d' should not be set too small,greater than or equal to 20 is needed.",
CoreOptions.SORT_COMPACTION_SAMPLE_MAGNIFICATION.key(),
localSampleMagnification));
}
String sinkParallelismValue =
table.options().get(FlinkConnectorOptions.SINK_PARALLELISM.key());
final int sinkParallelism =
sinkParallelismValue == null
? source.getParallelism()
: Integer.parseInt(sinkParallelismValue);
TableSortInfo sortInfo =
new TableSortInfo.Builder()
.setSortColumns(orderColumns)
.setSortStrategy(OrderType.of(sortStrategy))
.setSinkParallelism(sinkParallelism)
.setLocalSampleSize(sinkParallelism * localSampleMagnification)
.setGlobalSampleSize(sinkParallelism * 1000)
.setRangeNumber(sinkParallelism * 10)
.build();

TableSorter sorter = TableSorter.getSorter(env, source, fileStoreTable, sortInfo);

new SortCompactSinkBuilder(fileStoreTable)
.forCompact(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public static <T> DataStream<Tuple2<T, RowData>> rangeShuffleByKey(
new AssignRangeIndexOperator.RangePartitioner(rangeNum),
new AssignRangeIndexOperator.Tuple2KeySelector<>()),
StreamExchangeMode.BATCH),
"REMOVE KEY",
"REMOVE RANGE INDEX",
new RemoveRangeIndexOperator<>(),
input.getOutputType(),
outParallelism));
Expand Down Expand Up @@ -373,12 +373,16 @@ public void processElement1(StreamRecord<List<T>> streamRecord) {

@Override
public void processElement2(StreamRecord<Tuple2<T, RowData>> streamRecord) {
if (keyIndex == null || keyIndex.isEmpty()) {
throw new RuntimeException(
"There should be one data from the first input. And boundaries should not be empty.");
if (keyIndex == null) {
throw new RuntimeException("There should be one data from the first input.");
}
// If the range number is 1, the range index will be 0 for all records.
if (keyIndex.isEmpty()) {
collector.collect(new Tuple2<>(0, streamRecord.getValue()));
} else {
Tuple2<T, RowData> row = streamRecord.getValue();
collector.collect(new Tuple2<>(binarySearch(row.f0), row));
}
Tuple2<T, RowData> row = streamRecord.getValue();
collector.collect(new Tuple2<>(binarySearch(row.f0), row));
}

@Override
Expand Down Expand Up @@ -408,7 +412,7 @@ private int binarySearch(T key) {
// key not found, but the low index is the target
// bucket, since the boundaries are the upper bound
return low > lastIndex
? keyIndex.get(lastIndex).getRight().get()
? (keyIndex.get(lastIndex).getRight().get() + 1)
: keyIndex.get(low).getRight().get();
}

Expand Down Expand Up @@ -438,8 +442,8 @@ public RangePartitioner(int totalRangeNum) {
@Override
public int partition(Integer key, int numPartitions) {
Preconditions.checkArgument(
numPartitions < totalRangeNum,
"Num of subPartitions should < totalRangeNum: " + totalRangeNum);
numPartitions <= totalRangeNum,
"Num of subPartitions should <= totalRangeNum: " + totalRangeNum);
int partition = key / (totalRangeNum / numPartitions);
return Math.min(numPartitions - 1, partition);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@

import org.apache.paimon.annotation.Public;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.sink.index.GlobalDynamicBucketSink;
import org.apache.paimon.flink.sorter.TableSortInfo;
import org.apache.paimon.flink.sorter.TableSorter;
import org.apache.paimon.flink.sorter.TableSorter.OrderType;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
Expand All @@ -34,14 +38,27 @@
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;

import static org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_SAMPLE_FACTOR;
import static org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_STRATEGY;
import static org.apache.paimon.flink.FlinkConnectorOptions.MIN_CLUSTERING_SAMPLE_FACTOR;
import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;
import static org.apache.paimon.flink.sorter.TableSorter.OrderType.HILBERT;
import static org.apache.paimon.flink.sorter.TableSorter.OrderType.ORDER;
import static org.apache.paimon.flink.sorter.TableSorter.OrderType.ZORDER;
import static org.apache.paimon.table.BucketMode.BUCKET_UNAWARE;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkState;

/**
* DataStream API for building Flink Sink.
Expand All @@ -51,12 +68,15 @@
@Public
public class FlinkSinkBuilder {

private static final Logger LOG = LoggerFactory.getLogger(FlinkSinkBuilder.class);

private final FileStoreTable table;

private DataStream<RowData> input;
@Nullable private Map<String, String> overwritePartition;
@Nullable private Integer parallelism;
private Boolean boundedInput = null;
@Nullable private TableSortInfo tableSortInfo;

// ============== for extension ==============

Expand Down Expand Up @@ -119,8 +139,88 @@ public FlinkSinkBuilder inputBounded(boolean bounded) {
return this;
}

/** Clustering the input data if possible. */
public FlinkSinkBuilder clusteringIfPossible(
String clusteringColumns,
String clusteringStrategy,
boolean sortInCluster,
int sampleFactor) {
// The clustering will be skipped if the clustering columns are empty or the execution
// mode is STREAMING or the table type is illegal.
if (clusteringColumns == null || clusteringColumns.isEmpty()) {
return this;
}
checkState(input != null, "The input stream should be specified earlier.");
if (boundedInput == null) {
boundedInput = !FlinkSink.isStreaming(input);
}
if (!boundedInput || !table.bucketMode().equals(BUCKET_UNAWARE)) {
LOG.warn(
"Clustering is enabled; however, it has been skipped as "
+ "it only supports the bucket unaware table without primary keys and "
+ "BATCH execution mode.");
return this;
}
// If the clustering is not skipped, check the clustering column names and sample
// factor value.
List<String> columns = Arrays.asList(clusteringColumns.split(","));
List<String> fieldNames = table.schema().fieldNames();
checkState(
new HashSet<>(fieldNames).containsAll(new HashSet<>(columns)),
String.format(
"Field names %s should contains all clustering column names %s.",
fieldNames, columns));
checkState(
sampleFactor >= MIN_CLUSTERING_SAMPLE_FACTOR,
"The minimum allowed "
+ CLUSTERING_SAMPLE_FACTOR.key()
+ " is "
+ MIN_CLUSTERING_SAMPLE_FACTOR
+ ".");
TableSortInfo.Builder sortInfoBuilder = new TableSortInfo.Builder();
if (clusteringStrategy.equals(CLUSTERING_STRATEGY.defaultValue())) {
if (columns.size() == 1) {
sortInfoBuilder.setSortStrategy(ORDER);
} else if (columns.size() < 5) {
sortInfoBuilder.setSortStrategy(ZORDER);
} else {
sortInfoBuilder.setSortStrategy(HILBERT);
}
} else {
sortInfoBuilder.setSortStrategy(OrderType.of(clusteringStrategy));
}
int upstreamParallelism = input.getParallelism();
String sinkParallelismValue =
table.options().get(FlinkConnectorOptions.SINK_PARALLELISM.key());
int sinkParallelism =
sinkParallelismValue == null
? upstreamParallelism
: Integer.parseInt(sinkParallelismValue);
sortInfoBuilder
.setSortColumns(columns)
.setSortInCluster(sortInCluster)
.setSinkParallelism(sinkParallelism);
int globalSampleSize = sinkParallelism * sampleFactor;
// If the adaptive scheduler is not enabled, the local sample size is determined by the
// division of global sample size by the upstream parallelism, which limits total
// received data of global sample node. If the adaptive scheduler is enabled, the
// local sample size will equal to sinkParallelism * minimum sample factor.
int localSampleSize =
upstreamParallelism > 0
? Math.max(sampleFactor, globalSampleSize / upstreamParallelism)
: sinkParallelism * MIN_CLUSTERING_SAMPLE_FACTOR;
this.tableSortInfo =
sortInfoBuilder
.setRangeNumber(sinkParallelism)
.setGlobalSampleSize(globalSampleSize)
.setLocalSampleSize(localSampleSize)
.build();
return this;
}

/** Build {@link DataStreamSink}. */
public DataStreamSink<?> build() {
input = trySortInput(input);
DataStream<InternalRow> input = MapToInternalRow.map(this.input, table.rowType());
if (table.coreOptions().localMergeEnabled() && table.schema().primaryKeys().size() > 0) {
input =
Expand Down Expand Up @@ -181,4 +281,14 @@ private DataStreamSink<?> buildUnawareBucketSink(DataStream<InternalRow> input)
table, overwritePartition, logSinkFunction, parallelism, boundedInput)
.sinkFrom(input);
}

private DataStream<RowData> trySortInput(DataStream<RowData> input) {
if (tableSortInfo != null) {
TableSorter sorter =
TableSorter.getSorter(
input.getExecutionEnvironment(), input, table, tableSortInfo);
return sorter.sort();
}
return input;
}
}
Loading

0 comments on commit 885c2e0

Please sign in to comment.