From f81ba36d1a3fc6aa2c4225750008cf3916be6bc0 Mon Sep 17 00:00:00 2001 From: herefree <841043203@qq.com> Date: Thu, 3 Oct 2024 10:20:06 +0800 Subject: [PATCH] mod --- .../org/apache/paimon/flink/FlinkCatalog.java | 68 +++++++++---------- .../paimon/flink/utils/TableStatsUtil.java | 23 +++++-- .../paimon/flink/FlinkAnalyzeTableITCase.java | 18 +++-- 3 files changed, 62 insertions(+), 47 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java index 44c4d3f2ff57..fc9b3a68e103 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java @@ -401,7 +401,7 @@ private List toSchemaChange( schemaChanges.add( SchemaChange.addColumn( add.getColumn().getName(), - LogicalTypeConversion.toDataType( + toDataType( add.getColumn().getDataType().getLogicalType()), comment, move)); @@ -452,7 +452,7 @@ private List toSchemaChange( schemaChanges.add( SchemaChange.updateColumnType( modify.getOldColumn().getName(), - LogicalTypeConversion.toDataType(newColumnType))); + toDataType(newColumnType))); } return schemaChanges; } else if (change instanceof ModifyColumnPosition) { @@ -464,7 +464,7 @@ private List toSchemaChange( schemaChanges.add(SchemaChange.updateColumnPosition(move)); } return schemaChanges; - } else if (change instanceof TableChange.ModifyColumnComment) { + } else if (change instanceof ModifyColumnComment) { if (!oldTableNonPhysicalColumnIndex.containsKey( ((ModifyColumnComment) change).getOldColumn().getName())) { ModifyColumnComment modify = (ModifyColumnComment) change; @@ -580,7 +580,7 @@ public void alterTable( throw new TableNotExistException(getName(), tablePath); } - Preconditions.checkArgument(table instanceof FileStoreTable, "Can't alter system table."); + checkArgument(table instanceof FileStoreTable, "Can't alter system table."); validateAlterTable(toCatalogTable(table), (CatalogTable) newTable); Map oldTableNonPhysicalColumnIndex = FlinkCatalogPropertiesUtil.nonPhysicalColumns( @@ -656,7 +656,7 @@ private String getWatermarkExprDataTypeKey(String watermarkPrefix) { } private void setWatermarkOptions( - org.apache.flink.table.catalog.WatermarkSpec wms, List schemaChanges) { + WatermarkSpec wms, List schemaChanges) { String watermarkPrefix = getWatermarkKeyPrefix(); schemaChanges.add( SchemaChange.setOption( @@ -678,8 +678,8 @@ private static void validateAlterTable(CatalogTable ct1, CatalogTable ct2) { if (ct1 instanceof SystemCatalogTable) { throw new UnsupportedOperationException("Can't alter system table."); } - org.apache.flink.table.api.TableSchema ts1 = ct1.getSchema(); - org.apache.flink.table.api.TableSchema ts2 = ct2.getSchema(); + TableSchema ts1 = ct1.getSchema(); + TableSchema ts2 = ct2.getSchema(); boolean pkEquality = false; if (ts1.getPrimaryKey().isPresent() && ts2.getPrimaryKey().isPresent()) { @@ -914,7 +914,7 @@ private List getPartitionSpecs( e -> { LinkedHashMap partValues = partitionComputer.generatePartValues( - Preconditions.checkNotNull( + checkNotNull( e.partition(), "Partition row data is null. This is unexpected.")); return new CatalogPartitionSpec(partValues); @@ -1077,25 +1077,7 @@ public final CatalogColumnStatistics getPartitionColumnStatistics( public final void alterTableStatistics( ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists) throws CatalogException, TableNotExistException { - try { - Table table = catalog.getTable(toIdentifier(tablePath)); - Preconditions.checkArgument( - table instanceof FileStoreTable, "Can't analyze system table."); - if (!table.latestSnapshotId().isPresent()) { - return; - } - Statistics tableStats = - TableStatsUtil.createTableStats((FileStoreTable) table, tableStatistics); - - FileStoreCommit commit = - ((FileStoreTable) table).store().newCommit(UUID.randomUUID().toString()); - commit.commitStatistics(tableStats, BatchWriteBuilder.COMMIT_IDENTIFIER); - - } catch (Catalog.TableNotExistException e) { - if (!ignoreIfNotExists) { - throw new TableNotExistException(getName(), tablePath); - } - } + alterTableStatisticsInternal(tablePath, tableStatistics, ignoreIfNotExists); } @Override @@ -1104,22 +1086,36 @@ public final void alterTableColumnStatistics( CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws CatalogException, TableNotExistException { + alterTableStatisticsInternal(tablePath, columnStatistics, ignoreIfNotExists); + } + + private void alterTableStatisticsInternal(ObjectPath tablePath, Object statistics, boolean ignoreIfNotExists) throws TableNotExistException { try { Table table = catalog.getTable(toIdentifier(tablePath)); - Preconditions.checkArgument( - table instanceof FileStoreTable, "Can't analyze system table."); - + checkArgument( + table instanceof FileStoreTable, "Now only support analyze FileStoreTable."); if (!table.latestSnapshotId().isPresent()) { + LOG.info("Skipping analyze table because the snapshot is null."); return; } - Statistics tableStats = - TableStatsUtil.createTableColumnStats( - ((FileStoreTable) table), columnStatistics); - FileStoreCommit commit = - ((FileStoreTable) table).store().newCommit(UUID.randomUUID().toString()); - commit.commitStatistics(tableStats, BatchWriteBuilder.COMMIT_IDENTIFIER); + Statistics tableStats = null; + if (statistics instanceof CatalogColumnStatistics) { + tableStats = + TableStatsUtil.createTableColumnStats( + ((FileStoreTable) table), (CatalogColumnStatistics)statistics); + } else if (statistics instanceof CatalogTableStatistics) { + tableStats = + TableStatsUtil.createTableStats( + ((FileStoreTable) table), (CatalogTableStatistics)statistics); + } + if (tableStats != null) { + FileStoreTable fileStoreTable = (FileStoreTable) table; + FileStoreCommit commit = + fileStoreTable.store().newCommit(CoreOptions.createCommitUser(fileStoreTable.coreOptions().toConfiguration())); + commit.commitStatistics(tableStats, BatchWriteBuilder.COMMIT_IDENTIFIER); + } } catch (Catalog.TableNotExistException e) { if (!ignoreIfNotExists) { throw new TableNotExistException(getName(), tablePath); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableStatsUtil.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableStatsUtil.java index 9bb6728dccbe..5e829e4a7546 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableStatsUtil.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableStatsUtil.java @@ -20,6 +20,7 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.data.Decimal; +import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.stats.ColStats; import org.apache.paimon.stats.Statistics; import org.apache.paimon.table.FileStoreTable; @@ -39,20 +40,25 @@ import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataString; import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import javax.annotation.Nullable; import java.math.BigDecimal; import java.sql.Timestamp; import java.util.HashMap; import java.util.List; +import java.util.Map; /** Utility methods for analysis table. */ public class TableStatsUtil { /** create Paimon statistics. */ + @Nullable public static Statistics createTableStats( FileStoreTable table, CatalogTableStatistics catalogTableStatistics) { - Snapshot snapshot = table.snapshotManager().latestSnapshot(); long mergedRecordCount = catalogTableStatistics.getRowCount(); + if (snapshot.totalRecordCount() == null) { + return null; + } long totalRecordCount = snapshot.totalRecordCount(); Preconditions.checkState( totalRecordCount >= mergedRecordCount, @@ -60,7 +66,7 @@ public static Statistics createTableStats( long totalSize = table.newScan().plan().splits().stream() .flatMap(split -> ((DataSplit) split).dataFiles().stream()) - .mapToLong(dataFile -> dataFile.fileSize()) + .mapToLong(DataFileMeta::fileSize) .sum(); long mergedRecordSize = (long) (totalSize * ((double) mergedRecordCount / totalRecordCount)); @@ -71,12 +77,15 @@ public static Statistics createTableStats( } /** Create Paimon statistics from given Flink columnStatistics. */ + @Nullable public static Statistics createTableColumnStats( FileStoreTable table, CatalogColumnStatistics columnStatistics) { - + if (!table.statistics().isPresent()) { + return null; + } Statistics statistics = table.statistics().get(); - HashMap> tableColumnStatsMap = new HashMap<>(); List fields = table.schema().fields(); + Map> tableColumnStatsMap = new HashMap<>(fields.size()); for (DataField field : fields) { CatalogColumnStatisticsDataBase catalogColumnStatisticsDataBase = columnStatistics.getColumnStatisticsData().get(field.name()); @@ -91,7 +100,7 @@ public static Statistics createTableColumnStats( } /** Convert Flink ColumnStats to Paimon ColStats according to Paimon column type. */ - private static ColStats getPaimonColStats( + private static ColStats getPaimonColStats( DataField field, CatalogColumnStatisticsDataBase colStat) { DataTypeRoot typeRoot = field.type().getTypeRoot(); if (colStat instanceof CatalogColumnStatisticsDataString) { @@ -116,7 +125,7 @@ private static ColStats getPaimonColStats( return ColStats.newColStats( field.id(), (booleanColStat.getFalseCount() > 0 ? 1L : 0) - + (booleanColStat.getTrueCount() > 0 ? 1 : 0), + + (booleanColStat.getTrueCount() > 0 ? 1L : 0), null, null, booleanColStat.getNullCount(), @@ -278,7 +287,7 @@ private static ColStats getPaimonColStats( } throw new CatalogException( String.format( - "Flink does not support converting ColumnStats '%s' for Paimon column " + "Flink does not support convert ColumnStats '%s' for Paimon column " + "type '%s' yet", colStat, field.type())); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkAnalyzeTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkAnalyzeTableITCase.java index aa242bf8d814..a13f4bb99e10 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkAnalyzeTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkAnalyzeTableITCase.java @@ -28,6 +28,9 @@ import org.junit.jupiter.api.Test; import java.util.Map; +import java.util.Optional; + +import static org.assertj.core.api.Assertions.assertThat; /** IT cases for analyze table. */ public class FlinkAnalyzeTableITCase extends CatalogITCaseBase { @@ -47,16 +50,20 @@ public void testAnalyzeTable() throws Catalog.TableNotExistException { sql("INSERT INTO T VALUES ('1', 'a', 1, 1)"); sql("INSERT INTO T VALUES ('2', 'aaa', 1, 2)"); sql("ANALYZE TABLE T COMPUTE STATISTICS"); - Statistics stats = paimonTable("T").statistics().get(); + Optional statisticsOpt = paimonTable("T").statistics(); + assertThat(statisticsOpt.isPresent()).isTrue(); + Statistics stats = statisticsOpt.get(); + + assertThat(stats.mergedRecordCount().isPresent()).isTrue(); Assertions.assertEquals(2L, stats.mergedRecordCount().getAsLong()); + Assertions.assertTrue(stats.mergedRecordSize().isPresent()); Assertions.assertTrue(stats.colStats().isEmpty()); } @Test public void testAnalyzeTableColumn() throws Catalog.TableNotExistException { - sql( "CREATE TABLE T (" + "id STRING, name STRING, bytes_col BYTES, int_col INT, long_col bigint,\n" @@ -80,8 +87,11 @@ public void testAnalyzeTableColumn() throws Catalog.TableNotExistException { sql("ANALYZE TABLE T COMPUTE STATISTICS FOR ALL COLUMNS"); - Statistics stats = paimonTable("T").statistics().get(); + Optional statisticsOpt = paimonTable("T").statistics(); + assertThat(statisticsOpt.isPresent()).isTrue(); + Statistics stats = statisticsOpt.get(); + assertThat(stats.mergedRecordCount().isPresent()).isTrue(); Assertions.assertEquals(4L, stats.mergedRecordCount().getAsLong()); Map> colStats = stats.colStats(); @@ -95,7 +105,7 @@ public void testAnalyzeTableColumn() throws Catalog.TableNotExistException { colStats.get("bytes_col")); Assertions.assertEquals( - ColStats.newColStats(3, 2L, new Integer(1), new Integer(4), 0L, null, null), + ColStats.newColStats(3, 2L, 1, 4, 0L, null, null), colStats.get("int_col")); Assertions.assertEquals(