Skip to content

Commit

Permalink
initial comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ericm-db committed Nov 15, 2024
1 parent a2cd6e6 commit 141f780
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2210,9 +2210,10 @@ object SQLConf {
"in the state store")
.version("4.0.0")
.stringConf
.checkValue(v => Set("UnsafeRow", "Avro").contains(v),
"Valid values are 'UnsafeRow' and 'Avro'")
.createWithDefault("UnsafeRow")
.transform(_.toLowerCase(Locale.ROOT))
.checkValue(v => Set("unsaferow", "avro").contains(v),
"Valid values are 'unsaferow' and 'avro'")
.createWithDefault("unsaferow")

// The feature is still in development, so it is still internal.
val STATE_STORE_CHECKPOINT_FORMAT_VERSION =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,14 @@ class IncrementalExecution(
}
}

// This code needs to be run on the driver, and copy the operator with the
// column family schemas
private def withColumnFamilySchemas(op: TransformWithStateExec): TransformWithStateExec = {
op.copy(
columnFamilySchemas = op.getColFamilySchemas()
)
}

/**
* This rule populates the column family schemas for the TransformWithStateExec
* operator to ship them from the driver, where the schema and serializer objects
Expand All @@ -269,9 +277,7 @@ class IncrementalExecution(
case statefulOp: StatefulOperator =>
statefulOp match {
case op: TransformWithStateExec =>
op.copy(
columnFamilySchemas = op.getColFamilySchemas()
)
withColumnFamilySchemas(op)
case _ => statefulOp
}
}
Expand Down Expand Up @@ -570,9 +576,9 @@ class IncrementalExecution(
// The rule below doesn't change the plan but can cause the side effect that
// metadata/schema is written in the checkpoint directory of stateful operator.
planWithStateOpId transform StateSchemaAndOperatorMetadataRule.rule
val planWithStateSchemas = planWithStateOpId transform StateStoreColumnFamilySchemasRule.rule
val planWithStateSchemas = planWithStateOpId.transform(StateStoreColumnFamilySchemasRule.rule)
simulateWatermarkPropagation(planWithStateSchemas)
planWithStateSchemas transform WatermarkPropagationRule.rule
planWithStateSchemas.transform(WatermarkPropagationRule.rule)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ class StateStoreColumnFamilySchemaUtils(initializeAvroSerde: Boolean)
new AvroSerializer(schema, avroType, nullable = false)
}

// For range scan, we use custom encoding within the RangeKeyScanStateEncoder
// However, we will use the AvroSerializer, AvroDeserializer methods for the remaining
// key fields
private val numRemainingKeyFieldsForTtlRangeScan: Int = 2

private def getAvroDeserializer(schema: StructType): AvroDeserializer = {
val avroType = SchemaConverters.toAvroType(schema)
val avroOptions = AvroOptions(Map.empty)
Expand All @@ -91,21 +96,31 @@ class StateStoreColumnFamilySchemaUtils(initializeAvroSerde: Boolean)
}

/**
* If initializeAvroSerde is true, this method will create an Avro Serializer and Deserializer
* for a particular key and value schema.
* Creates an AvroEncoder with serializers and deserializers for key and value schemas if
* initialization is enabled.
*
* This method handles both standard key-value serialization and cases where the key is split
* into prefix and suffix components. When a suffix key schema is provided, separate
* serialization is applied to the suffix portion of the key, while the prefix is handled by
* PrefixKeyScanStateEncoder.
*
* @param keySchema The schema for the complete key in the column family. For prefix-suffix
* configurations, this represents the complete key structure.
* @param valSchema The schema for values in the column family.
* @param suffixKeySchema Optional schema for the suffix portion of the key. When provided,
* indicates that the key should be processed as a prefix-suffix pair.
* @return Some(AvroEncoder) if initializeAvroSerde is true, containing configured serializers
* and deserializers for both key and value components. Returns None if initialization
* is disabled.
*/
private[sql] def getAvroSerde(
keySchema: StructType,
valSchema: StructType,
suffixKeySchema: Option[StructType] = None
): Option[AvroEncoder] = {
if (initializeAvroSerde) {
val (suffixKeySer, suffixKeyDe) = if (suffixKeySchema.isDefined) {
(Some(getAvroSerializer(suffixKeySchema.get)),
Some(getAvroDeserializer(suffixKeySchema.get)))
} else {
(None, None)
}
val (suffixKeySer, suffixKeyDe) = (
suffixKeySchema.map(getAvroSerializer), suffixKeySchema.map(getAvroDeserializer))
Some(AvroEncoder(
getAvroSerializer(keySchema),
getAvroDeserializer(keySchema),
Expand Down Expand Up @@ -167,11 +182,23 @@ class StateStoreColumnFamilySchemaUtils(initializeAvroSerde: Boolean)
)
}

// This function creates the StateStoreColFamilySchema for
// the TTL secondary index.
// Because we want to encode fixed-length types as binary types
// if we are using Avro, we need to do some schema conversion to ensure
// we can use range scan
/**
* Creates a StateStoreColFamilySchema for TTL (Time-To-Live) secondary index specifically for
* ValueState and ListState.
*
* This method handles the schema creation for TTL tracking, ensuring proper encoding of
* fixed-length types as binary when using Avro serialization. The schema is specially
* configured to support range scan operations on the TTL index.
*
* @param stateName The name identifier for the state store column family
* @param keyEncoder An expression encoder for the key type, used to derive the base schema
* for TTL tracking
* @return StateStoreColFamilySchema configured for TTL tracking with:
* - Modified key schema optimized for range scans
* - Dummy value schema (since TTL index only needs key-based tracking)
* - Range key scan encoder specification
* - Appropriate Avro serialization configuration
*/
def getTtlStateSchema(
stateName: String,
keyEncoder: ExpressionEncoder[Any]): StateStoreColFamilySchema = {
Expand All @@ -185,17 +212,31 @@ class StateStoreColumnFamilySchemaUtils(initializeAvroSerde: Boolean)
ttlValSchema,
Some(RangeKeyScanStateEncoderSpec(ttlKeySchema, Seq(0))),
avroEnc = getAvroSerde(
StructType(ttlKeySchema.drop(2)),
StructType(ttlKeySchema.drop(numRemainingKeyFieldsForTtlRangeScan)),
ttlValSchema
)
)
}

// This function creates the StateStoreColFamilySchema for
// the TTL secondary index.
// Because we want to encode fixed-length types as binary types
// if we are using Avro, we need to do some schema conversion to ensure
// we can use range scan
/**
* Creates a StateStoreColFamilySchema for TTL (Time-To-Live) secondary index specifically for
* MapState, which requires handling composite keys.
*
* This overloaded version handles schema creation for TTL tracking with composite keys,
* ensuring proper encoding of fixed-length types as binary when using Avro serialization.
* The schema is specially configured to support range scan operations on the TTL index.
*
* @param stateName The name identifier for the state store column family
* @param keyEncoder An expression encoder for the key type, used to derive the base schema
* for TTL tracking
* @param userKeySchema Additional schema information for the user-defined key portion of the
* composite key structure
* @return StateStoreColFamilySchema configured for TTL tracking with:
* - Modified composite key schema optimized for range scans
* - Dummy value schema (since TTL index only needs key-based tracking)
* - Range key scan encoder specification
* - Appropriate Avro serialization configuration
*/
def getTtlStateSchema(
stateName: String,
keyEncoder: ExpressionEncoder[Any],
Expand All @@ -210,7 +251,7 @@ class StateStoreColumnFamilySchemaUtils(initializeAvroSerde: Boolean)
ttlValSchema,
Some(RangeKeyScanStateEncoderSpec(ttlKeySchema, Seq(0))),
avroEnc = getAvroSerde(
StructType(ttlKeySchema.drop(2)),
StructType(ttlKeySchema.drop(numRemainingKeyFieldsForTtlRangeScan)),
ttlValSchema
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,12 @@ import org.apache.spark.sql.execution.streaming.StatefulOperatorStateInfo
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.{NextIterator, ThreadUtils, Utils}

sealed trait StateStoreEncoding
sealed trait StateStoreEncoding {
override def toString: String = this match {
case StateStoreEncoding.UnsafeRow => "unsaferow"
case StateStoreEncoding.Avro => "avro"
}
}

object StateStoreEncoding {
case object UnsafeRow extends StateStoreEncoding
Expand Down

0 comments on commit 141f780

Please sign in to comment.