Skip to content

Commit

Permalink
[HUDI-8588] Col stats pruning is ignoring to leverage stats from log …
Browse files Browse the repository at this point in the history
…files (apache#12374)
  • Loading branch information
lokeshj1703 authored Nov 30, 2024
1 parent 56159e2 commit 57c6a98
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class BucketIndexSupport(spark: SparkSession,
val bucketIdsBitMapByFilter = filterQueriesWithBucketHashField(queryFilters)

if (bucketIdsBitMapByFilter.isDefined && bucketIdsBitMapByFilter.get.cardinality() > 0) {
val allFilesName = getPrunedPartitionsAndFileNames(prunedPartitionsAndFileSlices)._2
val allFilesName = getPrunedPartitionsAndFileNames(fileIndex, prunedPartitionsAndFileSlices)._2
Option.apply(getCandidateFiles(allFilesName, bucketIdsBitMapByFilter.get))
} else {
Option.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class ColumnStatsIndexSupport(spark: SparkSession,
): Option[Set[String]] = {
if (isIndexAvailable && queryFilters.nonEmpty && queryReferencedColumns.nonEmpty) {
val readInMemory = shouldReadInMemory(fileIndex, queryReferencedColumns, inMemoryProjectionThreshold)
val (prunedPartitions, prunedFileNames) = getPrunedPartitionsAndFileNames(prunedPartitionsAndFileSlices)
val (prunedPartitions, prunedFileNames) = getPrunedPartitionsAndFileNames(fileIndex, prunedPartitionsAndFileSlices)
// NOTE: If partition pruning doesn't prune any files, then there's no need to apply file filters
// when loading the Column Statistics Index
val prunedFileNamesOpt = if (shouldPushDownFilesFilter) Some(prunedFileNames) else None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class ExpressionIndexSupport(spark: SparkSession,
val indexDefinition = metaClient.getIndexMetadata.get().getIndexDefinitions.get(indexPartition)
if (indexDefinition.getIndexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS)) {
val readInMemory = shouldReadInMemory(fileIndex, queryReferencedColumns, inMemoryProjectionThreshold)
val (prunedPartitions, prunedFileNames) = getPrunedPartitionsAndFileNames(prunedPartitionsAndFileSlices)
val (prunedPartitions, prunedFileNames) = getPrunedPartitionsAndFileNames(fileIndex, prunedPartitionsAndFileSlices)
val indexDf = loadExpressionIndexDataFrame(indexPartition, prunedPartitions, readInMemory)
Some(getCandidateFiles(indexDf, queryFilters, prunedFileNames))
} else if (indexDefinition.getIndexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,14 @@ abstract class SparkBaseIndexSupport(spark: SparkSession,

def invalidateCaches(): Unit

def getPrunedPartitionsAndFileNames(prunedPartitionsAndFileSlices: Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])],
includeLogFiles: Boolean = false): (Set[String], Set[String]) = {
def getPrunedPartitionsAndFileNames(fileIndex: HoodieFileIndex, prunedPartitionsAndFileSlices: Seq[(Option[BaseHoodieTableFileIndex.PartitionPath],
Seq[FileSlice])]): (Set[String], Set[String]) = {
val (prunedPartitions, prunedFiles) = prunedPartitionsAndFileSlices.foldLeft((Set.empty[String], Set.empty[String])) {
case ((partitionSet, fileSet), (partitionPathOpt, fileSlices)) =>
val updatedPartitionSet = partitionPathOpt.map(_.path).map(partitionSet + _).getOrElse(partitionSet)
val updatedFileSet = fileSlices.foldLeft(fileSet) { (fileAcc, fileSlice) =>
val baseFile = Option(fileSlice.getBaseFile.orElse(null)).map(_.getFileName)
val logFiles = if (includeLogFiles) {
val logFiles = if (fileIndex.includeLogFiles) {
fileSlice.getLogFiles.iterator().asScala.map(_.getFileName).toSet
} else Set.empty[String]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,30 @@

package org.apache.hudi.functional

import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieFileIndex}
import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceReadOptions, DataSourceWriteOptions, HoodieFileIndex}
import org.apache.hudi.DataSourceWriteOptions.{DELETE_OPERATION_OPT_VAL, PRECOMBINE_FIELD, RECORDKEY_FIELD}
import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieTableType, WriteOperationType}
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.table.timeline.{HoodieInstant, MetadataConversionUtils}
import org.apache.hudi.common.util.FileIOUtils
import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, HoodieWriteConfig}
import org.apache.hudi.functional.ColumnStatIndexTestBase.{ColumnStatsTestCase, ColumnStatsTestParams}
import org.apache.hudi.index.HoodieIndex.IndexType.INMEMORY
import org.apache.hudi.metadata.HoodieMetadataFileSystemView
import org.apache.hudi.util.JavaConversions

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, Expression, GreaterThan, Literal}
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, Expression, GreaterThan, Literal}
import org.apache.spark.sql.types.StringType
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource

import java.io.File
import scala.collection.JavaConverters._

class TestColumnStatsIndexWithSQL extends ColumnStatIndexTestBase {
Expand Down Expand Up @@ -227,6 +229,114 @@ class TestColumnStatsIndexWithSQL extends ColumnStatIndexTestBase {
verifyFileIndexAndSQLQueries(commonOpts)
}

@ParameterizedTest
@MethodSource(Array("testMetadataColumnStatsIndexParamsForMOR"))
def testGetPrunedPartitionsAndFileNames(testCase: ColumnStatsTestCase): Unit = {
val metadataOpts = Map(
HoodieMetadataConfig.ENABLE.key -> "true",
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true"
)

val commonOpts = Map(
"hoodie.insert.shuffle.parallelism" -> "4",
"hoodie.upsert.shuffle.parallelism" -> "4",
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString,
RECORDKEY_FIELD.key -> "c1",
PRECOMBINE_FIELD.key -> "c1",
HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true",
DataSourceReadOptions.QUERY_TYPE.key -> DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL,
HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "20"
) ++ metadataOpts
setupTable(testCase, metadataOpts, commonOpts, shouldValidate = false)

doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, metadataOpts, commonOpts,
dataSourcePath = "index/colstats/update-input-table-json",
expectedColStatsSourcePath = "",
operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
saveMode = SaveMode.Append,
shouldValidate = false))
verifyFileIndexAndSQLQueries(commonOpts)

var fileIndex = HoodieFileIndex(spark, metaClient, None, commonOpts + ("path" -> basePath), includeLogFiles = true)
val metadataConfig = HoodieMetadataConfig.newBuilder.withMetadataIndexColumnStats(true).enable(true).build
val cis = new ColumnStatsIndexSupport(spark, fileIndex.schema, metadataConfig, metaClient)
// unpartitioned table - get all file slices
val fileSlices = fileIndex.prunePartitionsAndGetFileSlices(Seq.empty, Seq())
var files = cis.getPrunedPartitionsAndFileNames(fileIndex, fileSlices._2)._2
// Number of files obtained if file index has include log files as true is double of number of parquet files
val numberOfParquetFiles = 9
assertEquals(numberOfParquetFiles * 2, files.size)
assertEquals(numberOfParquetFiles, files.count(f => f.endsWith("parquet")))

fileIndex = HoodieFileIndex(spark, metaClient, None, commonOpts + ("path" -> basePath), includeLogFiles = false)
files = cis.getPrunedPartitionsAndFileNames(fileIndex, fileSlices._2)._2
assertEquals(numberOfParquetFiles, files.size)
assertEquals(numberOfParquetFiles, files.count(f => f.endsWith("parquet")))
}

@Test
def testUpdateAndSkippingWithColumnStatIndex() {
val tableName = "testUpdateAndSkippingWithColumnStatIndex"
val metadataOpts = Map(
HoodieMetadataConfig.ENABLE.key -> "true",
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true"
)

val commonOpts = Map(
HoodieWriteConfig.TBL_NAME.key -> tableName,
DataSourceWriteOptions.TABLE_TYPE.key -> "mor",
RECORDKEY_FIELD.key -> "id",
PRECOMBINE_FIELD.key -> "ts",
HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true",
HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "20",
HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key() -> "0"
) ++ metadataOpts

FileIOUtils.deleteDirectory(new File(basePath))
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| part long
|) using hudi
| options (
| primaryKey ='id',
| type = 'mor',
| preCombineField = 'name',
| hoodie.metadata.enable = 'true',
| hoodie.metadata.index.column.stats.enable = 'true'
| )
| partitioned by(part)
| location '$basePath'
""".stripMargin)
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000), (2, 'a2', 10, 1001), (3, 'a3', 10, 1002)")
spark.sql(s"update $tableName set price = 20 where id = 1")

metaClient = HoodieTableMetaClient.reload(metaClient)
var fileIndex = HoodieFileIndex(spark, metaClient, None, commonOpts + ("path" -> basePath), includeLogFiles = true)
val dataFilter = EqualTo(attribute("price"), literal("20"))
var filteredPartitionDirectories = fileIndex.listFiles(Seq.empty, Seq(dataFilter))
var filteredFilesCount = filteredPartitionDirectories.flatMap(s => s.files).size
assertEquals(2, filteredFilesCount)
assertEquals(1, spark.sql(s"select * from $tableName where price = 20").count())

fileIndex = HoodieFileIndex(spark, metaClient, None, commonOpts + ("path" -> basePath), includeLogFiles = false)
filteredPartitionDirectories = fileIndex.listFiles(Seq.empty, Seq(dataFilter))
filteredFilesCount = filteredPartitionDirectories.flatMap(s => s.files).size
assertEquals(0, filteredFilesCount)
val df = spark.read.format("org.apache.hudi")
.options(commonOpts)
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
.load(basePath)
.filter("price = 20")
assertEquals(0, df.count())
}

private def setupTable(testCase: ColumnStatsTestCase, metadataOpts: Map[String, String], commonOpts: Map[String, String],
shouldValidate: Boolean, useShortSchema: Boolean = false): Unit = {
val filePostfix = if (useShortSchema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1148,7 +1148,7 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
val partitionFilter: Expression = EqualTo(AttributeReference("c8", IntegerType)(), Literal(9))
val (isPruned, prunedPaths) = fileIndex.prunePartitionsAndGetFileSlices(Seq.empty, Seq(partitionFilter))
assertTrue(isPruned)
val prunedPartitionAndFileNames = expressionIndexSupport.getPrunedPartitionsAndFileNames(prunedPaths, includeLogFiles = true)
val prunedPartitionAndFileNames = expressionIndexSupport.getPrunedPartitionsAndFileNames(fileIndex, prunedPaths)
assertTrue(prunedPartitionAndFileNames._1.size == 1) // partition
assertTrue(prunedPartitionAndFileNames._2.size == 1) // log file
assertTrue(FSUtils.isLogFile(prunedPartitionAndFileNames._2.head))
Expand Down

0 comments on commit 57c6a98

Please sign in to comment.