Skip to content

Commit

Permalink
[WIP] Avro encoding only in StateStore code
Browse files Browse the repository at this point in the history
  • Loading branch information
ericm-db committed Nov 16, 2024
1 parent 281a8e1 commit 5bc6578
Show file tree
Hide file tree
Showing 20 changed files with 790 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2230,6 +2230,17 @@ object SQLConf {
.intConf
.createWithDefault(1)

val STREAMING_STATE_STORE_ENCODING_FORMAT =
buildConf("spark.sql.streaming.stateStore.encodingFormat")
.doc("The encoding format used for stateful operators to store information " +
"in the state store")
.version("4.0.0")
.stringConf
.transform(_.toLowerCase(Locale.ROOT))
.checkValue(v => Set("unsaferow", "avro").contains(v),
"Valid values are 'unsaferow' and 'avro'")
.createWithDefault("unsaferow")

val STATE_STORE_COMPRESSION_CODEC =
buildConf("spark.sql.streaming.stateStore.compression.codec")
.internal()
Expand Down Expand Up @@ -5598,6 +5609,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf {

def stateStoreCheckpointFormatVersion: Int = getConf(STATE_STORE_CHECKPOINT_FORMAT_VERSION)

def stateStoreEncodingFormat: String = getConf(STREAMING_STATE_STORE_ENCODING_FORMAT)

def checkpointRenamedFileCheck: Boolean = getConf(CHECKPOINT_RENAMEDFILE_CHECK_ENABLED)

def parquetFilterPushDown: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_ENABLED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,49 @@ import org.apache.spark.sql.Encoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchemaUtils._
import org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec, PrefixKeyScanStateEncoderSpec, StateStoreColFamilySchema}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types._

object StateStoreColumnFamilySchemaUtils {

/**
* Avro uses zig-zag encoding for some fixed-length types, like Longs and Ints. For range scans
* we want to use big-endian encoding, so we need to convert the source schema to replace these
* types with BinaryType.
*
* @param schema The schema to convert
* @param ordinals If non-empty, only convert fields at these ordinals.
* If empty, convert all fields.
*/
def convertForRangeScan(schema: StructType, ordinals: Seq[Int] = Seq.empty): StructType = {
val ordinalSet = ordinals.toSet

StructType(schema.fields.zipWithIndex.flatMap { case (field, idx) =>
if ((ordinals.isEmpty || ordinalSet.contains(idx)) && isFixedSize(field.dataType)) {
// For each numeric field, create two fields:
// 1. Byte marker for null, positive, or negative values
// 2. The original numeric value in big-endian format
// Byte type is converted to Int in Avro, which doesn't work for us as Avro
// uses zig-zag encoding as opposed to big-endian for Ints
Seq(
StructField(s"${field.name}_marker", BinaryType, nullable = false),
field.copy(name = s"${field.name}_value", BinaryType)
)
} else {
Seq(field)
}
})
}

private def isFixedSize(dataType: DataType): Boolean = dataType match {
case _: ByteType | _: BooleanType | _: ShortType | _: IntegerType | _: LongType |
_: FloatType | _: DoubleType => true
case _ => false
}

def getTtlColFamilyName(stateName: String): String = {
"$ttl_" + stateName
}

def getValueStateSchema[T](
stateName: String,
keyEncoder: ExpressionEncoder[Any],
Expand Down
Loading

0 comments on commit 5bc6578

Please sign in to comment.