diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index 07733d89681..26e1ac2935d 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -590,9 +590,11 @@ private List getColumns(MetadataEntry deltaMetadata) @Override public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTableHandle tableHandle) { + LOG.warn("inside getTableStatistics"); if (!isTableStatisticsEnabled(session)) { return TableStatistics.empty(); } + LOG.warn("fetching stats from metastore"); return metastore.getTableStatistics(session, (DeltaLakeTableHandle) tableHandle); } @@ -2612,6 +2614,7 @@ public static TupleDomain createStatisticsPredicate( List schema, List canonicalPartitionColumns) { + LOG.warn("inside createStatisticsPredicate"); return addFileEntry.getStats() .map(deltaLakeFileStatistics -> withColumnDomains( schema.stream() diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java index bfc55d8df49..76b70281e40 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java @@ -14,6 +14,7 @@ package io.trino.plugin.deltalake; import com.google.common.collect.ImmutableList; +import io.airlift.log.Logger; import io.airlift.units.DataSize; import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorSplitSource; import io.trino.plugin.deltalake.metastore.DeltaLakeMetastore; @@ -66,6 +67,8 @@ public class DeltaLakeSplitManager implements ConnectorSplitManager { + private static final Logger log = Logger.get(DeltaLakeSplitManager.class); + private final TypeManager typeManager; private final BiFunction metastoreProvider; private final ExecutorService executor; @@ -152,6 +155,7 @@ private Stream getSplits( .filter(column -> predicatedColumnNames.contains(column.getName())) // DeltaLakeColumnMetadata.name is lowercase .collect(toImmutableList()); + log.warn(">>> creating delta splits"); return validDataFiles.stream() .flatMap(addAction -> { if (tableHandle.getAnalyzeHandle().isPresent() && !tableHandle.getAnalyzeHandle().get().isInitialAnalyze() && !addAction.isDataChange()) { @@ -198,6 +202,7 @@ private Stream getSplits( } } + log.warn(">>> Creating delta splits for stats: " + statisticsPredicate); return splitsForFile( session, addAction, diff --git a/plugin/trino-hudi/pom.xml b/plugin/trino-hudi/pom.xml index 1d873e46cf1..02629d47e1a 100644 --- a/plugin/trino-hudi/pom.xml +++ b/plugin/trino-hudi/pom.xml @@ -110,11 +110,6 @@ guice - - javax.annotation - javax.annotation-api - - javax.inject javax.inject @@ -130,7 +125,59 @@ joda-time + + + + + org.apache.hudi + hudi-trino-bundle + ${dep.hudi.version} + + + com.google.protobuf + protobuf-java + + + commons-lang + commons-lang + + + org.apache.hudi + hudi-common + + + org.apache.hudi + hudi-hadoop-mr-bundle + + + org.apache.parquet + parquet-avro + + + org.apache.avro + avro + + + + + org.weakref @@ -403,6 +450,10 @@ io.dropwizard.metrics * + + org.apache.logging.log4j + log4j-api + diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/ForHudiBackgroundSplitLoader.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/ForHudiBackgroundSplitLoader.java new file mode 100644 index 00000000000..edd5189d043 --- /dev/null +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/ForHudiBackgroundSplitLoader.java @@ -0,0 +1,32 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.trino.plugin.hudi; + +import javax.inject.Qualifier; + +import java.lang.annotation.Retention; +import java.lang.annotation.Target; + +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +@Retention(RUNTIME) +@Target({FIELD, PARAMETER, METHOD}) +@Qualifier +public @interface ForHudiBackgroundSplitLoader +{ +} diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/ForHudiSplitSource.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/ForHudiSplitSource.java new file mode 100644 index 00000000000..608c25554b5 --- /dev/null +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/ForHudiSplitSource.java @@ -0,0 +1,32 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.trino.plugin.hudi; + +import javax.inject.Qualifier; + +import java.lang.annotation.Retention; +import java.lang.annotation.Target; + +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +@Retention(RUNTIME) +@Target({FIELD, PARAMETER, METHOD}) +@Qualifier +public @interface ForHudiSplitSource +{ +} diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConfig.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConfig.java index ea323818bdf..abf18f4dcfd 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConfig.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConfig.java @@ -17,6 +17,7 @@ import com.google.common.collect.ImmutableList; import io.airlift.configuration.Config; import io.airlift.configuration.ConfigDescription; +import io.airlift.configuration.LegacyConfig; import io.airlift.units.DataSize; import javax.validation.constraints.DecimalMax; @@ -42,10 +43,15 @@ public class HudiConfig private int minPartitionBatchSize = 10; private int maxPartitionBatchSize = 100; private boolean sizeBasedSplitWeightsEnabled = true; - private DataSize standardSplitWeightSize = DataSize.of(128, MEGABYTE); + private DataSize standardSplitWeightSize = DataSize.of(64, MEGABYTE); private double minimumAssignedSplitWeight = 0.05; private int maxSplitsPerSecond = Integer.MAX_VALUE; private int maxOutstandingSplits = 1000; + private boolean tableStatisticsEnabled; + private int splitLoaderParallelism = 2; + private int splitGeneratorParallelism = 16; + private int partitionScannerParallelism = 16; + private long perTransactionMetastoreCacheMaximumSize = 1000; public List getColumnsToHide() { @@ -99,7 +105,7 @@ public HudiConfig setMinPartitionBatchSize(int minPartitionBatchSize) } @Min(1) - @Max(100) + @Max(1000) public int getMinPartitionBatchSize() { return minPartitionBatchSize; @@ -114,7 +120,7 @@ public HudiConfig setMaxPartitionBatchSize(int maxPartitionBatchSize) } @Min(1) - @Max(1000) + @Max(10000) public int getMaxPartitionBatchSize() { return maxPartitionBatchSize; @@ -178,7 +184,6 @@ public HudiConfig setMaxSplitsPerSecond(int maxSplitsPerSecond) return this; } - @Min(1) public int getMaxOutstandingSplits() { return maxOutstandingSplits; @@ -191,4 +196,74 @@ public HudiConfig setMaxOutstandingSplits(int maxOutstandingSplits) this.maxOutstandingSplits = maxOutstandingSplits; return this; } + + @Config("hudi.table-statistics-enabled") + @ConfigDescription("Enable use of table statistics") + public HudiConfig setTableStatisticsEnabled(boolean tableStatisticsEnabled) + { + this.tableStatisticsEnabled = tableStatisticsEnabled; + return this; + } + + public boolean isTableStatisticsEnabled() + { + return tableStatisticsEnabled; + } + + @Min(1) + public int getSplitGeneratorParallelism() + { + return splitGeneratorParallelism; + } + + @Config("hudi.split-generator-parallelism") + @ConfigDescription("Number of threads to generate splits from partitions.") + public HudiConfig setSplitGeneratorParallelism(int splitGeneratorParallelism) + { + this.splitGeneratorParallelism = splitGeneratorParallelism; + return this; + } + + @Min(1) + public int getSplitLoaderParallelism() + { + return splitLoaderParallelism; + } + + @Config("hudi.split-loader-parallelism") + @ConfigDescription("Number of threads to run background split loader. " + + "A single background split loader is needed per query.") + public HudiConfig setSplitLoaderParallelism(int splitLoaderParallelism) + { + this.splitLoaderParallelism = splitLoaderParallelism; + return this; + } + + @Config("hudi.partition-scanner-parallelism") + @ConfigDescription("Number of threads to use for partition scanners") + public HudiConfig setPartitionScannerParallelism(int partitionScannerParallelism) + { + this.partitionScannerParallelism = partitionScannerParallelism; + return this; + } + + @Min(1) + public int getPartitionScannerParallelism() + { + return partitionScannerParallelism; + } + + @Min(1) + public long getPerTransactionMetastoreCacheMaximumSize() + { + return perTransactionMetastoreCacheMaximumSize; + } + + @LegacyConfig("hive.per-transaction-metastore-cache-maximum-size") + @Config("delta.per-transaction-metastore-cache-maximum-size") + public HudiConfig setPerTransactionMetastoreCacheMaximumSize(long perTransactionMetastoreCacheMaximumSize) + { + this.perTransactionMetastoreCacheMaximumSize = perTransactionMetastoreCacheMaximumSize; + return this; + } } diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiHiveStatisticsProvider.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiHiveStatisticsProvider.java new file mode 100644 index 00000000000..abd3da6f141 --- /dev/null +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiHiveStatisticsProvider.java @@ -0,0 +1,894 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.trino.plugin.hudi; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.VerifyException; +import com.google.common.collect.ImmutableMap; +import com.google.common.hash.HashFunction; +import com.google.common.primitives.Ints; +import com.google.common.primitives.Shorts; +import com.google.common.primitives.SignedBytes; +import io.airlift.log.Logger; +import io.airlift.slice.Slice; +import io.trino.plugin.hive.HiveBasicStatistics; +import io.trino.plugin.hive.HiveColumnHandle; +import io.trino.plugin.hive.HivePartition; +import io.trino.plugin.hive.PartitionStatistics; +import io.trino.plugin.hive.metastore.DateStatistics; +import io.trino.plugin.hive.metastore.DecimalStatistics; +import io.trino.plugin.hive.metastore.DoubleStatistics; +import io.trino.plugin.hive.metastore.HiveColumnStatistics; +import io.trino.plugin.hive.metastore.HiveMetastore; +import io.trino.plugin.hive.metastore.IntegerStatistics; +import io.trino.plugin.hive.metastore.Partition; +import io.trino.plugin.hive.metastore.Table; +import io.trino.plugin.hive.statistics.HiveStatisticsProvider; +import io.trino.spi.TrinoException; +import io.trino.spi.connector.ColumnHandle; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.predicate.NullableValue; +import io.trino.spi.statistics.ColumnStatistics; +import io.trino.spi.statistics.DoubleRange; +import io.trino.spi.statistics.Estimate; +import io.trino.spi.statistics.TableStatistics; +import io.trino.spi.type.CharType; +import io.trino.spi.type.DecimalType; +import io.trino.spi.type.Type; +import io.trino.spi.type.VarcharType; + +import java.math.BigDecimal; +import java.time.LocalDate; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.OptionalDouble; +import java.util.OptionalLong; +import java.util.stream.DoubleStream; + +import static com.google.common.base.MoreObjects.toStringHelper; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Verify.verify; +import static com.google.common.base.Verify.verifyNotNull; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.Maps.immutableEntry; +import static com.google.common.hash.Hashing.murmur3_128; +import static io.trino.plugin.hive.HiveErrorCode.HIVE_CORRUPTED_COLUMN_STATISTICS; +import static io.trino.plugin.hive.HivePartition.UNPARTITIONED_ID; +import static io.trino.plugin.hive.HiveSessionProperties.getPartitionStatisticsSampleSize; +import static io.trino.plugin.hive.HiveSessionProperties.isIgnoreCorruptedStatistics; +import static io.trino.plugin.hive.HiveSessionProperties.isStatisticsEnabled; +import static io.trino.spi.statistics.StatsUtil.toStatsRepresentation; +import static io.trino.spi.type.BigintType.BIGINT; +import static io.trino.spi.type.DateType.DATE; +import static io.trino.spi.type.DoubleType.DOUBLE; +import static io.trino.spi.type.IntegerType.INTEGER; +import static io.trino.spi.type.RealType.REAL; +import static io.trino.spi.type.SmallintType.SMALLINT; +import static io.trino.spi.type.TinyintType.TINYINT; +import static java.lang.Double.isFinite; +import static java.lang.Double.isNaN; +import static java.lang.String.format; +import static java.util.Collections.unmodifiableList; +import static java.util.Objects.requireNonNull; + +public class HudiHiveStatisticsProvider + implements HiveStatisticsProvider +{ + private static final Logger log = Logger.get(HudiHiveStatisticsProvider.class); + + private final PartitionsStatisticsProvider statisticsProvider; + + //@Inject + public HudiHiveStatisticsProvider(HiveMetastore metastore) + { + requireNonNull(metastore, "metastore is null"); + this.statisticsProvider = (session, table, hivePartitions) -> getPartitionsStatistics(metastore, table, hivePartitions); + } + + private static Map getPartitionsStatistics(HiveMetastore metastore, SchemaTableName table, List hivePartitions) + { + if (hivePartitions.isEmpty()) { + return ImmutableMap.of(); + } + Table metastoreTable = metastore.getTable(table.getSchemaName(), table.getTableName()).get(); + boolean unpartitioned = hivePartitions.stream().anyMatch(partition -> partition.getPartitionId().equals(UNPARTITIONED_ID)); + if (unpartitioned) { + checkArgument(hivePartitions.size() == 1, "expected only one hive partition"); + return ImmutableMap.of(UNPARTITIONED_ID, metastore.getTableStatistics(metastoreTable)); + } + List partitionNames = hivePartitions.stream() + .map(HivePartition::getPartitionId) + .collect(toImmutableList()); + List partitions = metastore.getPartitionsByNames(metastoreTable, partitionNames) + .values() + .stream() + .filter(Optional::isPresent) + .map(Optional::get) + .collect(toImmutableList()); + return metastore.getPartitionStatistics(metastoreTable, partitions); + } + + @Override + public TableStatistics getTableStatistics( + ConnectorSession session, + SchemaTableName table, + Map columns, + Map columnTypes, + List partitions) + { + if (!isStatisticsEnabled(session)) { + return TableStatistics.empty(); + } + if (partitions.isEmpty()) { + return createZeroStatistics(columns, columnTypes); + } + int sampleSize = getPartitionStatisticsSampleSize(session); + List partitionsSample = getPartitionsSample(partitions, sampleSize); + try { + Map statisticsSample = statisticsProvider.getPartitionsStatistics(session, table, partitionsSample); + validatePartitionStatistics(table, statisticsSample); + return getTableStatistics(columns, columnTypes, partitions, statisticsSample); + } + catch (TrinoException e) { + if (e.getErrorCode().equals(HIVE_CORRUPTED_COLUMN_STATISTICS.toErrorCode()) && isIgnoreCorruptedStatistics(session)) { + log.error(e); + return TableStatistics.empty(); + } + throw e; + } + } + + private TableStatistics createZeroStatistics(Map columns, Map columnTypes) + { + TableStatistics.Builder result = TableStatistics.builder(); + result.setRowCount(Estimate.of(0)); + columns.forEach((columnName, columnHandle) -> { + Type columnType = columnTypes.get(columnName); + verifyNotNull(columnType, "columnType is missing for column: %s", columnName); + ColumnStatistics.Builder columnStatistics = ColumnStatistics.builder(); + columnStatistics.setNullsFraction(Estimate.of(0)); + columnStatistics.setDistinctValuesCount(Estimate.of(0)); + if (hasDataSize(columnType)) { + columnStatistics.setDataSize(Estimate.of(0)); + } + result.setColumnStatistics(columnHandle, columnStatistics.build()); + }); + return result.build(); + } + + @VisibleForTesting + static List getPartitionsSample(List partitions, int sampleSize) + { + checkArgument(sampleSize > 0, "sampleSize is expected to be greater than zero"); + + if (partitions.size() <= sampleSize) { + return partitions; + } + + List result = new ArrayList<>(); + + int samplesLeft = sampleSize; + + HivePartition min = partitions.get(0); + HivePartition max = partitions.get(0); + for (HivePartition partition : partitions) { + if (partition.getPartitionId().compareTo(min.getPartitionId()) < 0) { + min = partition; + } + else if (partition.getPartitionId().compareTo(max.getPartitionId()) > 0) { + max = partition; + } + } + + result.add(min); + samplesLeft--; + if (samplesLeft > 0) { + result.add(max); + samplesLeft--; + } + + if (samplesLeft > 0) { + HashFunction hashFunction = murmur3_128(); + Comparator> hashComparator = Comparator + ., Long>comparing(Map.Entry::getValue) + .thenComparing(entry -> entry.getKey().getPartitionId()); + partitions.stream() + .filter(partition -> !result.contains(partition)) + .map(partition -> immutableEntry(partition, hashFunction.hashUnencodedChars(partition.getPartitionId()).asLong())) + .sorted(hashComparator) + .limit(samplesLeft) + .forEachOrdered(entry -> result.add(entry.getKey())); + } + + return unmodifiableList(result); + } + + @VisibleForTesting + static void validatePartitionStatistics(SchemaTableName table, Map partitionStatistics) + { + partitionStatistics.forEach((partition, statistics) -> { + HiveBasicStatistics basicStatistics = statistics.getBasicStatistics(); + OptionalLong rowCount = basicStatistics.getRowCount(); + rowCount.ifPresent(count -> checkStatistics(count >= 0, table, partition, "rowCount must be greater than or equal to zero: %s", count)); + basicStatistics.getFileCount().ifPresent(count -> checkStatistics(count >= 0, table, partition, "fileCount must be greater than or equal to zero: %s", count)); + basicStatistics.getInMemoryDataSizeInBytes().ifPresent(size -> checkStatistics(size >= 0, table, partition, "inMemoryDataSizeInBytes must be greater than or equal to zero: %s", size)); + basicStatistics.getOnDiskDataSizeInBytes().ifPresent(size -> checkStatistics(size >= 0, table, partition, "onDiskDataSizeInBytes must be greater than or equal to zero: %s", size)); + statistics.getColumnStatistics().forEach((column, columnStatistics) -> validateColumnStatistics(table, partition, column, rowCount, columnStatistics)); + }); + } + + private static void validateColumnStatistics(SchemaTableName table, String partition, String column, OptionalLong rowCount, HiveColumnStatistics columnStatistics) + { + columnStatistics.getMaxValueSizeInBytes().ifPresent(maxValueSizeInBytes -> + checkStatistics(maxValueSizeInBytes >= 0, table, partition, column, "maxValueSizeInBytes must be greater than or equal to zero: %s", maxValueSizeInBytes)); + columnStatistics.getTotalSizeInBytes().ifPresent(totalSizeInBytes -> + checkStatistics(totalSizeInBytes >= 0, table, partition, column, "totalSizeInBytes must be greater than or equal to zero: %s", totalSizeInBytes)); + columnStatistics.getNullsCount().ifPresent(nullsCount -> { + checkStatistics(nullsCount >= 0, table, partition, column, "nullsCount must be greater than or equal to zero: %s", nullsCount); + if (rowCount.isPresent()) { + checkStatistics( + nullsCount <= rowCount.getAsLong(), + table, + partition, + column, + "nullsCount must be less than or equal to rowCount. nullsCount: %s. rowCount: %s.", + nullsCount, + rowCount.getAsLong()); + } + }); + columnStatistics.getDistinctValuesCount().ifPresent(distinctValuesCount -> { + checkStatistics(distinctValuesCount >= 0, table, partition, column, "distinctValuesCount must be greater than or equal to zero: %s", distinctValuesCount); + if (rowCount.isPresent()) { + checkStatistics( + distinctValuesCount <= rowCount.getAsLong(), + table, + partition, + column, + "distinctValuesCount must be less than or equal to rowCount. distinctValuesCount: %s. rowCount: %s.", + distinctValuesCount, + rowCount.getAsLong()); + } + if (rowCount.isPresent() && columnStatistics.getNullsCount().isPresent()) { + long nonNullsCount = rowCount.getAsLong() - columnStatistics.getNullsCount().getAsLong(); + checkStatistics( + distinctValuesCount <= nonNullsCount, + table, + partition, + column, + "distinctValuesCount must be less than or equal to nonNullsCount. distinctValuesCount: %s. nonNullsCount: %s.", + distinctValuesCount, + nonNullsCount); + } + }); + + columnStatistics.getIntegerStatistics().ifPresent(integerStatistics -> { + OptionalLong min = integerStatistics.getMin(); + OptionalLong max = integerStatistics.getMax(); + if (min.isPresent() && max.isPresent()) { + checkStatistics( + min.getAsLong() <= max.getAsLong(), + table, + partition, + column, + "integerStatistics.min must be less than or equal to integerStatistics.max. integerStatistics.min: %s. integerStatistics.max: %s.", + min.getAsLong(), + max.getAsLong()); + } + }); + columnStatistics.getDoubleStatistics().ifPresent(doubleStatistics -> { + OptionalDouble min = doubleStatistics.getMin(); + OptionalDouble max = doubleStatistics.getMax(); + if (min.isPresent() && max.isPresent() && !isNaN(min.getAsDouble()) && !isNaN(max.getAsDouble())) { + checkStatistics( + min.getAsDouble() <= max.getAsDouble(), + table, + partition, + column, + "doubleStatistics.min must be less than or equal to doubleStatistics.max. doubleStatistics.min: %s. doubleStatistics.max: %s.", + min.getAsDouble(), + max.getAsDouble()); + } + }); + columnStatistics.getDecimalStatistics().ifPresent(decimalStatistics -> { + Optional min = decimalStatistics.getMin(); + Optional max = decimalStatistics.getMax(); + if (min.isPresent() && max.isPresent()) { + checkStatistics( + min.get().compareTo(max.get()) <= 0, + table, + partition, + column, + "decimalStatistics.min must be less than or equal to decimalStatistics.max. decimalStatistics.min: %s. decimalStatistics.max: %s.", + min.get(), + max.get()); + } + }); + columnStatistics.getDateStatistics().ifPresent(dateStatistics -> { + Optional min = dateStatistics.getMin(); + Optional max = dateStatistics.getMax(); + if (min.isPresent() && max.isPresent()) { + checkStatistics( + min.get().compareTo(max.get()) <= 0, + table, + partition, + column, + "dateStatistics.min must be less than or equal to dateStatistics.max. dateStatistics.min: %s. dateStatistics.max: %s.", + min.get(), + max.get()); + } + }); + columnStatistics.getBooleanStatistics().ifPresent(booleanStatistics -> { + OptionalLong falseCount = booleanStatistics.getFalseCount(); + OptionalLong trueCount = booleanStatistics.getTrueCount(); + falseCount.ifPresent(count -> + checkStatistics(count >= 0, table, partition, column, "falseCount must be greater than or equal to zero: %s", count)); + trueCount.ifPresent(count -> + checkStatistics(count >= 0, table, partition, column, "trueCount must be greater than or equal to zero: %s", count)); + if (rowCount.isPresent() && falseCount.isPresent()) { + checkStatistics( + falseCount.getAsLong() <= rowCount.getAsLong(), + table, + partition, + column, + "booleanStatistics.falseCount must be less than or equal to rowCount. booleanStatistics.falseCount: %s. rowCount: %s.", + falseCount.getAsLong(), + rowCount.getAsLong()); + } + if (rowCount.isPresent() && trueCount.isPresent()) { + checkStatistics( + trueCount.getAsLong() <= rowCount.getAsLong(), + table, + partition, + column, + "booleanStatistics.trueCount must be less than or equal to rowCount. booleanStatistics.trueCount: %s. rowCount: %s.", + trueCount.getAsLong(), + rowCount.getAsLong()); + } + }); + } + + private static void checkStatistics(boolean expression, SchemaTableName table, String partition, String column, String message, Object... args) + { + if (!expression) { + throw new TrinoException( + HIVE_CORRUPTED_COLUMN_STATISTICS, + format("Corrupted partition statistics (Table: %s Partition: [%s] Column: %s): %s", table, partition, column, format(message, args))); + } + } + + private static void checkStatistics(boolean expression, SchemaTableName table, String partition, String message, Object... args) + { + if (!expression) { + throw new TrinoException( + HIVE_CORRUPTED_COLUMN_STATISTICS, + format("Corrupted partition statistics (Table: %s Partition: [%s]): %s", table, partition, format(message, args))); + } + } + + private static TableStatistics getTableStatistics( + Map columns, + Map columnTypes, + List partitions, + Map statistics) + { + if (statistics.isEmpty()) { + return TableStatistics.empty(); + } + + checkArgument(!partitions.isEmpty(), "partitions is empty"); + + Optional optionalRowCount = calculatePartitionsRowCount(statistics.values(), partitions.size()); + if (optionalRowCount.isEmpty()) { + return TableStatistics.empty(); + } + double rowCount = optionalRowCount.get().getRowCount(); + + TableStatistics.Builder result = TableStatistics.builder(); + result.setRowCount(Estimate.of(rowCount)); + for (Map.Entry column : columns.entrySet()) { + String columnName = column.getKey(); + HiveColumnHandle columnHandle = (HiveColumnHandle) column.getValue(); + Type columnType = columnTypes.get(columnName); + ColumnStatistics columnStatistics; + if (columnHandle.isPartitionKey()) { + double averageRowsPerPartition = optionalRowCount.get().getAverageRowsPerPartition(); + columnStatistics = createPartitionColumnStatistics(columnHandle, columnType, partitions, statistics, averageRowsPerPartition, rowCount); + } + else { + columnStatistics = createDataColumnStatistics(columnName, columnType, rowCount, statistics.values()); + } + result.setColumnStatistics(columnHandle, columnStatistics); + } + return result.build(); + } + + @VisibleForTesting + static Optional calculatePartitionsRowCount(Collection statistics, int queriedPartitionsCount) + { + long[] rowCounts = statistics.stream() + .map(PartitionStatistics::getBasicStatistics) + .map(HiveBasicStatistics::getRowCount) + .filter(OptionalLong::isPresent) + .mapToLong(OptionalLong::getAsLong) + .peek(count -> verify(count >= 0, "count must be greater than or equal to zero")) + .toArray(); + int sampleSize = statistics.size(); + // Sample contains all the queried partitions, estimate avg normally + if (rowCounts.length <= 2 || queriedPartitionsCount == sampleSize) { + OptionalDouble averageRowsPerPartitionOptional = Arrays.stream(rowCounts).average(); + if (averageRowsPerPartitionOptional.isEmpty()) { + return Optional.empty(); + } + double averageRowsPerPartition = averageRowsPerPartitionOptional.getAsDouble(); + return Optional.of(new PartitionsRowCount(averageRowsPerPartition, averageRowsPerPartition * queriedPartitionsCount)); + } + + // Some partitions (e.g. __HIVE_DEFAULT_PARTITION__) may be outliers in terms of row count. + // Excluding the min and max rowCount values from averageRowsPerPartition calculation helps to reduce the + // possibility of errors in the extrapolated rowCount due to a couple of outliers. + int minIndex = 0; + int maxIndex = 0; + long rowCountSum = rowCounts[0]; + for (int index = 1; index < rowCounts.length; index++) { + if (rowCounts[index] < rowCounts[minIndex]) { + minIndex = index; + } + else if (rowCounts[index] > rowCounts[maxIndex]) { + maxIndex = index; + } + rowCountSum += rowCounts[index]; + } + double averageWithoutOutliers = ((double) (rowCountSum - rowCounts[minIndex] - rowCounts[maxIndex])) / (rowCounts.length - 2); + double rowCount = (averageWithoutOutliers * (queriedPartitionsCount - 2)) + rowCounts[minIndex] + rowCounts[maxIndex]; + return Optional.of(new PartitionsRowCount(averageWithoutOutliers, rowCount)); + } + + @VisibleForTesting + static class PartitionsRowCount + { + private final double averageRowsPerPartition; + private final double rowCount; + + PartitionsRowCount(double averageRowsPerPartition, double rowCount) + { + verify(averageRowsPerPartition >= 0, "averageRowsPerPartition must be greater than or equal to zero"); + verify(rowCount >= 0, "rowCount must be greater than or equal to zero"); + this.averageRowsPerPartition = averageRowsPerPartition; + this.rowCount = rowCount; + } + + private double getAverageRowsPerPartition() + { + return averageRowsPerPartition; + } + + private double getRowCount() + { + return rowCount; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PartitionsRowCount that = (PartitionsRowCount) o; + return Double.compare(that.averageRowsPerPartition, averageRowsPerPartition) == 0 + && Double.compare(that.rowCount, rowCount) == 0; + } + + @Override + public int hashCode() + { + return Objects.hash(averageRowsPerPartition, rowCount); + } + + @Override + public String toString() + { + return toStringHelper(this) + .add("averageRowsPerPartition", averageRowsPerPartition) + .add("rowCount", rowCount) + .toString(); + } + } + + private static ColumnStatistics createPartitionColumnStatistics( + HiveColumnHandle column, + Type type, + List partitions, + Map statistics, + double averageRowsPerPartition, + double rowCount) + { + List nonEmptyPartitions = partitions.stream() + .filter(partition -> getPartitionRowCount(partition.getPartitionId(), statistics).orElse(averageRowsPerPartition) != 0) + .collect(toImmutableList()); + + return ColumnStatistics.builder() + .setDistinctValuesCount(Estimate.of(calculateDistinctPartitionKeys(column, nonEmptyPartitions))) + .setNullsFraction(Estimate.of(calculateNullsFractionForPartitioningKey(column, partitions, statistics, averageRowsPerPartition, rowCount))) + .setRange(calculateRangeForPartitioningKey(column, type, nonEmptyPartitions)) + .setDataSize(calculateDataSizeForPartitioningKey(column, type, partitions, statistics, averageRowsPerPartition)) + .build(); + } + + @VisibleForTesting + static long calculateDistinctPartitionKeys( + HiveColumnHandle column, + List partitions) + { + return partitions.stream() + .map(partition -> partition.getKeys().get(column)) + .filter(value -> !value.isNull()) + .distinct() + .count(); + } + + @VisibleForTesting + static double calculateNullsFractionForPartitioningKey( + HiveColumnHandle column, + List partitions, + Map statistics, + double averageRowsPerPartition, + double rowCount) + { + if (rowCount == 0) { + return 0; + } + double estimatedNullsCount = partitions.stream() + .filter(partition -> partition.getKeys().get(column).isNull()) + .map(HivePartition::getPartitionId) + .mapToDouble(partitionName -> getPartitionRowCount(partitionName, statistics).orElse(averageRowsPerPartition)) + .sum(); + return normalizeFraction(estimatedNullsCount / rowCount); + } + + private static double normalizeFraction(double fraction) + { + checkArgument(!isNaN(fraction), "fraction is NaN"); + checkArgument(isFinite(fraction), "fraction must be finite"); + if (fraction < 0) { + return 0; + } + if (fraction > 1) { + return 1; + } + return fraction; + } + + @VisibleForTesting + static Estimate calculateDataSizeForPartitioningKey( + HiveColumnHandle column, + Type type, + List partitions, + Map statistics, + double averageRowsPerPartition) + { + if (!hasDataSize(type)) { + return Estimate.unknown(); + } + double dataSize = 0; + for (HivePartition partition : partitions) { + int length = getSize(partition.getKeys().get(column)); + double rowCount = getPartitionRowCount(partition.getPartitionId(), statistics).orElse(averageRowsPerPartition); + dataSize += length * rowCount; + } + return Estimate.of(dataSize); + } + + private static boolean hasDataSize(Type type) + { + return type instanceof VarcharType || type instanceof CharType; + } + + private static int getSize(NullableValue nullableValue) + { + if (nullableValue.isNull()) { + return 0; + } + Object value = nullableValue.getValue(); + checkArgument(value instanceof Slice, "value is expected to be of Slice type"); + return ((Slice) value).length(); + } + + private static OptionalDouble getPartitionRowCount(String partitionName, Map statistics) + { + PartitionStatistics partitionStatistics = statistics.get(partitionName); + if (partitionStatistics == null) { + return OptionalDouble.empty(); + } + OptionalLong rowCount = partitionStatistics.getBasicStatistics().getRowCount(); + if (rowCount.isPresent()) { + verify(rowCount.getAsLong() >= 0, "rowCount must be greater than or equal to zero"); + return OptionalDouble.of(rowCount.getAsLong()); + } + return OptionalDouble.empty(); + } + + @VisibleForTesting + static Optional calculateRangeForPartitioningKey(HiveColumnHandle column, Type type, List partitions) + { + List convertedValues = partitions.stream() + .map(HivePartition::getKeys) + .map(keys -> keys.get(column)) + .filter(value -> !value.isNull()) + .map(NullableValue::getValue) + .map(value -> convertPartitionValueToDouble(type, value)) + .collect(toImmutableList()); + + if (convertedValues.stream().noneMatch(OptionalDouble::isPresent)) { + return Optional.empty(); + } + double[] values = convertedValues.stream() + .peek(convertedValue -> checkState(convertedValue.isPresent(), "convertedValue is missing")) + .mapToDouble(OptionalDouble::getAsDouble) + .toArray(); + verify(values.length != 0, "No values"); + + if (DoubleStream.of(values).anyMatch(Double::isNaN)) { + return Optional.empty(); + } + + double min = DoubleStream.of(values).min().orElseThrow(); + double max = DoubleStream.of(values).max().orElseThrow(); + return Optional.of(new DoubleRange(min, max)); + } + + @VisibleForTesting + static OptionalDouble convertPartitionValueToDouble(Type type, Object value) + { + return toStatsRepresentation(type, value); + } + + @VisibleForTesting + static ColumnStatistics createDataColumnStatistics(String column, Type type, double rowsCount, Collection partitionStatistics) + { + List columnStatistics = partitionStatistics.stream() + .map(PartitionStatistics::getColumnStatistics) + .map(statistics -> statistics.get(column)) + .filter(Objects::nonNull) + .collect(toImmutableList()); + + if (columnStatistics.isEmpty()) { + return ColumnStatistics.empty(); + } + + return ColumnStatistics.builder() + .setDistinctValuesCount(calculateDistinctValuesCount(columnStatistics)) + .setNullsFraction(calculateNullsFraction(column, partitionStatistics)) + .setDataSize(calculateDataSize(column, partitionStatistics, rowsCount)) + .setRange(calculateRange(type, columnStatistics)) + .build(); + } + + @VisibleForTesting + static Estimate calculateDistinctValuesCount(List columnStatistics) + { + return columnStatistics.stream() + .map(HudiHiveStatisticsProvider::getDistinctValuesCount) + .filter(OptionalLong::isPresent) + .map(OptionalLong::getAsLong) + .peek(distinctValuesCount -> verify(distinctValuesCount >= 0, "distinctValuesCount must be greater than or equal to zero")) + .max(Long::compare) + .map(Estimate::of) + .orElse(Estimate.unknown()); + } + + private static OptionalLong getDistinctValuesCount(HiveColumnStatistics statistics) + { + if (statistics.getBooleanStatistics().isPresent() && + statistics.getBooleanStatistics().get().getFalseCount().isPresent() && + statistics.getBooleanStatistics().get().getTrueCount().isPresent()) { + long falseCount = statistics.getBooleanStatistics().get().getFalseCount().getAsLong(); + long trueCount = statistics.getBooleanStatistics().get().getTrueCount().getAsLong(); + return OptionalLong.of((falseCount > 0 ? 1 : 0) + (trueCount > 0 ? 1 : 0)); + } + if (statistics.getDistinctValuesCount().isPresent()) { + return statistics.getDistinctValuesCount(); + } + return OptionalLong.empty(); + } + + @VisibleForTesting + static Estimate calculateNullsFraction(String column, Collection partitionStatistics) + { + List statisticsWithKnownRowCountAndNullsCount = partitionStatistics.stream() + .filter(statistics -> { + if (statistics.getBasicStatistics().getRowCount().isEmpty()) { + return false; + } + HiveColumnStatistics columnStatistics = statistics.getColumnStatistics().get(column); + if (columnStatistics == null) { + return false; + } + return columnStatistics.getNullsCount().isPresent(); + }) + .collect(toImmutableList()); + + if (statisticsWithKnownRowCountAndNullsCount.isEmpty()) { + return Estimate.unknown(); + } + + long totalNullsCount = 0; + long totalRowCount = 0; + for (PartitionStatistics statistics : statisticsWithKnownRowCountAndNullsCount) { + long rowCount = statistics.getBasicStatistics().getRowCount().orElseThrow(() -> new VerifyException("rowCount is not present")); + verify(rowCount >= 0, "rowCount must be greater than or equal to zero"); + HiveColumnStatistics columnStatistics = statistics.getColumnStatistics().get(column); + verifyNotNull(columnStatistics, "columnStatistics is null"); + long nullsCount = columnStatistics.getNullsCount().orElseThrow(() -> new VerifyException("nullsCount is not present")); + verify(nullsCount >= 0, "nullsCount must be greater than or equal to zero"); + verify(nullsCount <= rowCount, "nullsCount must be less than or equal to rowCount. nullsCount: %s. rowCount: %s.", nullsCount, rowCount); + totalNullsCount += nullsCount; + totalRowCount += rowCount; + } + + if (totalRowCount == 0) { + return Estimate.zero(); + } + + verify( + totalNullsCount <= totalRowCount, + "totalNullsCount must be less than or equal to totalRowCount. totalNullsCount: %s. totalRowCount: %s.", + totalNullsCount, + totalRowCount); + return Estimate.of(((double) totalNullsCount) / totalRowCount); + } + + @VisibleForTesting + static Estimate calculateDataSize(String column, Collection partitionStatistics, double totalRowCount) + { + List statisticsWithKnownRowCountAndDataSize = partitionStatistics.stream() + .filter(statistics -> { + if (statistics.getBasicStatistics().getRowCount().isEmpty()) { + return false; + } + HiveColumnStatistics columnStatistics = statistics.getColumnStatistics().get(column); + if (columnStatistics == null) { + return false; + } + return columnStatistics.getTotalSizeInBytes().isPresent(); + }) + .collect(toImmutableList()); + + if (statisticsWithKnownRowCountAndDataSize.isEmpty()) { + return Estimate.unknown(); + } + + long knownRowCount = 0; + long knownDataSize = 0; + for (PartitionStatistics statistics : statisticsWithKnownRowCountAndDataSize) { + long rowCount = statistics.getBasicStatistics().getRowCount().orElseThrow(() -> new VerifyException("rowCount is not present")); + verify(rowCount >= 0, "rowCount must be greater than or equal to zero"); + HiveColumnStatistics columnStatistics = statistics.getColumnStatistics().get(column); + verifyNotNull(columnStatistics, "columnStatistics is null"); + long dataSize = columnStatistics.getTotalSizeInBytes().orElseThrow(() -> new VerifyException("totalSizeInBytes is not present")); + verify(dataSize >= 0, "dataSize must be greater than or equal to zero"); + knownRowCount += rowCount; + knownDataSize += dataSize; + } + + if (totalRowCount == 0) { + return Estimate.zero(); + } + + if (knownRowCount == 0) { + return Estimate.unknown(); + } + + double averageValueDataSizeInBytes = ((double) knownDataSize) / knownRowCount; + return Estimate.of(averageValueDataSizeInBytes * totalRowCount); + } + + @VisibleForTesting + static Optional calculateRange(Type type, List columnStatistics) + { + return columnStatistics.stream() + .map(statistics -> createRange(type, statistics)) + .filter(Optional::isPresent) + .map(Optional::get) + .reduce(DoubleRange::union); + } + + private static Optional createRange(Type type, HiveColumnStatistics statistics) + { + if (type.equals(BIGINT) || type.equals(INTEGER) || type.equals(SMALLINT) || type.equals(TINYINT)) { + return statistics.getIntegerStatistics().flatMap(integerStatistics -> createIntegerRange(type, integerStatistics)); + } + if (type.equals(DOUBLE) || type.equals(REAL)) { + return statistics.getDoubleStatistics().flatMap(HudiHiveStatisticsProvider::createDoubleRange); + } + if (type.equals(DATE)) { + return statistics.getDateStatistics().flatMap(HudiHiveStatisticsProvider::createDateRange); + } + if (type instanceof DecimalType) { + return statistics.getDecimalStatistics().flatMap(HudiHiveStatisticsProvider::createDecimalRange); + } + return Optional.empty(); + } + + private static Optional createIntegerRange(Type type, IntegerStatistics statistics) + { + if (statistics.getMin().isPresent() && statistics.getMax().isPresent()) { + return Optional.of(createIntegerRange(type, statistics.getMin().getAsLong(), statistics.getMax().getAsLong())); + } + return Optional.empty(); + } + + private static DoubleRange createIntegerRange(Type type, long min, long max) + { + return new DoubleRange(normalizeIntegerValue(type, min), normalizeIntegerValue(type, max)); + } + + private static long normalizeIntegerValue(Type type, long value) + { + if (type.equals(BIGINT)) { + return value; + } + if (type.equals(INTEGER)) { + return Ints.saturatedCast(value); + } + if (type.equals(SMALLINT)) { + return Shorts.saturatedCast(value); + } + if (type.equals(TINYINT)) { + return SignedBytes.saturatedCast(value); + } + throw new IllegalArgumentException("Unexpected type: " + type); + } + + private static Optional createDoubleRange(DoubleStatistics statistics) + { + if (statistics.getMin().isPresent() && statistics.getMax().isPresent() && !isNaN(statistics.getMin().getAsDouble()) && !isNaN(statistics.getMax().getAsDouble())) { + return Optional.of(new DoubleRange(statistics.getMin().getAsDouble(), statistics.getMax().getAsDouble())); + } + return Optional.empty(); + } + + private static Optional createDateRange(DateStatistics statistics) + { + if (statistics.getMin().isPresent() && statistics.getMax().isPresent()) { + return Optional.of(new DoubleRange(statistics.getMin().get().toEpochDay(), statistics.getMax().get().toEpochDay())); + } + return Optional.empty(); + } + + private static Optional createDecimalRange(DecimalStatistics statistics) + { + if (statistics.getMin().isPresent() && statistics.getMax().isPresent()) { + return Optional.of(new DoubleRange(statistics.getMin().get().doubleValue(), statistics.getMax().get().doubleValue())); + } + return Optional.empty(); + } + + @VisibleForTesting + interface PartitionsStatisticsProvider + { + Map getPartitionsStatistics(ConnectorSession session, SchemaTableName table, List hivePartitions); + } +} diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadata.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadata.java index bd2d83fa762..f8b4cd4b326 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadata.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadata.java @@ -22,6 +22,7 @@ import io.trino.plugin.hive.metastore.Column; import io.trino.plugin.hive.metastore.HiveMetastore; import io.trino.plugin.hive.metastore.Table; +import io.trino.plugin.hive.statistics.HiveStatisticsProvider; import io.trino.spi.TrinoException; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ColumnMetadata; @@ -78,12 +79,21 @@ public class HudiMetadata private final HiveMetastore metastore; private final HdfsEnvironment hdfsEnvironment; private final TypeManager typeManager; + private final HudiPartitionManager partitionManager; + private final HiveStatisticsProvider hiveStatisticsProvider; - public HudiMetadata(HiveMetastore metastore, HdfsEnvironment hdfsEnvironment, TypeManager typeManager) + public HudiMetadata( + HiveMetastore metastore, + HdfsEnvironment hdfsEnvironment, + TypeManager typeManager, + HudiPartitionManager partitionManager, + HiveStatisticsProvider hiveStatisticsProvider) { this.metastore = requireNonNull(metastore, "metastore is null"); this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.typeManager = requireNonNull(typeManager, "typeManager is null"); + this.partitionManager = requireNonNull(partitionManager, "partitionManager is null"); + this.hiveStatisticsProvider = requireNonNull(hiveStatisticsProvider, "hiveStatisticsProvider is null"); } @Override diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadataFactory.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadataFactory.java index bb1a4832bc9..145e7c1eba8 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadataFactory.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadataFactory.java @@ -16,6 +16,7 @@ import io.trino.hdfs.HdfsEnvironment; import io.trino.plugin.hive.metastore.HiveMetastore; import io.trino.plugin.hive.metastore.HiveMetastoreFactory; +import io.trino.plugin.hive.metastore.cache.CachingHiveMetastore; import io.trino.spi.security.ConnectorIdentity; import io.trino.spi.type.TypeManager; @@ -23,6 +24,7 @@ import java.util.Optional; +import static io.trino.plugin.hive.metastore.cache.CachingHiveMetastore.memoizeMetastore; import static java.util.Objects.requireNonNull; public class HudiMetadataFactory @@ -30,18 +32,28 @@ public class HudiMetadataFactory private final HiveMetastoreFactory metastoreFactory; private final HdfsEnvironment hdfsEnvironment; private final TypeManager typeManager; + private final HudiPartitionManager partitionManager; @Inject - public HudiMetadataFactory(HiveMetastoreFactory metastoreFactory, HdfsEnvironment hdfsEnvironment, TypeManager typeManager) + public HudiMetadataFactory( + HiveMetastoreFactory metastoreFactory, + HdfsEnvironment hdfsEnvironment, + TypeManager typeManager, + HudiPartitionManager partitionManager) { this.metastoreFactory = requireNonNull(metastoreFactory, "metastore is null"); this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.typeManager = requireNonNull(typeManager, "typeManager is null"); + this.partitionManager = requireNonNull(partitionManager, "partitionManager is null"); } public HudiMetadata create(ConnectorIdentity identity) { HiveMetastore metastore = metastoreFactory.createMetastore(Optional.of(identity)); - return new HudiMetadata(metastore, hdfsEnvironment, typeManager); + // create per-transaction cache over hive metastore interface + CachingHiveMetastore cachingHiveMetastore = memoizeMetastore( + metastoreFactory.createMetastore(Optional.of(identity)), + 2000); + return new HudiMetadata(cachingHiveMetastore, hdfsEnvironment, typeManager, partitionManager, new HudiHiveStatisticsProvider(metastore)); } } diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiModule.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiModule.java index 52bd1f7028c..978961160d0 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiModule.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiModule.java @@ -20,6 +20,7 @@ import com.google.inject.Scopes; import io.trino.plugin.base.session.SessionPropertiesProvider; import io.trino.plugin.hive.FileFormatDataSourceStats; +import io.trino.plugin.hive.HiveConfig; import io.trino.plugin.hive.HiveNodePartitioningProvider; import io.trino.plugin.hive.HiveTransactionHandle; import io.trino.plugin.hive.metastore.HiveMetastore; @@ -35,12 +36,15 @@ import javax.inject.Singleton; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; import java.util.function.BiFunction; import static com.google.inject.multibindings.Multibinder.newSetBinder; import static io.airlift.concurrent.Threads.daemonThreadsNamed; import static io.airlift.configuration.ConfigBinder.configBinder; import static java.util.concurrent.Executors.newCachedThreadPool; +import static java.util.concurrent.Executors.newFixedThreadPool; +import static java.util.concurrent.Executors.newScheduledThreadPool; import static org.weakref.jmx.guice.ExportBinder.newExporter; public class HudiModule @@ -52,6 +56,7 @@ public void configure(Binder binder) binder.bind(HudiTransactionManager.class).in(Scopes.SINGLETON); configBinder(binder).bindConfig(HudiConfig.class); + configBinder(binder).bindConfig(HiveConfig.class); configBinder(binder).bindConfig(HiveMetastoreConfig.class); binder.bind(Key.get(boolean.class, TranslateHiveViews.class)).toInstance(false); @@ -65,6 +70,7 @@ public void configure(Binder binder) configBinder(binder).bindConfig(ParquetReaderConfig.class); configBinder(binder).bindConfig(ParquetWriterConfig.class); + binder.bind(HudiPartitionManager.class).in(Scopes.SINGLETON); binder.bind(HudiMetadataFactory.class).in(Scopes.SINGLETON); binder.bind(FileFormatDataSourceStats.class).in(Scopes.SINGLETON); @@ -76,7 +82,27 @@ public void configure(Binder binder) @Provides public ExecutorService createExecutorService() { - return newCachedThreadPool(daemonThreadsNamed("hudi-split-manager-%d")); + return newCachedThreadPool(daemonThreadsNamed("hudi-split-manager-%s")); + } + + @ForHudiSplitSource + @Singleton + @Provides + public ScheduledExecutorService createSplitLoaderExecutor(HudiConfig hudiConfig) + { + return newScheduledThreadPool( + hudiConfig.getSplitLoaderParallelism(), + daemonThreadsNamed("hudi-split-loader-%s")); + } + + @ForHudiBackgroundSplitLoader + @Singleton + @Provides + public ExecutorService createSplitGeneratorExecutor(HudiConfig hudiConfig) + { + return newFixedThreadPool( + hudiConfig.getSplitGeneratorParallelism(), + daemonThreadsNamed("hudi-split-generator-%s")); } @Singleton diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPartitionManager.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPartitionManager.java new file mode 100644 index 00000000000..ffeec25be95 --- /dev/null +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPartitionManager.java @@ -0,0 +1,64 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.trino.plugin.hudi; + +import com.google.common.collect.ImmutableList; +import io.trino.plugin.hive.HiveColumnHandle; +import io.trino.plugin.hive.metastore.Column; +import io.trino.plugin.hive.metastore.HiveMetastore; +import io.trino.plugin.hive.metastore.Table; +import io.trino.spi.connector.TableNotFoundException; +import io.trino.spi.type.TypeManager; + +import javax.inject.Inject; + +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +import static com.google.common.base.Verify.verify; +import static io.trino.plugin.hive.metastore.MetastoreUtil.computePartitionKeyFilter; +import static io.trino.plugin.hive.util.HiveUtil.getPartitionKeyColumnHandles; +import static java.util.Objects.requireNonNull; + +public class HudiPartitionManager +{ + private final TypeManager typeManager; + + @Inject + public HudiPartitionManager(TypeManager typeManager) + { + this.typeManager = requireNonNull(typeManager, "typeManager is null"); + } + + public List getEffectivePartitions(HudiTableHandle tableHandle, HiveMetastore metastore) + { + Optional table = metastore.getTable(tableHandle.getSchemaName(), tableHandle.getTableName()); + verify(table.isPresent()); + List partitionColumns = table.get().getPartitionColumns(); + if (partitionColumns.isEmpty()) { + return ImmutableList.of(""); + } + + List partitionColumnHandles = getPartitionKeyColumnHandles(table.get(), typeManager); + + return metastore.getPartitionNamesByFilter( + tableHandle.getSchemaName(), + tableHandle.getTableName(), + partitionColumns.stream().map(Column::getName).collect(Collectors.toList()), + computePartitionKeyFilter(partitionColumnHandles, tableHandle.getPartitionPredicates())) + .orElseThrow(() -> new TableNotFoundException(tableHandle.getSchemaTableName())); + } +} diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSessionProperties.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSessionProperties.java index 355a5fd5180..a969c34674b 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSessionProperties.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSessionProperties.java @@ -47,6 +47,11 @@ public class HudiSessionProperties private static final String SIZE_BASED_SPLIT_WEIGHTS_ENABLED = "size_based_split_weights_enabled"; private static final String STANDARD_SPLIT_WEIGHT_SIZE = "standard_split_weight_size"; private static final String MINIMUM_ASSIGNED_SPLIT_WEIGHT = "minimum_assigned_split_weight"; + private static final String MAX_SPLITS_PER_SECOND = "max_splits_per_second"; + private static final String MAX_OUTSTANDING_SPLITS = "max_outstanding_splits"; + private static final String TABLE_STATISTICS_ENABLED = "statistics_enabled"; + private static final String SPLIT_GENERATOR_PARALLELISM = "split_generator_parallelism"; + private static final String PARTITION_SCANNER_PARALLELISM = "partition_scanner_parallelism"; private final List> sessionProperties; @@ -104,6 +109,31 @@ public HudiSessionProperties(HudiConfig hudiConfig) throw new TrinoException(INVALID_SESSION_PROPERTY, format("%s must be > 0 and <= 1.0: %s", MINIMUM_ASSIGNED_SPLIT_WEIGHT, value)); } }, + false), + integerProperty( + MAX_SPLITS_PER_SECOND, + "Rate at which splits are enqueued for processing. The queue will throttle if this rate limit is breached.", + hudiConfig.getMaxSplitsPerSecond(), + false), + integerProperty( + MAX_OUTSTANDING_SPLITS, + "Maximum outstanding splits in a batch enqueued for processing.", + hudiConfig.getMaxOutstandingSplits(), + false), + booleanProperty( + TABLE_STATISTICS_ENABLED, + "Expose table statistics", + hudiConfig.isTableStatisticsEnabled(), + false), + integerProperty( + SPLIT_GENERATOR_PARALLELISM, + "Number of threads to generate splits from partitions", + hudiConfig.getSplitGeneratorParallelism(), + false), + integerProperty( + PARTITION_SCANNER_PARALLELISM, + "Number of threads to use for partition scanners", + hudiConfig.getPartitionScannerParallelism(), false)); } @@ -153,4 +183,29 @@ public static double getMinimumAssignedSplitWeight(ConnectorSession session) { return session.getProperty(MINIMUM_ASSIGNED_SPLIT_WEIGHT, Double.class); } + + public static boolean isStatisticsEnabled(ConnectorSession session) + { + return session.getProperty(TABLE_STATISTICS_ENABLED, Boolean.class); + } + + public static int getMaxSplitsPerSecond(ConnectorSession session) + { + return session.getProperty(MAX_SPLITS_PER_SECOND, Integer.class); + } + + public static int getMaxOutstandingSplits(ConnectorSession session) + { + return session.getProperty(MAX_OUTSTANDING_SPLITS, Integer.class); + } + + public static int getSplitGeneratorParallelism(ConnectorSession session) + { + return session.getProperty(SPLIT_GENERATOR_PARALLELISM, Integer.class); + } + + public static int getPartitionScannerParallelism(ConnectorSession session) + { + return session.getProperty(PARTITION_SCANNER_PARALLELISM, Integer.class); + } } diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplit.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplit.java index 0081e35fa7d..e5abee081f7 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplit.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplit.java @@ -23,16 +23,21 @@ import io.trino.spi.SplitWeight; import io.trino.spi.connector.ConnectorSplit; import io.trino.spi.predicate.TupleDomain; +import org.openjdk.jol.info.ClassLayout; import java.util.List; import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkArgument; +import static io.airlift.slice.SizeOf.estimatedSizeOf; +import static java.lang.Math.toIntExact; import static java.util.Objects.requireNonNull; public class HudiSplit implements ConnectorSplit { + private static final int INSTANCE_SIZE = toIntExact(ClassLayout.parseClass(HudiSplit.class).instanceSize()); + private final String path; private final long start; private final long length; @@ -144,6 +149,17 @@ public List getPartitionKeys() return partitionKeys; } + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + estimatedSizeOf(path) + + estimatedSizeOf(addresses, HostAddress::getRetainedSizeInBytes) + + splitWeight.getRetainedSizeInBytes() + + predicate.getRetainedSizeInBytes(HiveColumnHandle::getRetainedSizeInBytes) + + estimatedSizeOf(partitionKeys, HivePartitionKey::getEstimatedSizeInBytes); + } + @Override public String toString() { diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitManager.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitManager.java index 34afd7e042f..0ab251a210e 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitManager.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitManager.java @@ -13,6 +13,7 @@ */ package io.trino.plugin.hudi; +import io.airlift.log.Logger; import io.trino.hdfs.HdfsContext; import io.trino.hdfs.HdfsEnvironment; import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorSplitSource; @@ -30,15 +31,19 @@ import io.trino.spi.connector.TableNotFoundException; import io.trino.spi.security.ConnectorIdentity; import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.util.HoodieTimer; -import javax.annotation.PreDestroy; import javax.inject.Inject; +import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; import java.util.function.BiFunction; import static com.google.common.collect.ImmutableMap.toImmutableMap; +import static io.trino.plugin.hudi.HudiSessionProperties.getMaxOutstandingSplits; +import static io.trino.plugin.hudi.HudiSessionProperties.getMaxSplitsPerSecond; import static io.trino.spi.connector.SchemaTableName.schemaTableName; import static java.util.Objects.requireNonNull; import static java.util.function.Function.identity; @@ -46,33 +51,33 @@ public class HudiSplitManager implements ConnectorSplitManager { + private static final Logger log = Logger.get(HudiSplitManager.class); + private final HudiTransactionManager transactionManager; + private final HudiPartitionManager partitionManager; private final BiFunction metastoreProvider; private final HdfsEnvironment hdfsEnvironment; private final ExecutorService executor; - private final int maxSplitsPerSecond; - private final int maxOutstandingSplits; + private final ScheduledExecutorService splitLoaderExecutorService; + private final ExecutorService splitGeneratorExecutorService; @Inject public HudiSplitManager( HudiTransactionManager transactionManager, + HudiPartitionManager partitionManager, BiFunction metastoreProvider, HdfsEnvironment hdfsEnvironment, @ForHudiSplitManager ExecutorService executor, - HudiConfig hudiConfig) + @ForHudiSplitSource ScheduledExecutorService splitLoaderExecutorService, + @ForHudiBackgroundSplitLoader ExecutorService splitGeneratorExecutorService) { this.transactionManager = requireNonNull(transactionManager, "transactionManager is null"); + this.partitionManager = requireNonNull(partitionManager, "partitionManager is null"); this.metastoreProvider = requireNonNull(metastoreProvider, "metastoreProvider is null"); this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.executor = requireNonNull(executor, "executor is null"); - this.maxSplitsPerSecond = requireNonNull(hudiConfig, "hudiConfig is null").getMaxSplitsPerSecond(); - this.maxOutstandingSplits = hudiConfig.getMaxOutstandingSplits(); - } - - @PreDestroy - public void destroy() - { - this.executor.shutdown(); + this.splitLoaderExecutorService = requireNonNull(splitLoaderExecutorService, "splitLoaderExecutorService is null"); + this.splitGeneratorExecutorService = requireNonNull(splitGeneratorExecutorService, "splitGeneratorExecutorService is null"); } @Override @@ -89,9 +94,14 @@ public ConnectorSplitSource getSplits( .values().stream().map(HiveColumnHandle.class::cast) .filter(HiveColumnHandle::isPartitionKey) .collect(toImmutableMap(HiveColumnHandle::getName, identity())); - HiveMetastore metastore = metastoreProvider.apply(session.getIdentity(), (HiveTransactionHandle) transaction); + HiveMetastore metastore = hudiMetadata.getMetastore(); Table table = metastore.getTable(hudiTableHandle.getSchemaName(), hudiTableHandle.getTableName()) .orElseThrow(() -> new TableNotFoundException(schemaTableName(hudiTableHandle.getSchemaName(), hudiTableHandle.getTableName()))); + + HoodieTimer timer = new HoodieTimer().startTimer(); + List partitions = partitionManager.getEffectivePartitions(hudiTableHandle, metastore); + log.info("Took %d ms to get %d partitions", timer.endTimer(), partitions.size()); + HudiSplitSource splitSource = new HudiSplitSource( session, metastore, @@ -100,8 +110,11 @@ public ConnectorSplitSource getSplits( hdfsEnvironment.getConfiguration(new HdfsContext(session), new Path(table.getStorage().getLocation())), partitionColumnHandles, executor, - maxSplitsPerSecond, - maxOutstandingSplits); + splitLoaderExecutorService, + splitGeneratorExecutorService, + getMaxSplitsPerSecond(session), + getMaxOutstandingSplits(session), + partitions); return new ClassLoaderSafeConnectorSplitSource(splitSource, HudiSplitManager.class.getClassLoader()); } } diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java index 2726bbb741d..db169ff0172 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java @@ -14,6 +14,7 @@ package io.trino.plugin.hudi; import com.google.common.util.concurrent.Futures; +import io.airlift.log.Logger; import io.airlift.units.DataSize; import io.trino.plugin.hive.HiveColumnHandle; import io.trino.plugin.hive.metastore.HiveMetastore; @@ -39,6 +40,9 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; @@ -53,7 +57,10 @@ public class HudiSplitSource implements ConnectorSplitSource { + private static final Logger log = Logger.get(HudiSplitSource.class); + private final AsyncQueue queue; + private final ScheduledFuture splitLoaderFuture; private final AtomicReference trinoException = new AtomicReference<>(); public HudiSplitSource( @@ -64,8 +71,11 @@ public HudiSplitSource( Configuration configuration, Map partitionColumnHandleMap, ExecutorService executor, + ScheduledExecutorService splitLoaderExecutorService, + ExecutorService splitGeneratorExecutorService, int maxSplitsPerSecond, - int maxOutstandingSplits) + int maxOutstandingSplits, + List partitions) { boolean metadataEnabled = isHudiMetadataEnabled(session); HoodieTableMetaClient metaClient = buildTableMetaClient(configuration, tableHandle.getBasePath()); @@ -83,7 +93,8 @@ public HudiSplitSource( metaClient, metastore, table, - partitionColumnHandles); + partitionColumnHandles, + partitions); this.queue = new ThrottledAsyncQueue<>(maxSplitsPerSecond, maxOutstandingSplits, executor); HudiBackgroundSplitLoader splitLoader = new HudiBackgroundSplitLoader( @@ -91,14 +102,16 @@ public HudiSplitSource( tableHandle, hudiDirectoryLister, queue, - executor, + splitGeneratorExecutorService, createSplitWeightProvider(session), throwable -> { trinoException.compareAndSet(null, new TrinoException(GENERIC_INTERNAL_ERROR, "Failed to generate splits for " + table.getTableName(), throwable)); queue.finish(); - }); - splitLoader.start(); + }, + partitions); + // splitLoader.start(); + this.splitLoaderFuture = splitLoaderExecutorService.schedule(splitLoader, 0, TimeUnit.MILLISECONDS); } @Override @@ -125,7 +138,7 @@ public void close() @Override public boolean isFinished() { - return queue.isFinished(); + return splitLoaderFuture.isDone() && queue.isFinished(); } private static HoodieTableMetaClient buildTableMetaClient(Configuration configuration, String basePath) diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/partition/HiveHudiPartitionInfo.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/partition/HiveHudiPartitionInfo.java index cf3596092da..a5e314dba6f 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/partition/HiveHudiPartitionInfo.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/partition/HiveHudiPartitionInfo.java @@ -69,7 +69,7 @@ public HiveHudiPartitionInfo( @Override public Table getTable() { - return null; + return table; } @Override diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/partition/HudiPartitionInfoLoader.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/partition/HudiPartitionInfoLoader.java index bf501a777d3..0538323cd4e 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/partition/HudiPartitionInfoLoader.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/partition/HudiPartitionInfoLoader.java @@ -13,20 +13,20 @@ */ package io.trino.plugin.hudi.partition; -import io.trino.plugin.hive.metastore.Partition; +import io.airlift.concurrent.MoreFutures; +import io.airlift.log.Logger; +import io.trino.plugin.hive.HivePartitionKey; +import io.trino.plugin.hive.util.AsyncQueue; import io.trino.plugin.hudi.query.HudiDirectoryLister; +import io.trino.plugin.hudi.split.HudiSplitFactory; import io.trino.spi.connector.ConnectorSession; -import org.apache.hudi.exception.HoodieIOException; +import io.trino.spi.connector.ConnectorSplit; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hudi.common.util.HoodieTimer; -import java.util.ArrayList; -import java.util.Comparator; import java.util.Deque; -import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.Optional; -import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.stream.Collectors; import static io.trino.plugin.hudi.HudiSessionProperties.getMaxPartitionBatchSize; import static io.trino.plugin.hudi.HudiSessionProperties.getMinPartitionBatchSize; @@ -34,89 +34,64 @@ public class HudiPartitionInfoLoader implements Runnable { + private static final Logger log = Logger.get(HudiPartitionInfoLoader.class); + private final HudiDirectoryLister hudiDirectoryLister; + private final HudiSplitFactory hudiSplitFactory; private final int minPartitionBatchSize; private final int maxPartitionBatchSize; - private final Deque partitionQueue; + private final AsyncQueue asyncQueue; + private final Deque partitionQueue; private int currentBatchSize; + private boolean isRunning; public HudiPartitionInfoLoader( ConnectorSession session, - HudiDirectoryLister hudiDirectoryLister) + HudiDirectoryLister hudiDirectoryLister, + HudiSplitFactory hudiSplitFactory, + AsyncQueue asyncQueue, + Deque partitionQueue) { this.hudiDirectoryLister = hudiDirectoryLister; - this.partitionQueue = new ConcurrentLinkedDeque<>(); + this.hudiSplitFactory = hudiSplitFactory; + this.asyncQueue = asyncQueue; + this.partitionQueue = partitionQueue; this.minPartitionBatchSize = getMinPartitionBatchSize(session); this.maxPartitionBatchSize = getMaxPartitionBatchSize(session); this.currentBatchSize = -1; + this.isRunning = true; } @Override public void run() { - List hudiPartitionInfoList = hudiDirectoryLister.getPartitionsToScan().stream() - .sorted(Comparator.comparing(HudiPartitionInfo::getComparingKey)) - .collect(Collectors.toList()); + HoodieTimer timer = new HoodieTimer().startTimer(); - // empty partitioned table - if (hudiPartitionInfoList.isEmpty()) { - return; - } + while (isRunning || !partitionQueue.isEmpty()) { + String partitionName = partitionQueue.poll(); - // non-partitioned table - if (hudiPartitionInfoList.size() == 1 && hudiPartitionInfoList.get(0).getHivePartitionName().isEmpty()) { - partitionQueue.addAll(hudiPartitionInfoList); - return; - } - - boolean shouldUseHiveMetastore = hudiPartitionInfoList.get(0) instanceof HiveHudiPartitionInfo; - Iterator iterator = hudiPartitionInfoList.iterator(); - while (iterator.hasNext()) { - int batchSize = updateBatchSize(); - List partitionInfoBatch = new ArrayList<>(); - while (iterator.hasNext() && batchSize > 0) { - partitionInfoBatch.add(iterator.next()); - batchSize--; - } - - if (!partitionInfoBatch.isEmpty()) { - if (shouldUseHiveMetastore) { - Map> partitions = hudiDirectoryLister.getPartitions(partitionInfoBatch.stream() - .map(HudiPartitionInfo::getHivePartitionName) - .collect(Collectors.toList())); - for (HudiPartitionInfo partitionInfo : partitionInfoBatch) { - String hivePartitionName = partitionInfo.getHivePartitionName(); - if (!partitions.containsKey(hivePartitionName)) { - throw new HoodieIOException("Partition does not exist: " + hivePartitionName); - } - partitionInfo.loadPartitionInfo(partitions.get(hivePartitionName)); - partitionQueue.add(partitionInfo); - } - } - else { - for (HudiPartitionInfo partitionInfo : partitionInfoBatch) { - partitionInfo.getHivePartitionKeys(); - partitionQueue.add(partitionInfo); - } - } + if (partitionName != null) { + generateSplitsFromPartition(partitionName); } } + log.debug("HudiPartitionInfoLoader %s finishes in %d ms", this, timer.endTimer()); } - public Deque getPartitionQueue() + private void generateSplitsFromPartition(String partitionName) { - return partitionQueue; + Optional partitionInfo = hudiDirectoryLister.getPartitionInfo(partitionName); + if (partitionInfo.isPresent()) { + List partitionKeys = partitionInfo.get().getHivePartitionKeys(); + List partitionFiles = hudiDirectoryLister.listStatus(partitionInfo.get()); + partitionFiles.stream() + .flatMap(fileStatus -> hudiSplitFactory.createSplits(partitionKeys, fileStatus)) + .map(asyncQueue::offer) + .forEachOrdered(MoreFutures::getFutureValue); + } } - private int updateBatchSize() + public void stopRunning() { - if (currentBatchSize <= 0) { - currentBatchSize = minPartitionBatchSize; - } - else { - currentBatchSize *= 2; - currentBatchSize = Math.min(currentBatchSize, maxPartitionBatchSize); - } - return currentBatchSize; + this.isRunning = false; } } diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/query/HudiDirectoryLister.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/query/HudiDirectoryLister.java index 401e0f35e84..8a40daf37db 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/query/HudiDirectoryLister.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/query/HudiDirectoryLister.java @@ -30,4 +30,6 @@ public interface HudiDirectoryLister List listStatus(HudiPartitionInfo partitionInfo); Map> getPartitions(List partitionNames); + + Optional getPartitionInfo(String partition); } diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/query/HudiReadOptimizedDirectoryLister.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/query/HudiReadOptimizedDirectoryLister.java index 92aad499ce6..5b4e266fd67 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/query/HudiReadOptimizedDirectoryLister.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/query/HudiReadOptimizedDirectoryLister.java @@ -16,15 +16,12 @@ import io.trino.plugin.hive.HiveColumnHandle; import io.trino.plugin.hive.metastore.Column; import io.trino.plugin.hive.metastore.HiveMetastore; -import io.trino.plugin.hive.metastore.MetastoreUtil; import io.trino.plugin.hive.metastore.Partition; import io.trino.plugin.hive.metastore.Table; import io.trino.plugin.hudi.HudiTableHandle; import io.trino.plugin.hudi.partition.HiveHudiPartitionInfo; import io.trino.plugin.hudi.partition.HudiPartitionInfo; import io.trino.spi.connector.SchemaTableName; -import io.trino.spi.connector.TableNotFoundException; -import io.trino.spi.predicate.TupleDomain; import org.apache.hadoop.fs.FileStatus; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.engine.HoodieEngineContext; @@ -32,7 +29,6 @@ import org.apache.hudi.common.table.view.FileSystemViewManager; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; @@ -48,12 +44,9 @@ public class HudiReadOptimizedDirectoryLister private final HiveMetastore hiveMetastore; private final Table hiveTable; private final SchemaTableName tableName; - private final List partitionColumnHandles; private final HoodieTableFileSystemView fileSystemView; - private final TupleDomain partitionKeysFilter; private final List partitionColumns; - - private List hivePartitionNames; + private final List allPartitionInfoList; public HudiReadOptimizedDirectoryLister( HoodieMetadataConfig metadataConfig, @@ -62,28 +55,16 @@ public HudiReadOptimizedDirectoryLister( HoodieTableMetaClient metaClient, HiveMetastore hiveMetastore, Table hiveTable, - List partitionColumnHandles) + List partitionColumnHandles, + List hivePartitionNames) { this.tableHandle = tableHandle; this.tableName = tableHandle.getSchemaTableName(); this.hiveMetastore = hiveMetastore; this.hiveTable = hiveTable; - this.partitionColumnHandles = partitionColumnHandles; this.fileSystemView = FileSystemViewManager.createInMemoryFileSystemView(engineContext, metaClient, metadataConfig); - this.partitionKeysFilter = MetastoreUtil.computePartitionKeyFilter(partitionColumnHandles, tableHandle.getPartitionPredicates()); this.partitionColumns = hiveTable.getPartitionColumns(); - } - - @Override - public List getPartitionsToScan() - { - if (hivePartitionNames == null) { - hivePartitionNames = partitionColumns.isEmpty() - ? Collections.singletonList("") - : getPartitionNamesFromHiveMetastore(partitionKeysFilter); - } - - List allPartitionInfoList = hivePartitionNames.stream() + this.allPartitionInfoList = hivePartitionNames.stream() .map(hivePartitionName -> new HiveHudiPartitionInfo( hivePartitionName, partitionColumns, @@ -92,7 +73,11 @@ public List getPartitionsToScan() hiveTable, hiveMetastore)) .collect(Collectors.toList()); + } + @Override + public List getPartitionsToScan() + { return allPartitionInfoList.stream() .filter(partitionInfo -> partitionInfo.getHivePartitionKeys().isEmpty() || partitionInfo.doesMatchPredicates()) .collect(Collectors.toList()); @@ -106,19 +91,18 @@ public List listStatus(HudiPartitionInfo partitionInfo) .collect(toImmutableList()); } - private List getPartitionNamesFromHiveMetastore(TupleDomain partitionKeysFilter) + @Override + public Map> getPartitions(List partitionNames) { - return hiveMetastore.getPartitionNamesByFilter( - tableName.getSchemaName(), - tableName.getTableName(), - partitionColumns.stream().map(Column::getName).collect(Collectors.toList()), - partitionKeysFilter).orElseThrow(() -> new TableNotFoundException(tableHandle.getSchemaTableName())); + return hiveMetastore.getPartitionsByNames(hiveTable, partitionNames); } @Override - public Map> getPartitions(List partitionNames) + public Optional getPartitionInfo(String partition) { - return hiveMetastore.getPartitionsByNames(hiveTable, partitionNames); + return allPartitionInfoList.stream() + .filter(partitionInfo -> partition.equals(partitionInfo.getHivePartitionName())) + .findFirst(); } @Override diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiBackgroundSplitLoader.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiBackgroundSplitLoader.java index b9ca3cbe60d..7a968050f82 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiBackgroundSplitLoader.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiBackgroundSplitLoader.java @@ -13,103 +13,92 @@ */ package io.trino.plugin.hudi.split; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import io.airlift.concurrent.MoreFutures; -import io.trino.plugin.hive.HivePartitionKey; +import io.airlift.log.Logger; import io.trino.plugin.hive.util.AsyncQueue; import io.trino.plugin.hudi.HudiTableHandle; -import io.trino.plugin.hudi.partition.HudiPartitionInfo; import io.trino.plugin.hudi.partition.HudiPartitionInfoLoader; import io.trino.plugin.hudi.query.HudiDirectoryLister; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorSplit; -import org.apache.hadoop.fs.FileStatus; +import org.apache.hudi.common.util.HoodieTimer; +import org.apache.hudi.exception.HoodieException; -import java.util.Collection; +import java.util.ArrayList; +import java.util.Deque; import java.util.List; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.function.Consumer; -import java.util.stream.Collectors; -import static com.google.common.util.concurrent.MoreExecutors.directExecutor; +import static io.trino.plugin.hudi.HudiSessionProperties.getSplitGeneratorParallelism; import static java.util.Objects.requireNonNull; public class HudiBackgroundSplitLoader + implements Runnable { + private static final Logger log = Logger.get(HudiBackgroundSplitLoader.class); + private final ConnectorSession session; private final HudiDirectoryLister hudiDirectoryLister; private final AsyncQueue asyncQueue; - private final ExecutorService executor; + private final ExecutorService splitGeneratorExecutorService; + private final int splitGeneratorNumThreads; private final Consumer errorListener; private final HudiSplitFactory hudiSplitFactory; + private final List partitions; public HudiBackgroundSplitLoader( ConnectorSession session, HudiTableHandle tableHandle, HudiDirectoryLister hudiDirectoryLister, AsyncQueue asyncQueue, - ExecutorService executor, + ExecutorService splitGeneratorExecutorService, HudiSplitWeightProvider hudiSplitWeightProvider, - Consumer errorListener) + Consumer errorListener, + List partitions) { this.session = requireNonNull(session, "session is null"); this.hudiDirectoryLister = requireNonNull(hudiDirectoryLister, "hudiDirectoryLister is null"); this.asyncQueue = requireNonNull(asyncQueue, "asyncQueue is null"); - this.executor = requireNonNull(executor, "executor is null"); + this.splitGeneratorExecutorService = requireNonNull(splitGeneratorExecutorService, "splitGeneratorExecutorService is null"); + this.splitGeneratorNumThreads = getSplitGeneratorParallelism(session); this.errorListener = requireNonNull(errorListener, "errorListener is null"); this.hudiSplitFactory = new HudiSplitFactory(tableHandle, hudiSplitWeightProvider); + this.partitions = requireNonNull(partitions, "partitions is null"); } - public void start() - { - ListenableFuture> partitionsFuture = Futures.submit(this::loadPartitions, executor); - hookErrorListener(partitionsFuture); - - ListenableFuture splitFutures = Futures.transform( - partitionsFuture, - partitions -> { - List> futures = partitions.stream() - .map(partition -> Futures.submit(() -> loadSplits(partition), executor)) - .peek(this::hookErrorListener) - .collect(Collectors.toList()); - Futures.whenAllComplete(futures).run(asyncQueue::finish, directExecutor()); - return null; - }, - directExecutor()); - hookErrorListener(splitFutures); - } - - private Collection loadPartitions() + @Override + public void run() { - HudiPartitionInfoLoader partitionInfoLoader = new HudiPartitionInfoLoader(session, hudiDirectoryLister); - partitionInfoLoader.run(); - return partitionInfoLoader.getPartitionQueue(); - } + HoodieTimer timer = new HoodieTimer().startTimer(); + Deque partitionQueue = new ConcurrentLinkedDeque<>(partitions); + List splitGeneratorList = new ArrayList<>(); + List splitGeneratorFutures = new ArrayList<>(); - private void loadSplits(HudiPartitionInfo partition) - { - List partitionKeys = partition.getHivePartitionKeys(); - List partitionFiles = hudiDirectoryLister.listStatus(partition); - partitionFiles.stream() - .flatMap(fileStatus -> hudiSplitFactory.createSplits(partitionKeys, fileStatus)) - .map(asyncQueue::offer) - .forEachOrdered(MoreFutures::getFutureValue); - } + // Start a number of partition split generators to generate the splits in parallel + for (int i = 0; i < splitGeneratorNumThreads; i++) { + HudiPartitionInfoLoader generator = new HudiPartitionInfoLoader(session, hudiDirectoryLister, hudiSplitFactory, asyncQueue, partitionQueue); + splitGeneratorList.add(generator); + splitGeneratorFutures.add(splitGeneratorExecutorService.submit(generator)); + } - private void hookErrorListener(ListenableFuture future) - { - Futures.addCallback(future, new FutureCallback() - { - @Override - public void onSuccess(T result) {} + for (HudiPartitionInfoLoader generator : splitGeneratorList) { + // Let the split generator stop once the partition queue is empty + generator.stopRunning(); + } - @Override - public void onFailure(Throwable t) - { - errorListener.accept(t); + // Wait for all split generators to finish + for (Future future : splitGeneratorFutures) { + try { + future.get(); + } + catch (InterruptedException | ExecutionException e) { + throw new HoodieException("Error generating Hudi split", e); } - }, directExecutor()); + } + asyncQueue.finish(); + log.debug("Finish getting all splits in %d ms", timer.endTimer()); } } diff --git a/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/TestHudiSmokeTest.java b/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/TestHudiSmokeTest.java index f00491943ff..843e1a7fb34 100644 --- a/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/TestHudiSmokeTest.java +++ b/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/TestHudiSmokeTest.java @@ -49,6 +49,13 @@ public void testReadNonPartitionedTable() "SELECT * FROM VALUES ('row_1', 'bob'), ('row_2', 'john'), ('row_3', 'tom')"); } + @Test + public void testTopN() + { + // remove this test or make it work with tpch tables + assertQueryOrdered("SELECT n.symbol, r.symbol FROM stock_ticks_cow n LEFT JOIN stock_ticks_cow_column_stats r ON n.key = r.key ORDER BY n.symbol LIMIT 1"); + } + @Test public void testReadPartitionedTables() { diff --git a/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/testing/ResourceHudiTablesInitializer.java b/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/testing/ResourceHudiTablesInitializer.java index 8d1cda74e28..affa66e14a3 100644 --- a/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/testing/ResourceHudiTablesInitializer.java +++ b/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/testing/ResourceHudiTablesInitializer.java @@ -155,6 +155,8 @@ public enum TestingTable HUDI_NON_PART_COW(COPY_ON_WRITE, nonPartitionRegularColumns()), HUDI_COW_PT_TBL(COPY_ON_WRITE, multiPartitionRegularColumns(), multiPartitionColumns(), multiPartitions()), STOCK_TICKS_COW(COPY_ON_WRITE, stockTicksRegularColumns(), stockTicksPartitionColumns(), stockTicksPartitions()), + // NOTE: remove this table, it is only for local testing purpose of column stats + STOCK_TICKS_COW_COLUMN_STATS(COPY_ON_WRITE, stockTicksRegularColumns(), stockTicksPartitionColumns(), stockTicksPartitions()), STOCK_TICKS_MOR(MERGE_ON_READ, stockTicksRegularColumns(), stockTicksPartitionColumns(), stockTicksPartitions()), /**/; diff --git a/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/testing/TpchHudiTablesInitializer.java b/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/testing/TpchHudiTablesInitializer.java index 2a6c5f44b22..85bf391b3aa 100644 --- a/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/testing/TpchHudiTablesInitializer.java +++ b/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/testing/TpchHudiTablesInitializer.java @@ -31,9 +31,6 @@ import io.trino.tpch.TpchColumnType; import io.trino.tpch.TpchColumnTypes; import io.trino.tpch.TpchTable; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hudi.client.HoodieJavaWriteClient; import org.apache.hudi.client.common.HoodieJavaEngineContext; @@ -50,6 +47,10 @@ import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.org.apache.avro.Schema; +import org.apache.hudi.org.apache.avro.Schema.Field; +import org.apache.hudi.org.apache.avro.generic.GenericData; +import org.apache.hudi.org.apache.avro.generic.GenericRecord; import org.intellij.lang.annotations.Language; import java.io.IOException; @@ -77,6 +78,7 @@ import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.toUnmodifiableList; import static org.apache.hadoop.hive.metastore.TableType.EXTERNAL_TABLE; +import static org.apache.hudi.org.apache.avro.Schema.Type.STRING; public class TpchHudiTablesInitializer implements HudiTablesInitializer @@ -263,14 +265,15 @@ public HoodieRecord newInstance() private static Schema createAvroSchema(TpchTable table) { List> tpchColumns = table.getColumns(); - List fields = new ArrayList<>(tpchColumns.size() + 1); + List fields = new ArrayList<>(tpchColumns.size() + 1); for (TpchColumn column : tpchColumns) { String columnName = column.getSimplifiedColumnName(); Schema.Type columnSchemaType = toSchemaType(column.getType()); + // NOTE: comment for now due to shading // Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.create(type)); - fields.add(new Schema.Field(columnName, Schema.create(columnSchemaType))); + //fields.add(new Field(columnName, create(columnSchemaType))); } - fields.add(new Schema.Field(FIELD_UUID, Schema.create(Schema.Type.STRING))); + //fields.add(new Field(FIELD_UUID, create(STRING))); String name = table.getTableName(); return Schema.createRecord(name, null, null, false, fields); } @@ -308,7 +311,7 @@ private enum TpchColumnTypeAdapter IDENTIFIER(Schema.Type.LONG, hiveTypeOf(HIVE_LONG), Function.identity()), DATE(Schema.Type.INT, hiveTypeOf(HIVE_DATE), TpchColumnTypeAdapter::convertDate), DOUBLE(Schema.Type.DOUBLE, hiveTypeOf(HIVE_DOUBLE), Function.identity()), - VARCHAR(Schema.Type.STRING, TpchColumnTypeAdapter::hiveVarcharOf, Function.identity()), + VARCHAR(STRING, TpchColumnTypeAdapter::hiveVarcharOf, Function.identity()), /**/; static TpchColumnTypeAdapter of(TpchColumnType columnType) diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/20221014130009989.commit b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/20221014130009989.commit new file mode 100644 index 00000000000..9e93d46cc4b --- /dev/null +++ b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/20221014130009989.commit @@ -0,0 +1,74 @@ +{ + "partitionToWriteStats" : { + "2018/08/31" : [ { + "fileId" : "7196b21b-34c1-4e78-bf20-72dceb524e9e-0", + "path" : "2018/08/31/7196b21b-34c1-4e78-bf20-72dceb524e9e-0_0-29-29_20221014130009989.parquet", + "prevCommit" : "null", + "numWrites" : 197, + "numDeletes" : 0, + "numUpdateWrites" : 0, + "numInserts" : 197, + "totalWriteBytes" : 443915, + "totalWriteErrors" : 0, + "tempPath" : null, + "partitionPath" : "2018/08/31", + "totalLogRecords" : 0, + "totalLogFilesCompacted" : 0, + "totalLogSizeCompacted" : 0, + "totalUpdatedRecordsCompacted" : 0, + "totalLogBlocks" : 0, + "totalCorruptLogBlock" : 0, + "totalRollbackBlocks" : 0, + "fileSizeInBytes" : 443915, + "minEventTime" : null, + "maxEventTime" : null + } ] + }, + "compacted" : false, + "extraMetadata" : { + "schema" : "{\"type\":\"record\",\"name\":\"stock_ticks\",\"fields\":[{\"name\":\"volume\",\"type\":\"long\"},{\"name\":\"ts\",\"type\":\"string\"},{\"name\":\"symbol\",\"type\":\"string\"},{\"name\":\"year\",\"type\":\"int\"},{\"name\":\"month\",\"type\":\"string\"},{\"name\":\"high\",\"type\":\"double\"},{\"name\":\"low\",\"type\":\"double\"},{\"name\":\"key\",\"type\":\"string\"},{\"name\":\"date\",\"type\":\"string\"},{\"name\":\"close\",\"type\":\"double\"},{\"name\":\"open\",\"type\":\"double\"},{\"name\":\"day\",\"type\":\"string\"}]}", + "deltastreamer.checkpoint.key" : "stock_ticks,0:3482" + }, + "operationType" : "UPSERT", + "writeStats" : [ { + "fileId" : "7196b21b-34c1-4e78-bf20-72dceb524e9e-0", + "path" : "2018/08/31/7196b21b-34c1-4e78-bf20-72dceb524e9e-0_0-29-29_20221014130009989.parquet", + "prevCommit" : "null", + "numWrites" : 197, + "numDeletes" : 0, + "numUpdateWrites" : 0, + "numInserts" : 197, + "totalWriteBytes" : 443915, + "totalWriteErrors" : 0, + "tempPath" : null, + "partitionPath" : "2018/08/31", + "totalLogRecords" : 0, + "totalLogFilesCompacted" : 0, + "totalLogSizeCompacted" : 0, + "totalUpdatedRecordsCompacted" : 0, + "totalLogBlocks" : 0, + "totalCorruptLogBlock" : 0, + "totalRollbackBlocks" : 0, + "fileSizeInBytes" : 443915, + "minEventTime" : null, + "maxEventTime" : null + } ], + "fileIdAndRelativePaths" : { + "7196b21b-34c1-4e78-bf20-72dceb524e9e-0" : "2018/08/31/7196b21b-34c1-4e78-bf20-72dceb524e9e-0_0-29-29_20221014130009989.parquet" + }, + "totalRecordsDeleted" : 0, + "totalLogRecordsCompacted" : 0, + "totalLogFilesCompacted" : 0, + "totalCompactedRecordsUpdated" : 0, + "totalLogFilesSize" : 0, + "totalScanTime" : 0, + "totalCreateTime" : 765, + "totalUpsertTime" : 0, + "minAndMaxEventTime" : { + "Optional.empty" : { + "val" : null, + "present" : false + } + }, + "writePartitionPaths" : [ "2018/08/31" ] +} \ No newline at end of file diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/20221014130009989.commit.requested b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/20221014130009989.commit.requested new file mode 100644 index 00000000000..e69de29bb2d diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/20221014130009989.inflight b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/20221014130009989.inflight new file mode 100644 index 00000000000..ceacf6654e7 --- /dev/null +++ b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/20221014130009989.inflight @@ -0,0 +1,71 @@ +{ + "partitionToWriteStats" : { + "2018/08/31" : [ { + "fileId" : "", + "path" : null, + "prevCommit" : "null", + "numWrites" : 0, + "numDeletes" : 0, + "numUpdateWrites" : 0, + "numInserts" : 197, + "totalWriteBytes" : 0, + "totalWriteErrors" : 0, + "tempPath" : null, + "partitionPath" : null, + "totalLogRecords" : 0, + "totalLogFilesCompacted" : 0, + "totalLogSizeCompacted" : 0, + "totalUpdatedRecordsCompacted" : 0, + "totalLogBlocks" : 0, + "totalCorruptLogBlock" : 0, + "totalRollbackBlocks" : 0, + "fileSizeInBytes" : 0, + "minEventTime" : null, + "maxEventTime" : null + } ] + }, + "compacted" : false, + "extraMetadata" : { }, + "operationType" : "UPSERT", + "writeStats" : [ { + "fileId" : "", + "path" : null, + "prevCommit" : "null", + "numWrites" : 0, + "numDeletes" : 0, + "numUpdateWrites" : 0, + "numInserts" : 197, + "totalWriteBytes" : 0, + "totalWriteErrors" : 0, + "tempPath" : null, + "partitionPath" : null, + "totalLogRecords" : 0, + "totalLogFilesCompacted" : 0, + "totalLogSizeCompacted" : 0, + "totalUpdatedRecordsCompacted" : 0, + "totalLogBlocks" : 0, + "totalCorruptLogBlock" : 0, + "totalRollbackBlocks" : 0, + "fileSizeInBytes" : 0, + "minEventTime" : null, + "maxEventTime" : null + } ], + "fileIdAndRelativePaths" : { + "" : null + }, + "totalRecordsDeleted" : 0, + "totalLogRecordsCompacted" : 0, + "totalLogFilesCompacted" : 0, + "totalCompactedRecordsUpdated" : 0, + "totalLogFilesSize" : 0, + "totalScanTime" : 0, + "totalCreateTime" : 0, + "totalUpsertTime" : 0, + "minAndMaxEventTime" : { + "Optional.empty" : { + "val" : null, + "present" : false + } + }, + "writePartitionPaths" : [ "2018/08/31" ] +} \ No newline at end of file diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/20221014130117650.indexing b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/20221014130117650.indexing new file mode 100644 index 00000000000..b8f51e5350e Binary files /dev/null and b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/20221014130117650.indexing differ diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/20221014130117650.indexing.inflight b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/20221014130117650.indexing.inflight new file mode 100644 index 00000000000..e69de29bb2d diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/20221014130117650.indexing.requested b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/20221014130117650.indexing.requested new file mode 100644 index 00000000000..16883161bf9 Binary files /dev/null and b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/20221014130117650.indexing.requested differ diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/hoodie.properties b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/hoodie.properties new file mode 100644 index 00000000000..7577edbb65e --- /dev/null +++ b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/hoodie.properties @@ -0,0 +1,18 @@ +#Updated at 2022-10-14T13:01:24.683Z +#Fri Oct 14 13:01:24 UTC 2022 +hoodie.table.precombine.field=ts +hoodie.datasource.write.drop.partition.columns=false +hoodie.table.partition.fields=date +hoodie.table.type=COPY_ON_WRITE +hoodie.archivelog.folder=archived +hoodie.populate.meta.fields=true +hoodie.partition.metafile.use.base.format=false +hoodie.timeline.layout.version=1 +hoodie.table.version=4 +hoodie.table.metadata.partitions=column_stats,files +hoodie.table.recordkey.fields=key +hoodie.table.base.file.format=PARQUET +hoodie.table.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator +hoodie.table.name=stock_ticks_cow_column_stats +hoodie.table.metadata.partitions.inflight= +hoodie.table.checksum=1862538983 diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/metadata/.hoodie/00000000000000.deltacommit b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/metadata/.hoodie/00000000000000.deltacommit new file mode 100644 index 00000000000..af3a67e8e88 --- /dev/null +++ b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/metadata/.hoodie/00000000000000.deltacommit @@ -0,0 +1,97 @@ +{ + "partitionToWriteStats" : { + "files" : [ { + "fileId" : "files-0000", + "path" : "files/.files-0000_00000000000000.log.1_0-8-8", + "prevCommit" : "00000000000000", + "numWrites" : 1, + "numDeletes" : 0, + "numUpdateWrites" : 1, + "numInserts" : 0, + "totalWriteBytes" : 10928, + "totalWriteErrors" : 0, + "tempPath" : null, + "partitionPath" : "files", + "totalLogRecords" : 0, + "totalLogFilesCompacted" : 0, + "totalLogSizeCompacted" : 0, + "totalUpdatedRecordsCompacted" : 0, + "totalLogBlocks" : 0, + "totalCorruptLogBlock" : 0, + "totalRollbackBlocks" : 0, + "fileSizeInBytes" : 10928, + "minEventTime" : null, + "maxEventTime" : null, + "logVersion" : 1, + "logOffset" : 0, + "baseFile" : "", + "logFiles" : [ ".files-0000_00000000000000.log.1_0-8-8" ], + "recordsStats" : { + "val" : null, + "present" : false + }, + "columnStats" : { + "val" : null, + "present" : false + } + } ] + }, + "compacted" : false, + "extraMetadata" : { + "schema" : "{\"type\":\"record\",\"name\":\"HoodieMetadataRecord\",\"namespace\":\"org.apache.hudi.avro.model\",\"doc\":\"A record saved within the Metadata Table\",\"fields\":[{\"name\":\"key\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"type\",\"type\":\"int\",\"doc\":\"Type of the metadata record\"},{\"name\":\"filesystemMetadata\",\"type\":[\"null\",{\"type\":\"map\",\"values\":{\"type\":\"record\",\"name\":\"HoodieMetadataFileInfo\",\"fields\":[{\"name\":\"size\",\"type\":\"long\",\"doc\":\"Size of the file\"},{\"name\":\"isDeleted\",\"type\":\"boolean\",\"doc\":\"True if this file has been deleted\"}]},\"avro.java.string\":\"String\"}],\"doc\":\"Contains information about partitions and files within the dataset\"},{\"name\":\"BloomFilterMetadata\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"HoodieMetadataBloomFilter\",\"doc\":\"Data file bloom filter details\",\"fields\":[{\"name\":\"type\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"doc\":\"Bloom filter type code\"},{\"name\":\"timestamp\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"doc\":\"Instant timestamp when this metadata was created/updated\"},{\"name\":\"bloomFilter\",\"type\":\"bytes\",\"doc\":\"Bloom filter binary byte array\"},{\"name\":\"isDeleted\",\"type\":\"boolean\",\"doc\":\"Bloom filter entry valid/deleted flag\"}]}],\"doc\":\"Metadata Index of bloom filters for all data files in the user table\",\"default\":null},{\"name\":\"ColumnStatsMetadata\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"HoodieMetadataColumnStats\",\"doc\":\"Data file column statistics\",\"fields\":[{\"name\":\"fileName\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"doc\":\"File name for which this column statistics applies\",\"default\":null},{\"name\":\"columnName\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"doc\":\"Column name for which this column statistics applies\",\"default\":null},{\"name\":\"minValue\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"BooleanWrapper\",\"doc\":\"A record wrapping boolean type to be able to be used it w/in Avro's Union\",\"fields\":[{\"name\":\"value\",\"type\":\"boolean\"}]},{\"type\":\"record\",\"name\":\"IntWrapper\",\"doc\":\"A record wrapping int type to be able to be used it w/in Avro's Union\",\"fields\":[{\"name\":\"value\",\"type\":\"int\"}]},{\"type\":\"record\",\"name\":\"LongWrapper\",\"doc\":\"A record wrapping long type to be able to be used it w/in Avro's Union\",\"fields\":[{\"name\":\"value\",\"type\":\"long\"}]},{\"type\":\"record\",\"name\":\"FloatWrapper\",\"doc\":\"A record wrapping float type to be able to be used it w/in Avro's Union\",\"fields\":[{\"name\":\"value\",\"type\":\"float\"}]},{\"type\":\"record\",\"name\":\"DoubleWrapper\",\"doc\":\"A record wrapping double type to be able to be used it w/in Avro's Union\",\"fields\":[{\"name\":\"value\",\"type\":\"double\"}]},{\"type\":\"record\",\"name\":\"BytesWrapper\",\"doc\":\"A record wrapping bytes type to be able to be used it w/in Avro's Union\",\"fields\":[{\"name\":\"value\",\"type\":\"bytes\"}]},{\"type\":\"record\",\"name\":\"StringWrapper\",\"doc\":\"A record wrapping string type to be able to be used it w/in Avro's Union\",\"fields\":[{\"name\":\"value\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]},{\"type\":\"record\",\"name\":\"DateWrapper\",\"doc\":\"A record wrapping Date logical type to be able to be used it w/in Avro's Union\",\"fields\":[{\"name\":\"value\",\"type\":\"int\"}]},{\"type\":\"record\",\"name\":\"DecimalWrapper\",\"doc\":\"A record wrapping Decimal logical type to be able to be used it w/in Avro's Union\",\"fields\":[{\"name\":\"value\",\"type\":{\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":30,\"scale\":15}}]},{\"type\":\"record\",\"name\":\"TimeMicrosWrapper\",\"doc\":\"A record wrapping Time-micros logical type to be able to be used it w/in Avro's Union\",\"fields\":[{\"name\":\"value\",\"type\":{\"type\":\"long\",\"logicalType\":\"time-micros\"}}]},{\"type\":\"record\",\"name\":\"TimestampMicrosWrapper\",\"doc\":\"A record wrapping Timestamp-micros logical type to be able to be used it w/in Avro's Union\",\"fields\":[{\"name\":\"value\",\"type\":\"long\"}]}],\"doc\":\"Minimum value in the range. Based on user data table schema, we can convert this to appropriate type\",\"default\":null},{\"name\":\"maxValue\",\"type\":[\"null\",\"BooleanWrapper\",\"IntWrapper\",\"LongWrapper\",\"FloatWrapper\",\"DoubleWrapper\",\"BytesWrapper\",\"StringWrapper\",\"DateWrapper\",\"DecimalWrapper\",\"TimeMicrosWrapper\",\"TimestampMicrosWrapper\"],\"doc\":\"Maximum value in the range. Based on user data table schema, we can convert it to appropriate type\",\"default\":null},{\"name\":\"valueCount\",\"type\":[\"null\",\"long\"],\"doc\":\"Total count of values\",\"default\":null},{\"name\":\"nullCount\",\"type\":[\"null\",\"long\"],\"doc\":\"Total count of null values\",\"default\":null},{\"name\":\"totalSize\",\"type\":[\"null\",\"long\"],\"doc\":\"Total storage size on disk\",\"default\":null},{\"name\":\"totalUncompressedSize\",\"type\":[\"null\",\"long\"],\"doc\":\"Total uncompressed storage size on disk\",\"default\":null},{\"name\":\"isDeleted\",\"type\":\"boolean\",\"doc\":\"Column range entry valid/deleted flag\"}]}],\"doc\":\"Metadata Index of column statistics for all data files in the user table\",\"default\":null}]}" + }, + "operationType" : "UPSERT_PREPPED", + "writeStats" : [ { + "fileId" : "files-0000", + "path" : "files/.files-0000_00000000000000.log.1_0-8-8", + "prevCommit" : "00000000000000", + "numWrites" : 1, + "numDeletes" : 0, + "numUpdateWrites" : 1, + "numInserts" : 0, + "totalWriteBytes" : 10928, + "totalWriteErrors" : 0, + "tempPath" : null, + "partitionPath" : "files", + "totalLogRecords" : 0, + "totalLogFilesCompacted" : 0, + "totalLogSizeCompacted" : 0, + "totalUpdatedRecordsCompacted" : 0, + "totalLogBlocks" : 0, + "totalCorruptLogBlock" : 0, + "totalRollbackBlocks" : 0, + "fileSizeInBytes" : 10928, + "minEventTime" : null, + "maxEventTime" : null, + "logVersion" : 1, + "logOffset" : 0, + "baseFile" : "", + "logFiles" : [ ".files-0000_00000000000000.log.1_0-8-8" ], + "recordsStats" : { + "val" : null, + "present" : false + }, + "columnStats" : { + "val" : null, + "present" : false + } + } ], + "fileIdAndRelativePaths" : { + "files-0000" : "files/.files-0000_00000000000000.log.1_0-8-8" + }, + "totalRecordsDeleted" : 0, + "totalLogRecordsCompacted" : 0, + "totalLogFilesCompacted" : 0, + "totalCompactedRecordsUpdated" : 0, + "totalLogFilesSize" : 0, + "totalScanTime" : 0, + "totalCreateTime" : 0, + "totalUpsertTime" : 706, + "minAndMaxEventTime" : { + "Optional.empty" : { + "val" : null, + "present" : false + } + }, + "writePartitionPaths" : [ "files" ] +} \ No newline at end of file diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/metadata/.hoodie/00000000000000.deltacommit.inflight b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/metadata/.hoodie/00000000000000.deltacommit.inflight new file mode 100644 index 00000000000..0c0cb7f843d --- /dev/null +++ b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/metadata/.hoodie/00000000000000.deltacommit.inflight @@ -0,0 +1,116 @@ +{ + "partitionToWriteStats" : { + "files" : [ { + "fileId" : "", + "path" : null, + "prevCommit" : "null", + "numWrites" : 0, + "numDeletes" : 0, + "numUpdateWrites" : 0, + "numInserts" : 0, + "totalWriteBytes" : 0, + "totalWriteErrors" : 0, + "tempPath" : null, + "partitionPath" : null, + "totalLogRecords" : 0, + "totalLogFilesCompacted" : 0, + "totalLogSizeCompacted" : 0, + "totalUpdatedRecordsCompacted" : 0, + "totalLogBlocks" : 0, + "totalCorruptLogBlock" : 0, + "totalRollbackBlocks" : 0, + "fileSizeInBytes" : 0, + "minEventTime" : null, + "maxEventTime" : null + }, { + "fileId" : "files-0000", + "path" : null, + "prevCommit" : "00000000000000", + "numWrites" : 0, + "numDeletes" : 0, + "numUpdateWrites" : 1, + "numInserts" : 0, + "totalWriteBytes" : 0, + "totalWriteErrors" : 0, + "tempPath" : null, + "partitionPath" : null, + "totalLogRecords" : 0, + "totalLogFilesCompacted" : 0, + "totalLogSizeCompacted" : 0, + "totalUpdatedRecordsCompacted" : 0, + "totalLogBlocks" : 0, + "totalCorruptLogBlock" : 0, + "totalRollbackBlocks" : 0, + "fileSizeInBytes" : 0, + "minEventTime" : null, + "maxEventTime" : null + } ] + }, + "compacted" : false, + "extraMetadata" : { }, + "operationType" : "UPSERT_PREPPED", + "writeStats" : [ { + "fileId" : "", + "path" : null, + "prevCommit" : "null", + "numWrites" : 0, + "numDeletes" : 0, + "numUpdateWrites" : 0, + "numInserts" : 0, + "totalWriteBytes" : 0, + "totalWriteErrors" : 0, + "tempPath" : null, + "partitionPath" : null, + "totalLogRecords" : 0, + "totalLogFilesCompacted" : 0, + "totalLogSizeCompacted" : 0, + "totalUpdatedRecordsCompacted" : 0, + "totalLogBlocks" : 0, + "totalCorruptLogBlock" : 0, + "totalRollbackBlocks" : 0, + "fileSizeInBytes" : 0, + "minEventTime" : null, + "maxEventTime" : null + }, { + "fileId" : "files-0000", + "path" : null, + "prevCommit" : "00000000000000", + "numWrites" : 0, + "numDeletes" : 0, + "numUpdateWrites" : 1, + "numInserts" : 0, + "totalWriteBytes" : 0, + "totalWriteErrors" : 0, + "tempPath" : null, + "partitionPath" : null, + "totalLogRecords" : 0, + "totalLogFilesCompacted" : 0, + "totalLogSizeCompacted" : 0, + "totalUpdatedRecordsCompacted" : 0, + "totalLogBlocks" : 0, + "totalCorruptLogBlock" : 0, + "totalRollbackBlocks" : 0, + "fileSizeInBytes" : 0, + "minEventTime" : null, + "maxEventTime" : null + } ], + "fileIdAndRelativePaths" : { + "" : null, + "files-0000" : null + }, + "totalRecordsDeleted" : 0, + "totalLogRecordsCompacted" : 0, + "totalLogFilesCompacted" : 0, + "totalCompactedRecordsUpdated" : 0, + "totalLogFilesSize" : 0, + "totalScanTime" : 0, + "totalCreateTime" : 0, + "totalUpsertTime" : 0, + "minAndMaxEventTime" : { + "Optional.empty" : { + "val" : null, + "present" : false + } + }, + "writePartitionPaths" : [ "files" ] +} \ No newline at end of file diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/metadata/.hoodie/00000000000000.deltacommit.requested b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/metadata/.hoodie/00000000000000.deltacommit.requested new file mode 100644 index 00000000000..e69de29bb2d diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/metadata/.hoodie/20221014130009989.deltacommit b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/metadata/.hoodie/20221014130009989.deltacommit new file mode 100644 index 00000000000..fb482954870 --- /dev/null +++ b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/metadata/.hoodie/20221014130009989.deltacommit @@ -0,0 +1,72 @@ +{ + "partitionToWriteStats" : { + "column_stats" : [ { + "fileId" : "col-stats-0001", + "path" : "column_stats/.col-stats-0001_20221014130009989.log.1_0-8-11", + "cdcPath" : null, + "prevCommit" : "20221014130009989", + "numWrites" : 9, + "numDeletes" : 0, + "numUpdateWrites" : 9, + "numInserts" : 0, + "cdcWriteBytes" : 0, + "totalWriteBytes" : 11493, + "totalWriteErrors" : 0, + "tempPath" : null, + "partitionPath" : "column_stats", + "totalLogRecords" : 0, + "totalLogFilesCompacted" : 0, + "totalLogSizeCompacted" : 0, + "totalUpdatedRecordsCompacted" : 0, + "totalLogBlocks" : 0, + "totalCorruptLogBlock" : 0, + "totalRollbackBlocks" : 0, + "fileSizeInBytes" : 11493, + "minEventTime" : null, + "maxEventTime" : null, + "logVersion" : 1, + "logOffset" : 0, + "baseFile" : "", + "logFiles" : [ ".col-stats-0001_20221014130009989.log.1_0-8-11" ], + "recordsStats" : { + "val" : null + } + }, { + "fileId" : "col-stats-0000", + "path" : "column_stats/.col-stats-0000_20221014130009989.log.1_1-8-12", + "cdcPath" : null, + "prevCommit" : "20221014130009989", + "numWrites" : 8, + "numDeletes" : 0, + "numUpdateWrites" : 8, + "numInserts" : 0, + "cdcWriteBytes" : 0, + "totalWriteBytes" : 11431, + "totalWriteErrors" : 0, + "tempPath" : null, + "partitionPath" : "column_stats", + "totalLogRecords" : 0, + "totalLogFilesCompacted" : 0, + "totalLogSizeCompacted" : 0, + "totalUpdatedRecordsCompacted" : 0, + "totalLogBlocks" : 0, + "totalCorruptLogBlock" : 0, + "totalRollbackBlocks" : 0, + "fileSizeInBytes" : 11431, + "minEventTime" : null, + "maxEventTime" : null, + "logVersion" : 1, + "logOffset" : 0, + "baseFile" : "", + "logFiles" : [ ".col-stats-0000_20221014130009989.log.1_1-8-12" ], + "recordsStats" : { + "val" : null + } + } ] + }, + "compacted" : false, + "extraMetadata" : { + "schema" : "{\"type\":\"record\",\"name\":\"HoodieMetadataRecord\",\"namespace\":\"org.apache.hudi.avro.model\",\"doc\":\"A record saved within the Metadata Table\",\"fields\":[{\"name\":\"key\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"type\",\"type\":\"int\",\"doc\":\"Type of the metadata record\"},{\"name\":\"filesystemMetadata\",\"type\":[\"null\",{\"type\":\"map\",\"values\":{\"type\":\"record\",\"name\":\"HoodieMetadataFileInfo\",\"fields\":[{\"name\":\"size\",\"type\":\"long\",\"doc\":\"Size of the file\"},{\"name\":\"isDeleted\",\"type\":\"boolean\",\"doc\":\"True if this file has been deleted\"}]},\"avro.java.string\":\"String\"}],\"doc\":\"Contains information about partitions and files within the dataset\"},{\"name\":\"BloomFilterMetadata\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"HoodieMetadataBloomFilter\",\"doc\":\"Data file bloom filter details\",\"fields\":[{\"name\":\"type\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"doc\":\"Bloom filter type code\"},{\"name\":\"timestamp\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"doc\":\"Instant timestamp when this metadata was created/updated\"},{\"name\":\"bloomFilter\",\"type\":\"bytes\",\"doc\":\"Bloom filter binary byte array\"},{\"name\":\"isDeleted\",\"type\":\"boolean\",\"doc\":\"Bloom filter entry valid/deleted flag\"}]}],\"doc\":\"Metadata Index of bloom filters for all data files in the user table\",\"default\":null},{\"name\":\"ColumnStatsMetadata\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"HoodieMetadataColumnStats\",\"doc\":\"Data file column statistics\",\"fields\":[{\"name\":\"fileName\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"doc\":\"File name for which this column statistics applies\",\"default\":null},{\"name\":\"columnName\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"doc\":\"Column name for which this column statistics applies\",\"default\":null},{\"name\":\"minValue\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"BooleanWrapper\",\"doc\":\"A record wrapping boolean type to be able to be used it w/in Avro's Union\",\"fields\":[{\"name\":\"value\",\"type\":\"boolean\"}]},{\"type\":\"record\",\"name\":\"IntWrapper\",\"doc\":\"A record wrapping int type to be able to be used it w/in Avro's Union\",\"fields\":[{\"name\":\"value\",\"type\":\"int\"}]},{\"type\":\"record\",\"name\":\"LongWrapper\",\"doc\":\"A record wrapping long type to be able to be used it w/in Avro's Union\",\"fields\":[{\"name\":\"value\",\"type\":\"long\"}]},{\"type\":\"record\",\"name\":\"FloatWrapper\",\"doc\":\"A record wrapping float type to be able to be used it w/in Avro's Union\",\"fields\":[{\"name\":\"value\",\"type\":\"float\"}]},{\"type\":\"record\",\"name\":\"DoubleWrapper\",\"doc\":\"A record wrapping double type to be able to be used it w/in Avro's Union\",\"fields\":[{\"name\":\"value\",\"type\":\"double\"}]},{\"type\":\"record\",\"name\":\"BytesWrapper\",\"doc\":\"A record wrapping bytes type to be able to be used it w/in Avro's Union\",\"fields\":[{\"name\":\"value\",\"type\":\"bytes\"}]},{\"type\":\"record\",\"name\":\"StringWrapper\",\"doc\":\"A record wrapping string type to be able to be used it w/in Avro's Union\",\"fields\":[{\"name\":\"value\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]},{\"type\":\"record\",\"name\":\"DateWrapper\",\"doc\":\"A record wrapping Date logical type to be able to be used it w/in Avro's Union\",\"fields\":[{\"name\":\"value\",\"type\":\"int\"}]},{\"type\":\"record\",\"name\":\"DecimalWrapper\",\"doc\":\"A record wrapping Decimal logical type to be able to be used it w/in Avro's Union\",\"fields\":[{\"name\":\"value\",\"type\":{\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":30,\"scale\":15}}]},{\"type\":\"record\",\"name\":\"TimeMicrosWrapper\",\"doc\":\"A record wrapping Time-micros logical type to be able to be used it w/in Avro's Union\",\"fields\":[{\"name\":\"value\",\"type\":{\"type\":\"long\",\"logicalType\":\"time-micros\"}}]},{\"type\":\"record\",\"name\":\"TimestampMicrosWrapper\",\"doc\":\"A record wrapping Timestamp-micros logical type to be able to be used it w/in Avro's Union\",\"fields\":[{\"name\":\"value\",\"type\":\"long\"}]}],\"doc\":\"Minimum value in the range. Based on user data table schema, we can convert this to appropriate type\",\"default\":null},{\"name\":\"maxValue\",\"type\":[\"null\",\"BooleanWrapper\",\"IntWrapper\",\"LongWrapper\",\"FloatWrapper\",\"DoubleWrapper\",\"BytesWrapper\",\"StringWrapper\",\"DateWrapper\",\"DecimalWrapper\",\"TimeMicrosWrapper\",\"TimestampMicrosWrapper\"],\"doc\":\"Maximum value in the range. Based on user data table schema, we can convert it to appropriate type\",\"default\":null},{\"name\":\"valueCount\",\"type\":[\"null\",\"long\"],\"doc\":\"Total count of values\",\"default\":null},{\"name\":\"nullCount\",\"type\":[\"null\",\"long\"],\"doc\":\"Total count of null values\",\"default\":null},{\"name\":\"totalSize\",\"type\":[\"null\",\"long\"],\"doc\":\"Total storage size on disk\",\"default\":null},{\"name\":\"totalUncompressedSize\",\"type\":[\"null\",\"long\"],\"doc\":\"Total uncompressed storage size on disk\",\"default\":null},{\"name\":\"isDeleted\",\"type\":\"boolean\",\"doc\":\"Column range entry valid/deleted flag\"}]}],\"doc\":\"Metadata Index of column statistics for all data files in the user table\",\"default\":null}]}" + }, + "operationType" : "UPSERT_PREPPED" +} \ No newline at end of file diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/metadata/.hoodie/20221014130009989.deltacommit.inflight b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/metadata/.hoodie/20221014130009989.deltacommit.inflight new file mode 100644 index 00000000000..6c3bba11e96 --- /dev/null +++ b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/metadata/.hoodie/20221014130009989.deltacommit.inflight @@ -0,0 +1,80 @@ +{ + "partitionToWriteStats" : { + "column_stats" : [ { + "fileId" : "", + "path" : null, + "cdcPath" : null, + "prevCommit" : "null", + "numWrites" : 0, + "numDeletes" : 0, + "numUpdateWrites" : 0, + "numInserts" : 0, + "cdcWriteBytes" : 0, + "totalWriteBytes" : 0, + "totalWriteErrors" : 0, + "tempPath" : null, + "partitionPath" : null, + "totalLogRecords" : 0, + "totalLogFilesCompacted" : 0, + "totalLogSizeCompacted" : 0, + "totalUpdatedRecordsCompacted" : 0, + "totalLogBlocks" : 0, + "totalCorruptLogBlock" : 0, + "totalRollbackBlocks" : 0, + "fileSizeInBytes" : 0, + "minEventTime" : null, + "maxEventTime" : null + }, { + "fileId" : "col-stats-0001", + "path" : null, + "cdcPath" : null, + "prevCommit" : "20221014130009989", + "numWrites" : 0, + "numDeletes" : 0, + "numUpdateWrites" : 9, + "numInserts" : 0, + "cdcWriteBytes" : 0, + "totalWriteBytes" : 0, + "totalWriteErrors" : 0, + "tempPath" : null, + "partitionPath" : null, + "totalLogRecords" : 0, + "totalLogFilesCompacted" : 0, + "totalLogSizeCompacted" : 0, + "totalUpdatedRecordsCompacted" : 0, + "totalLogBlocks" : 0, + "totalCorruptLogBlock" : 0, + "totalRollbackBlocks" : 0, + "fileSizeInBytes" : 0, + "minEventTime" : null, + "maxEventTime" : null + }, { + "fileId" : "col-stats-0000", + "path" : null, + "cdcPath" : null, + "prevCommit" : "20221014130009989", + "numWrites" : 0, + "numDeletes" : 0, + "numUpdateWrites" : 8, + "numInserts" : 0, + "cdcWriteBytes" : 0, + "totalWriteBytes" : 0, + "totalWriteErrors" : 0, + "tempPath" : null, + "partitionPath" : null, + "totalLogRecords" : 0, + "totalLogFilesCompacted" : 0, + "totalLogSizeCompacted" : 0, + "totalUpdatedRecordsCompacted" : 0, + "totalLogBlocks" : 0, + "totalCorruptLogBlock" : 0, + "totalRollbackBlocks" : 0, + "fileSizeInBytes" : 0, + "minEventTime" : null, + "maxEventTime" : null + } ] + }, + "compacted" : false, + "extraMetadata" : { }, + "operationType" : "UPSERT_PREPPED" +} \ No newline at end of file diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/metadata/.hoodie/20221014130009989.deltacommit.requested b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/metadata/.hoodie/20221014130009989.deltacommit.requested new file mode 100644 index 00000000000..e69de29bb2d diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/metadata/.hoodie/hoodie.properties b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/metadata/.hoodie/hoodie.properties new file mode 100644 index 00000000000..637c97ac49b --- /dev/null +++ b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/metadata/.hoodie/hoodie.properties @@ -0,0 +1,14 @@ +#Updated at 2022-10-14T13:01:20.950Z +#Fri Oct 14 13:01:20 UTC 2022 +hoodie.datasource.write.drop.partition.columns=false +hoodie.table.type=MERGE_ON_READ +hoodie.archivelog.folder=archived +hoodie.populate.meta.fields=false +hoodie.compaction.payload.class=org.apache.hudi.metadata.HoodieMetadataPayload +hoodie.timeline.layout.version=1 +hoodie.table.version=5 +hoodie.table.recordkey.fields=key +hoodie.table.base.file.format=HFILE +hoodie.table.keygenerator.class=org.apache.hudi.metadata.HoodieTableMetadataKeyGenerator +hoodie.table.name=stock_ticks_cow_column_stats_metadata +hoodie.table.checksum=2293906534 diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/metadata/column_stats/.col-stats-0000_20221014130009989.log.1_0-0-0 b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/metadata/column_stats/.col-stats-0000_20221014130009989.log.1_0-0-0 new file mode 100644 index 00000000000..b993c2d0fed Binary files /dev/null and b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/metadata/column_stats/.col-stats-0000_20221014130009989.log.1_0-0-0 differ diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/metadata/column_stats/.col-stats-0000_20221014130009989.log.1_1-8-12 b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/metadata/column_stats/.col-stats-0000_20221014130009989.log.1_1-8-12 new file mode 100644 index 00000000000..4e81920a9e8 Binary files /dev/null and b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/metadata/column_stats/.col-stats-0000_20221014130009989.log.1_1-8-12 differ diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/metadata/column_stats/.col-stats-0001_20221014130009989.log.1_0-0-0 b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/metadata/column_stats/.col-stats-0001_20221014130009989.log.1_0-0-0 new file mode 100644 index 00000000000..b993c2d0fed Binary files /dev/null and b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/metadata/column_stats/.col-stats-0001_20221014130009989.log.1_0-0-0 differ diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/metadata/column_stats/.col-stats-0001_20221014130009989.log.1_0-8-11 b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/metadata/column_stats/.col-stats-0001_20221014130009989.log.1_0-8-11 new file mode 100644 index 00000000000..13103d704b2 Binary files /dev/null and b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/metadata/column_stats/.col-stats-0001_20221014130009989.log.1_0-8-11 differ diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/metadata/column_stats/.hoodie_partition_metadata b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/metadata/column_stats/.hoodie_partition_metadata new file mode 100644 index 00000000000..24d26b944ac --- /dev/null +++ b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/metadata/column_stats/.hoodie_partition_metadata @@ -0,0 +1,4 @@ +#partition metadata +#Fri Oct 14 13:01:22 UTC 2022 +commitTime=20221014130009989 +partitionDepth=1 diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/metadata/files/.files-0000_00000000000000.log.1_0-0-0 b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/metadata/files/.files-0000_00000000000000.log.1_0-0-0 new file mode 100644 index 00000000000..9bf687c1a4b Binary files /dev/null and b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/metadata/files/.files-0000_00000000000000.log.1_0-0-0 differ diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/metadata/files/.files-0000_00000000000000.log.1_0-8-8 b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/metadata/files/.files-0000_00000000000000.log.1_0-8-8 new file mode 100644 index 00000000000..116bcd084d1 Binary files /dev/null and b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/metadata/files/.files-0000_00000000000000.log.1_0-8-8 differ diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/metadata/files/.hoodie_partition_metadata b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/metadata/files/.hoodie_partition_metadata new file mode 100644 index 00000000000..1c2b491e52c --- /dev/null +++ b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/.hoodie/metadata/files/.hoodie_partition_metadata @@ -0,0 +1,4 @@ +#partition metadata +#Fri Oct 14 13:00:12 UTC 2022 +commitTime=00000000000000 +partitionDepth=1 diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/2018/08/31/.hoodie_partition_metadata b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/2018/08/31/.hoodie_partition_metadata new file mode 100644 index 00000000000..bb4d3eb6387 --- /dev/null +++ b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/2018/08/31/.hoodie_partition_metadata @@ -0,0 +1,4 @@ +#partition metadata +#Fri Oct 14 13:00:16 UTC 2022 +commitTime=20221014130009989 +partitionDepth=3 diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/2018/08/31/7196b21b-34c1-4e78-bf20-72dceb524e9e-0_0-29-29_20221014130009989.parquet b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/2018/08/31/7196b21b-34c1-4e78-bf20-72dceb524e9e-0_0-29-29_20221014130009989.parquet new file mode 100644 index 00000000000..606e147c6e2 Binary files /dev/null and b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_ticks_cow_column_stats/2018/08/31/7196b21b-34c1-4e78-bf20-72dceb524e9e-0_0-29-29_20221014130009989.parquet differ