diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala index b11429363c099..bd887db24b2e0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala @@ -17,14 +17,16 @@ package org.apache.spark.sql.streaming -import java.sql.Timestamp +import java.{util => ju} import java.text.SimpleDateFormat import org.apache.spark.SparkException import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, SaveMode} +import org.apache.spark.sql.catalyst.util.DateTimeTestUtils.UTC import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.state.{AlsoTestWithChangelogCheckpointingEnabled, RocksDBStateStoreProvider} +import org.apache.spark.sql.functions.timestamp_seconds import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.util.StreamManualClock @@ -143,7 +145,7 @@ class RunningCountStatefulProcessorWithAddRemoveProcTimeTimer // Class to verify stateful processor usage with adding event time timers class RunningCountStatefulProcessorWithEventTimeTimer - extends StatefulProcessor[String, (String, Timestamp), (String, String)] { + extends StatefulProcessor[String, (String, Long), (String, String)] { @transient var _countState: ValueState[Long] = _ @transient var _processorHandle: StatefulProcessorHandle = _ @@ -160,7 +162,7 @@ class RunningCountStatefulProcessorWithEventTimeTimer override def handleInputRow( key: String, - inputRow: (String, Timestamp), + inputRow: (String, Long), timerValues: TimerValues): Iterator[(String, String)] = { val currCount = _countState.getOption().getOrElse(0L) if (currCount == 0 && (key == "a" || key == "c")) { @@ -210,7 +212,7 @@ class RunningCountStatefulProcessorWithAddRemoveEventTimeTimer override def handleInputRow( key: String, - inputRow: (String, Timestamp), + inputRow: (String, Long), timerValues: TimerValues): Iterator[(String, String)] = { val currCount = _countState.getOption().getOrElse(0L) val count = currCount + 1 @@ -385,11 +387,13 @@ class TransformWithStateSuite extends StateStoreMetricsTest withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName) { - val inputData = MemoryStream[(String, Timestamp)] + val inputData = MemoryStream[(String, Int)] val result = inputData.toDS() - .select($"_1".as("value"), $"_2".as("eventTime")) + .select($"_1".as("value"), $"_2") + .withColumn("eventTime", timestamp_seconds($"_2")) .withWatermark("eventTime", "1 second") - .as[(String, Timestamp)] + .select("value", "eventTime") + .as[(String, Long)] .groupByKey(x => x._1) .transformWithState(new RunningCountStatefulProcessorWithEventTimeTimer(), TimeoutMode.EventTime(), @@ -397,24 +401,24 @@ class TransformWithStateSuite extends StateStoreMetricsTest testStream(result, OutputMode.Update())( StartStream(), - AddData(inputData, ("a", Timestamp.valueOf("2023-08-01 00:00:00"))), + AddData(inputData, ("a", 10)), - AddData(inputData, ("b", Timestamp.valueOf("2023-08-02 00:00:00"))), + AddData(inputData, ("b", 15)), CheckNewAnswer(("a", "1"), ("a", "-1"), ("b", "1")), - assertWatermark(Timestamp.valueOf("1970-01-01 00:00:00")), + assertWatermark(0), - AddData(inputData, ("b", Timestamp.valueOf("2023-08-03 00:00:00"))), + AddData(inputData, ("b", 17)), CheckNewAnswer(("b", "2")), // watermark: 3rd august, timer t1 should have fired, - assertWatermark(Timestamp.valueOf("2023-08-02 06:59:59")), + assertWatermark(14), - AddData(inputData, ("b", Timestamp.valueOf("2023-08-04 00:00:00"))), - AddData(inputData, ("c", Timestamp.valueOf("2023-08-04 00:00:00"))), - CheckNewAnswer(("c", "-1"), ("c", "1")), - assertWatermark(Timestamp.valueOf("2023-08-03 06:59:59")), + AddData(inputData, ("b", 20)), + AddData(inputData, ("c", 20)), + CheckNewAnswer(("c", "1")), + assertWatermark(16), - AddData(inputData, ("d", Timestamp.valueOf("2023-08-06 00:00:00"))), - CheckNewAnswer(("d", "1")), - assertWatermark(Timestamp.valueOf("2023-08-04 06:59:59")), + AddData(inputData, ("d", 25)), + CheckNewAnswer(("c", "-1"), ("d", "1")), + assertWatermark(19), StopStream ) } @@ -427,17 +431,19 @@ class TransformWithStateSuite extends StateStoreMetricsTest } /** Assert event stats generated on that last batch with data in it */ - def assertWatermark(wtrmark: Timestamp): AssertOnQuery = { + def assertWatermark(wtrmark: Long): AssertOnQuery = { assertEventStats { e => assert(e.get("watermark") === formatTimestamp(wtrmark), s"watermark value mismatch") } } - private def formatTimestamp(timestamp: Timestamp): String = { - timestampFormat.format(timestamp) + private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601 + timestampFormat.setTimeZone(ju.TimeZone.getTimeZone(UTC)) + + private def formatTimestamp(sec: Long): String = { + timestampFormat.format(new ju.Date(sec * 1000)) } - private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601 test("transformWithState - streaming with rocksdb and event time timer " + "and add/remove timers should succeed") { @@ -445,11 +451,13 @@ class TransformWithStateSuite extends StateStoreMetricsTest classOf[RocksDBStateStoreProvider].getName) { val clock = new StreamManualClock - val inputData = MemoryStream[(String, Timestamp)] + val inputData = MemoryStream[(String, Int)] val result = inputData.toDS() - .select($"_1".as("value"), $"_2".as("eventTime")) + .select($"_1".as("value"), $"_2") + .withColumn("eventTime", timestamp_seconds($"_2")) .withWatermark("eventTime", "1 second") - .as[(String, Timestamp)] + .select("value", "eventTime") + .as[(String, Long)] .groupByKey(x => x._1) .transformWithState(new RunningCountStatefulProcessorWithAddRemoveEventTimeTimer(), TimeoutMode.EventTime(), @@ -457,14 +465,17 @@ class TransformWithStateSuite extends StateStoreMetricsTest testStream(result, OutputMode.Update())( StartStream(), - AddData(inputData, ("a", Timestamp.valueOf("2023-08-01 00:00:00"))), + AddData(inputData, ("a", 10)), CheckNewAnswer(("a", "1"), ("a", "-1")), + assertWatermark(0), - AddData(inputData, ("a", Timestamp.valueOf("2023-08-02 00:00:00"))), + AddData(inputData, ("a", 20)), CheckNewAnswer(("a", "2"), ("a", "-1")), + assertWatermark(9), - AddData(inputData, ("d", Timestamp.valueOf("2023-08-03 00:00:00"))), + AddData(inputData, ("d", 30)), CheckNewAnswer(("d", "1")), + assertWatermark(19), StopStream ) }