Skip to content

Commit

Permalink
[HUDI-8609] always read partition column from file if it is the preco…
Browse files Browse the repository at this point in the history
…mbine (apache#12380)

* always read partition column from file if it is the precombine

* make code more readable and test all the branches added

* fix tests to have updates to test mor

---------

Co-authored-by: Jonathan Vexler <=>
  • Loading branch information
jonvex authored Dec 1, 2024
1 parent 57c6a98 commit 8576f10
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ import org.apache.hudi.storage.StoragePath
import org.apache.avro.Schema
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.sql.{SparkSession, SQLContext}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.execution.datasources._
Expand All @@ -65,7 +66,7 @@ abstract class HoodieBaseHadoopFsRelationFactory(val sqlContext: SQLContext,
val options: Map[String, String],
val schemaSpec: Option[StructType],
val isBootstrap: Boolean
) extends SparkAdapterSupport with HoodieHadoopFsRelationFactory {
) extends SparkAdapterSupport with HoodieHadoopFsRelationFactory with Logging {
protected lazy val sparkSession: SparkSession = sqlContext.sparkSession
protected lazy val optParams: Map[String, String] = options
protected lazy val hadoopConfig: Configuration = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
Expand All @@ -79,6 +80,9 @@ abstract class HoodieBaseHadoopFsRelationFactory(val sqlContext: SQLContext,
protected lazy val basePath: StoragePath = metaClient.getBasePath
protected lazy val partitionColumns: Array[String] = tableConfig.getPartitionFields.orElse(Array.empty)

// very much not recommended to use a partition column as the precombine
private lazy val partitionColumnsHasPrecombine = preCombineFieldOpt.isDefined && partitionColumns.contains(preCombineFieldOpt.get)

private lazy val keygenTypeHasVariablePartitionCols = isTimestampKeygen || isCustomKeygen

private lazy val isTimestampKeygen = !isNullOrEmpty(tableConfig.getKeyGeneratorClassName) &&
Expand All @@ -89,12 +93,9 @@ abstract class HoodieBaseHadoopFsRelationFactory(val sqlContext: SQLContext,
(tableConfig.getKeyGeneratorClassName.equals(classOf[CustomKeyGenerator].getName) ||
tableConfig.getKeyGeneratorClassName.equals(classOf[CustomAvroKeyGenerator].getName))

protected lazy val partitionColumnsToRead: Seq[String] = if (shouldExtractPartitionValuesFromPartitionPath || !keygenTypeHasVariablePartitionCols) {
Seq.empty
} else if (isTimestampKeygen) {
private lazy val variableTimestampKeygenPartitionCols = if (isTimestampKeygen) {
tableConfig.getPartitionFields.orElse(Array.empty).toSeq
} else {
//it's custom keygen
} else if (isCustomKeygen) {
val timestampFieldsOpt = CustomAvroKeyGenerator.getTimestampFields(tableConfig)
if (timestampFieldsOpt.isPresent) {
timestampFieldsOpt.get().asScala.toSeq
Expand All @@ -103,6 +104,33 @@ abstract class HoodieBaseHadoopFsRelationFactory(val sqlContext: SQLContext,
// For older tables the partition type may not be available so falling back to partition fields in those cases
tableConfig.getPartitionFields.orElse(Array.empty).toSeq
}
} else {
Seq.empty
}

protected lazy val partitionColumnsToRead: Seq[String] = {
if (shouldExtractPartitionValuesFromPartitionPath) {
Seq.empty
} else if (partitionColumnsHasPrecombine) {
logWarning(s"Not recommended for field '${preCombineFieldOpt.get}' to be both precombine and partition")
if (keygenTypeHasVariablePartitionCols) {
// still need to read any timestamp/custom keygen timestamp columns
if (variableTimestampKeygenPartitionCols.contains(preCombineFieldOpt.get)) {
// precombine is already included in the list
variableTimestampKeygenPartitionCols
} else {
// precombine is not included in the list so we append it
variableTimestampKeygenPartitionCols :+ preCombineFieldOpt.get
}
} else {
// not timestamp/custom keygen so just need to read precombine
Seq(preCombineFieldOpt.get)
}
} else if (keygenTypeHasVariablePartitionCols) {
variableTimestampKeygenPartitionCols
} else {
Seq.empty
}
}

protected lazy val (tableAvroSchema: Schema, internalSchemaOpt: Option[InternalSchema]) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1555,4 +1555,108 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
)("Failed to create catalog table in metastore")
}
}

test("Test Create Table with Same Value for Partition and Precombine") {
withSQLConf("hoodie.parquet.small.file.limit" -> "0") {
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
// simple partition path
val tableName = generateTableName
val basePath = s"${tmp.getCanonicalPath}/$tableName"
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| options (
| primaryKey ='id',
| type = '$tableType',
| preCombineField = 'ts'
| )
| partitioned by(ts)
| location '$basePath'
""".stripMargin)
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
spark.sql(s"insert into $tableName values(1, 'a2', 10, 1000)")
checkAnswer(s"select id, name from $tableName")(
Seq(1, "a2")
)
checkAnswer(s"select id, name, price, ts from $tableName")(
Seq(1, "a2", 10.0, 1000)
)

// complex keygen, variable timestamp partition is precombine
val tableName2 = generateTableName
val basePath2 = s"${tmp.getCanonicalPath}/$tableName2"
spark.sql(
s"""
|create table $tableName2 (
| id int,
| name string,
| price double,
| segment string,
| ts long
|) using hudi
| options (
| primaryKey ='id',
| type = '$tableType',
| preCombineField = 'ts',
| 'hoodie.datasource.write.partitionpath.field' = 'segment:simple,ts:timestamp',
| 'hoodie.datasource.write.keygenerator.class' = 'org.apache.hudi.keygen.CustomKeyGenerator',
| 'hoodie.keygen.timebased.timestamp.type' = 'SCALAR',
| 'hoodie.keygen.timebased.output.dateformat' = 'YYYY',
| 'hoodie.keygen.timebased.timestamp.scalar.time.unit' = 'seconds'
| )
| partitioned by(segment,ts)
| location '$basePath2'
""".stripMargin)
spark.sql(s"insert into $tableName2 values(1, 'a1', 10, 'seg1', 1000)")
spark.sql(s"insert into $tableName2 values(1, 'a2', 10, 'seg1', 1000)")
checkAnswer(s"select id, name from $tableName2")(
Seq(1, "a2")
)
checkAnswer(s"select id, name, price, segment, ts from $tableName2")(
Seq(1, "a2", 10.0, "seg1", 1000)
)

// complex keygen, simple partition is precombine
val tableName3 = generateTableName
val basePath3 = s"${tmp.getCanonicalPath}/$tableName3"
spark.sql(
s"""
|create table $tableName3 (
| id int,
| name string,
| price double,
| segment string,
| ts long
|) using hudi
| options (
| primaryKey ='id',
| type = '$tableType',
| preCombineField = 'segment',
| 'hoodie.datasource.write.partitionpath.field' = 'segment:simple,ts:timestamp',
| 'hoodie.datasource.write.keygenerator.class' = 'org.apache.hudi.keygen.CustomKeyGenerator',
| 'hoodie.keygen.timebased.timestamp.type' = 'SCALAR',
| 'hoodie.keygen.timebased.output.dateformat' = 'YYYY',
| 'hoodie.keygen.timebased.timestamp.scalar.time.unit' = 'seconds'
| )
| partitioned by(segment,ts)
| location '$basePath3'
""".stripMargin)
spark.sql(s"insert into $tableName3 values(1, 'a1', 10, 'seg1', 1000)")
spark.sql(s"insert into $tableName3 values(1, 'a2', 10, 'seg1', 1000)")
checkAnswer(s"select id, name from $tableName3")(
Seq(1, "a2")
)
checkAnswer(s"select id, name, price, segment, ts from $tableName3")(
Seq(1, "a2", 10.0, "seg1", 1000)
)
}
}
}
}
}

0 comments on commit 8576f10

Please sign in to comment.