Skip to content

Commit

Permalink
[flink] Support Column Statistics for Flink (apache#4330)
Browse files Browse the repository at this point in the history
This closes apache#4330.
  • Loading branch information
HuangXingBo authored Oct 25, 2024
1 parent 92550da commit 3ed1175
Show file tree
Hide file tree
Showing 3 changed files with 289 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.apache.paimon.flink.log.LogStoreTableFactory;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.stats.ColStats;
import org.apache.paimon.stats.Statistics;
import org.apache.paimon.table.Table;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
Expand All @@ -28,12 +30,17 @@
import org.apache.flink.table.connector.source.abilities.SupportsStatisticReport;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.plan.stats.ColumnStats;
import org.apache.flink.table.plan.stats.TableStats;

import javax.annotation.Nullable;

import java.util.AbstractMap;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

import static org.apache.paimon.utils.Preconditions.checkState;

Expand Down Expand Up @@ -113,7 +120,21 @@ public TableStats reportStatistics() {
if (streaming) {
return TableStats.UNKNOWN;
}

Optional<Statistics> optionStatistics = table.statistics();
if (optionStatistics.isPresent()) {
Statistics statistics = optionStatistics.get();
if (statistics.mergedRecordCount().isPresent()) {
Map<String, ColumnStats> flinkColStats =
statistics.colStats().entrySet().stream()
.map(
entry ->
new AbstractMap.SimpleEntry<>(
entry.getKey(),
toFlinkColumnStats(entry.getValue())))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
return new TableStats(statistics.mergedRecordCount().getAsLong(), flinkColStats);
}
}
scanSplitsForInference();
return new TableStats(splitStatistics.totalRowCount());
}
Expand Down Expand Up @@ -143,4 +164,23 @@ public void applyDynamicFiltering(List<String> candidateFilterFields) {
protected List<String> dynamicPartitionFilteringFields() {
return dynamicPartitionFilteringFields;
}

private ColumnStats toFlinkColumnStats(ColStats<?> colStats) {
return ColumnStats.Builder.builder()
.setNdv(
colStats.distinctCount().isPresent()
? colStats.distinctCount().getAsLong()
: null)
.setNullCount(
colStats.nullCount().isPresent() ? colStats.nullCount().getAsLong() : null)
.setAvgLen(
colStats.avgLen().isPresent()
? (double) colStats.avgLen().getAsLong()
: null)
.setMaxLen(
colStats.maxLen().isPresent() ? (int) colStats.maxLen().getAsLong() : null)
.setMax(colStats.max().isPresent() ? colStats.max().get() : null)
.setMin(colStats.min().isPresent() ? colStats.min().get() : null)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.BlockingIterator;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.SnapshotNotExistException;

import org.apache.flink.api.dag.Transformation;
import org.apache.flink.types.Row;
Expand Down Expand Up @@ -111,17 +112,17 @@ public void testTimeTravelRead() throws Exception {
assertThatThrownBy(() -> batchSql("SELECT * FROM T /*+ OPTIONS('scan.snapshot-id'='0') */"))
.satisfies(
anyCauseMatches(
IllegalArgumentException.class,
"The specified scan snapshotId 0 is out of available snapshotId range [1, 4]."));
SnapshotNotExistException.class,
"Specified parameter scan.snapshot-id = 0 is not exist, you can set it in range from 1 to 4."));

assertThatThrownBy(
() ->
batchSql(
"SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='0') */"))
.satisfies(
anyCauseMatches(
IllegalArgumentException.class,
"The specified scan snapshotId 0 is out of available snapshotId range [1, 4]."));
SnapshotNotExistException.class,
"Specified parameter scan.snapshot-id = 0 is not exist, you can set it in range from 1 to 4."));

assertThat(
batchSql(
Expand Down
Loading

0 comments on commit 3ed1175

Please sign in to comment.