From 3ed1175f8f9256bc48535e73f50b07696173ebe8 Mon Sep 17 00:00:00 2001 From: HuangXingBo Date: Fri, 25 Oct 2024 10:39:10 +0800 Subject: [PATCH] [flink] Support Column Statistics for Flink (#4330) This closes #4330. --- .../paimon/flink/source/DataTableSource.java | 42 ++- .../paimon/flink/BatchFileStoreITCase.java | 9 +- .../FileStoreTableStatisticsTestBase.java | 247 +++++++++++++++++- 3 files changed, 289 insertions(+), 9 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java index ad5123205d4a..53a1b5f63083 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java @@ -20,6 +20,8 @@ import org.apache.paimon.flink.log.LogStoreTableFactory; import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.stats.ColStats; +import org.apache.paimon.stats.Statistics; import org.apache.paimon.table.Table; import org.apache.flink.api.common.eventtime.WatermarkStrategy; @@ -28,12 +30,17 @@ import org.apache.flink.table.connector.source.abilities.SupportsStatisticReport; import org.apache.flink.table.data.RowData; import org.apache.flink.table.factories.DynamicTableFactory; +import org.apache.flink.table.plan.stats.ColumnStats; import org.apache.flink.table.plan.stats.TableStats; import javax.annotation.Nullable; +import java.util.AbstractMap; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; import static org.apache.paimon.utils.Preconditions.checkState; @@ -113,7 +120,21 @@ public TableStats reportStatistics() { if (streaming) { return TableStats.UNKNOWN; } - + Optional optionStatistics = table.statistics(); + if (optionStatistics.isPresent()) { + Statistics statistics = optionStatistics.get(); + if (statistics.mergedRecordCount().isPresent()) { + Map flinkColStats = + statistics.colStats().entrySet().stream() + .map( + entry -> + new AbstractMap.SimpleEntry<>( + entry.getKey(), + toFlinkColumnStats(entry.getValue()))) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + return new TableStats(statistics.mergedRecordCount().getAsLong(), flinkColStats); + } + } scanSplitsForInference(); return new TableStats(splitStatistics.totalRowCount()); } @@ -143,4 +164,23 @@ public void applyDynamicFiltering(List candidateFilterFields) { protected List dynamicPartitionFilteringFields() { return dynamicPartitionFilteringFields; } + + private ColumnStats toFlinkColumnStats(ColStats colStats) { + return ColumnStats.Builder.builder() + .setNdv( + colStats.distinctCount().isPresent() + ? colStats.distinctCount().getAsLong() + : null) + .setNullCount( + colStats.nullCount().isPresent() ? colStats.nullCount().getAsLong() : null) + .setAvgLen( + colStats.avgLen().isPresent() + ? (double) colStats.avgLen().getAsLong() + : null) + .setMaxLen( + colStats.maxLen().isPresent() ? (int) colStats.maxLen().getAsLong() : null) + .setMax(colStats.max().isPresent() ? colStats.max().get() : null) + .setMin(colStats.min().isPresent() ? colStats.min().get() : null) + .build(); + } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java index f03a1636850f..c30e6cd5612d 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java @@ -23,6 +23,7 @@ import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.utils.BlockingIterator; import org.apache.paimon.utils.DateTimeUtils; +import org.apache.paimon.utils.SnapshotNotExistException; import org.apache.flink.api.dag.Transformation; import org.apache.flink.types.Row; @@ -111,8 +112,8 @@ public void testTimeTravelRead() throws Exception { assertThatThrownBy(() -> batchSql("SELECT * FROM T /*+ OPTIONS('scan.snapshot-id'='0') */")) .satisfies( anyCauseMatches( - IllegalArgumentException.class, - "The specified scan snapshotId 0 is out of available snapshotId range [1, 4].")); + SnapshotNotExistException.class, + "Specified parameter scan.snapshot-id = 0 is not exist, you can set it in range from 1 to 4.")); assertThatThrownBy( () -> @@ -120,8 +121,8 @@ public void testTimeTravelRead() throws Exception { "SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='0') */")) .satisfies( anyCauseMatches( - IllegalArgumentException.class, - "The specified scan snapshotId 0 is out of available snapshotId range [1, 4].")); + SnapshotNotExistException.class, + "Specified parameter scan.snapshot-id = 0 is not exist, you can set it in range from 1 to 4.")); assertThat( batchSql( diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/FileStoreTableStatisticsTestBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/FileStoreTableStatisticsTestBase.java index f8aadb8bdc3b..826bf28d1248 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/FileStoreTableStatisticsTestBase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/FileStoreTableStatisticsTestBase.java @@ -18,13 +18,18 @@ package org.apache.paimon.flink.source.statistics; +import org.apache.paimon.FileStore; +import org.apache.paimon.Snapshot; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.flink.source.DataTableSource; import org.apache.paimon.fs.Path; +import org.apache.paimon.operation.FileStoreCommit; import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.schema.Schema; +import org.apache.paimon.stats.ColStats; +import org.apache.paimon.stats.Statistics; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.StreamTableCommit; import org.apache.paimon.table.sink.StreamTableWrite; @@ -33,6 +38,7 @@ import org.apache.paimon.types.VarCharType; import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.plan.stats.ColumnStats; import org.apache.flink.table.plan.stats.TableStats; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -40,6 +46,8 @@ import org.junit.jupiter.api.io.TempDir; import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; import java.util.UUID; /** Statistics tests for {@link FileStoreTable}. */ @@ -60,9 +68,66 @@ public void before() { @Test public void testTableScanStatistics() throws Exception { FileStoreTable table = writeData(); + Map> colStatsMap = new HashMap<>(); + colStatsMap.put("pt", ColStats.newColStats(0, 2L, 1, 2, 0L, null, null)); + colStatsMap.put("a", ColStats.newColStats(1, 9L, 10, 90, 0L, null, null)); + colStatsMap.put("b", ColStats.newColStats(2, 7L, 100L, 900L, 2L, null, null)); + colStatsMap.put( + "c", + ColStats.newColStats( + 3, + 7L, + BinaryString.fromString("S1"), + BinaryString.fromString("S8"), + 2L, + null, + null)); + + FileStore fileStore = table.store(); + FileStoreCommit fileStoreCommit = fileStore.newCommit(commitUser); + Snapshot latestSnapshot = fileStore.snapshotManager().latestSnapshot(); + Statistics colStats = + new Statistics( + latestSnapshot.id(), latestSnapshot.schemaId(), 9L, null, colStatsMap); + fileStoreCommit.commitStatistics(colStats, Long.MAX_VALUE); + fileStoreCommit.close(); DataTableSource scanSource = new DataTableSource(identifier, table, false, null, null); Assertions.assertThat(scanSource.reportStatistics().getRowCount()).isEqualTo(9L); - // TODO validate column statistics + Map expectedColStats = new HashMap<>(); + expectedColStats.put( + "pt", + ColumnStats.Builder.builder() + .setNdv(2L) + .setMin(1) + .setMax(2) + .setNullCount(0L) + .build()); + expectedColStats.put( + "a", + ColumnStats.Builder.builder() + .setNdv(9L) + .setMin(10) + .setMax(90) + .setNullCount(0L) + .build()); + expectedColStats.put( + "b", + ColumnStats.Builder.builder() + .setNdv(7L) + .setMin(100L) + .setMax(900L) + .setNullCount(2L) + .build()); + expectedColStats.put( + "c", + ColumnStats.Builder.builder() + .setNdv(7L) + .setMin(BinaryString.fromString("S1")) + .setMax(BinaryString.fromString("S8")) + .setNullCount(2L) + .build()); + Assertions.assertThat(scanSource.reportStatistics().getColumnStats()) + .isEqualTo(expectedColStats); } @Test @@ -90,7 +155,65 @@ public void testTableFilterPartitionStatistics() throws Exception { null, false); Assertions.assertThat(partitionFilterSource.reportStatistics().getRowCount()).isEqualTo(5L); - // TODO validate column statistics + Map> colStatsMap = new HashMap<>(); + colStatsMap.put("pt", ColStats.newColStats(0, 1L, 1, 2, 0L, null, null)); + colStatsMap.put("a", ColStats.newColStats(1, 5L, 10, 90, 0L, null, null)); + colStatsMap.put("b", ColStats.newColStats(2, 3L, 100L, 900L, 2L, null, null)); + colStatsMap.put( + "c", + ColStats.newColStats( + 3, + 3L, + BinaryString.fromString("S1"), + BinaryString.fromString("S7"), + 2L, + null, + null)); + + FileStore fileStore = table.store(); + FileStoreCommit fileStoreCommit = fileStore.newCommit(commitUser); + Snapshot latestSnapshot = fileStore.snapshotManager().latestSnapshot(); + Statistics colStats = + new Statistics( + latestSnapshot.id(), latestSnapshot.schemaId(), 9L, null, colStatsMap); + fileStoreCommit.commitStatistics(colStats, Long.MAX_VALUE); + fileStoreCommit.close(); + + Map expectedColStats = new HashMap<>(); + expectedColStats.put( + "pt", + ColumnStats.Builder.builder() + .setNdv(1L) + .setMin(1) + .setMax(2) + .setNullCount(0L) + .build()); + expectedColStats.put( + "a", + ColumnStats.Builder.builder() + .setNdv(5L) + .setMin(10) + .setMax(90) + .setNullCount(0L) + .build()); + expectedColStats.put( + "b", + ColumnStats.Builder.builder() + .setNdv(3L) + .setMin(100L) + .setMax(900L) + .setNullCount(2L) + .build()); + expectedColStats.put( + "c", + ColumnStats.Builder.builder() + .setNdv(3L) + .setMin(BinaryString.fromString("S1")) + .setMax(BinaryString.fromString("S7")) + .setNullCount(2L) + .build()); + Assertions.assertThat(partitionFilterSource.reportStatistics().getColumnStats()) + .isEqualTo(expectedColStats); } @Test @@ -111,7 +234,65 @@ public void testTableFilterKeyStatistics() throws Exception { null, false); Assertions.assertThat(keyFilterSource.reportStatistics().getRowCount()).isEqualTo(2L); - // TODO validate column statistics + Map> colStatsMap = new HashMap<>(); + colStatsMap.put("pt", ColStats.newColStats(0, 1L, 2, 2, 0L, null, null)); + colStatsMap.put("a", ColStats.newColStats(1, 1L, 50, 50, 0L, null, null)); + colStatsMap.put("b", ColStats.newColStats(2, 1L, null, null, 1L, null, null)); + colStatsMap.put( + "c", + ColStats.newColStats( + 3, + 1L, + BinaryString.fromString("S5"), + BinaryString.fromString("S5"), + 0L, + null, + null)); + + FileStore fileStore = table.store(); + FileStoreCommit fileStoreCommit = fileStore.newCommit(commitUser); + Snapshot latestSnapshot = fileStore.snapshotManager().latestSnapshot(); + Statistics colStats = + new Statistics( + latestSnapshot.id(), latestSnapshot.schemaId(), 9L, null, colStatsMap); + fileStoreCommit.commitStatistics(colStats, Long.MAX_VALUE); + fileStoreCommit.close(); + + Map expectedColStats = new HashMap<>(); + expectedColStats.put( + "pt", + ColumnStats.Builder.builder() + .setNdv(1L) + .setMin(2) + .setMax(2) + .setNullCount(0L) + .build()); + expectedColStats.put( + "a", + ColumnStats.Builder.builder() + .setNdv(1L) + .setMin(50) + .setMax(50) + .setNullCount(0L) + .build()); + expectedColStats.put( + "b", + ColumnStats.Builder.builder() + .setNdv(1L) + .setMin(null) + .setMax(null) + .setNullCount(1L) + .build()); + expectedColStats.put( + "c", + ColumnStats.Builder.builder() + .setNdv(1L) + .setMin(BinaryString.fromString("S5")) + .setMax(BinaryString.fromString("S5")) + .setNullCount(0L) + .build()); + Assertions.assertThat(keyFilterSource.reportStatistics().getColumnStats()) + .isEqualTo(expectedColStats); } @Test @@ -132,7 +313,65 @@ public void testTableFilterValueStatistics() throws Exception { null, false); Assertions.assertThat(keyFilterSource.reportStatistics().getRowCount()).isEqualTo(4L); - // TODO validate column statistics + Map> colStatsMap = new HashMap<>(); + colStatsMap.put("pt", ColStats.newColStats(0, 4L, 2, 2, 0L, null, null)); + colStatsMap.put("a", ColStats.newColStats(1, 4L, 50, 50, 0L, null, null)); + colStatsMap.put("b", ColStats.newColStats(2, 4L, null, null, 1L, null, null)); + colStatsMap.put( + "c", + ColStats.newColStats( + 3, + 4L, + BinaryString.fromString("S5"), + BinaryString.fromString("S8"), + 0L, + null, + null)); + + FileStore fileStore = table.store(); + FileStoreCommit fileStoreCommit = fileStore.newCommit(commitUser); + Snapshot latestSnapshot = fileStore.snapshotManager().latestSnapshot(); + Statistics colStats = + new Statistics( + latestSnapshot.id(), latestSnapshot.schemaId(), 9L, null, colStatsMap); + fileStoreCommit.commitStatistics(colStats, Long.MAX_VALUE); + fileStoreCommit.close(); + + Map expectedColStats = new HashMap<>(); + expectedColStats.put( + "pt", + ColumnStats.Builder.builder() + .setNdv(4L) + .setMin(2) + .setMax(2) + .setNullCount(0L) + .build()); + expectedColStats.put( + "a", + ColumnStats.Builder.builder() + .setNdv(4L) + .setMin(50) + .setMax(50) + .setNullCount(0L) + .build()); + expectedColStats.put( + "b", + ColumnStats.Builder.builder() + .setNdv(4L) + .setMin(null) + .setMax(null) + .setNullCount(1L) + .build()); + expectedColStats.put( + "c", + ColumnStats.Builder.builder() + .setNdv(4L) + .setMin(BinaryString.fromString("S5")) + .setMax(BinaryString.fromString("S8")) + .setNullCount(0L) + .build()); + Assertions.assertThat(keyFilterSource.reportStatistics().getColumnStats()) + .isEqualTo(expectedColStats); } protected FileStoreTable writeData() throws Exception {