Skip to content

Commit

Permalink
[HUDI-1716]: Resolving default values for schema from dataframe (apac…
Browse files Browse the repository at this point in the history
…he#2765)

- Adding default values and setting null as first entry in UNION data types in avro schema. 

Co-authored-by: Aditya Tiwari <[email protected]>
  • Loading branch information
aditiwari01 and Aditya Tiwari authored Apr 19, 2021
1 parent dab5114 commit ec2334c
Show file tree
Hide file tree
Showing 9 changed files with 405 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieFileStatus;
import org.apache.hudi.common.bootstrap.FileStatusUtils;
Expand All @@ -29,7 +30,6 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.parquet.schema.MessageType;
import org.apache.spark.sql.avro.SchemaConverters;
import org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.StructType;
Expand Down Expand Up @@ -63,6 +63,6 @@ protected Schema getBootstrapSourceSchema(HoodieEngineContext context, List<Pair
String structName = tableName + "_record";
String recordNamespace = "hoodie." + tableName;

return SchemaConverters.toAvroType(sparkSchema, false, structName, recordNamespace);
return AvroConversionUtils.convertStructTypeToAvroSchema(sparkSchema, structName, recordNamespace);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.sql.avro.{IncompatibleSchemaException, SchemaConverters}
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
import org.apache.hudi.AvroConversionUtils._

import scala.collection.JavaConverters._

Expand Down Expand Up @@ -340,7 +341,7 @@ object AvroConversionHelper {
}
}
case structType: StructType =>
val schema: Schema = SchemaConverters.toAvroType(structType, nullable = false, structName, recordNamespace)
val schema: Schema = convertStructTypeToAvroSchema(structType, structName, recordNamespace)
val childNameSpace = if (recordNamespace != "") s"$recordNamespace.$structName" else structName
val fieldConverters = structType.fields.map(field =>
createConverterToAvro(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi

import org.apache.avro.Schema
import org.apache.avro.JsonProperties
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord}
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.spark.rdd.RDD
Expand All @@ -27,6 +28,7 @@ import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Dataset, Row, SparkSession}

import scala.collection.JavaConverters._
import scala.collection.JavaConversions._

object AvroConversionUtils {

Expand All @@ -46,10 +48,67 @@ object AvroConversionUtils {
}
}

/**
*
* Returns avro schema from spark StructType.
*
* @param structType Dataframe Struct Type.
* @param structName Avro record name.
* @param recordNamespace Avro record namespace.
* @return Avro schema corresponding to given struct type.
*/
def convertStructTypeToAvroSchema(structType: StructType,
structName: String,
recordNamespace: String): Schema = {
SchemaConverters.toAvroType(structType, nullable = false, structName, recordNamespace)
getAvroSchemaWithDefaults(SchemaConverters.toAvroType(structType, nullable = false, structName, recordNamespace))
}

/**
*
* Method to add default value of null to nullable fields in given avro schema
*
* @param schema input avro schema
* @return Avro schema with null default set to nullable fields
*/
def getAvroSchemaWithDefaults(schema: Schema): Schema = {

schema.getType match {
case Schema.Type.RECORD => {

val modifiedFields = schema.getFields.map(field => {
val newSchema = getAvroSchemaWithDefaults(field.schema())
field.schema().getType match {
case Schema.Type.UNION => {
val innerFields = newSchema.getTypes
val containsNullSchema = innerFields.foldLeft(false)((nullFieldEncountered, schema) => nullFieldEncountered | schema.getType == Schema.Type.NULL)
if(containsNullSchema) {
// Need to re shuffle the fields in list because to set null as default, null schema must be head in union schema
val restructuredNewSchema = Schema.createUnion(List(Schema.create(Schema.Type.NULL)) ++ innerFields.filter(innerSchema => !(innerSchema.getType == Schema.Type.NULL)))
new Schema.Field(field.name(), restructuredNewSchema, field.doc(), JsonProperties.NULL_VALUE)
} else {
new Schema.Field(field.name(), newSchema, field.doc(), field.defaultVal())
}
}
case _ => new Schema.Field(field.name(), newSchema, field.doc(), field.defaultVal())
}
}).toList
Schema.createRecord(schema.getName, schema.getDoc, schema.getNamespace, schema.isError, modifiedFields)
}

case Schema.Type.UNION => {
Schema.createUnion(schema.getTypes.map(innerSchema => getAvroSchemaWithDefaults(innerSchema)))
}

case Schema.Type.MAP => {
Schema.createMap(getAvroSchemaWithDefaults(schema.getValueType))
}

case Schema.Type.ARRAY => {
Schema.createArray(getAvroSchemaWithDefaults(schema.getElementType))
}

case _ => schema
}
}

def convertAvroSchemaToStructType(avroSchema: Schema): StructType = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ public static Schema getStructTypeExampleSchema() throws IOException {
return new Schema.Parser().parse(FileIOUtils.readAsUTFString(DataSourceTestUtils.class.getResourceAsStream("/exampleSchema.txt")));
}

public static Schema getStructTypeExampleEvolvedSchema() throws IOException {
return new Schema.Parser().parse(FileIOUtils.readAsUTFString(DataSourceTestUtils.class.getResourceAsStream("/exampleEvolvedSchema.txt")));
}

public static List<Row> generateRandomRows(int count) {
Random random = new Random();
List<Row> toReturn = new ArrayList<>();
Expand All @@ -58,4 +62,31 @@ public static List<Row> generateRandomRows(int count) {
}
return toReturn;
}

public static List<Row> generateUpdates(List<Row> records, int count) {
List<Row> toReturn = new ArrayList<>();
for (int i = 0; i < count; i++) {
Object[] values = new Object[3];
values[0] = records.get(i).getString(0);
values[1] = records.get(i).getAs(1);
values[2] = new Date().getTime();
toReturn.add(RowFactory.create(values));
}
return toReturn;
}

public static List<Row> generateRandomRowsEvolvedSchema(int count) {
Random random = new Random();
List<Row> toReturn = new ArrayList<>();
List<String> partitions = Arrays.asList(new String[] {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH});
for (int i = 0; i < count; i++) {
Object[] values = new Object[4];
values[0] = UUID.randomUUID().toString();
values[1] = partitions.get(random.nextInt(3));
values[2] = new Date().getTime();
values[3] = UUID.randomUUID().toString();
toReturn.add(RowFactory.create(values));
}
return toReturn;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{
"namespace": "example.schema",
"type": "record",
"name": "trip",
"fields": [
{
"name": "_row_key",
"type": "string"
},
{
"name": "partition",
"type": "string"
},
{
"name": "ts",
"type": ["long", "null"]
},
{
"name": "new_field",
"type": ["string","null"]
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi

import org.apache.avro.Schema
import org.apache.spark.sql.types.{DataTypes, StructType, StringType, ArrayType}
import org.scalatest.{FunSuite, Matchers}

class TestAvroConversionUtils extends FunSuite with Matchers {


test("test convertStructTypeToAvroSchema") {
val mapType = DataTypes.createMapType(StringType, new StructType().add("mapKey", "string", false).add("mapVal", "integer", true))
val arrayType = ArrayType(new StructType().add("arrayKey", "string", false).add("arrayVal", "integer", true))
val innerStruct = new StructType().add("innerKey","string",false).add("value", "long", true)

val struct = new StructType().add("key", "string", false).add("version", "string", true)
.add("data1",innerStruct,false).add("data2",innerStruct,true)
.add("nullableMap", mapType, true).add("map",mapType,false)
.add("nullableArray", arrayType, true).add("array",arrayType,false)

val avroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(struct, "SchemaName", "SchemaNS")

val expectedSchemaStr = s"""
{
"type" : "record",
"name" : "SchemaName",
"namespace" : "SchemaNS",
"fields" : [ {
"name" : "key",
"type" : "string"
}, {
"name" : "version",
"type" : [ "null", "string" ],
"default" : null
}, {
"name" : "data1",
"type" : {
"type" : "record",
"name" : "data1",
"namespace" : "SchemaNS.SchemaName",
"fields" : [ {
"name" : "innerKey",
"type" : "string"
}, {
"name" : "value",
"type" : [ "null", "long" ],
"default" : null
} ]
}
}, {
"name" : "data2",
"type" : [ "null", {
"type" : "record",
"name" : "data2",
"namespace" : "SchemaNS.SchemaName",
"fields" : [ {
"name" : "innerKey",
"type" : "string"
}, {
"name" : "value",
"type" : [ "null", "long" ],
"default" : null
} ]
} ],
"default" : null
}, {
"name" : "nullableMap",
"type" : [ "null", {
"type" : "map",
"values" : [ {
"type" : "record",
"name" : "nullableMap",
"namespace" : "SchemaNS.SchemaName",
"fields" : [ {
"name" : "mapKey",
"type" : "string"
}, {
"name" : "mapVal",
"type" : [ "null", "int" ],
"default" : null
} ]
}, "null" ]
} ],
"default" : null
}, {
"name" : "map",
"type" : {
"type" : "map",
"values" : [ {
"type" : "record",
"name" : "map",
"namespace" : "SchemaNS.SchemaName",
"fields" : [ {
"name" : "mapKey",
"type" : "string"
}, {
"name" : "mapVal",
"type" : [ "null", "int" ],
"default" : null
} ]
}, "null" ]
}
}, {
"name" : "nullableArray",
"type" : [ "null", {
"type" : "array",
"items" : [ {
"type" : "record",
"name" : "nullableArray",
"namespace" : "SchemaNS.SchemaName",
"fields" : [ {
"name" : "arrayKey",
"type" : "string"
}, {
"name" : "arrayVal",
"type" : [ "null", "int" ],
"default" : null
} ]
}, "null" ]
} ],
"default" : null
}, {
"name" : "array",
"type" : {
"type" : "array",
"items" : [ {
"type" : "record",
"name" : "array",
"namespace" : "SchemaNS.SchemaName",
"fields" : [ {
"name" : "arrayKey",
"type" : "string"
}, {
"name" : "arrayVal",
"type" : [ "null", "int" ],
"default" : null
} ]
}, "null" ]
}
} ]
}
"""
val expectedAvroSchema = new Schema.Parser().parse(expectedSchemaStr)

assert(avroSchema.equals(expectedAvroSchema))
}
}
Loading

0 comments on commit ec2334c

Please sign in to comment.