From 57c6a98462af4ff7996c873cc3c5c8f444be7e22 Mon Sep 17 00:00:00 2001 From: Lokesh Jain Date: Sat, 30 Nov 2024 23:20:05 +0530 Subject: [PATCH] [HUDI-8588] Col stats pruning is ignoring to leverage stats from log files (#12374) --- .../org/apache/hudi/BucketIndexSupport.scala | 2 +- .../apache/hudi/ColumnStatsIndexSupport.scala | 2 +- .../apache/hudi/ExpressionIndexSupport.scala | 2 +- .../apache/hudi/SparkBaseIndexSupport.scala | 6 +- .../TestColumnStatsIndexWithSQL.scala | 118 +++++++++++++++++- .../command/index/TestExpressionIndex.scala | 2 +- 6 files changed, 121 insertions(+), 11 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BucketIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BucketIndexSupport.scala index b141897cdec0..52d80254d9e7 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BucketIndexSupport.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BucketIndexSupport.scala @@ -81,7 +81,7 @@ class BucketIndexSupport(spark: SparkSession, val bucketIdsBitMapByFilter = filterQueriesWithBucketHashField(queryFilters) if (bucketIdsBitMapByFilter.isDefined && bucketIdsBitMapByFilter.get.cardinality() > 0) { - val allFilesName = getPrunedPartitionsAndFileNames(prunedPartitionsAndFileSlices)._2 + val allFilesName = getPrunedPartitionsAndFileNames(fileIndex, prunedPartitionsAndFileSlices)._2 Option.apply(getCandidateFiles(allFilesName, bucketIdsBitMapByFilter.get)) } else { Option.empty diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala index 3eaa59d6b807..fc1f15ad4093 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala @@ -80,7 +80,7 @@ class ColumnStatsIndexSupport(spark: SparkSession, ): Option[Set[String]] = { if (isIndexAvailable && queryFilters.nonEmpty && queryReferencedColumns.nonEmpty) { val readInMemory = shouldReadInMemory(fileIndex, queryReferencedColumns, inMemoryProjectionThreshold) - val (prunedPartitions, prunedFileNames) = getPrunedPartitionsAndFileNames(prunedPartitionsAndFileSlices) + val (prunedPartitions, prunedFileNames) = getPrunedPartitionsAndFileNames(fileIndex, prunedPartitionsAndFileSlices) // NOTE: If partition pruning doesn't prune any files, then there's no need to apply file filters // when loading the Column Statistics Index val prunedFileNamesOpt = if (shouldPushDownFilesFilter) Some(prunedFileNames) else None diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ExpressionIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ExpressionIndexSupport.scala index 2fb56ab3282a..37e27ccd1a1a 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ExpressionIndexSupport.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ExpressionIndexSupport.scala @@ -64,7 +64,7 @@ class ExpressionIndexSupport(spark: SparkSession, val indexDefinition = metaClient.getIndexMetadata.get().getIndexDefinitions.get(indexPartition) if (indexDefinition.getIndexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS)) { val readInMemory = shouldReadInMemory(fileIndex, queryReferencedColumns, inMemoryProjectionThreshold) - val (prunedPartitions, prunedFileNames) = getPrunedPartitionsAndFileNames(prunedPartitionsAndFileSlices) + val (prunedPartitions, prunedFileNames) = getPrunedPartitionsAndFileNames(fileIndex, prunedPartitionsAndFileSlices) val indexDf = loadExpressionIndexDataFrame(indexPartition, prunedPartitions, readInMemory) Some(getCandidateFiles(indexDf, queryFilters, prunedFileNames)) } else if (indexDefinition.getIndexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS)) { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala index 41eb0978e70b..ad0b7bcd6b64 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala @@ -79,14 +79,14 @@ abstract class SparkBaseIndexSupport(spark: SparkSession, def invalidateCaches(): Unit - def getPrunedPartitionsAndFileNames(prunedPartitionsAndFileSlices: Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])], - includeLogFiles: Boolean = false): (Set[String], Set[String]) = { + def getPrunedPartitionsAndFileNames(fileIndex: HoodieFileIndex, prunedPartitionsAndFileSlices: Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], + Seq[FileSlice])]): (Set[String], Set[String]) = { val (prunedPartitions, prunedFiles) = prunedPartitionsAndFileSlices.foldLeft((Set.empty[String], Set.empty[String])) { case ((partitionSet, fileSet), (partitionPathOpt, fileSlices)) => val updatedPartitionSet = partitionPathOpt.map(_.path).map(partitionSet + _).getOrElse(partitionSet) val updatedFileSet = fileSlices.foldLeft(fileSet) { (fileAcc, fileSlice) => val baseFile = Option(fileSlice.getBaseFile.orElse(null)).map(_.getFileName) - val logFiles = if (includeLogFiles) { + val logFiles = if (fileIndex.includeLogFiles) { fileSlice.getLogFiles.iterator().asScala.map(_.getFileName).toSet } else Set.empty[String] diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala index 595587968889..293208d8ca6a 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala @@ -18,28 +18,30 @@ package org.apache.hudi.functional -import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieFileIndex} +import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceReadOptions, DataSourceWriteOptions, HoodieFileIndex} import org.apache.hudi.DataSourceWriteOptions.{DELETE_OPERATION_OPT_VAL, PRECOMBINE_FIELD, RECORDKEY_FIELD} import org.apache.hudi.client.SparkRDDWriteClient import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties} import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieTableType, WriteOperationType} -import org.apache.hudi.common.table.HoodieTableConfig +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.common.table.timeline.{HoodieInstant, MetadataConversionUtils} +import org.apache.hudi.common.util.FileIOUtils import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, HoodieWriteConfig} import org.apache.hudi.functional.ColumnStatIndexTestBase.{ColumnStatsTestCase, ColumnStatsTestParams} import org.apache.hudi.index.HoodieIndex.IndexType.INMEMORY import org.apache.hudi.metadata.HoodieMetadataFileSystemView import org.apache.hudi.util.JavaConversions - import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, Expression, GreaterThan, Literal} +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, Expression, GreaterThan, Literal} import org.apache.spark.sql.types.StringType import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} +import org.junit.jupiter.api.Test import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.MethodSource +import java.io.File import scala.collection.JavaConverters._ class TestColumnStatsIndexWithSQL extends ColumnStatIndexTestBase { @@ -227,6 +229,114 @@ class TestColumnStatsIndexWithSQL extends ColumnStatIndexTestBase { verifyFileIndexAndSQLQueries(commonOpts) } + @ParameterizedTest + @MethodSource(Array("testMetadataColumnStatsIndexParamsForMOR")) + def testGetPrunedPartitionsAndFileNames(testCase: ColumnStatsTestCase): Unit = { + val metadataOpts = Map( + HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true" + ) + + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString, + RECORDKEY_FIELD.key -> "c1", + PRECOMBINE_FIELD.key -> "c1", + HoodieTableConfig.POPULATE_META_FIELDS.key -> "true", + DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true", + DataSourceReadOptions.QUERY_TYPE.key -> DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL, + HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "20" + ) ++ metadataOpts + setupTable(testCase, metadataOpts, commonOpts, shouldValidate = false) + + doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, metadataOpts, commonOpts, + dataSourcePath = "index/colstats/update-input-table-json", + expectedColStatsSourcePath = "", + operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Append, + shouldValidate = false)) + verifyFileIndexAndSQLQueries(commonOpts) + + var fileIndex = HoodieFileIndex(spark, metaClient, None, commonOpts + ("path" -> basePath), includeLogFiles = true) + val metadataConfig = HoodieMetadataConfig.newBuilder.withMetadataIndexColumnStats(true).enable(true).build + val cis = new ColumnStatsIndexSupport(spark, fileIndex.schema, metadataConfig, metaClient) + // unpartitioned table - get all file slices + val fileSlices = fileIndex.prunePartitionsAndGetFileSlices(Seq.empty, Seq()) + var files = cis.getPrunedPartitionsAndFileNames(fileIndex, fileSlices._2)._2 + // Number of files obtained if file index has include log files as true is double of number of parquet files + val numberOfParquetFiles = 9 + assertEquals(numberOfParquetFiles * 2, files.size) + assertEquals(numberOfParquetFiles, files.count(f => f.endsWith("parquet"))) + + fileIndex = HoodieFileIndex(spark, metaClient, None, commonOpts + ("path" -> basePath), includeLogFiles = false) + files = cis.getPrunedPartitionsAndFileNames(fileIndex, fileSlices._2)._2 + assertEquals(numberOfParquetFiles, files.size) + assertEquals(numberOfParquetFiles, files.count(f => f.endsWith("parquet"))) + } + + @Test + def testUpdateAndSkippingWithColumnStatIndex() { + val tableName = "testUpdateAndSkippingWithColumnStatIndex" + val metadataOpts = Map( + HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true" + ) + + val commonOpts = Map( + HoodieWriteConfig.TBL_NAME.key -> tableName, + DataSourceWriteOptions.TABLE_TYPE.key -> "mor", + RECORDKEY_FIELD.key -> "id", + PRECOMBINE_FIELD.key -> "ts", + HoodieTableConfig.POPULATE_META_FIELDS.key -> "true", + DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true", + HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "20", + HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key() -> "0" + ) ++ metadataOpts + + FileIOUtils.deleteDirectory(new File(basePath)) + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | part long + |) using hudi + | options ( + | primaryKey ='id', + | type = 'mor', + | preCombineField = 'name', + | hoodie.metadata.enable = 'true', + | hoodie.metadata.index.column.stats.enable = 'true' + | ) + | partitioned by(part) + | location '$basePath' + """.stripMargin) + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000), (2, 'a2', 10, 1001), (3, 'a3', 10, 1002)") + spark.sql(s"update $tableName set price = 20 where id = 1") + + metaClient = HoodieTableMetaClient.reload(metaClient) + var fileIndex = HoodieFileIndex(spark, metaClient, None, commonOpts + ("path" -> basePath), includeLogFiles = true) + val dataFilter = EqualTo(attribute("price"), literal("20")) + var filteredPartitionDirectories = fileIndex.listFiles(Seq.empty, Seq(dataFilter)) + var filteredFilesCount = filteredPartitionDirectories.flatMap(s => s.files).size + assertEquals(2, filteredFilesCount) + assertEquals(1, spark.sql(s"select * from $tableName where price = 20").count()) + + fileIndex = HoodieFileIndex(spark, metaClient, None, commonOpts + ("path" -> basePath), includeLogFiles = false) + filteredPartitionDirectories = fileIndex.listFiles(Seq.empty, Seq(dataFilter)) + filteredFilesCount = filteredPartitionDirectories.flatMap(s => s.files).size + assertEquals(0, filteredFilesCount) + val df = spark.read.format("org.apache.hudi") + .options(commonOpts) + .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL) + .load(basePath) + .filter("price = 20") + assertEquals(0, df.count()) + } + private def setupTable(testCase: ColumnStatsTestCase, metadataOpts: Map[String, String], commonOpts: Map[String, String], shouldValidate: Boolean, useShortSchema: Boolean = false): Unit = { val filePostfix = if (useShortSchema) { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestExpressionIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestExpressionIndex.scala index f09ae398e0e0..e1604c988925 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestExpressionIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestExpressionIndex.scala @@ -1148,7 +1148,7 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase { val partitionFilter: Expression = EqualTo(AttributeReference("c8", IntegerType)(), Literal(9)) val (isPruned, prunedPaths) = fileIndex.prunePartitionsAndGetFileSlices(Seq.empty, Seq(partitionFilter)) assertTrue(isPruned) - val prunedPartitionAndFileNames = expressionIndexSupport.getPrunedPartitionsAndFileNames(prunedPaths, includeLogFiles = true) + val prunedPartitionAndFileNames = expressionIndexSupport.getPrunedPartitionsAndFileNames(fileIndex, prunedPaths) assertTrue(prunedPartitionAndFileNames._1.size == 1) // partition assertTrue(prunedPartitionAndFileNames._2.size == 1) // log file assertTrue(FSUtils.isLogFile(prunedPartitionAndFileNames._2.head))