diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java index 38f0acd261ae6..0dcd744e5d0be 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java @@ -20,6 +20,7 @@ import org.apache.avro.Schema; import org.apache.hadoop.fs.Path; +import org.apache.hudi.AvroConversionUtils; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieFileStatus; import org.apache.hudi.common.bootstrap.FileStatusUtils; @@ -29,7 +30,6 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.parquet.schema.MessageType; -import org.apache.spark.sql.avro.SchemaConverters; import org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter; import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.types.StructType; @@ -63,6 +63,6 @@ protected Schema getBootstrapSourceSchema(HoodieEngineContext context, List - val schema: Schema = SchemaConverters.toAvroType(structType, nullable = false, structName, recordNamespace) + val schema: Schema = convertStructTypeToAvroSchema(structType, structName, recordNamespace) val childNameSpace = if (recordNamespace != "") s"$recordNamespace.$structName" else structName val fieldConverters = structType.fields.map(field => createConverterToAvro( diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala index 88101265de297..5b87fee14a1e2 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala @@ -19,6 +19,7 @@ package org.apache.hudi import org.apache.avro.Schema +import org.apache.avro.JsonProperties import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord} import org.apache.hudi.avro.HoodieAvroUtils import org.apache.spark.rdd.RDD @@ -27,6 +28,7 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{Dataset, Row, SparkSession} import scala.collection.JavaConverters._ +import scala.collection.JavaConversions._ object AvroConversionUtils { @@ -46,10 +48,67 @@ object AvroConversionUtils { } } + /** + * + * Returns avro schema from spark StructType. + * + * @param structType Dataframe Struct Type. + * @param structName Avro record name. + * @param recordNamespace Avro record namespace. + * @return Avro schema corresponding to given struct type. + */ def convertStructTypeToAvroSchema(structType: StructType, structName: String, recordNamespace: String): Schema = { - SchemaConverters.toAvroType(structType, nullable = false, structName, recordNamespace) + getAvroSchemaWithDefaults(SchemaConverters.toAvroType(structType, nullable = false, structName, recordNamespace)) + } + + /** + * + * Method to add default value of null to nullable fields in given avro schema + * + * @param schema input avro schema + * @return Avro schema with null default set to nullable fields + */ + def getAvroSchemaWithDefaults(schema: Schema): Schema = { + + schema.getType match { + case Schema.Type.RECORD => { + + val modifiedFields = schema.getFields.map(field => { + val newSchema = getAvroSchemaWithDefaults(field.schema()) + field.schema().getType match { + case Schema.Type.UNION => { + val innerFields = newSchema.getTypes + val containsNullSchema = innerFields.foldLeft(false)((nullFieldEncountered, schema) => nullFieldEncountered | schema.getType == Schema.Type.NULL) + if(containsNullSchema) { + // Need to re shuffle the fields in list because to set null as default, null schema must be head in union schema + val restructuredNewSchema = Schema.createUnion(List(Schema.create(Schema.Type.NULL)) ++ innerFields.filter(innerSchema => !(innerSchema.getType == Schema.Type.NULL))) + new Schema.Field(field.name(), restructuredNewSchema, field.doc(), JsonProperties.NULL_VALUE) + } else { + new Schema.Field(field.name(), newSchema, field.doc(), field.defaultVal()) + } + } + case _ => new Schema.Field(field.name(), newSchema, field.doc(), field.defaultVal()) + } + }).toList + Schema.createRecord(schema.getName, schema.getDoc, schema.getNamespace, schema.isError, modifiedFields) + } + + case Schema.Type.UNION => { + Schema.createUnion(schema.getTypes.map(innerSchema => getAvroSchemaWithDefaults(innerSchema))) + } + + case Schema.Type.MAP => { + Schema.createMap(getAvroSchemaWithDefaults(schema.getValueType)) + } + + case Schema.Type.ARRAY => { + Schema.createArray(getAvroSchemaWithDefaults(schema.getElementType)) + } + + case _ => schema + } } def convertAvroSchemaToStructType(avroSchema: Schema): StructType = { diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java index b0bb509962333..ebf23242a5279 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java @@ -45,6 +45,10 @@ public static Schema getStructTypeExampleSchema() throws IOException { return new Schema.Parser().parse(FileIOUtils.readAsUTFString(DataSourceTestUtils.class.getResourceAsStream("/exampleSchema.txt"))); } + public static Schema getStructTypeExampleEvolvedSchema() throws IOException { + return new Schema.Parser().parse(FileIOUtils.readAsUTFString(DataSourceTestUtils.class.getResourceAsStream("/exampleEvolvedSchema.txt"))); + } + public static List generateRandomRows(int count) { Random random = new Random(); List toReturn = new ArrayList<>(); @@ -58,4 +62,31 @@ public static List generateRandomRows(int count) { } return toReturn; } + + public static List generateUpdates(List records, int count) { + List toReturn = new ArrayList<>(); + for (int i = 0; i < count; i++) { + Object[] values = new Object[3]; + values[0] = records.get(i).getString(0); + values[1] = records.get(i).getAs(1); + values[2] = new Date().getTime(); + toReturn.add(RowFactory.create(values)); + } + return toReturn; + } + + public static List generateRandomRowsEvolvedSchema(int count) { + Random random = new Random(); + List toReturn = new ArrayList<>(); + List partitions = Arrays.asList(new String[] {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH}); + for (int i = 0; i < count; i++) { + Object[] values = new Object[4]; + values[0] = UUID.randomUUID().toString(); + values[1] = partitions.get(random.nextInt(3)); + values[2] = new Date().getTime(); + values[3] = UUID.randomUUID().toString(); + toReturn.add(RowFactory.create(values)); + } + return toReturn; + } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/exampleEvolvedSchema.txt b/hudi-spark-datasource/hudi-spark/src/test/resources/exampleEvolvedSchema.txt new file mode 100644 index 0000000000000..5fcddac6a84ff --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/resources/exampleEvolvedSchema.txt @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + { + "namespace": "example.schema", + "type": "record", + "name": "trip", + "fields": [ + { + "name": "_row_key", + "type": "string" + }, + { + "name": "partition", + "type": "string" + }, + { + "name": "ts", + "type": ["long", "null"] + }, + { + "name": "new_field", + "type": ["string","null"] + } + ] + } \ No newline at end of file diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionUtils.scala new file mode 100644 index 0000000000000..50137c9a580a3 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionUtils.scala @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.avro.Schema +import org.apache.spark.sql.types.{DataTypes, StructType, StringType, ArrayType} +import org.scalatest.{FunSuite, Matchers} + +class TestAvroConversionUtils extends FunSuite with Matchers { + + + test("test convertStructTypeToAvroSchema") { + val mapType = DataTypes.createMapType(StringType, new StructType().add("mapKey", "string", false).add("mapVal", "integer", true)) + val arrayType = ArrayType(new StructType().add("arrayKey", "string", false).add("arrayVal", "integer", true)) + val innerStruct = new StructType().add("innerKey","string",false).add("value", "long", true) + + val struct = new StructType().add("key", "string", false).add("version", "string", true) + .add("data1",innerStruct,false).add("data2",innerStruct,true) + .add("nullableMap", mapType, true).add("map",mapType,false) + .add("nullableArray", arrayType, true).add("array",arrayType,false) + + val avroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(struct, "SchemaName", "SchemaNS") + + val expectedSchemaStr = s""" + { + "type" : "record", + "name" : "SchemaName", + "namespace" : "SchemaNS", + "fields" : [ { + "name" : "key", + "type" : "string" + }, { + "name" : "version", + "type" : [ "null", "string" ], + "default" : null + }, { + "name" : "data1", + "type" : { + "type" : "record", + "name" : "data1", + "namespace" : "SchemaNS.SchemaName", + "fields" : [ { + "name" : "innerKey", + "type" : "string" + }, { + "name" : "value", + "type" : [ "null", "long" ], + "default" : null + } ] + } + }, { + "name" : "data2", + "type" : [ "null", { + "type" : "record", + "name" : "data2", + "namespace" : "SchemaNS.SchemaName", + "fields" : [ { + "name" : "innerKey", + "type" : "string" + }, { + "name" : "value", + "type" : [ "null", "long" ], + "default" : null + } ] + } ], + "default" : null + }, { + "name" : "nullableMap", + "type" : [ "null", { + "type" : "map", + "values" : [ { + "type" : "record", + "name" : "nullableMap", + "namespace" : "SchemaNS.SchemaName", + "fields" : [ { + "name" : "mapKey", + "type" : "string" + }, { + "name" : "mapVal", + "type" : [ "null", "int" ], + "default" : null + } ] + }, "null" ] + } ], + "default" : null + }, { + "name" : "map", + "type" : { + "type" : "map", + "values" : [ { + "type" : "record", + "name" : "map", + "namespace" : "SchemaNS.SchemaName", + "fields" : [ { + "name" : "mapKey", + "type" : "string" + }, { + "name" : "mapVal", + "type" : [ "null", "int" ], + "default" : null + } ] + }, "null" ] + } + }, { + "name" : "nullableArray", + "type" : [ "null", { + "type" : "array", + "items" : [ { + "type" : "record", + "name" : "nullableArray", + "namespace" : "SchemaNS.SchemaName", + "fields" : [ { + "name" : "arrayKey", + "type" : "string" + }, { + "name" : "arrayVal", + "type" : [ "null", "int" ], + "default" : null + } ] + }, "null" ] + } ], + "default" : null + }, { + "name" : "array", + "type" : { + "type" : "array", + "items" : [ { + "type" : "record", + "name" : "array", + "namespace" : "SchemaNS.SchemaName", + "fields" : [ { + "name" : "arrayKey", + "type" : "string" + }, { + "name" : "arrayVal", + "type" : [ "null", "int" ], + "default" : null + } ] + }, "null" ] + } + } ] + } + """ + val expectedAvroSchema = new Schema.Parser().parse(expectedSchemaStr) + + assert(avroSchema.equals(expectedAvroSchema)) + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala index bbaeea1086e25..e9f375ee25efe 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala @@ -39,6 +39,7 @@ import org.mockito.Mockito.{spy, times, verify} import org.scalatest.{FunSuite, Matchers} import scala.collection.JavaConversions._ +import org.junit.jupiter.api.Assertions.assertEquals class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { @@ -300,13 +301,14 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { // generate the inserts val schema = DataSourceTestUtils.getStructTypeExampleSchema val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema) + val modifiedSchema = AvroConversionUtils.convertStructTypeToAvroSchema(structType, "trip", "example.schema") val records = DataSourceTestUtils.generateRandomRows(100) val recordsSeq = convertRowListToSeq(records) val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType) val client = spy(DataSourceUtils.createHoodieClient( new JavaSparkContext(sc), - schema.toString, + modifiedSchema.toString, path.toAbsolutePath.toString, hoodieFooTableName, mapAsJavaMap(fooTableParams)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]) @@ -399,6 +401,91 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { } }) + List(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) + .foreach(tableType => { + test("test schema evolution for " + tableType) { + initSparkContext("test_schema_evolution") + val path = java.nio.file.Files.createTempDirectory("hoodie_test_path") + try { + val hoodieFooTableName = "hoodie_foo_tbl" + //create a new table + val fooTableModifier = Map("path" -> path.toAbsolutePath.toString, + HoodieWriteConfig.TABLE_NAME -> hoodieFooTableName, + "hoodie.insert.shuffle.parallelism" -> "1", + "hoodie.upsert.shuffle.parallelism" -> "1", + DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> tableType, + DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key", + DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition", + DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> "org.apache.hudi.keygen.SimpleKeyGenerator") + val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier) + + // generate the inserts + var schema = DataSourceTestUtils.getStructTypeExampleSchema + var structType = AvroConversionUtils.convertAvroSchemaToStructType(schema) + var records = DataSourceTestUtils.generateRandomRows(10) + var recordsSeq = convertRowListToSeq(records) + var df1 = spark.createDataFrame(sc.parallelize(recordsSeq), structType) + // write to Hudi + HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, fooTableParams, df1) + + val snapshotDF1 = spark.read.format("org.apache.hudi") + .load(path.toAbsolutePath.toString + "/*/*/*/*") + assertEquals(10, snapshotDF1.count()) + + // remove metadata columns so that expected and actual DFs can be compared as is + val trimmedDf1 = snapshotDF1.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1)) + .drop(HoodieRecord.HOODIE_META_COLUMNS.get(2)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(3)) + .drop(HoodieRecord.HOODIE_META_COLUMNS.get(4)) + + assert(df1.except(trimmedDf1).count() == 0) + + // issue updates so that log files are created for MOR table + var updates = DataSourceTestUtils.generateUpdates(records, 5); + var updatesSeq = convertRowListToSeq(updates) + var updatesDf = spark.createDataFrame(sc.parallelize(updatesSeq), structType) + // write updates to Hudi + HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, updatesDf) + + val snapshotDF2 = spark.read.format("org.apache.hudi") + .load(path.toAbsolutePath.toString + "/*/*/*/*") + assertEquals(10, snapshotDF2.count()) + + // remove metadata columns so that expected and actual DFs can be compared as is + val trimmedDf2 = snapshotDF1.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1)) + .drop(HoodieRecord.HOODIE_META_COLUMNS.get(2)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(3)) + .drop(HoodieRecord.HOODIE_META_COLUMNS.get(4)) + + // ensure 2nd batch of updates matches. + assert(updatesDf.intersect(trimmedDf2).except(updatesDf).count() == 0) + + // getting new schema with new column + schema = DataSourceTestUtils.getStructTypeExampleEvolvedSchema + structType = AvroConversionUtils.convertAvroSchemaToStructType(schema) + records = DataSourceTestUtils.generateRandomRowsEvolvedSchema(5) + recordsSeq = convertRowListToSeq(records) + val df3 = spark.createDataFrame(sc.parallelize(recordsSeq), structType) + // write to Hudi with new column + HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df3) + + val snapshotDF3 = spark.read.format("org.apache.hudi") + .load(path.toAbsolutePath.toString + "/*/*/*/*") + assertEquals(15, snapshotDF3.count()) + + // remove metadata columns so that expected and actual DFs can be compared as is + val trimmedDf3 = snapshotDF3.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1)) + .drop(HoodieRecord.HOODIE_META_COLUMNS.get(2)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(3)) + .drop(HoodieRecord.HOODIE_META_COLUMNS.get(4)) + + // ensure 2nd batch of updates matches. + assert(df3.intersect(trimmedDf3).except(df3).count() == 0) + + } finally { + spark.stop() + FileUtils.deleteDirectory(path.toFile) + } + } + }) + case class Test(uuid: String, ts: Long) import scala.collection.JavaConverters diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java index 725743dd234b4..b9e9282923853 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java @@ -47,8 +47,8 @@ public class TestSchemaPostProcessor extends UtilitiesTestBase { private static String ORIGINAL_SCHEMA = "{\"name\":\"t3_biz_operation_t_driver\",\"type\":\"record\",\"fields\":[{\"name\":\"ums_id_\",\"type\":[\"null\",\"string\"],\"default\":null}," + "{\"name\":\"ums_ts_\",\"type\":[\"null\",\"string\"],\"default\":null}]}"; - private static String RESULT_SCHEMA = "{\"type\":\"record\",\"name\":\"hoodie_source\",\"namespace\":\"hoodie.source\",\"fields\":[{\"name\":\"ums_id_\",\"type\":[\"string\",\"null\"]}," - + "{\"name\":\"ums_ts_\",\"type\":[\"string\",\"null\"]}]}"; + private static String RESULT_SCHEMA = "{\"type\":\"record\",\"name\":\"hoodie_source\",\"namespace\":\"hoodie.source\",\"fields\":[{\"name\":\"ums_id_\",\"type\":[\"null\",\"string\"]," + + "\"default\":null},{\"name\":\"ums_ts_\",\"type\":[\"null\",\"string\"],\"default\":null}]}"; @Test public void testPostProcessor() throws IOException { diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/source-jdbc.avsc b/hudi-utilities/src/test/resources/delta-streamer-config/source-jdbc.avsc index cb8697da52c4f..e7943b0abdc94 100644 --- a/hudi-utilities/src/test/resources/delta-streamer-config/source-jdbc.avsc +++ b/hudi-utilities/src/test/resources/delta-streamer-config/source-jdbc.avsc @@ -26,34 +26,42 @@ }, { "name": "TIMESTAMP", - "type": ["double", "null"] + "type": ["null", "double"], + "default": null }, { "name": "RIDER", - "type": ["string", "null"] + "type": ["null", "string"], + "default": null }, { "name": "DRIVER", - "type": ["string", "null"] + "type": ["null" ,"string"], + "default": null }, { "name": "BEGIN_LAT", - "type": ["double", "null"] + "type": ["null", "double"], + "default": null }, { "name": "BEGIN_LON", - "type": ["double", "null"] + "type": ["null", "double"], + "default": null }, { "name": "END_LAT", - "type": ["double", "null"] + "type": ["null", "double"], + "default": null }, { "name": "END_LON", - "type": ["double", "null"] + "type": ["null", "double"], + "default": null }, { "name": "FARE", - "type": ["double", "null"] + "type": ["null", "double"], + "default": null } ] } \ No newline at end of file