Skip to content

Commit

Permalink
[flink] Support analysis statistics in flink (#4280)
Browse files Browse the repository at this point in the history
  • Loading branch information
herefree authored Oct 10, 2024
1 parent c6e320c commit cafb215
Show file tree
Hide file tree
Showing 3 changed files with 529 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,19 @@
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.procedure.ProcedureUtil;
import org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil;
import org.apache.paimon.flink.utils.TableStatsUtil;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.stats.Statistics;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FormatTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.InternalRowPartitionComputer;
Expand Down Expand Up @@ -1239,17 +1243,57 @@ public final CatalogColumnStatistics getPartitionColumnStatistics(
@Override
public final void alterTableStatistics(
ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists)
throws CatalogException {
throw new UnsupportedOperationException();
throws CatalogException, TableNotExistException {
alterTableStatisticsInternal(tablePath, tableStatistics, ignoreIfNotExists);
}

@Override
public final void alterTableColumnStatistics(
ObjectPath tablePath,
CatalogColumnStatistics columnStatistics,
boolean ignoreIfNotExists)
throws CatalogException {
throw new UnsupportedOperationException();
throws CatalogException, TableNotExistException {
alterTableStatisticsInternal(tablePath, columnStatistics, ignoreIfNotExists);
}

private void alterTableStatisticsInternal(
ObjectPath tablePath, Object statistics, boolean ignoreIfNotExists)
throws TableNotExistException {
try {
Table table = catalog.getTable(toIdentifier(tablePath));
checkArgument(
table instanceof FileStoreTable, "Now only support analyze FileStoreTable.");
if (!table.latestSnapshotId().isPresent()) {
LOG.info("Skipping analyze table because the snapshot is null.");
return;
}

Statistics tableStats = null;
if (statistics instanceof CatalogColumnStatistics) {
tableStats =
TableStatsUtil.createTableColumnStats(
((FileStoreTable) table), (CatalogColumnStatistics) statistics);
} else if (statistics instanceof CatalogTableStatistics) {
tableStats =
TableStatsUtil.createTableStats(
((FileStoreTable) table), (CatalogTableStatistics) statistics);
}

if (tableStats != null) {
FileStoreTable fileStoreTable = (FileStoreTable) table;
FileStoreCommit commit =
fileStoreTable
.store()
.newCommit(
CoreOptions.createCommitUser(
fileStoreTable.coreOptions().toConfiguration()));
commit.commitStatistics(tableStats, BatchWriteBuilder.COMMIT_IDENTIFIER);
}
} catch (Catalog.TableNotExistException e) {
if (!ignoreIfNotExists) {
throw new TableNotExistException(getName(), tablePath);
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,295 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.utils;

import org.apache.paimon.Snapshot;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.stats.ColStats;
import org.apache.paimon.stats.Statistics;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.utils.Preconditions;

import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBase;
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBinary;
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBoolean;
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataDate;
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataDouble;
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataLong;
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataString;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;

import javax.annotation.Nullable;

import java.math.BigDecimal;
import java.sql.Timestamp;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/** Utility methods for analysis table. */
public class TableStatsUtil {

/** create Paimon statistics. */
@Nullable
public static Statistics createTableStats(
FileStoreTable table, CatalogTableStatistics catalogTableStatistics) {
Snapshot snapshot = table.snapshotManager().latestSnapshot();
long mergedRecordCount = catalogTableStatistics.getRowCount();
if (snapshot.totalRecordCount() == null) {
return null;
}
long totalRecordCount = snapshot.totalRecordCount();
Preconditions.checkState(
totalRecordCount >= mergedRecordCount,
"totalRecordCount: $totalRecordCount should be greater or equal than mergedRecordCount: $mergedRecordCount.");
long totalSize =
table.newScan().plan().splits().stream()
.flatMap(split -> ((DataSplit) split).dataFiles().stream())
.mapToLong(DataFileMeta::fileSize)
.sum();
long mergedRecordSize =
(long) (totalSize * ((double) mergedRecordCount / totalRecordCount));

// convert to paimon stats
return new Statistics(
snapshot.id(), snapshot.schemaId(), mergedRecordCount, mergedRecordSize);
}

/** Create Paimon statistics from given Flink columnStatistics. */
@Nullable
public static Statistics createTableColumnStats(
FileStoreTable table, CatalogColumnStatistics columnStatistics) {
if (!table.statistics().isPresent()) {
return null;
}
Statistics statistics = table.statistics().get();
List<DataField> fields = table.schema().fields();
Map<String, ColStats<?>> tableColumnStatsMap = new HashMap<>(fields.size());
for (DataField field : fields) {
CatalogColumnStatisticsDataBase catalogColumnStatisticsDataBase =
columnStatistics.getColumnStatisticsData().get(field.name());
if (catalogColumnStatisticsDataBase == null) {
continue;
}
tableColumnStatsMap.put(
field.name(), getPaimonColStats(field, catalogColumnStatisticsDataBase));
}
statistics.colStats().putAll(tableColumnStatsMap);
return statistics;
}

/** Convert Flink ColumnStats to Paimon ColStats according to Paimon column type. */
private static ColStats<?> getPaimonColStats(
DataField field, CatalogColumnStatisticsDataBase colStat) {
DataTypeRoot typeRoot = field.type().getTypeRoot();
if (colStat instanceof CatalogColumnStatisticsDataString) {
CatalogColumnStatisticsDataString stringColStat =
(CatalogColumnStatisticsDataString) colStat;
if (typeRoot.equals(DataTypeRoot.CHAR) || typeRoot.equals(DataTypeRoot.VARCHAR)) {
return ColStats.newColStats(
field.id(),
null != stringColStat.getNdv() ? stringColStat.getNdv() : null,
null,
null,
null != stringColStat.getNullCount() ? stringColStat.getNullCount() : null,
null != stringColStat.getAvgLength()
? stringColStat.getAvgLength().longValue()
: null,
null != stringColStat.getMaxLength() ? stringColStat.getMaxLength() : null);
}
} else if (colStat instanceof CatalogColumnStatisticsDataBoolean) {
CatalogColumnStatisticsDataBoolean booleanColStat =
(CatalogColumnStatisticsDataBoolean) colStat;
if (typeRoot.equals(DataTypeRoot.BOOLEAN)) {
return ColStats.newColStats(
field.id(),
(booleanColStat.getFalseCount() > 0 ? 1L : 0)
+ (booleanColStat.getTrueCount() > 0 ? 1L : 0),
null,
null,
booleanColStat.getNullCount(),
null,
null);
}
} else if (colStat instanceof CatalogColumnStatisticsDataLong) {
CatalogColumnStatisticsDataLong longColStat = (CatalogColumnStatisticsDataLong) colStat;
if (typeRoot.equals(DataTypeRoot.INTEGER)) {
return ColStats.newColStats(
field.id(),
null != longColStat.getNdv() ? longColStat.getNdv() : null,
null != longColStat.getMin() ? longColStat.getMin().intValue() : null,
null != longColStat.getMax() ? longColStat.getMax().intValue() : null,
null != longColStat.getNullCount() ? longColStat.getNullCount() : null,
null,
null);
} else if (typeRoot.equals(DataTypeRoot.TINYINT)) {
return ColStats.newColStats(
field.id(),
null != longColStat.getNdv() ? longColStat.getNdv() : null,
null != longColStat.getMin() ? longColStat.getMin().byteValue() : null,
null != longColStat.getMax() ? longColStat.getMax().byteValue() : null,
null != longColStat.getNullCount() ? longColStat.getNullCount() : null,
null,
null);

} else if (typeRoot.equals(DataTypeRoot.SMALLINT)) {
return ColStats.newColStats(
field.id(),
null != longColStat.getNdv() ? longColStat.getNdv() : null,
null != longColStat.getMin() ? longColStat.getMin().shortValue() : null,
null != longColStat.getMax() ? longColStat.getMax().shortValue() : null,
null != longColStat.getNullCount() ? longColStat.getNullCount() : null,
null,
null);
} else if (typeRoot.equals(DataTypeRoot.BIGINT)) {
return ColStats.newColStats(
field.id(),
null != longColStat.getNdv() ? longColStat.getNdv() : null,
null != longColStat.getMin() ? longColStat.getMin() : null,
null != longColStat.getMax() ? longColStat.getMax() : null,
null != longColStat.getNullCount() ? longColStat.getNullCount() : null,
null,
null);
} else if (typeRoot.equals(DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)) {
return ColStats.newColStats(
field.id(),
null != longColStat.getNdv() ? longColStat.getNdv() : null,
null != longColStat.getMin()
? org.apache.paimon.data.Timestamp.fromSQLTimestamp(
new Timestamp(longColStat.getMin()))
: null,
null != longColStat.getMax()
? org.apache.paimon.data.Timestamp.fromSQLTimestamp(
new Timestamp(longColStat.getMax()))
: null,
null != longColStat.getNullCount() ? longColStat.getNullCount() : null,
null,
null);
}
} else if (colStat instanceof CatalogColumnStatisticsDataDouble) {
CatalogColumnStatisticsDataDouble doubleColumnStatsData =
(CatalogColumnStatisticsDataDouble) colStat;
if (typeRoot.equals(DataTypeRoot.FLOAT)) {
return ColStats.newColStats(
field.id(),
null != doubleColumnStatsData.getNdv()
? doubleColumnStatsData.getNdv()
: null,
null != doubleColumnStatsData.getMin()
? doubleColumnStatsData.getMin().floatValue()
: null,
null != doubleColumnStatsData.getMax()
? doubleColumnStatsData.getMax().floatValue()
: null,
null != doubleColumnStatsData.getNullCount()
? doubleColumnStatsData.getNullCount()
: null,
null,
null);
} else if (typeRoot.equals(DataTypeRoot.DOUBLE)) {
return ColStats.newColStats(
field.id(),
null != doubleColumnStatsData.getNdv()
? doubleColumnStatsData.getNdv()
: null,
null != doubleColumnStatsData.getMin()
? doubleColumnStatsData.getMin()
: null,
null != doubleColumnStatsData.getMax()
? doubleColumnStatsData.getMax()
: null,
null != doubleColumnStatsData.getNullCount()
? doubleColumnStatsData.getNullCount()
: null,
null,
null);
} else if (typeRoot.equals(DataTypeRoot.DECIMAL)) {
BigDecimal max = BigDecimal.valueOf(doubleColumnStatsData.getMax());
BigDecimal min = BigDecimal.valueOf(doubleColumnStatsData.getMin());
return ColStats.newColStats(
field.id(),
null != doubleColumnStatsData.getNdv()
? doubleColumnStatsData.getNdv()
: null,
null != doubleColumnStatsData.getMin()
? Decimal.fromBigDecimal(min, min.precision(), min.scale())
: null,
null != doubleColumnStatsData.getMax()
? Decimal.fromBigDecimal(max, max.precision(), max.scale())
: null,
null != doubleColumnStatsData.getNullCount()
? doubleColumnStatsData.getNullCount()
: null,
null,
null);
}
} else if (colStat instanceof CatalogColumnStatisticsDataDate) {
CatalogColumnStatisticsDataDate dateColumnStatsData =
(CatalogColumnStatisticsDataDate) colStat;
if (typeRoot.equals(DataTypeRoot.DATE)) {
return ColStats.newColStats(
field.id(),
null != dateColumnStatsData.getNdv() ? dateColumnStatsData.getNdv() : null,
null != dateColumnStatsData.getMin()
? new Long(dateColumnStatsData.getMin().getDaysSinceEpoch())
.intValue()
: null,
null != dateColumnStatsData.getMax()
? new Long(dateColumnStatsData.getMax().getDaysSinceEpoch())
.intValue()
: null,
null != dateColumnStatsData.getNullCount()
? dateColumnStatsData.getNullCount()
: null,
null,
null);
}
} else if (colStat instanceof CatalogColumnStatisticsDataBinary) {
CatalogColumnStatisticsDataBinary binaryColumnStatsData =
(CatalogColumnStatisticsDataBinary) colStat;
if (typeRoot.equals(DataTypeRoot.VARBINARY) || typeRoot.equals(DataTypeRoot.BINARY)) {
return ColStats.newColStats(
field.id(),
null,
null,
null,
null != binaryColumnStatsData.getNullCount()
? binaryColumnStatsData.getNullCount()
: null,
null != binaryColumnStatsData.getAvgLength()
? binaryColumnStatsData.getAvgLength().longValue()
: null,
null != binaryColumnStatsData.getMaxLength()
? binaryColumnStatsData.getMaxLength()
: null);
}
}
throw new CatalogException(
String.format(
"Flink does not support convert ColumnStats '%s' for Paimon column "
+ "type '%s' yet",
colStat, field.type()));
}
}
Loading

0 comments on commit cafb215

Please sign in to comment.