Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
ericm-db committed Dec 31, 2024
1 parent cd4d1ff commit 4826d12
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,11 @@ object SchemaConverters extends Logging {
}
}

def getDefaultValue(dataType: DataType): Any = {
/**
* Creates default values for Spark SQL data types when converting to Avro.
* This ensures fields have appropriate defaults during schema evolution.
*/
private def getDefaultValue(dataType: DataType): Any = {
def createNestedDefault(st: StructType): java.util.HashMap[String, Any] = {
val defaultMap = new java.util.HashMap[String, Any]()
st.fields.foreach { field =>
Expand All @@ -310,22 +314,27 @@ object SchemaConverters extends Logging {
}

dataType match {
// Basic types
case BooleanType => false
case ByteType | ShortType | IntegerType => 0
case LongType => 0L
case FloatType => 0.0f
case DoubleType => 0.0
case StringType => ""
case BinaryType => java.nio.ByteBuffer.allocate(0)

// Complex types
case ArrayType(elementType, _) =>
val defaultArray = new java.util.ArrayList[Any]()
defaultArray.add(getDefaultValue(elementType)) // Add one default element
defaultArray.add(getDefaultValue(elementType))
defaultArray
case MapType(StringType, valueType, _) =>
val defaultMap = new java.util.HashMap[String, Any]()
defaultMap.put("defaultKey", getDefaultValue(valueType)) // Add one default entry
defaultMap.put("defaultKey", getDefaultValue(valueType))
defaultMap
case st: StructType => createNestedDefault(st) // Handle nested structs recursively
case st: StructType => createNestedDefault(st)

// Special types
case _: DecimalType => java.nio.ByteBuffer.allocate(0)
case DateType => 0
case TimestampType => 0L
Expand All @@ -335,6 +344,10 @@ object SchemaConverters extends Logging {
}
}

/**
* Converts a Spark SQL schema to a corresponding Avro schema.
* Handles nested types and adds support for schema evolution.
*/
def toAvroType(
catalystType: DataType,
nullable: Boolean = false,
Expand Down Expand Up @@ -377,7 +390,7 @@ object SchemaConverters extends Logging {
}

val schema = catalystType match {
// Basic types remain the same
// Basic types
case BooleanType => builder.booleanType()
case ByteType | ShortType | IntegerType => builder.intType()
case LongType => builder.longType()
Expand All @@ -386,7 +399,7 @@ object SchemaConverters extends Logging {
case StringType => builder.stringType()
case NullType => builder.nullType()

// Date and Timestamp types
// Date/Timestamp types
case DateType =>
LogicalTypes.date().addToSchema(builder.intType())
case TimestampType =>
Expand All @@ -406,7 +419,7 @@ object SchemaConverters extends Logging {

case BinaryType => builder.bytesType()

// Complex types with improved nesting handling
// Complex types
case ArrayType(elementType, containsNull) =>
builder.array()
.items(toAvroType(elementType, containsNull, recordName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,13 @@ case class TransformWithStateExec(
* after init is called.
*/
override def getColFamilySchemas(): Map[String, StateStoreColFamilySchema] = {
val columnFamilySchemas = getDriverProcessorHandle().getColumnFamilySchemas
val keySchema = keyExpressions.toStructType
val columnFamilySchemas = getDriverProcessorHandle().getColumnFamilySchemas ++
Map(
StateStore.DEFAULT_COL_FAMILY_NAME ->
StateStoreColFamilySchema(StateStore.DEFAULT_COL_FAMILY_NAME,
0, keyExpressions.toStructType, 0, DUMMY_VALUE_ROW_SCHEMA,
Some(NoPrefixKeyStateEncoderSpec(keySchema))))
closeProcessorHandle()
columnFamilySchemas
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -608,12 +608,12 @@ class AvroStateEncoder(


// schema information
private val currentKeySchemaId: Short = getStateSchemaBroadcast.getCurrentStateSchemaId(
private lazy val currentKeySchemaId: Short = getStateSchemaBroadcast.getCurrentStateSchemaId(
getColFamilyName,
isKey = true
)

private val currentValSchemaId: Short = getStateSchemaBroadcast.getCurrentStateSchemaId(
private lazy val currentValSchemaId: Short = getStateSchemaBroadcast.getCurrentStateSchemaId(
getColFamilyName,
isKey = false
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ private[sql] class RocksDBStateStoreProvider
}

val dataEncoder = getDataEncoder(
"unsaferow",
stateStoreEncoding,
dataEncoderCacheKey,
keyStateEncoderSpec,
valueSchema,
Expand Down

0 comments on commit 4826d12

Please sign in to comment.