Skip to content

Commit

Permalink
[SS][SPIP-IN-PROGRESS][DO-NOT-MERGE] Add support for event time timer…
Browse files Browse the repository at this point in the history
…s for new transformWithState operator
  • Loading branch information
ericm-db committed Jan 2, 2024
1 parent a021be8 commit 7aa16d4
Show file tree
Hide file tree
Showing 9 changed files with 272 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ public static final TimeoutMode ProcessingTime() {
return ProcessingTime$.MODULE$;
}

/**
* Stateful processor that only registers event time timers
*/
public static final TimeoutMode EventTime() {
return EventTime$.MODULE$;
}

/**
* Stateful processor that does not register timers
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ import org.apache.spark.sql.streaming.TimeoutMode
/** Types of timeouts used in tranformWithState operator */
case object NoTimeouts extends TimeoutMode
case object ProcessingTime extends TimeoutMode
case object EventTime extends TimeoutMode
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,22 @@ trait StatefulProcessor[K, I, O] extends Serializable {
Iterator.empty
}

/**
* Function that will allow users to handle event time timers for a given grouping key.
*
* @param key - grouping key
* @param expiryTimestampMs - registered expiry timestamp for the timer
* @param timerValues - instance of TimerValues that provides access to current processing/event
* time if available
* @return - Zero or more output rows
*/
def handleEventTimeTimers(
key: K,
expiryTimestampMs: Long,
timerValues: TimerValues): Iterator[O] = {
Iterator.empty
}

/**
* Function called as the last method that allows for users to perform
* any cleanup or teardown operations.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,19 @@ trait StatefulProcessorHandle extends Serializable {
* @param expiryTimestampMs - timer expiry timestamp in milliseconds
*/
def deleteProcessingTimeTimer(expiryTimestampMs: Long): Unit

/**
* Function to register a event time timer for given implicit key
*
* @param expiryTimestampMs - timer expiry timestamp in milliseconds
*/
def registerEventTimeTimer(expiryTimestampMs: Long): Unit

/**
* Function to delete a event time timer for implicit key and given
* timestamp
*
* @param expiryTimestampMs - timer expiry timestamp in milliseconds
*/
def deleteEventTimeTimer(expiryTimestampMs: Long): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ class StatefulProcessorHandleImpl(
private lazy val procTimers =
getTimerState[Boolean](TimerStateUtils.PROC_TIMERS_STATE_NAME)

private lazy val eventTimers =
getTimerState[Boolean](TimerStateUtils.EVENT_TIMERS_STATE_NAME)

override def registerProcessingTimeTimer(expiryTimestampMs: Long): Unit = {
verify(timeoutMode == ProcessingTime, s"Cannot register processing time " +
"timers with incorrect TimeoutMode")
Expand Down Expand Up @@ -166,4 +169,45 @@ class StatefulProcessorHandleImpl(
procTimers.remove(expiryTimestampMs)
}
}

/**
* Function to register a event time timer for given implicit key
*
* @param expiryTimestampMs - timer expiry timestamp in milliseconds
*/
override def registerEventTimeTimer(expiryTimestampMs: Long): Unit = {
verify(timeoutMode == EventTime, s"Cannot register event time " +
"timers with incorrect TimeoutMode")
verify(currState == INITIALIZED || currState == DATA_PROCESSED,
s"Cannot register event time timer with " +
s"expiryTimestampMs=$expiryTimestampMs in current state=$currState")

if (eventTimers.exists(expiryTimestampMs)) {
logWarning(s"Timer already exists for expiryTimestampMs=$expiryTimestampMs")
} else {
logInfo(s"Registering timer with expiryTimestampMs=$expiryTimestampMs")
eventTimers.add(expiryTimestampMs, true)
}
}

/**
* Function to delete a event time timer for implicit key and given
* timestamp
*
* @param expiryTimestampMs - timer expiry timestamp in milliseconds
*/
override def deleteEventTimeTimer(expiryTimestampMs: Long): Unit = {
verify(timeoutMode == EventTime, s"Cannot delete event time " +
"timers with incorrect TimeoutMode")
verify(currState == INITIALIZED || currState == DATA_PROCESSED,
s"Cannot delete event time timer with " +
s"expiryTimestampMs=$expiryTimestampMs in current state=$currState")

if (!eventTimers.exists(expiryTimestampMs)) {
logInfo(s"Timer does not exist for expiryTimestampMs=$expiryTimestampMs")
} else {
logInfo(s"Removing timer with expiryTimestampMs=$expiryTimestampMs")
eventTimers.remove(expiryTimestampMs)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ object TimerStateUtils {
expiryTimestampMs: Long) extends Serializable

val PROC_TIMERS_STATE_NAME = "_procTimers"
val EVENT_TIMERS_STATE_NAME = "_eventTimers"
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ case class TransformWithStateExec(
// TODO: check if we can return true only if actual timers are registered
case ProcessingTime =>
true

case EventTime =>
true
case _ =>
false
}
Expand Down Expand Up @@ -105,13 +106,30 @@ case class TransformWithStateExec(

if (tsWithKey.expiryTimestampMs < currTimestampMs) {
ImplicitKeyTracker.setImplicitKey(tsWithKey.key)
val mappedIterator = statefulProcessor.handleProcessingTimeTimers(tsWithKey.key,
tsWithKey.expiryTimestampMs,
new TimerValuesImpl(batchTimestampMs, eventTimeWatermarkForLateEvents)).map { obj =>
getOutputRow(obj)
}
ImplicitKeyTracker.removeImplicitKey()
store.remove(keyRow, TimerStateUtils.PROC_TIMERS_STATE_NAME)

val mappedIterator =
timeoutMode match {
case ProcessingTime =>
val mi = statefulProcessor.handleProcessingTimeTimers(tsWithKey.key,
tsWithKey.expiryTimestampMs,
new TimerValuesImpl(batchTimestampMs, eventTimeWatermarkForLateEvents)).map {
obj =>
getOutputRow(obj)
}
ImplicitKeyTracker.removeImplicitKey()
store.remove(keyRow, TimerStateUtils.PROC_TIMERS_STATE_NAME)
mi
case EventTime =>
val mi = statefulProcessor.handleEventTimeTimers(tsWithKey.key,
tsWithKey.expiryTimestampMs,
new TimerValuesImpl(batchTimestampMs, eventTimeWatermarkForLateEvents)).map {
obj =>
getOutputRow(obj)
}
ImplicitKeyTracker.removeImplicitKey()
store.remove(keyRow, TimerStateUtils.EVENT_TIMERS_STATE_NAME)
mi
}
mappedIterator
} else {
Iterator.empty
Expand Down Expand Up @@ -161,7 +179,15 @@ case class TransformWithStateExec(
procTimeIter.flatMap { case rowPair =>
handleTimerRows(store, rowPair.key, batchTimestampMs.get)
}
case EventTime =>
assert(batchTimestampMs.isDefined)
store.createColFamilyIfAbsent(TimerStateUtils.EVENT_TIMERS_STATE_NAME, true)

val eventTimeIter = store
.iterator(TimerStateUtils.EVENT_TIMERS_STATE_NAME)
eventTimeIter.flatMap { case rowPair =>
handleTimerRows(store, rowPair.key, batchTimestampMs.get)
}
case _ => Iterator.empty
}
}
Expand Down Expand Up @@ -237,6 +263,8 @@ case class TransformWithStateExec(
timeoutMode match {
case ProcessingTime =>
require(batchTimestampMs.nonEmpty)
case EventTime =>
require(batchTimestampMs.nonEmpty)

case _ =>
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,8 @@ class RocksDB(
* @return - true if the column family is for internal use, false otherwise
*/
private def checkInternalColumnFamilies(cfName: String): Boolean = {
if (cfName == TimerStateUtils.PROC_TIMERS_STATE_NAME) {
if (cfName == TimerStateUtils.PROC_TIMERS_STATE_NAME
|| cfName == TimerStateUtils.EVENT_TIMERS_STATE_NAME) {
true
} else {
false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,81 @@ class RunningCountStatefulProcessorWithAddRemoveProcTimeTimer
}
}

// Class to verify stateful processor usage with adding event time timers
class RunningCountStatefulProcessorWithEventTimeTimer extends RunningCountStatefulProcessor {
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[(String, String)] = {
val currCount = _countState.getOption().getOrElse(0L)
if (currCount == 0 && (key == "a" || key == "c")) {
_processorHandle.registerEventTimeTimer(timerValues.getCurrentProcessingTimeInMs()
+ 5000)
}

val count = currCount + inputRows.size
if (count == 3) {
_countState.remove()
Iterator.empty
} else {
_countState.update(count)
Iterator((key, count.toString))
}
}

override def handleEventTimeTimers(
key: String,
expiryTimestampMs: Long,
timerValues: TimerValues): Iterator[(String, String)] = {
_countState.remove()
Iterator((key, "-1"))
}
}

// Class to verify stateful processor usage with adding/deleting processing time timers
class RunningCountStatefulProcessorWithAddRemoveEventTimeTimer
extends RunningCountStatefulProcessor {
@transient private var _timerState: ValueState[Long] = _

override def init(
handle: StatefulProcessorHandle,
outputMode: OutputMode) : Unit = {
super.init(handle, outputMode)
_timerState = _processorHandle.getValueState[Long]("timerState")
}

override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[(String, String)] = {
val currCount = _countState.getOption().getOrElse(0L)
val count = currCount + inputRows.size
_countState.update(count)
if (key == "a") {
var nextTimerTs: Long = 0L
if (currCount == 0) {
nextTimerTs = timerValues.getCurrentWatermarkInMs() + 5000
_processorHandle.registerEventTimeTimer(nextTimerTs)
_timerState.update(nextTimerTs)
} else if (currCount == 1) {
_processorHandle.deleteEventTimeTimer(_timerState.get())
nextTimerTs = timerValues.getCurrentWatermarkInMs() + 7500
_processorHandle.registerEventTimeTimer(nextTimerTs)
_timerState.update(nextTimerTs)
}
}
Iterator((key, count.toString))
}

override def handleEventTimeTimers(
key: String,
expiryTimestampMs: Long,
timerValues: TimerValues): Iterator[(String, String)] = {
_timerState.remove()
Iterator((key, "-1"))
}
}

// Class to verify incorrect usage of stateful processor
class RunningCountStatefulProcessorWithError extends RunningCountStatefulProcessor {
@transient private var _tempState: ValueState[Long] = _
Expand Down Expand Up @@ -276,6 +351,81 @@ class TransformWithStateSuite extends StateStoreMetricsTest
)
}
}

test("transformWithState - streaming with rocksdb and event time timer " +
"should succeed") {
withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBStateStoreProvider].getName) {
val clock = new StreamManualClock

val inputData = MemoryStream[String]
val result = inputData.toDS()
.groupByKey(x => x)
.transformWithState(new RunningCountStatefulProcessorWithEventTimeTimer(),
TimeoutMode.EventTime(),
OutputMode.Update())

testStream(result, OutputMode.Update())(
StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock),
AddData(inputData, "a"),
AdvanceManualClock(1 * 1000),
CheckNewAnswer(("a", "1")),

AddData(inputData, "b"),
AdvanceManualClock(1 * 1000),
CheckNewAnswer(("b", "1")),

AddData(inputData, "b"),
AdvanceManualClock(10 * 1000),
CheckNewAnswer(("a", "-1"), ("b", "2")),

StopStream,
StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock),
AddData(inputData, "b"),
AddData(inputData, "c"),
AdvanceManualClock(1 * 1000),
CheckNewAnswer(("c", "1")),
AddData(inputData, "d"),
AdvanceManualClock(10 * 1000),
CheckNewAnswer(("c", "-1"), ("d", "1")),
StopStream
)
}
}

test("transformWithState - streaming with rocksdb and event time timer " +
"and add/remove timers should succeed") {
withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBStateStoreProvider].getName) {
val clock = new StreamManualClock

val inputData = MemoryStream[String]
val result = inputData.toDS()
.groupByKey(x => x)
.transformWithState(
new RunningCountStatefulProcessorWithAddRemoveEventTimeTimer(),
TimeoutMode.EventTime(),
OutputMode.Update())

testStream(result, OutputMode.Update())(
StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock),
AddData(inputData, "a"),
AdvanceManualClock(1 * 1000),
CheckNewAnswer(("a", "1")),

AddData(inputData, "a"),
AdvanceManualClock(2 * 1000),
CheckNewAnswer(("a", "2")),
StopStream,

StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock),
AddData(inputData, "d"),
AdvanceManualClock(10 * 1000),
CheckNewAnswer(("a", "-1"), ("d", "1")),
StopStream
)
}
}
}

class TransformWithStateValidationSuite extends StateStoreMetricsTest {
Expand Down

0 comments on commit 7aa16d4

Please sign in to comment.