Skip to content

Commit

Permalink
have to version the checker I think
Browse files Browse the repository at this point in the history
  • Loading branch information
ericm-db committed Jan 6, 2025
1 parent 20c8c62 commit 8294000
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ class StateDataSource extends TableProvider with DataSourceRegister with Logging

// Read the schema file path from operator metadata version v2 onwards
// for the transformWithState operator
val oldSchemaFilePath = if (storeMetadata.length > 0 && storeMetadata.head.version == 2
val oldSchemaFilePaths = if (storeMetadata.length > 0 && storeMetadata.head.version == 2
&& twsShortNameSeq.exists(storeMetadata.head.operatorName.contains)) {
val storeMetadataEntry = storeMetadata.head
val operatorProperties = TransformWithStateOperatorProperties.fromJson(
Expand All @@ -241,12 +241,10 @@ class StateDataSource extends TableProvider with DataSourceRegister with Logging
schemaFilePaths
)
stateSchemaProvider = Some(new InMemoryStateSchemaProvider(stateSchemaMetadata))
schemaFilePaths.lastOption.map { schemaFilePath =>
new Path(schemaFilePath)
}
schemaFilePaths.map(new Path(_))
} else {
None
}
}.toList

try {
// Read the actual state schema from the provided path for v2 or from the dedicated path
Expand All @@ -257,7 +255,7 @@ class StateDataSource extends TableProvider with DataSourceRegister with Logging
partitionId, sourceOptions.storeName)
val providerId = new StateStoreProviderId(storeId, UUID.randomUUID())
val manager = new StateSchemaCompatibilityChecker(providerId, hadoopConf,
oldSchemaFilePath = oldSchemaFilePath)
oldSchemaFilePaths = oldSchemaFilePaths)
val stateSchema = manager.readSchemaFile()

// Based on the version and read schema, populate the keyStateEncoderSpec used for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,24 +222,21 @@ trait TransformWithStateMetadataUtils extends Logging {
None
}

val oldStateSchemaFilePath: Option[Path] = operatorStateMetadata match {
val oldStateSchemaFilePaths: List[Path] = operatorStateMetadata match {
case Some(metadata) =>
metadata match {
case v2: OperatorStateMetadataV2 =>
// We pick the last entry in the schema list because it contains the most recent
// StateStoreColFamilySchemas
val schemaPath = v2.stateStoreInfo.head.stateSchemaFilePaths.last
Some(new Path(schemaPath))
case _ => None
v2.stateStoreInfo.head.stateSchemaFilePaths.map(new Path(_))
case _ => List.empty
}
case None => None
case None => List.empty
}
// state schema file written here, writing the new schema list we passed here
List(StateSchemaCompatibilityChecker.
validateAndMaybeEvolveStateSchema(info, hadoopConf,
newSchemas.values.toList, session.sessionState, stateSchemaVersion,
storeName = StateStoreId.DEFAULT_STORE_NAME,
oldSchemaFilePath = oldStateSchemaFilePath,
oldSchemaFilePaths = oldStateSchemaFilePaths,
newSchemaFilePath = Some(newStateSchemaFilePath),
schemaEvolutionEnabled = stateStoreEncodingFormat == "avro"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.sql.execution.streaming.{CheckpointFileManager, Stateful
import org.apache.spark.sql.execution.streaming.state.SchemaHelper.{SchemaReader, SchemaWriter}
import org.apache.spark.sql.execution.streaming.state.StateSchemaCompatibilityChecker.SCHEMA_FORMAT_V3
import org.apache.spark.sql.internal.SessionState
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.types._

// Result returned after validating the schema of the state store for schema changes
case class StateSchemaValidationResult(
Expand Down Expand Up @@ -78,37 +78,39 @@ case class StateStoreColFamilySchema(
class StateSchemaCompatibilityChecker(
providerId: StateStoreProviderId,
hadoopConf: Configuration,
oldSchemaFilePath: Option[Path] = None,
oldSchemaFilePaths: List[Path] = List.empty,
newSchemaFilePath: Option[Path] = None) extends Logging {

private val schemaFileLocation = if (oldSchemaFilePath.isEmpty) {
// For OperatorStateMetadataV1: Only one schema file present per operator
// per query
// For OperatorStateMetadataV2: Multiple schema files present per operator
// per query. This variable is the latest one
private val schemaFileLocation = if (oldSchemaFilePaths.isEmpty) {
val storeCpLocation = providerId.storeId.storeCheckpointLocation()
schemaFile(storeCpLocation)
} else {
oldSchemaFilePath.get
oldSchemaFilePaths.last
}

private val fm = CheckpointFileManager.create(schemaFileLocation, hadoopConf)

fm.mkdirs(schemaFileLocation.getParent)

// Read most recent schema file
def readSchemaFile(): List[StateStoreColFamilySchema] = {
val inStream = fm.open(schemaFileLocation)
StateSchemaCompatibilityChecker.readSchemaFile(inStream)
}

/**
* Function to read and return the list of existing state store column family schemas from the
* schema file, if it exists
* @return - List of state store column family schemas if the schema file exists and empty l
* otherwise
*/
private def getExistingKeyAndValueSchema(): List[StateStoreColFamilySchema] = {
if (fm.exists(schemaFileLocation)) {
readSchemaFile()
} else {
List.empty
}
// Read all old schema files, group by column family name
// This method is used for OperatorStateMetadataV2 when schema evolution
// is supported, to read all active schemas in the StateStore for this operator
def readSchemaFiles(): Map[String, List[StateStoreColFamilySchema]] = {
oldSchemaFilePaths.flatMap { schemaFile =>
val inStream = fm.open(schemaFile)
StateSchemaCompatibilityChecker.readSchemaFile(inStream)
}
.groupBy(_.colFamilyName)
}

private def createSchemaFile(
Expand Down Expand Up @@ -155,54 +157,59 @@ class StateSchemaCompatibilityChecker(
* @param ignoreValueSchema - whether to ignore value schema or not
*/
private def check(
oldSchema: StateStoreColFamilySchema,
oldSchemas: List[StateStoreColFamilySchema],
newSchema: StateStoreColFamilySchema,
ignoreValueSchema: Boolean,
schemaEvolutionEnabled: Boolean): StateStoreColFamilySchema = {
schemaEvolutionEnabled: Boolean): (StateStoreColFamilySchema, Boolean) = {

def incrementSchemaId(id: Short): Short = (id + 1).toShort

val mostRecentSchema = oldSchemas.last
// Initialize with old schema IDs
var resultSchema = newSchema.copy(
keySchemaId = oldSchema.keySchemaId,
valueSchemaId = oldSchema.valueSchemaId
val resultSchema = newSchema.copy(
keySchemaId = mostRecentSchema.keySchemaId,
valueSchemaId = mostRecentSchema.valueSchemaId
)
val (storedKeySchema, storedValueSchema) = (oldSchema.keySchema,
oldSchema.valueSchema)
val (storedKeySchema, storedValueSchema) = (mostRecentSchema.keySchema,
mostRecentSchema.valueSchema)
val (keySchema, valueSchema) = (newSchema.keySchema, newSchema.valueSchema)

if (storedKeySchema.equals(keySchema) &&
(ignoreValueSchema || storedValueSchema.equals(valueSchema))) {
// schema is exactly same
oldSchema
(mostRecentSchema, false)
} else if (!schemasCompatible(storedKeySchema, keySchema)) {
throw StateStoreErrors.stateStoreKeySchemaNotCompatible(storedKeySchema.toString,
keySchema.toString)
} else if (!ignoreValueSchema && schemaEvolutionEnabled) {

// Check value schema evolution
val oldAvroSchema = SchemaConverters.toAvroTypeWithDefaults(storedValueSchema)
// Sort schemas by most recent to least recent
val oldAvroSchemas = oldSchemas.sortBy(_.valueSchemaId).reverse.map { oldSchema =>
SchemaConverters.toAvroTypeWithDefaults(oldSchema.valueSchema)
}.asJava
val newAvroSchema = SchemaConverters.toAvroTypeWithDefaults(valueSchema)

val validator = new SchemaValidatorBuilder().canReadStrategy.validateAll()
try {
validator.validate(newAvroSchema, Iterable(oldAvroSchema).asJava)
validator.validate(newAvroSchema, oldAvroSchemas)
} catch {
case _: SchemaValidationException =>
StateStoreErrors.stateStoreInvalidValueSchemaEvolution(
throw StateStoreErrors.stateStoreInvalidValueSchemaEvolution(
storedValueSchema.toString,
valueSchema.toString)
case e: Throwable =>
throw e
}

// Schema evolved - increment value schema ID
resultSchema.copy(valueSchemaId = incrementSchemaId(oldSchema.valueSchemaId))
(resultSchema.copy(valueSchemaId = incrementSchemaId(mostRecentSchema.valueSchemaId)), true)
} else if (!ignoreValueSchema && !schemasCompatible(storedValueSchema, valueSchema)) {
throw StateStoreErrors.stateStoreValueSchemaNotCompatible(storedValueSchema.toString,
valueSchema.toString)
} else {
logInfo("Detected schema change which is compatible. Allowing to put rows.")
oldSchema
(mostRecentSchema, true)
}
}

Expand All @@ -218,29 +225,25 @@ class StateSchemaCompatibilityChecker(
ignoreValueSchema: Boolean,
stateSchemaVersion: Int,
schemaEvolutionEnabled: Boolean): Boolean = {
val existingStateSchemaList = getExistingKeyAndValueSchema()
val existingStateSchemaMap = readSchemaFiles()

if (existingStateSchemaList.isEmpty) {
if (existingStateSchemaMap.isEmpty) {
// Initialize schemas with ID 0 when no existing schema
val initializedSchemas = newStateSchema.map(schema =>
schema.copy(keySchemaId = 0, valueSchemaId = 0)
)
createSchemaFile(initializedSchemas.sortBy(_.colFamilyName), stateSchemaVersion)
true
} else {
val existingSchemaMap = existingStateSchemaList.map(schema =>
schema.colFamilyName -> schema
).toMap

// Process each new schema and track if any have evolved
val (evolvedSchemas, hasEvolutions) = newStateSchema.foldLeft(
(List.empty[StateStoreColFamilySchema], false)) {
case ((schemas, evolved), newSchema) =>
existingSchemaMap.get(newSchema.colFamilyName) match {
case Some(existingSchema) =>
val updatedSchema = check(
existingSchema, newSchema, ignoreValueSchema, schemaEvolutionEnabled)
val hasEvolved = !updatedSchema.equals(existingSchema)
existingStateSchemaMap.get(newSchema.colFamilyName) match {
case Some(existingSchemas) =>
val (updatedSchema, hasEvolved) = check(
existingSchemas, newSchema, ignoreValueSchema, schemaEvolutionEnabled)
(updatedSchema :: schemas, evolved || hasEvolved)
case None =>
// New column family - initialize with schema ID 0
Expand All @@ -250,7 +253,7 @@ class StateSchemaCompatibilityChecker(
}

val colFamiliesAddedOrRemoved =
(newStateSchema.map(_.colFamilyName).toSet != existingSchemaMap.keySet)
(newStateSchema.map(_.colFamilyName).toSet != existingStateSchemaMap.keySet)
val newSchemaFileWritten = hasEvolutions || colFamiliesAddedOrRemoved

if (stateSchemaVersion == SCHEMA_FORMAT_V3 && newSchemaFileWritten) {
Expand Down Expand Up @@ -319,7 +322,7 @@ object StateSchemaCompatibilityChecker extends Logging {
stateSchemaVersion: Int,
extraOptions: Map[String, String] = Map.empty,
storeName: String = StateStoreId.DEFAULT_STORE_NAME,
oldSchemaFilePath: Option[Path] = None,
oldSchemaFilePaths: List[Path] = List.empty,
newSchemaFilePath: Option[Path] = None,
schemaEvolutionEnabled: Boolean = false): StateSchemaValidationResult = {
// SPARK-47776: collation introduces the concept of binary (in)equality, which means
Expand All @@ -339,7 +342,7 @@ object StateSchemaCompatibilityChecker extends Logging {
val providerId = StateStoreProviderId(StateStoreId(stateInfo.checkpointLocation,
stateInfo.operatorId, 0, storeName), stateInfo.queryRunId)
val checker = new StateSchemaCompatibilityChecker(providerId, hadoopConf,
oldSchemaFilePath = oldSchemaFilePath, newSchemaFilePath = newSchemaFilePath)
oldSchemaFilePaths = oldSchemaFilePaths, newSchemaFilePath = newSchemaFilePath)
// regardless of configuration, we check compatibility to at least write schema file
// if necessary
// if the format validation for value schema is disabled, we also disable the schema
Expand Down Expand Up @@ -380,7 +383,7 @@ object StateSchemaCompatibilityChecker extends Logging {
// so we would just populate the next run's metadata file with this
// file path
if (stateSchemaVersion == SCHEMA_FORMAT_V3) {
oldSchemaFilePath.get.toString
oldSchemaFilePaths.last.toString
} else {
// if we are using any version less than v3, we have written
// the schema to this static location, which we will return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,16 +320,14 @@ class StateStoreKeySchemaNotCompatible(
"storedKeySchema" -> storedKeySchema,
"newKeySchema" -> newKeySchema))

trait StateStoreInvalidValueSchema extends Throwable

class StateStoreValueSchemaNotCompatible(
storedValueSchema: String,
newValueSchema: String)
extends SparkUnsupportedOperationException(
errorClass = "STATE_STORE_VALUE_SCHEMA_NOT_COMPATIBLE",
messageParameters = Map(
"storedValueSchema" -> storedValueSchema,
"newValueSchema" -> newValueSchema)) with StateStoreInvalidValueSchema
"newValueSchema" -> newValueSchema))

class StateStoreInvalidValueSchemaEvolution(
storedValueSchema: String,
Expand All @@ -338,7 +336,7 @@ class StateStoreInvalidValueSchemaEvolution(
errorClass = "STATE_STORE_INVALID_VALUE_SCHEMA_EVOLUTION",
messageParameters = Map(
"storedValueSchema" -> storedValueSchema,
"newValueSchema" -> newValueSchema)) with StateStoreInvalidValueSchema
"newValueSchema" -> newValueSchema))

class StateStoreSnapshotFileNotFound(fileToRead: String, clazz: String)
extends SparkRuntimeException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,20 @@ trait AlsoTestWithEncodingTypes extends SQLTestUtils {
}
}
}

// New method for Avro-only tests
protected def testWithAvroOnly(testName: String, testTags: Tag*)(testBody: => Any)
(implicit pos: Position): Unit = {
super.test(s"$testName (encoding = avro)", testTags: _*) {
withSQLConf(SQLConf.STREAMING_STATE_STORE_ENCODING_FORMAT.key -> "avro") {
testBody
}
}
}

protected def getCurrentEncoding(): String = {
spark.conf.get(SQLConf.STREAMING_STATE_STORE_ENCODING_FORMAT.key)
}
}

trait AlsoTestWithRocksDBFeatures
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ class StateSchemaCompatibilityCheckerSuite extends SharedSparkSession {
val schemaFilePath = Some(new Path(stateSchemaDir,
s"${batchId}_${UUID.randomUUID().toString}"))
val checker = new StateSchemaCompatibilityChecker(providerId, hadoopConf,
oldSchemaFilePath = schemaFilePath,
oldSchemaFilePaths = schemaFilePath.toList,
newSchemaFilePath = schemaFilePath)
checker.createSchemaFile(storeColFamilySchema,
SchemaHelper.SchemaWriter.createSchemaWriter(stateSchemaVersion))
Expand Down Expand Up @@ -397,7 +397,7 @@ class StateSchemaCompatibilityCheckerSuite extends SharedSparkSession {
val result = Try(
StateSchemaCompatibilityChecker.validateAndMaybeEvolveStateSchema(stateInfo, hadoopConf,
oldStateSchema, spark.sessionState, stateSchemaVersion = stateSchemaVersion,
oldSchemaFilePath = schemaFilePath,
oldSchemaFilePaths = schemaFilePath.toList,
newSchemaFilePath = newSchemaFilePath,
extraOptions = extraOptions)
).toEither.fold(Some(_), _ => None)
Expand All @@ -412,9 +412,9 @@ class StateSchemaCompatibilityCheckerSuite extends SharedSparkSession {
StateSchemaCompatibilityChecker.validateAndMaybeEvolveStateSchema(stateInfo, hadoopConf,
newStateSchema, spark.sessionState, stateSchemaVersion = stateSchemaVersion,
extraOptions = extraOptions,
oldSchemaFilePath = stateSchemaVersion match {
case 3 => newSchemaFilePath
case _ => None
oldSchemaFilePaths = stateSchemaVersion match {
case 3 => newSchemaFilePath.toList
case _ => List.empty
},
newSchemaFilePath = getNewSchemaPath(stateSchemaDir, stateSchemaVersion))
}
Expand Down Expand Up @@ -463,7 +463,7 @@ class StateSchemaCompatibilityCheckerSuite extends SharedSparkSession {
keyStateEncoderSpec = getKeyStateEncoderSpec(stateSchemaVersion, oldKeySchema)))
StateSchemaCompatibilityChecker.validateAndMaybeEvolveStateSchema(stateInfo, hadoopConf,
oldStateSchema, spark.sessionState, stateSchemaVersion = stateSchemaVersion,
oldSchemaFilePath = schemaFilePath,
oldSchemaFilePaths = schemaFilePath.toList,
newSchemaFilePath = getNewSchemaPath(stateSchemaDir, stateSchemaVersion),
extraOptions = extraOptions)

Expand All @@ -472,7 +472,7 @@ class StateSchemaCompatibilityCheckerSuite extends SharedSparkSession {
keyStateEncoderSpec = getKeyStateEncoderSpec(stateSchemaVersion, newKeySchema)))
StateSchemaCompatibilityChecker.validateAndMaybeEvolveStateSchema(stateInfo, hadoopConf,
newStateSchema, spark.sessionState, stateSchemaVersion = stateSchemaVersion,
oldSchemaFilePath = schemaFilePath,
oldSchemaFilePaths = schemaFilePath.toList,
newSchemaFilePath = getNewSchemaPath(stateSchemaDir, stateSchemaVersion),
extraOptions = extraOptions)
}
Expand Down
Loading

0 comments on commit 8294000

Please sign in to comment.