Skip to content

Commit

Permalink
exception
Browse files Browse the repository at this point in the history
  • Loading branch information
ericm-db committed Jan 6, 2025
1 parent e95ef00 commit ef279f3
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.streaming.state
import scala.jdk.CollectionConverters.IterableHasAsJava
import scala.util.Try

import org.apache.avro.{SchemaValidationException, SchemaValidatorBuilder}
import org.apache.avro.SchemaValidatorBuilder
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataInputStream, Path}

Expand Down Expand Up @@ -205,16 +205,7 @@ class StateSchemaCompatibilityChecker(
val newAvroSchema = SchemaConverters.toAvroTypeWithDefaults(valueSchema)

val validator = new SchemaValidatorBuilder().canReadStrategy.validateAll()
try {
validator.validate(newAvroSchema, oldAvroSchemas)
} catch {
case _: SchemaValidationException =>
throw StateStoreErrors.stateStoreInvalidValueSchemaEvolution(
storedValueSchema.toString,
valueSchema.toString)
case e: Throwable =>
throw e
}
validator.validate(newAvroSchema, oldAvroSchemas)

// Schema evolved - increment value schema ID
(resultSchema.copy(valueSchemaId = incrementSchemaId(mostRecentSchema.valueSchemaId)), true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.File
import java.time.Duration
import java.util.UUID

import org.apache.avro.SchemaValidationException
import org.apache.hadoop.fs.{FileStatus, Path}
import org.scalatest.matchers.must.Matchers.be
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper
Expand All @@ -33,7 +34,7 @@ import org.apache.spark.sql.catalyst.util.stringToFile
import org.apache.spark.sql.execution.datasources.v2.state.StateSourceOptions
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.state.{StateStoreInvalidValueSchemaEvolution, _}
import org.apache.spark.sql.execution.streaming.state._
import org.apache.spark.sql.functions.timestamp_seconds
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.util.StreamManualClock
Expand Down Expand Up @@ -129,6 +130,75 @@ class DefaultValueEvolvedProcessor
}
}

// First schema - String field
case class StateV1(value1: String)
class ProcessorV1 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV1] = _

override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV1](
"testState",
Encoders.product[StateV1],
TTLConfig.NONE)
}

override def handleInputRows(
key: String,
rows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state.update(StateV1(value))
value
}
}
}

// Second schema - Long field
case class StateV2(value2: Long)
class ProcessorV2 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV2] = _

override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV2](
"testState",
Encoders.product[StateV2],
TTLConfig.NONE)
}

override def handleInputRows(
key: String,
rows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state.update(StateV2(value.length))
value
}
}
}

// Third schema - Int field (incompatible with previous Long)
case class StateV3(value1: Int)
class ProcessorV3 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV3] = _

override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV3](
"testState",
Encoders.product[StateV3],
TTLConfig.NONE)
}

override def handleInputRows(
key: String,
rows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state.update(StateV3(value.length))
value
}
}
}

class RunningCountStatefulProcessorInitialOrder
extends StatefulProcessor[String, String, (String, String)] with Logging {
@transient protected var _countState: ValueState[TwoLongs] = _
Expand Down Expand Up @@ -1781,10 +1851,10 @@ class TransformWithStateSuite extends StateStoreMetricsTest
testStream(result2, OutputMode.Update())(
StartStream(checkpointLocation = checkpointDir.getCanonicalPath),
AddData(inputData, "a"),
ExpectFailure[StateStoreInvalidValueSchemaEvolution] {
ExpectFailure[SchemaValidationException] {
(t: Throwable) => {
assert(t.getMessage.contains(
"Schema evolution is not possible with existing value_schema"))
"Unable to read schema:"))
}
}
)
Expand Down Expand Up @@ -2015,16 +2085,10 @@ class TransformWithStateSuite extends StateStoreMetricsTest
triggerClock = clock),
AddData(inputData, "a"),
AdvanceManualClock(1 * 1000),
ExpectFailure[StateStoreInvalidValueSchemaEvolution] { t =>
ExpectFailure[SchemaValidationException] { t =>
checkError(
t.asInstanceOf[SparkUnsupportedOperationException],
condition = "STATE_STORE_INVALID_VALUE_SCHEMA_EVOLUTION",
parameters = Map(
"storedValueSchema" -> "StructType(StructField(value,LongType,false))",
"newValueSchema" ->
("StructType(StructField(value,StructType(StructField(value,LongType,false))," +
"true),StructField(ttlExpirationMs,LongType,true))")
)
condition = "STATE_STORE_INVALID_VALUE_SCHEMA_EVOLUTION"
)
}
)
Expand Down Expand Up @@ -2642,6 +2706,60 @@ class TransformWithStateSuite extends StateStoreMetricsTest
}
}
}

testWithEncoding("avro")("transformWithState - incompatible schema evolution should fail") {
withSQLConf(
SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName,
SQLConf.STREAMING_STATE_STORE_ENCODING_FORMAT.key -> "avro",
SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
withTempDir { dir =>
val inputData = MemoryStream[String]

// First run with String field
val result1 = inputData.toDS()
.groupByKey(x => x)
.transformWithState(new ProcessorV1(),
TimeMode.None(),
OutputMode.Update())

testStream(result1, OutputMode.Update())(
StartStream(checkpointLocation = dir.getCanonicalPath),
AddData(inputData, "test1"),
CheckNewAnswer("test1"),
StopStream
)

// Second run with Long field
val result2 = inputData.toDS()
.groupByKey(x => x)
.transformWithState(new ProcessorV2(),
TimeMode.None(),
OutputMode.Update())

testStream(result2, OutputMode.Update())(
StartStream(checkpointLocation = dir.getCanonicalPath),
AddData(inputData, "test2"),
CheckNewAnswer("test2"),
StopStream
)

// Third run with Int field - should fail
val result3 = inputData.toDS()
.groupByKey(x => x)
.transformWithState(new ProcessorV3(),
TimeMode.None(),
OutputMode.Update())

testStream(result3, OutputMode.Update())(
StartStream(checkpointLocation = dir.getCanonicalPath),
AddData(inputData, "test3"),
ExpectFailure[SchemaValidationException] { error =>
assert(error.getMessage.contains("Unable to read schema:"))
}
)
}
}
}
}

class TransformWithStateValidationSuite extends StateStoreMetricsTest {
Expand Down

0 comments on commit ef279f3

Please sign in to comment.