From cafb215acb2398b78e2e5c0dca167e7c722275fd Mon Sep 17 00:00:00 2001 From: herefree <841043203@qq.com> Date: Thu, 10 Oct 2024 15:09:59 +0800 Subject: [PATCH] [flink] Support analysis statistics in flink (#4280) --- .../org/apache/paimon/flink/FlinkCatalog.java | 52 ++- .../paimon/flink/utils/TableStatsUtil.java | 295 ++++++++++++++++++ .../paimon/flink/FlinkAnalyzeTableITCase.java | 186 +++++++++++ 3 files changed, 529 insertions(+), 4 deletions(-) create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableStatsUtil.java create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkAnalyzeTableITCase.java diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java index 0f46f2965c86..72151ec96a65 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java @@ -24,15 +24,19 @@ import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.procedure.ProcedureUtil; import org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil; +import org.apache.paimon.flink.utils.TableStatsUtil; import org.apache.paimon.fs.Path; import org.apache.paimon.manifest.PartitionEntry; +import org.apache.paimon.operation.FileStoreCommit; import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.stats.Statistics; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.FormatTable; import org.apache.paimon.table.Table; +import org.apache.paimon.table.sink.BatchWriteBuilder; import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.InternalRowPartitionComputer; @@ -1239,8 +1243,8 @@ public final CatalogColumnStatistics getPartitionColumnStatistics( @Override public final void alterTableStatistics( ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists) - throws CatalogException { - throw new UnsupportedOperationException(); + throws CatalogException, TableNotExistException { + alterTableStatisticsInternal(tablePath, tableStatistics, ignoreIfNotExists); } @Override @@ -1248,8 +1252,48 @@ public final void alterTableColumnStatistics( ObjectPath tablePath, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) - throws CatalogException { - throw new UnsupportedOperationException(); + throws CatalogException, TableNotExistException { + alterTableStatisticsInternal(tablePath, columnStatistics, ignoreIfNotExists); + } + + private void alterTableStatisticsInternal( + ObjectPath tablePath, Object statistics, boolean ignoreIfNotExists) + throws TableNotExistException { + try { + Table table = catalog.getTable(toIdentifier(tablePath)); + checkArgument( + table instanceof FileStoreTable, "Now only support analyze FileStoreTable."); + if (!table.latestSnapshotId().isPresent()) { + LOG.info("Skipping analyze table because the snapshot is null."); + return; + } + + Statistics tableStats = null; + if (statistics instanceof CatalogColumnStatistics) { + tableStats = + TableStatsUtil.createTableColumnStats( + ((FileStoreTable) table), (CatalogColumnStatistics) statistics); + } else if (statistics instanceof CatalogTableStatistics) { + tableStats = + TableStatsUtil.createTableStats( + ((FileStoreTable) table), (CatalogTableStatistics) statistics); + } + + if (tableStats != null) { + FileStoreTable fileStoreTable = (FileStoreTable) table; + FileStoreCommit commit = + fileStoreTable + .store() + .newCommit( + CoreOptions.createCommitUser( + fileStoreTable.coreOptions().toConfiguration())); + commit.commitStatistics(tableStats, BatchWriteBuilder.COMMIT_IDENTIFIER); + } + } catch (Catalog.TableNotExistException e) { + if (!ignoreIfNotExists) { + throw new TableNotExistException(getName(), tablePath); + } + } } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableStatsUtil.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableStatsUtil.java new file mode 100644 index 000000000000..c0ed45d95b40 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableStatsUtil.java @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.utils; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.data.Decimal; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.stats.ColStats; +import org.apache.paimon.stats.Statistics; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypeRoot; +import org.apache.paimon.utils.Preconditions; + +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBase; +import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBinary; +import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBoolean; +import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataDate; +import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataDouble; +import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataLong; +import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataString; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; + +import javax.annotation.Nullable; + +import java.math.BigDecimal; +import java.sql.Timestamp; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** Utility methods for analysis table. */ +public class TableStatsUtil { + + /** create Paimon statistics. */ + @Nullable + public static Statistics createTableStats( + FileStoreTable table, CatalogTableStatistics catalogTableStatistics) { + Snapshot snapshot = table.snapshotManager().latestSnapshot(); + long mergedRecordCount = catalogTableStatistics.getRowCount(); + if (snapshot.totalRecordCount() == null) { + return null; + } + long totalRecordCount = snapshot.totalRecordCount(); + Preconditions.checkState( + totalRecordCount >= mergedRecordCount, + "totalRecordCount: $totalRecordCount should be greater or equal than mergedRecordCount: $mergedRecordCount."); + long totalSize = + table.newScan().plan().splits().stream() + .flatMap(split -> ((DataSplit) split).dataFiles().stream()) + .mapToLong(DataFileMeta::fileSize) + .sum(); + long mergedRecordSize = + (long) (totalSize * ((double) mergedRecordCount / totalRecordCount)); + + // convert to paimon stats + return new Statistics( + snapshot.id(), snapshot.schemaId(), mergedRecordCount, mergedRecordSize); + } + + /** Create Paimon statistics from given Flink columnStatistics. */ + @Nullable + public static Statistics createTableColumnStats( + FileStoreTable table, CatalogColumnStatistics columnStatistics) { + if (!table.statistics().isPresent()) { + return null; + } + Statistics statistics = table.statistics().get(); + List fields = table.schema().fields(); + Map> tableColumnStatsMap = new HashMap<>(fields.size()); + for (DataField field : fields) { + CatalogColumnStatisticsDataBase catalogColumnStatisticsDataBase = + columnStatistics.getColumnStatisticsData().get(field.name()); + if (catalogColumnStatisticsDataBase == null) { + continue; + } + tableColumnStatsMap.put( + field.name(), getPaimonColStats(field, catalogColumnStatisticsDataBase)); + } + statistics.colStats().putAll(tableColumnStatsMap); + return statistics; + } + + /** Convert Flink ColumnStats to Paimon ColStats according to Paimon column type. */ + private static ColStats getPaimonColStats( + DataField field, CatalogColumnStatisticsDataBase colStat) { + DataTypeRoot typeRoot = field.type().getTypeRoot(); + if (colStat instanceof CatalogColumnStatisticsDataString) { + CatalogColumnStatisticsDataString stringColStat = + (CatalogColumnStatisticsDataString) colStat; + if (typeRoot.equals(DataTypeRoot.CHAR) || typeRoot.equals(DataTypeRoot.VARCHAR)) { + return ColStats.newColStats( + field.id(), + null != stringColStat.getNdv() ? stringColStat.getNdv() : null, + null, + null, + null != stringColStat.getNullCount() ? stringColStat.getNullCount() : null, + null != stringColStat.getAvgLength() + ? stringColStat.getAvgLength().longValue() + : null, + null != stringColStat.getMaxLength() ? stringColStat.getMaxLength() : null); + } + } else if (colStat instanceof CatalogColumnStatisticsDataBoolean) { + CatalogColumnStatisticsDataBoolean booleanColStat = + (CatalogColumnStatisticsDataBoolean) colStat; + if (typeRoot.equals(DataTypeRoot.BOOLEAN)) { + return ColStats.newColStats( + field.id(), + (booleanColStat.getFalseCount() > 0 ? 1L : 0) + + (booleanColStat.getTrueCount() > 0 ? 1L : 0), + null, + null, + booleanColStat.getNullCount(), + null, + null); + } + } else if (colStat instanceof CatalogColumnStatisticsDataLong) { + CatalogColumnStatisticsDataLong longColStat = (CatalogColumnStatisticsDataLong) colStat; + if (typeRoot.equals(DataTypeRoot.INTEGER)) { + return ColStats.newColStats( + field.id(), + null != longColStat.getNdv() ? longColStat.getNdv() : null, + null != longColStat.getMin() ? longColStat.getMin().intValue() : null, + null != longColStat.getMax() ? longColStat.getMax().intValue() : null, + null != longColStat.getNullCount() ? longColStat.getNullCount() : null, + null, + null); + } else if (typeRoot.equals(DataTypeRoot.TINYINT)) { + return ColStats.newColStats( + field.id(), + null != longColStat.getNdv() ? longColStat.getNdv() : null, + null != longColStat.getMin() ? longColStat.getMin().byteValue() : null, + null != longColStat.getMax() ? longColStat.getMax().byteValue() : null, + null != longColStat.getNullCount() ? longColStat.getNullCount() : null, + null, + null); + + } else if (typeRoot.equals(DataTypeRoot.SMALLINT)) { + return ColStats.newColStats( + field.id(), + null != longColStat.getNdv() ? longColStat.getNdv() : null, + null != longColStat.getMin() ? longColStat.getMin().shortValue() : null, + null != longColStat.getMax() ? longColStat.getMax().shortValue() : null, + null != longColStat.getNullCount() ? longColStat.getNullCount() : null, + null, + null); + } else if (typeRoot.equals(DataTypeRoot.BIGINT)) { + return ColStats.newColStats( + field.id(), + null != longColStat.getNdv() ? longColStat.getNdv() : null, + null != longColStat.getMin() ? longColStat.getMin() : null, + null != longColStat.getMax() ? longColStat.getMax() : null, + null != longColStat.getNullCount() ? longColStat.getNullCount() : null, + null, + null); + } else if (typeRoot.equals(DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)) { + return ColStats.newColStats( + field.id(), + null != longColStat.getNdv() ? longColStat.getNdv() : null, + null != longColStat.getMin() + ? org.apache.paimon.data.Timestamp.fromSQLTimestamp( + new Timestamp(longColStat.getMin())) + : null, + null != longColStat.getMax() + ? org.apache.paimon.data.Timestamp.fromSQLTimestamp( + new Timestamp(longColStat.getMax())) + : null, + null != longColStat.getNullCount() ? longColStat.getNullCount() : null, + null, + null); + } + } else if (colStat instanceof CatalogColumnStatisticsDataDouble) { + CatalogColumnStatisticsDataDouble doubleColumnStatsData = + (CatalogColumnStatisticsDataDouble) colStat; + if (typeRoot.equals(DataTypeRoot.FLOAT)) { + return ColStats.newColStats( + field.id(), + null != doubleColumnStatsData.getNdv() + ? doubleColumnStatsData.getNdv() + : null, + null != doubleColumnStatsData.getMin() + ? doubleColumnStatsData.getMin().floatValue() + : null, + null != doubleColumnStatsData.getMax() + ? doubleColumnStatsData.getMax().floatValue() + : null, + null != doubleColumnStatsData.getNullCount() + ? doubleColumnStatsData.getNullCount() + : null, + null, + null); + } else if (typeRoot.equals(DataTypeRoot.DOUBLE)) { + return ColStats.newColStats( + field.id(), + null != doubleColumnStatsData.getNdv() + ? doubleColumnStatsData.getNdv() + : null, + null != doubleColumnStatsData.getMin() + ? doubleColumnStatsData.getMin() + : null, + null != doubleColumnStatsData.getMax() + ? doubleColumnStatsData.getMax() + : null, + null != doubleColumnStatsData.getNullCount() + ? doubleColumnStatsData.getNullCount() + : null, + null, + null); + } else if (typeRoot.equals(DataTypeRoot.DECIMAL)) { + BigDecimal max = BigDecimal.valueOf(doubleColumnStatsData.getMax()); + BigDecimal min = BigDecimal.valueOf(doubleColumnStatsData.getMin()); + return ColStats.newColStats( + field.id(), + null != doubleColumnStatsData.getNdv() + ? doubleColumnStatsData.getNdv() + : null, + null != doubleColumnStatsData.getMin() + ? Decimal.fromBigDecimal(min, min.precision(), min.scale()) + : null, + null != doubleColumnStatsData.getMax() + ? Decimal.fromBigDecimal(max, max.precision(), max.scale()) + : null, + null != doubleColumnStatsData.getNullCount() + ? doubleColumnStatsData.getNullCount() + : null, + null, + null); + } + } else if (colStat instanceof CatalogColumnStatisticsDataDate) { + CatalogColumnStatisticsDataDate dateColumnStatsData = + (CatalogColumnStatisticsDataDate) colStat; + if (typeRoot.equals(DataTypeRoot.DATE)) { + return ColStats.newColStats( + field.id(), + null != dateColumnStatsData.getNdv() ? dateColumnStatsData.getNdv() : null, + null != dateColumnStatsData.getMin() + ? new Long(dateColumnStatsData.getMin().getDaysSinceEpoch()) + .intValue() + : null, + null != dateColumnStatsData.getMax() + ? new Long(dateColumnStatsData.getMax().getDaysSinceEpoch()) + .intValue() + : null, + null != dateColumnStatsData.getNullCount() + ? dateColumnStatsData.getNullCount() + : null, + null, + null); + } + } else if (colStat instanceof CatalogColumnStatisticsDataBinary) { + CatalogColumnStatisticsDataBinary binaryColumnStatsData = + (CatalogColumnStatisticsDataBinary) colStat; + if (typeRoot.equals(DataTypeRoot.VARBINARY) || typeRoot.equals(DataTypeRoot.BINARY)) { + return ColStats.newColStats( + field.id(), + null, + null, + null, + null != binaryColumnStatsData.getNullCount() + ? binaryColumnStatsData.getNullCount() + : null, + null != binaryColumnStatsData.getAvgLength() + ? binaryColumnStatsData.getAvgLength().longValue() + : null, + null != binaryColumnStatsData.getMaxLength() + ? binaryColumnStatsData.getMaxLength() + : null); + } + } + throw new CatalogException( + String.format( + "Flink does not support convert ColumnStats '%s' for Paimon column " + + "type '%s' yet", + colStat, field.type())); + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkAnalyzeTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkAnalyzeTableITCase.java new file mode 100644 index 000000000000..ad8a2d45a036 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkAnalyzeTableITCase.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.data.Decimal; +import org.apache.paimon.stats.ColStats; +import org.apache.paimon.stats.Statistics; +import org.apache.paimon.utils.DateTimeUtils; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Map; +import java.util.Optional; + +import static org.assertj.core.api.Assertions.assertThat; + +/** IT cases for analyze table. */ +public class FlinkAnalyzeTableITCase extends CatalogITCaseBase { + + @Test + public void testAnalyzeTable() throws Catalog.TableNotExistException { + sql( + "CREATE TABLE T (" + + " id STRING" + + ", name STRING" + + ", i INT" + + ", l bigint" + + ", PRIMARY KEY (id) NOT ENFORCED" + + " ) WITH (" + + " 'bucket' = '2'" + + " )"); + sql("INSERT INTO T VALUES ('1', 'a', 1, 1)"); + sql("INSERT INTO T VALUES ('2', 'aaa', 1, 2)"); + sql("ANALYZE TABLE T COMPUTE STATISTICS"); + + Optional statisticsOpt = paimonTable("T").statistics(); + assertThat(statisticsOpt.isPresent()).isTrue(); + Statistics stats = statisticsOpt.get(); + + assertThat(stats.mergedRecordCount().isPresent()).isTrue(); + Assertions.assertEquals(2L, stats.mergedRecordCount().getAsLong()); + + Assertions.assertTrue(stats.mergedRecordSize().isPresent()); + Assertions.assertTrue(stats.colStats().isEmpty()); + } + + @Test + public void testAnalyzeTableColumn() throws Catalog.TableNotExistException { + sql( + "CREATE TABLE T (" + + "id STRING, name STRING, bytes_col BYTES, int_col INT, long_col bigint,\n" + + "float_col FLOAT, double_col DOUBLE, decimal_col DECIMAL(10, 5), boolean_col BOOLEAN, date_col DATE,\n" + + "timestamp_col TIMESTAMP_LTZ, binary_col BINARY, varbinary_col VARBINARY, char_col CHAR(20), varchar_col VARCHAR(20),\n" + + "tinyint_col TINYINT, smallint_col SMALLINT" + + ", PRIMARY KEY (id) NOT ENFORCED" + + " ) WITH (" + + " 'bucket' = '2'" + + " )"); + sql( + "INSERT INTO T VALUES ('1', 'a', CAST('your_binary_data' AS BYTES), 1, 1, 1.0, 1.0, 13.12345, true, cast('2020-01-01' as date), cast('2024-01-01 00:00:00' as TIMESTAMP_LTZ), CAST('example binary1' AS BINARY), CAST('example binary1' AS VARBINARY), 'a', 'a',CAST(1 AS TINYINT), CAST(2 AS SMALLINT))"); + sql( + "INSERT INTO T VALUES ('2', 'aaa', CAST('your_binary_data' AS BYTES), 1, 1, 1.0, 5.0, 12.12345, true, cast('2021-01-02' as date), cast('2024-01-02 00:00:00' as TIMESTAMP_LTZ), CAST('example binary1' AS BINARY), CAST('example binary1' AS VARBINARY), 'aaa', 'aaa', CAST(2 AS TINYINT), CAST(4 AS SMALLINT))"); + + sql( + "INSERT INTO T VALUES ('3', 'bbbb', CAST('data' AS BYTES), 4, 19, 7.0, 1.0, 14.12345, true, cast(NULL as date), cast('2024-01-02 05:00:00' as TIMESTAMP_LTZ), CAST(NULL AS BINARY), CAST('example binary1' AS VARBINARY), 'aaa', 'aaa', CAST(NULL AS TINYINT), CAST(4 AS SMALLINT))"); + + sql( + "INSERT INTO T VALUES ('4', 'aa', CAST(NULL AS BYTES), 1, 1, 1.0, 1.0, 14.12345, false, cast(NULL as date), cast(NULL as TIMESTAMP_LTZ), CAST(NULL AS BINARY), CAST('example' AS VARBINARY), 'aba', 'aaab', CAST(NULL AS TINYINT), CAST(4 AS SMALLINT))"); + + sql("ANALYZE TABLE T COMPUTE STATISTICS FOR ALL COLUMNS"); + + Optional statisticsOpt = paimonTable("T").statistics(); + assertThat(statisticsOpt.isPresent()).isTrue(); + Statistics stats = statisticsOpt.get(); + + assertThat(stats.mergedRecordCount().isPresent()).isTrue(); + Assertions.assertEquals(4L, stats.mergedRecordCount().getAsLong()); + + Map> colStats = stats.colStats(); + Assertions.assertEquals( + ColStats.newColStats(0, 4L, null, null, 0L, 1L, 1L), colStats.get("id")); + Assertions.assertEquals( + ColStats.newColStats(1, 4L, null, null, 0L, 2L, 4L), colStats.get("name")); + + Assertions.assertEquals( + ColStats.newColStats(2, null, null, null, 1L, null, null), + colStats.get("bytes_col")); + + Assertions.assertEquals( + ColStats.newColStats(3, 2L, 1, 4, 0L, null, null), colStats.get("int_col")); + + Assertions.assertEquals( + ColStats.newColStats(4, 2L, 1L, 19L, 0L, null, null), colStats.get("long_col")); + + Assertions.assertEquals( + ColStats.newColStats(5, 2L, 1.0f, 7.0f, 0L, null, null), colStats.get("float_col")); + + Assertions.assertEquals( + ColStats.newColStats(6, 2L, 1.0d, 5.0d, 0L, null, null), + colStats.get("double_col")); + + Assertions.assertEquals( + ColStats.newColStats( + 7, + 3L, + Decimal.fromBigDecimal(new java.math.BigDecimal("12.12345"), 10, 5), + Decimal.fromBigDecimal(new java.math.BigDecimal("14.12345"), 10, 5), + 0L, + null, + null), + colStats.get("decimal_col")); + + Assertions.assertEquals( + ColStats.newColStats(8, 2L, null, null, 0L, null, null), + colStats.get("boolean_col")); + + Assertions.assertEquals( + ColStats.newColStats(9, 2L, 18262, 18629, 2L, null, null), + colStats.get("date_col")); + + Assertions.assertEquals( + ColStats.newColStats( + 10, + 3L, + DateTimeUtils.parseTimestampData("2024-01-01 00:00:00", 0), + DateTimeUtils.parseTimestampData("2024-01-02 05:00:00", 0), + 1L, + null, + null), + colStats.get("timestamp_col")); + + Assertions.assertEquals( + ColStats.newColStats(11, null, null, null, 2L, null, null), + colStats.get("binary_col")); + + Assertions.assertEquals( + ColStats.newColStats(12, null, null, null, 0L, null, null), + colStats.get("varbinary_col")); + + Assertions.assertEquals( + ColStats.newColStats(13, 3L, null, null, 0L, 20L, 20L), colStats.get("char_col")); + + Assertions.assertEquals( + ColStats.newColStats(14, 3L, null, null, 0L, 2L, 4L), colStats.get("varchar_col")); + + Assertions.assertEquals( + ColStats.newColStats( + 15, + 2L, + new Integer(1).byteValue(), + new Integer(2).byteValue(), + 2L, + null, + null), + colStats.get("tinyint_col")); + + Assertions.assertEquals( + ColStats.newColStats( + 16, + 2L, + new Integer(2).shortValue(), + new Integer(4).shortValue(), + 0L, + null, + null), + colStats.get("smallint_col")); + } +}