Skip to content

Commit

Permalink
adding to StatefulProcessorHandleSuite
Browse files Browse the repository at this point in the history
  • Loading branch information
ericm-db committed Jan 2, 2024
1 parent 7aa16d4 commit 26528e8
Showing 1 changed file with 83 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.UUID
import scala.util.Random

import org.apache.hadoop.conf.Configuration
import org.scalatest.BeforeAndAfter
import org.scalatest.{BeforeAndAfter, Tag}

import org.apache.spark.sql.execution.streaming.{ImplicitKeyTracker, StatefulProcessorHandleImpl, StatefulProcessorHandleState}
import org.apache.spark.sql.internal.SQLConf
Expand All @@ -46,6 +46,12 @@ class StatefulProcessorHandleSuite extends SharedSparkSession
StateStore.stop()
require(!StateStore.isMaintenanceRunning)
}
protected def gridTest[A](testNamePrefix: String, testTags: Tag*)(params: Seq[A])(
testFun: A => Unit): Unit = {
for (param <- params) {
test(testNamePrefix + s" ($param)", testTags: _*)(testFun(param))
}
}

import StateStoreTestsHelper._

Expand All @@ -54,7 +60,7 @@ class StatefulProcessorHandleSuite extends SharedSparkSession
val schemaForValueRow: StructType = new StructType().add("value", BinaryType)

private def newStoreProviderWithHandle(useColumnFamilies: Boolean):
RocksDBStateStoreProvider = {
RocksDBStateStoreProvider = {
newStoreProviderWithHandle(StateStoreId(newDir(), Random.nextInt(), 0),
numColsPrefixKey = 0,
useColumnFamilies = useColumnFamilies)
Expand Down Expand Up @@ -94,11 +100,12 @@ class StatefulProcessorHandleSuite extends SharedSparkSession
}
}

test("value state creation with processing time timeout should succeed") {
gridTest("value state creation with timeout should succeed for timer mode")(
Seq(TimeoutMode.ProcessingTime(), TimeoutMode.EventTime())) { timeoutMode =>
tryWithProviderResource(newStoreProviderWithHandle(true)) { provider =>
val store = provider.getStore(0)
val handle = new StatefulProcessorHandleImpl(store,
UUID.randomUUID(), TimeoutMode.ProcessingTime())
UUID.randomUUID(), timeoutMode)
assert(handle.getQueryInfo().getPartitionId === 0)
assert(handle.getHandleState === StatefulProcessorHandleState.CREATED)
handle.getValueState[Long]("testState")
Expand All @@ -120,17 +127,20 @@ class StatefulProcessorHandleSuite extends SharedSparkSession
private def createValueStateInstance(handle: StatefulProcessorHandleImpl): Unit = {
handle.getValueState[Long]("testState")
}

private def registerTimer(handle: StatefulProcessorHandleImpl): Unit = {
private def registerProcessingTimeTimer(handle: StatefulProcessorHandleImpl): Unit = {
handle.registerProcessingTimeTimer(1000L)
}

test("value state creation with processing time " +
"timeout and invalid state should fail") {
private def registerEventTimeTimer(handle: StatefulProcessorHandleImpl): Unit = {
handle.registerEventTimeTimer(1000L)
}

gridTest("value state creation with invalid state should fail for timeout mode: ")(
Seq(TimeoutMode.ProcessingTime(), TimeoutMode.EventTime())) { timeoutMode =>
tryWithProviderResource(newStoreProviderWithHandle(true)) { provider =>
val store = provider.getStore(0)
val handle = new StatefulProcessorHandleImpl(store,
UUID.randomUUID(), TimeoutMode.ProcessingTime())
UUID.randomUUID(), timeoutMode)

verifyInvalidOperation(handle, StatefulProcessorHandleState.INITIALIZED,
"Cannot create state variable") { handle =>
Expand Down Expand Up @@ -200,17 +210,78 @@ class StatefulProcessorHandleSuite extends SharedSparkSession

verifyInvalidOperation(handle, StatefulProcessorHandleState.CREATED,
"Cannot register processing time timer") { handle =>
registerTimer(handle)
registerProcessingTimeTimer(handle)
}

verifyInvalidOperation(handle, StatefulProcessorHandleState.TIMER_PROCESSED,
"Cannot register processing time timer") { handle =>
registerTimer(handle)
registerProcessingTimeTimer(handle)
}

verifyInvalidOperation(handle, StatefulProcessorHandleState.CLOSED,
"Cannot register processing time timer") { handle =>
registerTimer(handle)
registerProcessingTimeTimer(handle)
}
}
}

test("registering event time timeouts with NoTimeout mode should fail") {
tryWithProviderResource(newStoreProviderWithHandle(true)) { provider =>
val store = provider.getStore(0)
val handle = new StatefulProcessorHandleImpl(store,
UUID.randomUUID(), TimeoutMode.NoTimeouts())
assert(handle.getQueryInfo().getPartitionId === 0)
val ex = intercept[Exception] {
handle.registerEventTimeTimer(10000L)
}
assert(ex.getMessage.contains("Cannot register event time timers"))

val ex2 = intercept[Exception] {
handle.deleteEventTimeTimer(10000L)
}
assert(ex2.getMessage.contains("Cannot delete event time timers"))
}
}

test("registering event time timeouts with EventTime mode should succeed") {
tryWithProviderResource(newStoreProviderWithHandle(true)) { provider =>
val store = provider.getStore(0)
val handle = new StatefulProcessorHandleImpl(store,
UUID.randomUUID(), TimeoutMode.EventTime())
assert(handle.getQueryInfo().getPartitionId === 0)
handle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
assert(handle.getHandleState === StatefulProcessorHandleState.INITIALIZED)

ImplicitKeyTracker.setImplicitKey("test_key")
assert(ImplicitKeyTracker.getImplicitKeyOption.isDefined)

handle.registerEventTimeTimer(10000L)
handle.deleteEventTimeTimer(10000L)

ImplicitKeyTracker.removeImplicitKey()
assert(ImplicitKeyTracker.getImplicitKeyOption.isEmpty)
}
}

test("registering event time timeouts with invalid state should fail") {
tryWithProviderResource(newStoreProviderWithHandle(true)) { provider =>
val store = provider.getStore(0)
val handle = new StatefulProcessorHandleImpl(store,
UUID.randomUUID(), TimeoutMode.EventTime())

verifyInvalidOperation(handle, StatefulProcessorHandleState.CREATED,
"Cannot register event time timer") { handle =>
registerEventTimeTimer(handle)
}

verifyInvalidOperation(handle, StatefulProcessorHandleState.TIMER_PROCESSED,
"Cannot register event time timer") { handle =>
registerEventTimeTimer(handle)
}

verifyInvalidOperation(handle, StatefulProcessorHandleState.CLOSED,
"Cannot register event time timer") { handle =>
registerEventTimeTimer(handle)
}
}
}
Expand Down

0 comments on commit 26528e8

Please sign in to comment.