Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Flink] Support analysis statistics in flink #4280

Merged
merged 6 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,19 @@
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.procedure.ProcedureUtil;
import org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil;
import org.apache.paimon.flink.utils.TableStatsUtil;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.stats.Statistics;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FormatTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.InternalRowPartitionComputer;
Expand Down Expand Up @@ -1239,17 +1243,57 @@ public final CatalogColumnStatistics getPartitionColumnStatistics(
@Override
public final void alterTableStatistics(
ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists)
throws CatalogException {
throw new UnsupportedOperationException();
throws CatalogException, TableNotExistException {
alterTableStatisticsInternal(tablePath, tableStatistics, ignoreIfNotExists);
}

@Override
public final void alterTableColumnStatistics(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1、Same suggests with this method and alterTableStatistics.
2、Abstract a new method named alterTableStatisticsInternal, because alterTableStatistics and alterTableColumnStatistics has lots of same code.

ObjectPath tablePath,
CatalogColumnStatistics columnStatistics,
boolean ignoreIfNotExists)
throws CatalogException {
throw new UnsupportedOperationException();
throws CatalogException, TableNotExistException {
alterTableStatisticsInternal(tablePath, columnStatistics, ignoreIfNotExists);
}

private void alterTableStatisticsInternal(
ObjectPath tablePath, Object statistics, boolean ignoreIfNotExists)
throws TableNotExistException {
try {
Table table = catalog.getTable(toIdentifier(tablePath));
checkArgument(
table instanceof FileStoreTable, "Now only support analyze FileStoreTable.");
if (!table.latestSnapshotId().isPresent()) {
LOG.info("Skipping analyze table because the snapshot is null.");
return;
}

Statistics tableStats = null;
if (statistics instanceof CatalogColumnStatistics) {
tableStats =
TableStatsUtil.createTableColumnStats(
((FileStoreTable) table), (CatalogColumnStatistics) statistics);
} else if (statistics instanceof CatalogTableStatistics) {
tableStats =
TableStatsUtil.createTableStats(
((FileStoreTable) table), (CatalogTableStatistics) statistics);
}

if (tableStats != null) {
FileStoreTable fileStoreTable = (FileStoreTable) table;
FileStoreCommit commit =
fileStoreTable
.store()
.newCommit(
CoreOptions.createCommitUser(
fileStoreTable.coreOptions().toConfiguration()));
commit.commitStatistics(tableStats, BatchWriteBuilder.COMMIT_IDENTIFIER);
}
} catch (Catalog.TableNotExistException e) {
if (!ignoreIfNotExists) {
throw new TableNotExistException(getName(), tablePath);
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,295 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.utils;

import org.apache.paimon.Snapshot;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.stats.ColStats;
import org.apache.paimon.stats.Statistics;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.utils.Preconditions;

import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBase;
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBinary;
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBoolean;
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataDate;
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataDouble;
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataLong;
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataString;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;

import javax.annotation.Nullable;

import java.math.BigDecimal;
import java.sql.Timestamp;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/** Utility methods for analysis table. */
public class TableStatsUtil {

/** create Paimon statistics. */
@Nullable
public static Statistics createTableStats(
FileStoreTable table, CatalogTableStatistics catalogTableStatistics) {
Snapshot snapshot = table.snapshotManager().latestSnapshot();
long mergedRecordCount = catalogTableStatistics.getRowCount();
if (snapshot.totalRecordCount() == null) {
return null;
}
long totalRecordCount = snapshot.totalRecordCount();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check totalRecordCount is null.

Preconditions.checkState(
totalRecordCount >= mergedRecordCount,
"totalRecordCount: $totalRecordCount should be greater or equal than mergedRecordCount: $mergedRecordCount.");
long totalSize =
table.newScan().plan().splits().stream()
.flatMap(split -> ((DataSplit) split).dataFiles().stream())
.mapToLong(DataFileMeta::fileSize)
.sum();
long mergedRecordSize =
(long) (totalSize * ((double) mergedRecordCount / totalRecordCount));

// convert to paimon stats
return new Statistics(
snapshot.id(), snapshot.schemaId(), mergedRecordCount, mergedRecordSize);
}

/** Create Paimon statistics from given Flink columnStatistics. */
@Nullable
public static Statistics createTableColumnStats(
FileStoreTable table, CatalogColumnStatistics columnStatistics) {
if (!table.statistics().isPresent()) {
return null;
}
Statistics statistics = table.statistics().get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'Optional.get()' without 'isPresent()' check

List<DataField> fields = table.schema().fields();
Map<String, ColStats<?>> tableColumnStatsMap = new HashMap<>(fields.size());
for (DataField field : fields) {
CatalogColumnStatisticsDataBase catalogColumnStatisticsDataBase =
columnStatistics.getColumnStatisticsData().get(field.name());
if (catalogColumnStatisticsDataBase == null) {
continue;
}
tableColumnStatsMap.put(
field.name(), getPaimonColStats(field, catalogColumnStatisticsDataBase));
}
statistics.colStats().putAll(tableColumnStatsMap);
return statistics;
}

/** Convert Flink ColumnStats to Paimon ColStats according to Paimon column type. */
private static ColStats<?> getPaimonColStats(
DataField field, CatalogColumnStatisticsDataBase colStat) {
DataTypeRoot typeRoot = field.type().getTypeRoot();
if (colStat instanceof CatalogColumnStatisticsDataString) {
CatalogColumnStatisticsDataString stringColStat =
(CatalogColumnStatisticsDataString) colStat;
if (typeRoot.equals(DataTypeRoot.CHAR) || typeRoot.equals(DataTypeRoot.VARCHAR)) {
return ColStats.newColStats(
field.id(),
null != stringColStat.getNdv() ? stringColStat.getNdv() : null,
null,
null,
null != stringColStat.getNullCount() ? stringColStat.getNullCount() : null,
null != stringColStat.getAvgLength()
? stringColStat.getAvgLength().longValue()
: null,
null != stringColStat.getMaxLength() ? stringColStat.getMaxLength() : null);
}
} else if (colStat instanceof CatalogColumnStatisticsDataBoolean) {
CatalogColumnStatisticsDataBoolean booleanColStat =
(CatalogColumnStatisticsDataBoolean) colStat;
if (typeRoot.equals(DataTypeRoot.BOOLEAN)) {
return ColStats.newColStats(
field.id(),
(booleanColStat.getFalseCount() > 0 ? 1L : 0)
+ (booleanColStat.getTrueCount() > 0 ? 1L : 0),
null,
null,
booleanColStat.getNullCount(),
null,
null);
}
} else if (colStat instanceof CatalogColumnStatisticsDataLong) {
CatalogColumnStatisticsDataLong longColStat = (CatalogColumnStatisticsDataLong) colStat;
if (typeRoot.equals(DataTypeRoot.INTEGER)) {
return ColStats.newColStats(
field.id(),
null != longColStat.getNdv() ? longColStat.getNdv() : null,
null != longColStat.getMin() ? longColStat.getMin().intValue() : null,
null != longColStat.getMax() ? longColStat.getMax().intValue() : null,
null != longColStat.getNullCount() ? longColStat.getNullCount() : null,
null,
null);
} else if (typeRoot.equals(DataTypeRoot.TINYINT)) {
return ColStats.newColStats(
field.id(),
null != longColStat.getNdv() ? longColStat.getNdv() : null,
null != longColStat.getMin() ? longColStat.getMin().byteValue() : null,
null != longColStat.getMax() ? longColStat.getMax().byteValue() : null,
null != longColStat.getNullCount() ? longColStat.getNullCount() : null,
null,
null);

} else if (typeRoot.equals(DataTypeRoot.SMALLINT)) {
return ColStats.newColStats(
field.id(),
null != longColStat.getNdv() ? longColStat.getNdv() : null,
null != longColStat.getMin() ? longColStat.getMin().shortValue() : null,
null != longColStat.getMax() ? longColStat.getMax().shortValue() : null,
null != longColStat.getNullCount() ? longColStat.getNullCount() : null,
null,
null);
} else if (typeRoot.equals(DataTypeRoot.BIGINT)) {
return ColStats.newColStats(
field.id(),
null != longColStat.getNdv() ? longColStat.getNdv() : null,
null != longColStat.getMin() ? longColStat.getMin() : null,
null != longColStat.getMax() ? longColStat.getMax() : null,
null != longColStat.getNullCount() ? longColStat.getNullCount() : null,
null,
null);
} else if (typeRoot.equals(DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)) {
return ColStats.newColStats(
field.id(),
null != longColStat.getNdv() ? longColStat.getNdv() : null,
null != longColStat.getMin()
? org.apache.paimon.data.Timestamp.fromSQLTimestamp(
new Timestamp(longColStat.getMin()))
: null,
null != longColStat.getMax()
? org.apache.paimon.data.Timestamp.fromSQLTimestamp(
new Timestamp(longColStat.getMax()))
: null,
null != longColStat.getNullCount() ? longColStat.getNullCount() : null,
null,
null);
}
} else if (colStat instanceof CatalogColumnStatisticsDataDouble) {
CatalogColumnStatisticsDataDouble doubleColumnStatsData =
(CatalogColumnStatisticsDataDouble) colStat;
if (typeRoot.equals(DataTypeRoot.FLOAT)) {
return ColStats.newColStats(
field.id(),
null != doubleColumnStatsData.getNdv()
? doubleColumnStatsData.getNdv()
: null,
null != doubleColumnStatsData.getMin()
? doubleColumnStatsData.getMin().floatValue()
: null,
null != doubleColumnStatsData.getMax()
? doubleColumnStatsData.getMax().floatValue()
: null,
null != doubleColumnStatsData.getNullCount()
? doubleColumnStatsData.getNullCount()
: null,
null,
null);
} else if (typeRoot.equals(DataTypeRoot.DOUBLE)) {
return ColStats.newColStats(
field.id(),
null != doubleColumnStatsData.getNdv()
? doubleColumnStatsData.getNdv()
: null,
null != doubleColumnStatsData.getMin()
? doubleColumnStatsData.getMin()
: null,
null != doubleColumnStatsData.getMax()
? doubleColumnStatsData.getMax()
: null,
null != doubleColumnStatsData.getNullCount()
? doubleColumnStatsData.getNullCount()
: null,
null,
null);
} else if (typeRoot.equals(DataTypeRoot.DECIMAL)) {
BigDecimal max = BigDecimal.valueOf(doubleColumnStatsData.getMax());
BigDecimal min = BigDecimal.valueOf(doubleColumnStatsData.getMin());
return ColStats.newColStats(
field.id(),
null != doubleColumnStatsData.getNdv()
? doubleColumnStatsData.getNdv()
: null,
null != doubleColumnStatsData.getMin()
? Decimal.fromBigDecimal(min, min.precision(), min.scale())
: null,
null != doubleColumnStatsData.getMax()
? Decimal.fromBigDecimal(max, max.precision(), max.scale())
: null,
null != doubleColumnStatsData.getNullCount()
? doubleColumnStatsData.getNullCount()
: null,
null,
null);
}
} else if (colStat instanceof CatalogColumnStatisticsDataDate) {
CatalogColumnStatisticsDataDate dateColumnStatsData =
(CatalogColumnStatisticsDataDate) colStat;
if (typeRoot.equals(DataTypeRoot.DATE)) {
return ColStats.newColStats(
field.id(),
null != dateColumnStatsData.getNdv() ? dateColumnStatsData.getNdv() : null,
null != dateColumnStatsData.getMin()
? new Long(dateColumnStatsData.getMin().getDaysSinceEpoch())
.intValue()
: null,
null != dateColumnStatsData.getMax()
? new Long(dateColumnStatsData.getMax().getDaysSinceEpoch())
.intValue()
: null,
null != dateColumnStatsData.getNullCount()
? dateColumnStatsData.getNullCount()
: null,
null,
null);
}
} else if (colStat instanceof CatalogColumnStatisticsDataBinary) {
CatalogColumnStatisticsDataBinary binaryColumnStatsData =
(CatalogColumnStatisticsDataBinary) colStat;
if (typeRoot.equals(DataTypeRoot.VARBINARY) || typeRoot.equals(DataTypeRoot.BINARY)) {
return ColStats.newColStats(
field.id(),
null,
null,
null,
null != binaryColumnStatsData.getNullCount()
? binaryColumnStatsData.getNullCount()
: null,
null != binaryColumnStatsData.getAvgLength()
? binaryColumnStatsData.getAvgLength().longValue()
: null,
null != binaryColumnStatsData.getMaxLength()
? binaryColumnStatsData.getMaxLength()
: null);
}
}
throw new CatalogException(
String.format(
"Flink does not support convert ColumnStats '%s' for Paimon column "
+ "type '%s' yet",
colStat, field.type()));
}
}
Loading
Loading