diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 61cbb3285a4f0..d4eb526540053 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -42,7 +42,7 @@ import org.apache.spark.sql.functions.{count, window} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.kafka010.KafkaSourceProvider._ import org.apache.spark.sql.sources.v2.DataSourceOptions -import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest} +import org.apache.spark.sql.streaming.{StreamTest, Trigger} import org.apache.spark.sql.streaming.util.StreamManualClock import org.apache.spark.sql.test.SharedSQLContext @@ -236,7 +236,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { } testStream(mapped)( - StartStream(ProcessingTime(100), clock), + StartStream(Trigger.ProcessingTime(100), clock), waitUntilBatchProcessed, // 1 from smallest, 1 from middle, 8 from biggest CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107), @@ -247,7 +247,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { 11, 108, 109, 110, 111, 112, 113, 114, 115, 116 ), StopStream, - StartStream(ProcessingTime(100), clock), + StartStream(Trigger.ProcessingTime(100), clock), waitUntilBatchProcessed, // smallest now empty, 1 more from middle, 9 more from biggest CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107, @@ -282,7 +282,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { val mapped = kafka.map(kv => kv._2.toInt + 1) testStream(mapped)( - StartStream(trigger = ProcessingTime(1)), + StartStream(trigger = Trigger.ProcessingTime(1)), makeSureGetOffsetCalled, AddKafkaData(Set(topic), 1, 2, 3), CheckAnswer(2, 3, 4), @@ -605,7 +605,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { } testStream(kafka)( - StartStream(ProcessingTime(100), clock), + StartStream(Trigger.ProcessingTime(100), clock), waitUntilBatchProcessed, // 5 from smaller topic, 5 from bigger one CheckLastBatch((0 to 4) ++ (100 to 104): _*), @@ -618,7 +618,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { // smaller topic empty, 5 from bigger one CheckLastBatch(110 to 114: _*), StopStream, - StartStream(ProcessingTime(100), clock), + StartStream(Trigger.ProcessingTime(100), clock), waitUntilBatchProcessed, // smallest now empty, 5 from bigger one CheckLastBatch(115 to 119: _*), @@ -727,7 +727,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { // The message values are the same as their offsets to make the test easy to follow testUtils.withTranscationalProducer { producer => testStream(mapped)( - StartStream(ProcessingTime(100), clock), + StartStream(Trigger.ProcessingTime(100), clock), waitUntilBatchProcessed, CheckAnswer(), WithOffsetSync(topicPartition, expectedOffset = 5) { () => @@ -850,7 +850,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { // The message values are the same as their offsets to make the test easy to follow testUtils.withTranscationalProducer { producer => testStream(mapped)( - StartStream(ProcessingTime(100), clock), + StartStream(Trigger.ProcessingTime(100), clock), waitUntilBatchProcessed, CheckNewAnswer(), WithOffsetSync(topicPartition, expectedOffset = 5) { () => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index d4bd9c7987f2d..de664cafed3b6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -1360,7 +1360,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { options = srcOptions) val clock = new StreamManualClock() testStream(fileStream)( - StartStream(trigger = ProcessingTime(10), triggerClock = clock), + StartStream(trigger = Trigger.ProcessingTime(10), triggerClock = clock), AssertOnQuery { _ => // Block until the first batch finishes. eventually(timeout(streamingTimeout)) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index f55ddb5419d20..55fdcee83f114 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -312,7 +312,7 @@ class StreamSuite extends StreamTest { val inputData = MemoryStream[Int] testStream(inputData.toDS())( - StartStream(ProcessingTime("10 seconds"), new StreamManualClock), + StartStream(Trigger.ProcessingTime("10 seconds"), new StreamManualClock), /* -- batch 0 ----------------------- */ // Add some data in batch 0 @@ -353,7 +353,7 @@ class StreamSuite extends StreamTest { /* Stop then restart the Stream */ StopStream, - StartStream(ProcessingTime("10 seconds"), new StreamManualClock(60 * 1000)), + StartStream(Trigger.ProcessingTime("10 seconds"), new StreamManualClock(60 * 1000)), /* -- batch 1 no rerun ----------------- */ // batch 1 would not re-run because the latest batch id logged in commit log is 1 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index fe77a1b4469c5..d00f2e3bf4d1a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -82,7 +82,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { testStream(df, OutputMode.Append)( // Start event generated when query started - StartStream(ProcessingTime(100), triggerClock = clock), + StartStream(Trigger.ProcessingTime(100), triggerClock = clock), AssertOnQuery { query => assert(listener.startEvent !== null) assert(listener.startEvent.id === query.id) @@ -124,7 +124,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { }, // Termination event generated with exception message when stopped with error - StartStream(ProcessingTime(100), triggerClock = clock), + StartStream(Trigger.ProcessingTime(100), triggerClock = clock), AssertStreamExecThreadToWaitForClock(), AddData(inputData, 0), AdvanceManualClock(100), // process bad data @@ -306,7 +306,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { } val clock = new StreamManualClock() val actions = mutable.ArrayBuffer[StreamAction]() - actions += StartStream(trigger = ProcessingTime(10), triggerClock = clock) + actions += StartStream(trigger = Trigger.ProcessingTime(10), triggerClock = clock) for (_ <- 1 to 100) { actions += AdvanceManualClock(10) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index c170641372d61..29b816486a1fe 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -257,7 +257,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi var lastProgressBeforeStop: StreamingQueryProgress = null testStream(mapped, OutputMode.Complete)( - StartStream(ProcessingTime(1000), triggerClock = clock), + StartStream(Trigger.ProcessingTime(1000), triggerClock = clock), AssertStreamExecThreadIsWaitingForTime(1000), AssertOnQuery(_.status.isDataAvailable === false), AssertOnQuery(_.status.isTriggerActive === false), @@ -370,7 +370,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi AssertOnQuery(_.status.message === "Stopped"), // Test status and progress after query terminated with error - StartStream(ProcessingTime(1000), triggerClock = clock), + StartStream(Trigger.ProcessingTime(1000), triggerClock = clock), AdvanceManualClock(1000), // ensure initial trigger completes before AddData AddData(inputData, 0), AdvanceManualClock(1000), // allow another trigger