Skip to content

Commit

Permalink
mod
Browse files Browse the repository at this point in the history
  • Loading branch information
herefree committed Oct 3, 2024
1 parent 257cb67 commit f81ba36
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ private List<SchemaChange> toSchemaChange(
schemaChanges.add(
SchemaChange.addColumn(
add.getColumn().getName(),
LogicalTypeConversion.toDataType(
toDataType(
add.getColumn().getDataType().getLogicalType()),
comment,
move));
Expand Down Expand Up @@ -452,7 +452,7 @@ private List<SchemaChange> toSchemaChange(
schemaChanges.add(
SchemaChange.updateColumnType(
modify.getOldColumn().getName(),
LogicalTypeConversion.toDataType(newColumnType)));
toDataType(newColumnType)));
}
return schemaChanges;
} else if (change instanceof ModifyColumnPosition) {
Expand All @@ -464,7 +464,7 @@ private List<SchemaChange> toSchemaChange(
schemaChanges.add(SchemaChange.updateColumnPosition(move));
}
return schemaChanges;
} else if (change instanceof TableChange.ModifyColumnComment) {
} else if (change instanceof ModifyColumnComment) {
if (!oldTableNonPhysicalColumnIndex.containsKey(
((ModifyColumnComment) change).getOldColumn().getName())) {
ModifyColumnComment modify = (ModifyColumnComment) change;
Expand Down Expand Up @@ -580,7 +580,7 @@ public void alterTable(
throw new TableNotExistException(getName(), tablePath);
}

Preconditions.checkArgument(table instanceof FileStoreTable, "Can't alter system table.");
checkArgument(table instanceof FileStoreTable, "Can't alter system table.");
validateAlterTable(toCatalogTable(table), (CatalogTable) newTable);
Map<String, Integer> oldTableNonPhysicalColumnIndex =
FlinkCatalogPropertiesUtil.nonPhysicalColumns(
Expand Down Expand Up @@ -656,7 +656,7 @@ private String getWatermarkExprDataTypeKey(String watermarkPrefix) {
}

private void setWatermarkOptions(
org.apache.flink.table.catalog.WatermarkSpec wms, List<SchemaChange> schemaChanges) {
WatermarkSpec wms, List<SchemaChange> schemaChanges) {
String watermarkPrefix = getWatermarkKeyPrefix();
schemaChanges.add(
SchemaChange.setOption(
Expand All @@ -678,8 +678,8 @@ private static void validateAlterTable(CatalogTable ct1, CatalogTable ct2) {
if (ct1 instanceof SystemCatalogTable) {
throw new UnsupportedOperationException("Can't alter system table.");
}
org.apache.flink.table.api.TableSchema ts1 = ct1.getSchema();
org.apache.flink.table.api.TableSchema ts2 = ct2.getSchema();
TableSchema ts1 = ct1.getSchema();
TableSchema ts2 = ct2.getSchema();
boolean pkEquality = false;

if (ts1.getPrimaryKey().isPresent() && ts2.getPrimaryKey().isPresent()) {
Expand Down Expand Up @@ -914,7 +914,7 @@ private List<CatalogPartitionSpec> getPartitionSpecs(
e -> {
LinkedHashMap<String, String> partValues =
partitionComputer.generatePartValues(
Preconditions.checkNotNull(
checkNotNull(
e.partition(),
"Partition row data is null. This is unexpected."));
return new CatalogPartitionSpec(partValues);
Expand Down Expand Up @@ -1077,25 +1077,7 @@ public final CatalogColumnStatistics getPartitionColumnStatistics(
public final void alterTableStatistics(
ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists)
throws CatalogException, TableNotExistException {
try {
Table table = catalog.getTable(toIdentifier(tablePath));
Preconditions.checkArgument(
table instanceof FileStoreTable, "Can't analyze system table.");
if (!table.latestSnapshotId().isPresent()) {
return;
}
Statistics tableStats =
TableStatsUtil.createTableStats((FileStoreTable) table, tableStatistics);

FileStoreCommit commit =
((FileStoreTable) table).store().newCommit(UUID.randomUUID().toString());
commit.commitStatistics(tableStats, BatchWriteBuilder.COMMIT_IDENTIFIER);

} catch (Catalog.TableNotExistException e) {
if (!ignoreIfNotExists) {
throw new TableNotExistException(getName(), tablePath);
}
}
alterTableStatisticsInternal(tablePath, tableStatistics, ignoreIfNotExists);
}

@Override
Expand All @@ -1104,22 +1086,36 @@ public final void alterTableColumnStatistics(
CatalogColumnStatistics columnStatistics,
boolean ignoreIfNotExists)
throws CatalogException, TableNotExistException {
alterTableStatisticsInternal(tablePath, columnStatistics, ignoreIfNotExists);
}

private void alterTableStatisticsInternal(ObjectPath tablePath, Object statistics, boolean ignoreIfNotExists) throws TableNotExistException {
try {
Table table = catalog.getTable(toIdentifier(tablePath));
Preconditions.checkArgument(
table instanceof FileStoreTable, "Can't analyze system table.");

checkArgument(
table instanceof FileStoreTable, "Now only support analyze FileStoreTable.");
if (!table.latestSnapshotId().isPresent()) {
LOG.info("Skipping analyze table because the snapshot is null.");
return;
}
Statistics tableStats =
TableStatsUtil.createTableColumnStats(
((FileStoreTable) table), columnStatistics);

FileStoreCommit commit =
((FileStoreTable) table).store().newCommit(UUID.randomUUID().toString());
commit.commitStatistics(tableStats, BatchWriteBuilder.COMMIT_IDENTIFIER);
Statistics tableStats = null;
if (statistics instanceof CatalogColumnStatistics) {
tableStats =
TableStatsUtil.createTableColumnStats(
((FileStoreTable) table), (CatalogColumnStatistics)statistics);
} else if (statistics instanceof CatalogTableStatistics) {
tableStats =
TableStatsUtil.createTableStats(
((FileStoreTable) table), (CatalogTableStatistics)statistics);
}

if (tableStats != null) {
FileStoreTable fileStoreTable = (FileStoreTable) table;
FileStoreCommit commit =
fileStoreTable.store().newCommit(CoreOptions.createCommitUser(fileStoreTable.coreOptions().toConfiguration()));
commit.commitStatistics(tableStats, BatchWriteBuilder.COMMIT_IDENTIFIER);
}
} catch (Catalog.TableNotExistException e) {
if (!ignoreIfNotExists) {
throw new TableNotExistException(getName(), tablePath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.Snapshot;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.stats.ColStats;
import org.apache.paimon.stats.Statistics;
import org.apache.paimon.table.FileStoreTable;
Expand All @@ -39,28 +40,33 @@
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataString;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;

import javax.annotation.Nullable;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/** Utility methods for analysis table. */
public class TableStatsUtil {

/** create Paimon statistics. */
@Nullable
public static Statistics createTableStats(
FileStoreTable table, CatalogTableStatistics catalogTableStatistics) {

Snapshot snapshot = table.snapshotManager().latestSnapshot();
long mergedRecordCount = catalogTableStatistics.getRowCount();
if (snapshot.totalRecordCount() == null) {
return null;
}
long totalRecordCount = snapshot.totalRecordCount();
Preconditions.checkState(
totalRecordCount >= mergedRecordCount,
"totalRecordCount: $totalRecordCount should be greater or equal than mergedRecordCount: $mergedRecordCount.");
long totalSize =
table.newScan().plan().splits().stream()
.flatMap(split -> ((DataSplit) split).dataFiles().stream())
.mapToLong(dataFile -> dataFile.fileSize())
.mapToLong(DataFileMeta::fileSize)
.sum();
long mergedRecordSize =
(long) (totalSize * ((double) mergedRecordCount / totalRecordCount));
Expand All @@ -71,12 +77,15 @@ public static Statistics createTableStats(
}

/** Create Paimon statistics from given Flink columnStatistics. */
@Nullable
public static Statistics createTableColumnStats(
FileStoreTable table, CatalogColumnStatistics columnStatistics) {

if (!table.statistics().isPresent()) {
return null;
}
Statistics statistics = table.statistics().get();
HashMap<String, ColStats<?>> tableColumnStatsMap = new HashMap<>();
List<DataField> fields = table.schema().fields();
Map<String, ColStats<?>> tableColumnStatsMap = new HashMap<>(fields.size());
for (DataField field : fields) {
CatalogColumnStatisticsDataBase catalogColumnStatisticsDataBase =
columnStatistics.getColumnStatisticsData().get(field.name());
Expand All @@ -91,7 +100,7 @@ public static Statistics createTableColumnStats(
}

/** Convert Flink ColumnStats to Paimon ColStats according to Paimon column type. */
private static ColStats getPaimonColStats(
private static ColStats<?> getPaimonColStats(
DataField field, CatalogColumnStatisticsDataBase colStat) {
DataTypeRoot typeRoot = field.type().getTypeRoot();
if (colStat instanceof CatalogColumnStatisticsDataString) {
Expand All @@ -116,7 +125,7 @@ private static ColStats getPaimonColStats(
return ColStats.newColStats(
field.id(),
(booleanColStat.getFalseCount() > 0 ? 1L : 0)
+ (booleanColStat.getTrueCount() > 0 ? 1 : 0),
+ (booleanColStat.getTrueCount() > 0 ? 1L : 0),
null,
null,
booleanColStat.getNullCount(),
Expand Down Expand Up @@ -278,7 +287,7 @@ private static ColStats getPaimonColStats(
}
throw new CatalogException(
String.format(
"Flink does not support converting ColumnStats '%s' for Paimon column "
"Flink does not support convert ColumnStats '%s' for Paimon column "
+ "type '%s' yet",
colStat, field.type()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
import org.junit.jupiter.api.Test;

import java.util.Map;
import java.util.Optional;

import static org.assertj.core.api.Assertions.assertThat;

/** IT cases for analyze table. */
public class FlinkAnalyzeTableITCase extends CatalogITCaseBase {
Expand All @@ -47,16 +50,20 @@ public void testAnalyzeTable() throws Catalog.TableNotExistException {
sql("INSERT INTO T VALUES ('1', 'a', 1, 1)");
sql("INSERT INTO T VALUES ('2', 'aaa', 1, 2)");
sql("ANALYZE TABLE T COMPUTE STATISTICS");
Statistics stats = paimonTable("T").statistics().get();

Optional<Statistics> statisticsOpt = paimonTable("T").statistics();
assertThat(statisticsOpt.isPresent()).isTrue();
Statistics stats = statisticsOpt.get();

assertThat(stats.mergedRecordCount().isPresent()).isTrue();
Assertions.assertEquals(2L, stats.mergedRecordCount().getAsLong());

Assertions.assertTrue(stats.mergedRecordSize().isPresent());
Assertions.assertTrue(stats.colStats().isEmpty());
}

@Test
public void testAnalyzeTableColumn() throws Catalog.TableNotExistException {

sql(
"CREATE TABLE T ("
+ "id STRING, name STRING, bytes_col BYTES, int_col INT, long_col bigint,\n"
Expand All @@ -80,8 +87,11 @@ public void testAnalyzeTableColumn() throws Catalog.TableNotExistException {

sql("ANALYZE TABLE T COMPUTE STATISTICS FOR ALL COLUMNS");

Statistics stats = paimonTable("T").statistics().get();
Optional<Statistics> statisticsOpt = paimonTable("T").statistics();
assertThat(statisticsOpt.isPresent()).isTrue();
Statistics stats = statisticsOpt.get();

assertThat(stats.mergedRecordCount().isPresent()).isTrue();
Assertions.assertEquals(4L, stats.mergedRecordCount().getAsLong());

Map<String, ColStats<?>> colStats = stats.colStats();
Expand All @@ -95,7 +105,7 @@ public void testAnalyzeTableColumn() throws Catalog.TableNotExistException {
colStats.get("bytes_col"));

Assertions.assertEquals(
ColStats.newColStats(3, 2L, new Integer(1), new Integer(4), 0L, null, null),
ColStats.newColStats(3, 2L, 1, 4, 0L, null, null),
colStats.get("int_col"));

Assertions.assertEquals(
Expand Down

0 comments on commit f81ba36

Please sign in to comment.