diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala index 66cb1ca876365..483ed92c07d77 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala @@ -30,8 +30,7 @@ import org.apache.log4j.LogManager import org.apache.spark.sql._ import org.apache.spark.sql.streaming.{OutputMode, Trigger} import org.apache.spark.sql.types.StructType -import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue} -import org.junit.jupiter.api.function.Executable +import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import scala.collection.JavaConversions._ @@ -183,8 +182,10 @@ class TestStructuredStreaming extends HoodieClientTestBase { case te: TableNotFoundException => log.info("Got table not found exception. Retrying") } finally { - Thread.sleep(sleepSecsAfterEachRun * 1000) - currTime = System.currentTimeMillis + if (!success) { + Thread.sleep(sleepSecsAfterEachRun * 1000) + currTime = System.currentTimeMillis + } } if (!success) throw new IllegalStateException("Timed-out waiting for " + numCommits + " commits to appear in " + tablePath) numInstants @@ -207,7 +208,7 @@ class TestStructuredStreaming extends HoodieClientTestBase { def checkClusteringResult(destPath: String):Unit = { // check have schedule clustering and clustering file group to one - waitTillHasCompletedReplaceInstant(destPath, 120, 5) + waitTillHasCompletedReplaceInstant(destPath, 120, 1) metaClient.reloadActiveTimeline() assertEquals(1, getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size) } @@ -221,7 +222,7 @@ class TestStructuredStreaming extends HoodieClientTestBase { def checkClusteringResult(destPath: String):Unit = { // check have schedule clustering and clustering file group to one - waitTillHasCompletedReplaceInstant(destPath, 120, 5) + waitTillHasCompletedReplaceInstant(destPath, 120, 1) metaClient.reloadActiveTimeline() assertEquals(1, getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size) } @@ -235,7 +236,7 @@ class TestStructuredStreaming extends HoodieClientTestBase { def checkClusteringResult(destPath: String):Unit = { // check have schedule clustering and clustering file group to one - waitTillHasCompletedReplaceInstant(destPath, 120, 5) + waitTillHasCompletedReplaceInstant(destPath, 120, 1) metaClient.reloadActiveTimeline() assertEquals(1, getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size) } @@ -243,23 +244,6 @@ class TestStructuredStreaming extends HoodieClientTestBase { HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, checkClusteringResult) } - @Test - def testStructuredStreamingWithoutClustering(): Unit = { - val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", "dest") - - def checkClusteringResult(destPath: String):Unit = { - val msg = "Should have replace commit completed" - assertThrows(classOf[IllegalStateException], new Executable { - override def execute(): Unit = { - waitTillHasCompletedReplaceInstant(destPath, 120, 5) - } - }, msg) - println(msg) - } - structuredStreamingForTestClusteringRunner(sourcePath, destPath, false, false, false, - HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, checkClusteringResult) - } - def structuredStreamingForTestClusteringRunner(sourcePath: String, destPath: String, isInlineClustering: Boolean, isAsyncClustering: Boolean, isAsyncCompaction: Boolean, partitionOfRecords: String, checkClusteringResult: String => Unit): Unit = { @@ -285,17 +269,17 @@ class TestStructuredStreaming extends HoodieClientTestBase { // wait for spark streaming to process second microbatch currNumCommits = waitTillAtleastNCommits(fs, destPath, currNumCommits + 1, 120, 5) // for inline clustering, clustering may be complete along with 2nd commit - if (HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, destPath).getCompletedReplaceTimeline().countInstants() > 0) { + if (HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, destPath).getCompletedReplaceTimeline.countInstants() > 0) { assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size()) // check have at least one file group this.metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(destPath) - .setLoadActiveTimelineOnLoad(true).build() + .setLoadActiveTimelineOnLoad(true).build() assertTrue(getLatestFileGroupsFileId(partitionOfRecords).size > 0) } else { assertEquals(currNumCommits, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size()) // check have more than one file group this.metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(destPath) - .setLoadActiveTimelineOnLoad(true).build() + .setLoadActiveTimelineOnLoad(true).build() assertTrue(getLatestFileGroupsFileId(partitionOfRecords).size > 1) }