Skip to content

Commit

Permalink
[SPARK-26428][SS][TEST] Minimize deprecated ProcessingTime usage
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Use of `ProcessingTime` class was deprecated in favor of `Trigger.ProcessingTime` in Spark 2.2. And, [SPARK-21464](https://issues.apache.org/jira/browse/SPARK-21464) minimized it at 2.2.1. Recently, it grows again in test suites. This PR aims to clean up newly introduced deprecation warnings for Spark 3.0.

## How was this patch tested?

Pass the Jenkins with existing tests and manually check the warnings.

Closes apache#23367 from dongjoon-hyun/SPARK-26428.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
dongjoon-hyun committed Dec 22, 2018
1 parent 81addaa commit ceff0c8
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.spark.sql.functions.{count, window}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest}
import org.apache.spark.sql.streaming.{StreamTest, Trigger}
import org.apache.spark.sql.streaming.util.StreamManualClock
import org.apache.spark.sql.test.SharedSQLContext

Expand Down Expand Up @@ -236,7 +236,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
}

testStream(mapped)(
StartStream(ProcessingTime(100), clock),
StartStream(Trigger.ProcessingTime(100), clock),
waitUntilBatchProcessed,
// 1 from smallest, 1 from middle, 8 from biggest
CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107),
Expand All @@ -247,7 +247,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
11, 108, 109, 110, 111, 112, 113, 114, 115, 116
),
StopStream,
StartStream(ProcessingTime(100), clock),
StartStream(Trigger.ProcessingTime(100), clock),
waitUntilBatchProcessed,
// smallest now empty, 1 more from middle, 9 more from biggest
CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
Expand Down Expand Up @@ -282,7 +282,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {

val mapped = kafka.map(kv => kv._2.toInt + 1)
testStream(mapped)(
StartStream(trigger = ProcessingTime(1)),
StartStream(trigger = Trigger.ProcessingTime(1)),
makeSureGetOffsetCalled,
AddKafkaData(Set(topic), 1, 2, 3),
CheckAnswer(2, 3, 4),
Expand Down Expand Up @@ -605,7 +605,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
}

testStream(kafka)(
StartStream(ProcessingTime(100), clock),
StartStream(Trigger.ProcessingTime(100), clock),
waitUntilBatchProcessed,
// 5 from smaller topic, 5 from bigger one
CheckLastBatch((0 to 4) ++ (100 to 104): _*),
Expand All @@ -618,7 +618,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
// smaller topic empty, 5 from bigger one
CheckLastBatch(110 to 114: _*),
StopStream,
StartStream(ProcessingTime(100), clock),
StartStream(Trigger.ProcessingTime(100), clock),
waitUntilBatchProcessed,
// smallest now empty, 5 from bigger one
CheckLastBatch(115 to 119: _*),
Expand Down Expand Up @@ -727,7 +727,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
// The message values are the same as their offsets to make the test easy to follow
testUtils.withTranscationalProducer { producer =>
testStream(mapped)(
StartStream(ProcessingTime(100), clock),
StartStream(Trigger.ProcessingTime(100), clock),
waitUntilBatchProcessed,
CheckAnswer(),
WithOffsetSync(topicPartition, expectedOffset = 5) { () =>
Expand Down Expand Up @@ -850,7 +850,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
// The message values are the same as their offsets to make the test easy to follow
testUtils.withTranscationalProducer { producer =>
testStream(mapped)(
StartStream(ProcessingTime(100), clock),
StartStream(Trigger.ProcessingTime(100), clock),
waitUntilBatchProcessed,
CheckNewAnswer(),
WithOffsetSync(topicPartition, expectedOffset = 5) { () =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1360,7 +1360,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
options = srcOptions)
val clock = new StreamManualClock()
testStream(fileStream)(
StartStream(trigger = ProcessingTime(10), triggerClock = clock),
StartStream(trigger = Trigger.ProcessingTime(10), triggerClock = clock),
AssertOnQuery { _ =>
// Block until the first batch finishes.
eventually(timeout(streamingTimeout)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ class StreamSuite extends StreamTest {

val inputData = MemoryStream[Int]
testStream(inputData.toDS())(
StartStream(ProcessingTime("10 seconds"), new StreamManualClock),
StartStream(Trigger.ProcessingTime("10 seconds"), new StreamManualClock),

/* -- batch 0 ----------------------- */
// Add some data in batch 0
Expand Down Expand Up @@ -353,7 +353,7 @@ class StreamSuite extends StreamTest {

/* Stop then restart the Stream */
StopStream,
StartStream(ProcessingTime("10 seconds"), new StreamManualClock(60 * 1000)),
StartStream(Trigger.ProcessingTime("10 seconds"), new StreamManualClock(60 * 1000)),

/* -- batch 1 no rerun ----------------- */
// batch 1 would not re-run because the latest batch id logged in commit log is 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
testStream(df, OutputMode.Append)(

// Start event generated when query started
StartStream(ProcessingTime(100), triggerClock = clock),
StartStream(Trigger.ProcessingTime(100), triggerClock = clock),
AssertOnQuery { query =>
assert(listener.startEvent !== null)
assert(listener.startEvent.id === query.id)
Expand Down Expand Up @@ -124,7 +124,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
},

// Termination event generated with exception message when stopped with error
StartStream(ProcessingTime(100), triggerClock = clock),
StartStream(Trigger.ProcessingTime(100), triggerClock = clock),
AssertStreamExecThreadToWaitForClock(),
AddData(inputData, 0),
AdvanceManualClock(100), // process bad data
Expand Down Expand Up @@ -306,7 +306,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
}
val clock = new StreamManualClock()
val actions = mutable.ArrayBuffer[StreamAction]()
actions += StartStream(trigger = ProcessingTime(10), triggerClock = clock)
actions += StartStream(trigger = Trigger.ProcessingTime(10), triggerClock = clock)
for (_ <- 1 to 100) {
actions += AdvanceManualClock(10)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
var lastProgressBeforeStop: StreamingQueryProgress = null

testStream(mapped, OutputMode.Complete)(
StartStream(ProcessingTime(1000), triggerClock = clock),
StartStream(Trigger.ProcessingTime(1000), triggerClock = clock),
AssertStreamExecThreadIsWaitingForTime(1000),
AssertOnQuery(_.status.isDataAvailable === false),
AssertOnQuery(_.status.isTriggerActive === false),
Expand Down Expand Up @@ -370,7 +370,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
AssertOnQuery(_.status.message === "Stopped"),

// Test status and progress after query terminated with error
StartStream(ProcessingTime(1000), triggerClock = clock),
StartStream(Trigger.ProcessingTime(1000), triggerClock = clock),
AdvanceManualClock(1000), // ensure initial trigger completes before AddData
AddData(inputData, 0),
AdvanceManualClock(1000), // allow another trigger
Expand Down

0 comments on commit ceff0c8

Please sign in to comment.