Skip to content

Commit

Permalink
Upgrade schema-ddl to 0.25.0-M1
Browse files Browse the repository at this point in the history
  • Loading branch information
oguzhanunlu committed Sep 25, 2024
1 parent 875ebbd commit cd79f64
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private[parquet] object Codecs {
case FieldValue.DateValue(value) =>
as[Date](value)
case FieldValue.ArrayValue(values) =>
as[List[FieldValue]](values)
as[Vector[FieldValue]](values)
case FieldValue.StructValue(values) =>
values
.foldLeft[RowParquetRecord](RowParquetRecord()) { case (acc, NamedValue(name, value)) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,6 @@ private[parquet] object ParquetSchema {
val listElement = asParquetType(element).withRequired(elementNullability.required)
SchemaDef.list(listElement)
case Type.Struct(subFields) =>
SchemaDef.group(subFields.map(asParquetField): _*)
SchemaDef.group(subFields.map(asParquetField).toVector: _*)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,11 @@ object NonAtomicFieldsProvider {
val schemasSorted = schemas.sorted

// Schemas need to be ordered by key to merge in correct order.
schemasSorted
schemasSorted.toList
// `types` are all of the same family, so it does not matter which element is passed to fieldFromSchema
.map(schemaWithKey => TypedField(fieldFromSchema(types.head)(schemaWithKey.schema), types.head, Set(schemaWithKey.schemaKey)))
.flatMap { schemaWithKey =>
fieldFromSchema(types.head)(schemaWithKey.schema).map(TypedField(_, types.head, Set(schemaWithKey.schemaKey)))
}
// Accumulating vector would contain base column as first element and broken migrations in others
.foldLeft(Vector.empty[TypedField])((endFields, typedField) =>
endFields.headOption match {
Expand Down Expand Up @@ -137,14 +139,14 @@ object NonAtomicFieldsProvider {
)
)

private def fieldFromSchema(`type`: WideRow.Type)(schema: Schema): Field = {
private def fieldFromSchema(`type`: WideRow.Type)(schema: Schema): Option[Field] = {
val fieldName = SnowplowEvent.transformSchema(`type`.snowplowEntity.toSdkProperty, `type`.schemaKey)

Field.normalize(`type`.snowplowEntity match {
(`type`.snowplowEntity match {
case LoaderMessage.SnowplowEntity.SelfDescribingEvent =>
Field.build(fieldName, schema, enforceValuePresence = false)
case LoaderMessage.SnowplowEntity.Context =>
Field.buildRepeated(fieldName, schema, enforceItemPresence = true, Type.Nullability.Nullable)
})
}).map(Field.normalize)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"self": {
"vendor": "com.snowplowanalytics.snowplow",
"name": "digit_schema",
"format": "jsonschema",
"version": "1-0-0"
},
"type": "object",
"properties": {
"1field": {"type": "string"}
},
"required": ["1field"]
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package com.snowplowanalytics.snowplow.rdbloader.common

import cats.Id
import cats.data.NonEmptyVector
import com.snowplowanalytics.iglu.client.Client
import com.snowplowanalytics.iglu.client.resolver.registries.JavaNetRegistryLookup._
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer}
Expand All @@ -31,6 +32,35 @@ class ParquetFieldsProviderSpec extends Specification with Tables {
private val resolver = embeddedIgluClient.resolver

"Parquet non-atomic fields provider" should {
"prefix field with underscore if it starts with a number" >> {
"for contexts" in {
val ctx = WideRow.Type(
SchemaKey(vendor = "com.snowplowanalytics.snowplow", name = "digit_schema", format = "jsonschema", SchemaVer.Full(1, 0, 0)),
Context
)
val result = NonAtomicFieldsProvider.build(resolver, List(ctx)).value.right.get

result.value.size mustEqual 1
result.value.head.field mustEqual nullableArrayWithRequiredElement(
name = "contexts_com_snowplowanalytics_snowplow_digit_schema_1",
elementType = DdlTypes.digitSchema100
)
}
"for unstruct" in {
val ctx = WideRow.Type(
SchemaKey(vendor = "com.snowplowanalytics.snowplow", name = "digit_schema", format = "jsonschema", SchemaVer.Full(1, 0, 0)),
SelfDescribingEvent
)
val result = NonAtomicFieldsProvider.build(resolver, List(ctx)).value.right.get

result.value.size mustEqual 1
result.value.head.field mustEqual Field(
name = "unstruct_event_com_snowplowanalytics_snowplow_digit_schema_1",
fieldType = DdlTypes.digitSchema100,
nullability = Nullable
)
}
}
"produce only one field from latest type when versions are compatible" >> {
"for contexts" in {

Expand Down Expand Up @@ -218,17 +248,23 @@ object ParquetFieldsProviderSpec {

object DdlTypes {

val digitSchema100 = Type.Struct(
fields = NonEmptyVector.of(
Field("_1field", Type.String, Required).copy(accessors = Set("1field"))
)
)

val schema100 = Type.Struct(
fields = List(
fields = NonEmptyVector.of(
Field(
"a_field",
Type.Struct(
List(
NonEmptyVector.of(
Field("b_field", Type.String, Nullable),
Field(
"c_field",
Type.Struct(
List(
NonEmptyVector.one(
Field("d_field", Type.String, Nullable)
)
),
Expand All @@ -247,7 +283,7 @@ object ParquetFieldsProviderSpec {
"i_field",
Type.Array(
Type.Struct(
List(
NonEmptyVector.of(
Field("c_field", Type.Long, Nullable),
Field("d_field", Type.String, Nullable)
)
Expand All @@ -259,16 +295,16 @@ object ParquetFieldsProviderSpec {
)
)
val schema101 = Type.Struct(
fields = List(
fields = NonEmptyVector.of(
Field(
"a_field",
Type.Struct(
List(
NonEmptyVector.of(
Field("b_field", Type.String, Nullable),
Field(
"c_field",
Type.Struct(
List(
NonEmptyVector.of(
Field("d_field", Type.String, Nullable),
Field("e_field", Type.String, Nullable)
)
Expand All @@ -289,7 +325,7 @@ object ParquetFieldsProviderSpec {
"i_field",
Type.Array(
Type.Struct(
List(
NonEmptyVector.of(
Field("c_field", Type.Long, Nullable),
Field("d_field", Type.String, Nullable)
)
Expand All @@ -301,16 +337,16 @@ object ParquetFieldsProviderSpec {
)
)
val schema110 = Type.Struct(
fields = List(
fields = NonEmptyVector.of(
Field(
"a_field",
Type.Struct(
List(
NonEmptyVector.of(
Field("b_field", Type.String, Nullable),
Field(
"c_field",
Type.Struct(
List(
NonEmptyVector.of(
Field("d_field", Type.String, Nullable),
Field("e_field", Type.String, Nullable)
)
Expand All @@ -333,7 +369,7 @@ object ParquetFieldsProviderSpec {
"i_field",
Type.Array(
Type.Struct(
List(
NonEmptyVector.of(
Field("c_field", Type.Long, Nullable),
Field("d_field", Type.String, Nullable)
)
Expand All @@ -346,22 +382,22 @@ object ParquetFieldsProviderSpec {
)

val schema200 = Type.Struct(
fields = List(
fields = NonEmptyVector.of(
Field("a_field", Type.String, Required),
Field("e_field", Type.String, Required),
Field("f_field", Type.Long, Required)
)
)

val brokenSchema100 = Type.Struct(fields = List(Field("b_field", Type.Long, Nullable)))
val brokenSchema101 = Type.Struct(fields = List(Field("b_field", Type.String, Nullable)))
val brokenSchema100 = Type.Struct(fields = NonEmptyVector.of(Field("b_field", Type.Long, Nullable)))
val brokenSchema101 = Type.Struct(fields = NonEmptyVector.of(Field("b_field", Type.String, Nullable)))
val brokenSchema110 = Type.Struct(fields =
List(
NonEmptyVector.of(
Field("a_field", Type.Long, Nullable),
Field("b_field", Type.Long, Nullable)
)
)
val brokenSchema111 = Type.Struct(fields = List(Field("a_field", Type.String, Nullable)))
val brokenSchema111 = Type.Struct(fields = NonEmptyVector.of(Field("a_field", Type.String, Nullable)))

val context100 = getBrokenType(SchemaVer.Full(1, 0, 0), Context)
val context101 = getBrokenType(SchemaVer.Full(1, 0, 1), Context) // breaking
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import com.snowplowanalytics.iglu.schemaddl.parquet.{Field, Type}
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.parquet.fields.AllFields
import org.apache.spark.sql.types._

import scala.collection.JavaConverters._

object SparkSchema {

def build(allFields: AllFields): StructType =
Expand All @@ -37,7 +39,7 @@ object SparkSchema {
case Type.Decimal(precision, scale) => DecimalType(Type.DecimalPrecision.toInt(precision), scale)
case Type.Date => DateType
case Type.Timestamp => TimestampType
case Type.Struct(fields) => StructType(fields.map(asSparkField))
case Type.Struct(fields) => StructType(fields.map(asSparkField).toVector.asJava)
case Type.Array(element, elNullability) => ArrayType(fieldType(element), elNullability.nullable)
case Type.Json => StringType // Spark does not support the `Json` parquet logical type.
}
Expand Down
6 changes: 3 additions & 3 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ object Dependencies {
val monocle = "2.0.3"
val catsRetry = "3.1.0"
val log4cats = "2.5.0"
val http4s = "0.23.17"
val http4sBlaze = "0.23.14" // this dep fell out of sync with http4s-core versioning - 0.23.14 is the last 0.X release.
val http4s = "0.23.18"
val http4sBlaze = "0.23.16" // this dep fell out of sync with http4s-core versioning - 0.23.16 is the last 0.X release.
val scalaTracker = "2.0.0"

val spark = "3.3.1"
val eventsManifest = "0.4.0"
val schemaDdl = "0.22.1"
val schemaDdl = "0.25.0-M1"
val jacksonModule = "2.17.2" // Override incompatible version in spark runtime
val jacksonDatabind = "2.17.2"
val jacksonMapper = "1.9.14-atlassian-6" // Fix CVE
Expand Down

0 comments on commit cd79f64

Please sign in to comment.