Skip to content

Commit

Permalink
feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
ericm-db committed Jan 5, 2024
1 parent 149058c commit dc8380c
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ case class TransformWithStateExec(
case ProcessingTime =>
val mi = statefulProcessor.handleProcessingTimeTimers(tsWithKey.key,
tsWithKey.expiryTimestampMs,
new TimerValuesImpl(batchTimestampMs, eventTimeWatermarkForLateEvents)).map {
new TimerValuesImpl(batchTimestampMs, eventTimeWatermarkForEviction)).map {
obj =>
getOutputRow(obj)
}
Expand All @@ -123,8 +123,7 @@ case class TransformWithStateExec(
case EventTime =>
val mi = statefulProcessor.handleEventTimeTimers(tsWithKey.key,
tsWithKey.expiryTimestampMs,
new TimerValuesImpl(eventTimeWatermarkForEviction,
eventTimeWatermarkForLateEvents)).map {
new TimerValuesImpl(batchTimestampMs, eventTimeWatermarkForEviction)).map {
obj =>
getOutputRow(obj)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ class StatefulProcessorHandleSuite extends SharedSparkSession
val schemaForValueRow: StructType = new StructType().add("value", BinaryType)

private def newStoreProviderWithHandle(useColumnFamilies: Boolean):
RocksDBStateStoreProvider = {
newStoreProviderWithHandle(StateStoreId(newDir(), Random.nextInt(), 0),
numColsPrefixKey = 0,
useColumnFamilies = useColumnFamilies)
RocksDBStateStoreProvider = {
newStoreProviderWithHandle(StateStoreId(newDir(), Random.nextInt(), 0),
numColsPrefixKey = 0,
useColumnFamilies = useColumnFamilies)
}

private def newStoreProviderWithHandle(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,6 @@ class TransformWithStateSuite extends StateStoreMetricsTest
TimeoutMode.EventTime(),
OutputMode.Update())


testStream(result, OutputMode.Update())(
StartStream(),
AddData(inputData, ("a", Timestamp.valueOf("2023-08-01 00:00:00"))),
Expand Down

0 comments on commit dc8380c

Please sign in to comment.