Skip to content

Commit

Permalink
[HUDI-6941] Fix partition pruning for multiple partition fields (apac…
Browse files Browse the repository at this point in the history
  • Loading branch information
codope authored Oct 14, 2023
1 parent e6be396 commit 34852b7
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ case class HoodieFileIndex(spark: SparkSession,
spark = spark,
metaClient = metaClient,
schemaSpec = schemaSpec,
configProperties = getConfigProperties(spark, options, metaClient),
configProperties = getConfigProperties(spark, options),
queryPaths = HoodieFileIndex.getQueryPaths(options),
specifiedQueryInstant = options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key).map(HoodieSqlCommonUtils.formatQueryInstant),
fileStatusCache = fileStatusCache
Expand Down Expand Up @@ -445,7 +445,7 @@ object HoodieFileIndex extends Logging {
schema.fieldNames.filter { colName => refs.exists(r => resolver.apply(colName, r.name)) }
}

def getConfigProperties(spark: SparkSession, options: Map[String, String], metaClient: HoodieTableMetaClient) = {
def getConfigProperties(spark: SparkSession, options: Map[String, String]) = {
val sqlConf: SQLConf = spark.sessionState.conf
val properties = TypedProperties.fromMap(options.filter(p => p._2 != null).asJava)

Expand All @@ -463,16 +463,6 @@ object HoodieFileIndex extends Logging {
if (listingModeOverride != null) {
properties.setProperty(DataSourceReadOptions.FILE_INDEX_LISTING_MODE_OVERRIDE.key, listingModeOverride)
}
val partitionColumns = metaClient.getTableConfig.getPartitionFields
if (partitionColumns.isPresent) {
// NOTE: Multiple partition fields could have non-encoded slashes in the partition value.
// We might not be able to properly parse partition-values from the listed partition-paths.
// Fallback to eager listing in this case.
if (partitionColumns.get().length > 1
&& (listingModeOverride == null || DataSourceReadOptions.FILE_INDEX_LISTING_MODE_LAZY.equals(listingModeOverride))) {
properties.setProperty(DataSourceReadOptions.FILE_INDEX_LISTING_MODE_OVERRIDE.key, DataSourceReadOptions.FILE_INDEX_LISTING_MODE_EAGER)
}
}

properties
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
import org.apache.hudi.internal.schema.Types.RecordType
import org.apache.hudi.internal.schema.utils.Conversions
import org.apache.hudi.keygen.{CustomAvroKeyGenerator, CustomKeyGenerator, StringPartitionPathFormatter, TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
import org.apache.hudi.keygen.{StringPartitionPathFormatter, TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
import org.apache.hudi.util.JFunction
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -112,9 +112,7 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
// Note that key generator class name could be null
val keyGeneratorClassName = tableConfig.getKeyGeneratorClassName
if (classOf[TimestampBasedKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)
|| classOf[TimestampBasedAvroKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)
|| classOf[CustomKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)
|| classOf[CustomAvroKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)) {
|| classOf[TimestampBasedAvroKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)) {
val partitionFields = partitionColumns.get().map(column => StructField(column, StringType))
StructType(partitionFields)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class HoodieCDCRDD(

private val cdcSupplementalLoggingMode = metaClient.getTableConfig.cdcSupplementalLoggingMode

private val props = HoodieFileIndex.getConfigProperties(spark, Map.empty, metaClient)
private val props = HoodieFileIndex.getConfigProperties(spark, Map.empty)

protected val payloadProps: Properties = Option(metaClient.getTableConfig.getPreCombineField)
.map { preCombineField =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, HoodieTestUtil
import org.apache.hudi.common.util.PartitionPathEncodeUtils
import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.TimestampType
import org.apache.hudi.metadata.HoodieTableMetadata
import org.apache.hudi.testutils.HoodieSparkClientTestBase
Expand Down Expand Up @@ -325,21 +326,29 @@ class TestHoodieFileIndex extends HoodieSparkClientTestBase with ScalaAssertionS
EqualTo(attribute("dt"), literal("2021/03/01")),
EqualTo(attribute("hh"), literal("10"))
)
val partitionAndFilesNoPruning = fileIndex.listFiles(Seq(partitionFilter2), Seq.empty)
// NOTE: That if file-index is in lazy-listing mode and we can't parse partition values, there's no way
// to recover from this since Spark by default have to inject partition values parsed from the partition paths.
if (listingModeOverride == DataSourceReadOptions.FILE_INDEX_LISTING_MODE_LAZY) {
assertThrows(classOf[HoodieException]) {
fileIndex.listFiles(Seq(partitionFilter2), Seq.empty)
}
} else {
val partitionAndFilesNoPruning = fileIndex.listFiles(Seq(partitionFilter2), Seq.empty)

assertEquals(1, partitionAndFilesNoPruning.size)
// The partition prune would not work for this case, so the partition value it
// returns is a InternalRow.empty.
assertTrue(partitionAndFilesNoPruning.forall(_.values.numFields == 0))
// The returned file size should equal to the whole file size in all the partition paths.
assertEquals(getFileCountInPartitionPaths("2021/03/01/10", "2021/03/02/10"),
partitionAndFilesNoPruning.flatMap(_.files).length)
assertEquals(1, partitionAndFilesNoPruning.size)
// The partition prune would not work for this case, so the partition value it
// returns is a InternalRow.empty.
assertTrue(partitionAndFilesNoPruning.forall(_.values.numFields == 0))
// The returned file size should equal to the whole file size in all the partition paths.
assertEquals(getFileCountInPartitionPaths("2021/03/01/10", "2021/03/02/10"),
partitionAndFilesNoPruning.flatMap(_.files).length)

val readDF = spark.read.format("hudi").options(readerOpts).load()
val readDF = spark.read.format("hudi").options(readerOpts).load()

assertEquals(10, readDF.count())
// There are 5 rows in the dt = 2021/03/01 and hh = 10
assertEquals(5, readDF.filter("dt = '2021/03/01' and hh ='10'").count())
assertEquals(10, readDF.count())
// There are 5 rows in the dt = 2021/03/01 and hh = 10
assertEquals(5, readDF.filter("dt = '2021/03/01' and hh ='10'").count())
}
}

{
Expand Down Expand Up @@ -422,7 +431,7 @@ class TestHoodieFileIndex extends HoodieSparkClientTestBase with ScalaAssertionS
val partitionAndFilesAfterPrune = fileIndex.listFiles(Seq(partitionFilters), Seq.empty)
assertEquals(1, partitionAndFilesAfterPrune.size)

assertTrue(fileIndex.areAllPartitionPathsCached())
assertEquals(fileIndex.areAllPartitionPathsCached(), !complexExpressionPushDown)

val PartitionDirectory(partitionActualValues, filesAfterPrune) = partitionAndFilesAfterPrune.head
val partitionExpectValues = Seq("default", "2021-03-01", "5", "CN")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{CsvSource, EnumSource, ValueSource}

Expand Down Expand Up @@ -1006,6 +1006,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup
}
}

@Disabled("HUDI-6320")
@ParameterizedTest
@EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK"))
def testSparkPartitionByWithCustomKeyGenerator(recordType: HoodieRecordType): Unit = {
Expand Down

0 comments on commit 34852b7

Please sign in to comment.