Skip to content

Commit

Permalink
string, timestamp -> string, long
Browse files Browse the repository at this point in the history
  • Loading branch information
ericm-db committed Jan 8, 2024
1 parent 6bf23f4 commit 27bd69a
Showing 1 changed file with 40 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@

package org.apache.spark.sql.streaming

import java.sql.Timestamp
import java.{util => ju}
import java.text.SimpleDateFormat

import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, SaveMode}
import org.apache.spark.sql.catalyst.util.DateTimeTestUtils.UTC
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.state.{AlsoTestWithChangelogCheckpointingEnabled, RocksDBStateStoreProvider}
import org.apache.spark.sql.functions.timestamp_seconds
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.util.StreamManualClock

Expand Down Expand Up @@ -143,7 +145,7 @@ class RunningCountStatefulProcessorWithAddRemoveProcTimeTimer

// Class to verify stateful processor usage with adding event time timers
class RunningCountStatefulProcessorWithEventTimeTimer
extends StatefulProcessor[String, (String, Timestamp), (String, String)] {
extends StatefulProcessor[String, (String, Long), (String, String)] {

@transient var _countState: ValueState[Long] = _
@transient var _processorHandle: StatefulProcessorHandle = _
Expand All @@ -160,7 +162,7 @@ class RunningCountStatefulProcessorWithEventTimeTimer

override def handleInputRow(
key: String,
inputRow: (String, Timestamp),
inputRow: (String, Long),
timerValues: TimerValues): Iterator[(String, String)] = {
val currCount = _countState.getOption().getOrElse(0L)
if (currCount == 0 && (key == "a" || key == "c")) {
Expand Down Expand Up @@ -210,7 +212,7 @@ class RunningCountStatefulProcessorWithAddRemoveEventTimeTimer

override def handleInputRow(
key: String,
inputRow: (String, Timestamp),
inputRow: (String, Long),
timerValues: TimerValues): Iterator[(String, String)] = {
val currCount = _countState.getOption().getOrElse(0L)
val count = currCount + 1
Expand Down Expand Up @@ -385,36 +387,38 @@ class TransformWithStateSuite extends StateStoreMetricsTest
withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBStateStoreProvider].getName) {

val inputData = MemoryStream[(String, Timestamp)]
val inputData = MemoryStream[(String, Int)]
val result = inputData.toDS()
.select($"_1".as("value"), $"_2".as("eventTime"))
.select($"_1".as("value"), $"_2")
.withColumn("eventTime", timestamp_seconds($"_2"))
.withWatermark("eventTime", "1 second")
.as[(String, Timestamp)]
.select("value", "eventTime")
.as[(String, Long)]
.groupByKey(x => x._1)
.transformWithState(new RunningCountStatefulProcessorWithEventTimeTimer(),
TimeoutMode.EventTime(),
OutputMode.Update())

testStream(result, OutputMode.Update())(
StartStream(),
AddData(inputData, ("a", Timestamp.valueOf("2023-08-01 00:00:00"))),
AddData(inputData, ("a", 10)),

AddData(inputData, ("b", Timestamp.valueOf("2023-08-02 00:00:00"))),
AddData(inputData, ("b", 15)),
CheckNewAnswer(("a", "1"), ("a", "-1"), ("b", "1")),
assertWatermark(Timestamp.valueOf("1970-01-01 00:00:00")),
assertWatermark(0),

AddData(inputData, ("b", Timestamp.valueOf("2023-08-03 00:00:00"))),
AddData(inputData, ("b", 17)),
CheckNewAnswer(("b", "2")), // watermark: 3rd august, timer t1 should have fired,
assertWatermark(Timestamp.valueOf("2023-08-02 06:59:59")),
assertWatermark(14),

AddData(inputData, ("b", Timestamp.valueOf("2023-08-04 00:00:00"))),
AddData(inputData, ("c", Timestamp.valueOf("2023-08-04 00:00:00"))),
CheckNewAnswer(("c", "-1"), ("c", "1")),
assertWatermark(Timestamp.valueOf("2023-08-03 06:59:59")),
AddData(inputData, ("b", 20)),
AddData(inputData, ("c", 20)),
CheckNewAnswer(("c", "1")),
assertWatermark(16),

AddData(inputData, ("d", Timestamp.valueOf("2023-08-06 00:00:00"))),
CheckNewAnswer(("d", "1")),
assertWatermark(Timestamp.valueOf("2023-08-04 06:59:59")),
AddData(inputData, ("d", 25)),
CheckNewAnswer(("c", "-1"), ("d", "1")),
assertWatermark(19),
StopStream
)
}
Expand All @@ -427,44 +431,51 @@ class TransformWithStateSuite extends StateStoreMetricsTest
}

/** Assert event stats generated on that last batch with data in it */
def assertWatermark(wtrmark: Timestamp): AssertOnQuery = {
def assertWatermark(wtrmark: Long): AssertOnQuery = {
assertEventStats { e =>
assert(e.get("watermark") === formatTimestamp(wtrmark), s"watermark value mismatch")
}
}

private def formatTimestamp(timestamp: Timestamp): String = {
timestampFormat.format(timestamp)
private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
timestampFormat.setTimeZone(ju.TimeZone.getTimeZone(UTC))

private def formatTimestamp(sec: Long): String = {
timestampFormat.format(new ju.Date(sec * 1000))
}

private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601

test("transformWithState - streaming with rocksdb and event time timer " +
"and add/remove timers should succeed") {
withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBStateStoreProvider].getName) {
val clock = new StreamManualClock

val inputData = MemoryStream[(String, Timestamp)]
val inputData = MemoryStream[(String, Int)]
val result = inputData.toDS()
.select($"_1".as("value"), $"_2".as("eventTime"))
.select($"_1".as("value"), $"_2")
.withColumn("eventTime", timestamp_seconds($"_2"))
.withWatermark("eventTime", "1 second")
.as[(String, Timestamp)]
.select("value", "eventTime")
.as[(String, Long)]
.groupByKey(x => x._1)
.transformWithState(new RunningCountStatefulProcessorWithAddRemoveEventTimeTimer(),
TimeoutMode.EventTime(),
OutputMode.Update())

testStream(result, OutputMode.Update())(
StartStream(),
AddData(inputData, ("a", Timestamp.valueOf("2023-08-01 00:00:00"))),
AddData(inputData, ("a", 10)),
CheckNewAnswer(("a", "1"), ("a", "-1")),
assertWatermark(0),

AddData(inputData, ("a", Timestamp.valueOf("2023-08-02 00:00:00"))),
AddData(inputData, ("a", 20)),
CheckNewAnswer(("a", "2"), ("a", "-1")),
assertWatermark(9),

AddData(inputData, ("d", Timestamp.valueOf("2023-08-03 00:00:00"))),
AddData(inputData, ("d", 30)),
CheckNewAnswer(("d", "1")),
assertWatermark(19),
StopStream
)
}
Expand Down

0 comments on commit 27bd69a

Please sign in to comment.