Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
ericm-db committed Jan 6, 2025
1 parent 91cc364 commit e95ef00
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,11 @@ trait AlsoTestWithEncodingTypes extends SQLTestUtils {
}
}

// New method for Avro-only tests
protected def testWithAvroOnly(testName: String, testTags: Tag*)(testBody: => Any)
(implicit pos: Position): Unit = {
super.test(s"$testName (encoding = avro)", testTags: _*) {
withSQLConf(SQLConf.STREAMING_STATE_STORE_ENCODING_FORMAT.key -> "avro") {
// Helper method to test with specific encoding
protected def testWithEncoding(encoding: String)(testName: String)(testBody: => Any)
(implicit pos: Position): Unit = {
super.test(s"$testName (encoding = $encoding)", Seq.empty: _*) {
withSQLConf(SQLConf.STREAMING_STATE_STORE_ENCODING_FORMAT.key -> encoding) {
testBody
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -854,7 +854,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest
}
}

testWithAvroOnly("transformWithState - upcasting should succeed") {
testWithEncoding("avro")("transformWithState - upcasting should succeed") {
withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBStateStoreProvider].getName,
SQLConf.SHUFFLE_PARTITIONS.key ->
Expand Down Expand Up @@ -911,7 +911,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest
}
}

testWithAvroOnly("transformWithState - reordering fields should succeed") {
testWithEncoding("avro")("transformWithState - reordering fields should succeed") {
withSQLConf(
SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBStateStoreProvider].getName,
Expand Down Expand Up @@ -952,7 +952,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest
}
}

testWithAvroOnly("transformWithState - adding field should succeed") {
testWithEncoding("avro")("transformWithState - adding field should succeed") {
withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBStateStoreProvider].getName,
SQLConf.SHUFFLE_PARTITIONS.key ->
Expand Down Expand Up @@ -1007,7 +1007,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest
}
}

testWithAvroOnly("transformWithState - add and remove field between runs") {
testWithEncoding("avro")("transformWithState - add and remove field between runs") {
withSQLConf(
SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName,
SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
Expand Down Expand Up @@ -1049,7 +1049,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest
}
}

testWithAvroOnly("transformWithState - verify default values during schema evolution") {
testWithEncoding("avro")("transformWithState - verify default values during schema evolution") {
withSQLConf(
SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName,
SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
Expand Down Expand Up @@ -1109,7 +1109,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest
}
}

testWithAvroOnly("transformWithState - removing field should succeed") {
testWithEncoding("avro")("transformWithState - removing field should succeed") {
withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBStateStoreProvider].getName,
SQLConf.SHUFFLE_PARTITIONS.key ->
Expand Down Expand Up @@ -1713,7 +1713,8 @@ class TransformWithStateSuite extends StateStoreMetricsTest
}
}

test("test that invalid schema evolution fails query for column family") {
testWithEncoding("unsaferow")("test that invalid schema evolution " +
"fails query for column family") {
withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBStateStoreProvider].getName,
SQLConf.SHUFFLE_PARTITIONS.key ->
Expand Down Expand Up @@ -1741,7 +1742,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest
testStream(result2, OutputMode.Update())(
StartStream(checkpointLocation = checkpointDir.getCanonicalPath),
AddData(inputData, "a"),
ExpectFailure[StateStoreInvalidValueSchemaEvolution] {
ExpectFailure[StateStoreValueSchemaNotCompatible] {
(t: Throwable) => {
assert(t.getMessage.contains("Please check number and type of fields."))
}
Expand All @@ -1751,6 +1752,46 @@ class TransformWithStateSuite extends StateStoreMetricsTest
}
}

testWithEncoding("avro")("test that invalid schema evolution " +
"fails query for column family") {
withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBStateStoreProvider].getName,
SQLConf.SHUFFLE_PARTITIONS.key ->
TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString) {
withTempDir { checkpointDir =>
val inputData = MemoryStream[String]
val result1 = inputData.toDS()
.groupByKey(x => x)
.transformWithState(new RunningCountStatefulProcessor(),
TimeMode.None(),
OutputMode.Update())

testStream(result1, OutputMode.Update())(
StartStream(checkpointLocation = checkpointDir.getCanonicalPath),
AddData(inputData, "a"),
CheckNewAnswer(("a", "1")),
StopStream
)
val result2 = inputData.toDS()
.groupByKey(x => x)
.transformWithState(new RunningCountStatefulProcessorInt(),
TimeMode.None(),
OutputMode.Update())

testStream(result2, OutputMode.Update())(
StartStream(checkpointLocation = checkpointDir.getCanonicalPath),
AddData(inputData, "a"),
ExpectFailure[StateStoreInvalidValueSchemaEvolution] {
(t: Throwable) => {
assert(t.getMessage.contains(
"Schema evolution is not possible with existing value_schema"))
}
}
)
}
}
}

test("test that different outputMode after query restart fails") {
withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBStateStoreProvider].getName,
Expand Down Expand Up @@ -1883,7 +1924,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest
}
}

test("test that introducing TTL after restart fails query") {
testWithEncoding("unsaferow")("test that introducing TTL after restart fails query") {
withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBStateStoreProvider].getName,
SQLConf.SHUFFLE_PARTITIONS.key ->
Expand Down Expand Up @@ -1937,6 +1978,60 @@ class TransformWithStateSuite extends StateStoreMetricsTest
}
}

testWithEncoding("avro")("test that introducing TTL after restart fails query") {
withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBStateStoreProvider].getName,
SQLConf.SHUFFLE_PARTITIONS.key ->
TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString) {
withTempDir { checkpointDir =>
val inputData = MemoryStream[String]
val clock = new StreamManualClock
val result = inputData.toDS()
.groupByKey(x => x)
.transformWithState(new RunningCountStatefulProcessor(),
TimeMode.ProcessingTime(),
OutputMode.Update())

testStream(result, OutputMode.Update())(
StartStream(
trigger = Trigger.ProcessingTime("1 second"),
checkpointLocation = checkpointDir.getCanonicalPath,
triggerClock = clock),
AddData(inputData, "a"),
AdvanceManualClock(1 * 1000),
CheckNewAnswer(("a", "1")),
AdvanceManualClock(1 * 1000),
StopStream
)
val result2 = inputData.toDS()
.groupByKey(x => x)
.transformWithState(new RunningCountStatefulProcessorWithTTL(),
TimeMode.ProcessingTime(),
OutputMode.Update())
testStream(result2, OutputMode.Update())(
StartStream(
trigger = Trigger.ProcessingTime("1 second"),
checkpointLocation = checkpointDir.getCanonicalPath,
triggerClock = clock),
AddData(inputData, "a"),
AdvanceManualClock(1 * 1000),
ExpectFailure[StateStoreInvalidValueSchemaEvolution] { t =>
checkError(
t.asInstanceOf[SparkUnsupportedOperationException],
condition = "STATE_STORE_INVALID_VALUE_SCHEMA_EVOLUTION",
parameters = Map(
"storedValueSchema" -> "StructType(StructField(value,LongType,false))",
"newValueSchema" ->
("StructType(StructField(value,StructType(StructField(value,LongType,false))," +
"true),StructField(ttlExpirationMs,LongType,true))")
)
)
}
)
}
}
}

test("test query restart with new state variable succeeds") {
withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBStateStoreProvider].getName,
Expand Down Expand Up @@ -2068,7 +2163,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest
new Path(stateCheckpointPath, "_stateSchema/default/")
}

test("transformWithState - verify that metadata and schema logs are purged") {
ignore("transformWithState - verify that metadata and schema logs are purged") {
withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBStateStoreProvider].getName,
SQLConf.SHUFFLE_PARTITIONS.key ->
Expand Down

0 comments on commit e95ef00

Please sign in to comment.