From 02f7ca2b0518b0a61d539e9f0781b70835aea287 Mon Sep 17 00:00:00 2001 From: Raymond Xu <2701446+xushiyan@users.noreply.github.com> Date: Mon, 22 Nov 2021 02:16:45 -0800 Subject: [PATCH] [HUDI-1870] Add more Spark CI build tasks (#4022) * [HUDI-1870] Add more Spark CI build tasks - build for spark3.0.x - build for spark-shade-unbundle-avro - fix build failures - delete unnecessary assertion for spark 3.0.x - use AvroConversionUtils#convertAvroSchemaToStructType instead of calling SchemaConverters#toSqlType directly to solve the compilation failures with spark-shade-unbundle-avro (#5) Co-authored-by: Yann --- .github/workflows/bot.yml | 8 ++++++++ .../main/scala/org/apache/hudi/HoodieSparkUtils.scala | 8 +++++--- .../hudi/integ/testsuite/utils/SparkSqlUtils.scala | 11 ++++++----- .../main/scala/org/apache/hudi/DefaultSource.scala | 7 ++++--- .../main/scala/org/apache/hudi/HoodieFileIndex.scala | 7 ++++--- .../org/apache/spark/sql/hudi/HoodieSqlUtils.scala | 10 +++++----- .../sql/hudi/command/payload/ExpressionPayload.scala | 9 +++++++-- .../sql/hudi/command/payload/SqlTypedRecord.scala | 8 +++++--- .../spark/sql/hudi/streaming/HoodieStreamSource.scala | 8 ++++---- .../apache/hudi/spark3/internal/TestReflectUtil.java | 4 ---- 10 files changed, 48 insertions(+), 32 deletions(-) diff --git a/.github/workflows/bot.yml b/.github/workflows/bot.yml index 0652de980e16..f1f178e1716b 100644 --- a/.github/workflows/bot.yml +++ b/.github/workflows/bot.yml @@ -18,8 +18,16 @@ jobs: include: - scala: "scala-2.11" spark: "spark2" + - scala: "scala-2.11" + spark: "spark2,spark-shade-unbundle-avro" + - scala: "scala-2.12" + spark: "spark3,spark3.0.x" + - scala: "scala-2.12" + spark: "spark3,spark3.0.x,spark-shade-unbundle-avro" - scala: "scala-2.12" spark: "spark3" + - scala: "scala-2.12" + spark: "spark3,spark-shade-unbundle-avro" steps: - uses: actions/checkout@v2 - name: Set up JDK 8 diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala index ce3984327581..f0217f800471 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala @@ -22,7 +22,9 @@ import java.util.Properties import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord + import org.apache.hadoop.fs.{FileSystem, Path} + import org.apache.hudi.client.utils.SparkRowSerDe import org.apache.hudi.common.config.TypedProperties import org.apache.hudi.common.model.HoodieRecord @@ -30,9 +32,9 @@ import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.keygen.constant.KeyGeneratorOptions import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory import org.apache.hudi.keygen.{BaseKeyGenerator, CustomAvroKeyGenerator, CustomKeyGenerator, KeyGenerator} + import org.apache.spark.SPARK_VERSION import org.apache.spark.rdd.RDD -import org.apache.spark.sql.avro.SchemaConverters import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, Literal} import org.apache.spark.sql.execution.datasources.{FileStatusCache, InMemoryFileIndex} @@ -137,13 +139,13 @@ object HoodieSparkUtils extends SparkAdapterSupport { def createRddInternal(df: DataFrame, writeSchema: Schema, latestTableSchema: Schema, structName: String, recordNamespace: String) : RDD[GenericRecord] = { // Use the write avro schema to derive the StructType which has the correct nullability information - val writeDataType = SchemaConverters.toSqlType(writeSchema).dataType.asInstanceOf[StructType] + val writeDataType = AvroConversionUtils.convertAvroSchemaToStructType(writeSchema) val encoder = RowEncoder.apply(writeDataType).resolveAndBind() val deserializer = sparkAdapter.createSparkRowSerDe(encoder) // if records were serialized with old schema, but an evolved schema was passed in with latestTableSchema, we need // latestTableSchema equivalent datatype to be passed in to AvroConversionHelper.createConverterToAvro() val reconciledDataType = - if (latestTableSchema != null) SchemaConverters.toSqlType(latestTableSchema).dataType.asInstanceOf[StructType] else writeDataType + if (latestTableSchema != null) AvroConversionUtils.convertAvroSchemaToStructType(latestTableSchema) else writeDataType // Note: deserializer.deserializeRow(row) is not capable of handling evolved schema. i.e. if Row was serialized in // old schema, but deserializer was created with an encoder with evolved schema, deserialization fails. // Hence we always need to deserialize in the same schema as serialized schema. diff --git a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/utils/SparkSqlUtils.scala b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/utils/SparkSqlUtils.scala index fa16eae06b17..ca7ca6f26a4e 100644 --- a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/utils/SparkSqlUtils.scala +++ b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/utils/SparkSqlUtils.scala @@ -21,18 +21,19 @@ package org.apache.hudi.integ.testsuite.utils import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord -import org.apache.hudi.HoodieSparkUtils + +import org.apache.hudi.{AvroConversionUtils, HoodieSparkUtils} import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.util.Option import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config import org.apache.hudi.integ.testsuite.generator.GenericRecordFullPayloadGenerator -import org.apache.hudi.integ.testsuite.utils.SparkSqlUtils.getFieldNamesAndTypes import org.apache.hudi.utilities.schema.RowBasedSchemaProvider + import org.apache.spark.api.java.JavaRDD -import org.apache.spark.sql.avro.SchemaConverters import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.SparkSession import org.apache.spark.storage.StorageLevel + import org.slf4j.Logger import scala.math.BigDecimal.RoundingMode.RoundingMode @@ -139,7 +140,7 @@ object SparkSqlUtils { */ def getFieldNamesAndTypes(avroSchemaString: String): Array[(String, String)] = { val schema = new Schema.Parser().parse(avroSchemaString) - val structType = SchemaConverters.toSqlType(schema).dataType.asInstanceOf[StructType] + val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema) structType.fields.map(field => (field.name, field.dataType.simpleString)) } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala index a9d85af2ee47..9b437f5451b1 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -18,6 +18,7 @@ package org.apache.hudi import org.apache.hadoop.fs.Path + import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord} import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, OPERATION} @@ -26,8 +27,9 @@ import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_REA import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.exception.HoodieException import org.apache.hudi.hadoop.HoodieROTablePathFilter + import org.apache.log4j.LogManager -import org.apache.spark.sql.avro.SchemaConverters + import org.apache.spark.sql.execution.datasources.{DataSource, FileStatusCache, HadoopFsRelation} import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat @@ -217,8 +219,7 @@ class DefaultSource extends RelationProvider // the table schema evolution. val tableSchemaResolver = new TableSchemaResolver(metaClient) try { - Some(SchemaConverters.toSqlType(tableSchemaResolver.getTableAvroSchema) - .dataType.asInstanceOf[StructType]) + Some(AvroConversionUtils.convertAvroSchemaToStructType(tableSchemaResolver.getTableAvroSchema)) } catch { case _: Throwable => None // If there is no commit in the table, we can not get the schema diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index c4e9bafb3fc6..bc6867453686 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -18,6 +18,7 @@ package org.apache.hudi import org.apache.hadoop.fs.{FileStatus, Path} + import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_SNAPSHOT_OPT_VAL} import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.config.HoodieMetadataConfig @@ -26,10 +27,10 @@ import org.apache.hudi.common.model.FileSlice import org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ import org.apache.hudi.common.table.view.{FileSystemViewStorageConfig, HoodieTableFileSystemView} import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} + import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.internal.Logging import org.apache.spark.sql.{Column, SparkSession} -import org.apache.spark.sql.avro.SchemaConverters import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Expression, InterpretedPredicate} import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} import org.apache.spark.sql.catalyst.{InternalRow, expressions} @@ -38,6 +39,7 @@ import org.apache.spark.sql.hudi.{DataSkippingUtils, HoodieSqlUtils} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType import org.apache.spark.unsafe.types.UTF8String + import java.util.Properties import scala.collection.JavaConverters._ @@ -96,8 +98,7 @@ case class HoodieFileIndex( */ lazy val schema: StructType = schemaSpec.getOrElse({ val schemaUtil = new TableSchemaResolver(metaClient) - SchemaConverters.toSqlType(schemaUtil.getTableAvroSchema) - .dataType.asInstanceOf[StructType] + AvroConversionUtils.convertAvroSchemaToStructType(schemaUtil.getTableAvroSchema) }) /** diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala index 50c9e539ccb2..c8ac6e318c8d 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala @@ -23,7 +23,8 @@ import java.util.{Date, Locale, Properties} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.hudi.SparkAdapterSupport + +import org.apache.hudi.{AvroConversionUtils, SparkAdapterSupport} import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.config.DFSPropertiesConfiguration import org.apache.hudi.common.config.HoodieMetadataConfig @@ -31,9 +32,8 @@ import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.table.timeline.HoodieActiveTimeline + import org.apache.spark.SPARK_VERSION -import org.apache.spark.api.java.JavaSparkContext -import org.apache.spark.sql.avro.SchemaConverters import org.apache.spark.sql.{Column, DataFrame, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation @@ -46,6 +46,7 @@ import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql.types.{DataType, NullType, StringType, StructField, StructType} import java.text.SimpleDateFormat + import scala.collection.immutable.Map object HoodieSqlUtils extends SparkAdapterSupport { @@ -83,8 +84,7 @@ object HoodieSqlUtils extends SparkAdapterSupport { catch { case _: Throwable => None } - avroSchema.map(SchemaConverters.toSqlType(_).dataType - .asInstanceOf[StructType]).map(removeMetaFields) + avroSchema.map(AvroConversionUtils.convertAvroSchemaToStructType).map(removeMetaFields) } def getAllPartitionPaths(spark: SparkSession, table: CatalogTable): Seq[String] = { diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala index b025cf3efa44..e660fe870b80 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala @@ -19,10 +19,13 @@ package org.apache.spark.sql.hudi.command.payload import java.util.{Base64, Properties} import java.util.concurrent.Callable -import scala.collection.JavaConverters._ + import com.google.common.cache.CacheBuilder + import org.apache.avro.Schema import org.apache.avro.generic.{GenericData, GenericRecord, IndexedRecord} + +import org.apache.hudi.AvroConversionUtils import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro @@ -31,12 +34,14 @@ import org.apache.hudi.common.util.{ValidationUtils, Option => HOption} import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.io.HoodieWriteHandle import org.apache.hudi.sql.IExpressionEvaluator + import org.apache.spark.sql.avro.{AvroSerializer, HoodieAvroSerializer, SchemaConverters} import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.hudi.SerDeUtils import org.apache.spark.sql.hudi.command.payload.ExpressionPayload.getEvaluator import org.apache.spark.sql.types.{StructField, StructType} +import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer /** @@ -309,7 +314,7 @@ object ExpressionPayload { SchemaConverters.toAvroType(conditionType), false) val conditionEvaluator = ExpressionCodeGen.doCodeGen(Seq(condition), conditionSerializer) - val assignSqlType = SchemaConverters.toSqlType(writeSchema).dataType.asInstanceOf[StructType] + val assignSqlType = AvroConversionUtils.convertAvroSchemaToStructType(writeSchema) val assignSerializer = new HoodieAvroSerializer(assignSqlType, writeSchema, false) val assignmentEvaluator = ExpressionCodeGen.doCodeGen(assignments, assignSerializer) conditionEvaluator -> assignmentEvaluator diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/SqlTypedRecord.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/SqlTypedRecord.scala index 3fb48f430221..76f5caf2203d 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/SqlTypedRecord.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/SqlTypedRecord.scala @@ -19,16 +19,18 @@ package org.apache.spark.sql.hudi.command.payload import org.apache.avro.generic.IndexedRecord import org.apache.avro.Schema -import org.apache.spark.sql.avro.{HooodieAvroDeserializer, SchemaConverters} + +import org.apache.hudi.AvroConversionUtils + +import org.apache.spark.sql.avro.HooodieAvroDeserializer import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.types._ /** * A sql typed record which will convert the avro field to sql typed value. */ class SqlTypedRecord(val record: IndexedRecord) extends IndexedRecord { - private lazy val sqlType = SchemaConverters.toSqlType(getSchema).dataType.asInstanceOf[StructType] + private lazy val sqlType = AvroConversionUtils.convertAvroSchemaToStructType(getSchema) private lazy val avroDeserializer = HooodieAvroDeserializer(record.getSchema, sqlType) private lazy val sqlRow = avroDeserializer.deserializeData(record).asInstanceOf[InternalRow] diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala index a60a63b7a7d7..ccb3191e187e 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala @@ -22,16 +22,17 @@ import java.nio.charset.StandardCharsets import java.util.Date import org.apache.hadoop.fs.Path -import org.apache.hudi.{DataSourceReadOptions, IncrementalRelation, MergeOnReadIncrementalRelation, SparkAdapterSupport} + +import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, IncrementalRelation, MergeOnReadIncrementalRelation, SparkAdapterSupport} import org.apache.hudi.common.model.HoodieTableType import org.apache.hudi.common.table.timeline.HoodieActiveTimeline import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.util.{FileIOUtils, TablePathUtils} + import org.apache.spark.sql.hudi.streaming.HoodieStreamSource.VERSION import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.INIT_OFFSET import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD -import org.apache.spark.sql.avro.SchemaConverters import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, Offset, Source} @@ -118,8 +119,7 @@ class HoodieStreamSource( override def schema: StructType = { schemaOption.getOrElse { val schemaUtil = new TableSchemaResolver(metaClient) - SchemaConverters.toSqlType(schemaUtil.getTableAvroSchema) - .dataType.asInstanceOf[StructType] + AvroConversionUtils.convertAvroSchemaToStructType(schemaUtil.getTableAvroSchema) } } diff --git a/hudi-spark-datasource/hudi-spark3/src/test/java/org/apache/hudi/spark3/internal/TestReflectUtil.java b/hudi-spark-datasource/hudi-spark3/src/test/java/org/apache/hudi/spark3/internal/TestReflectUtil.java index 284b2aaf1f81..f8b18289433a 100644 --- a/hudi-spark-datasource/hudi-spark3/src/test/java/org/apache/hudi/spark3/internal/TestReflectUtil.java +++ b/hudi-spark-datasource/hudi-spark3/src/test/java/org/apache/hudi/spark3/internal/TestReflectUtil.java @@ -50,9 +50,5 @@ public void testDataSourceWriterExtraCommitMetadata() throws Exception { Assertions.assertTrue( ((UnresolvedRelation)newStatment.table()).multipartIdentifier().contains("test_reflect_util")); - - if (!spark.version().startsWith("3.0")) { - Assertions.assertTrue(newStatment.userSpecifiedCols().isEmpty()); - } } }