Skip to content

Commit

Permalink
[MINOR] Improving runtime of TestStructuredStreaming by 2 mins
Browse files Browse the repository at this point in the history
  • Loading branch information
vinothchandar committed Aug 2, 2021
1 parent fe50837 commit ce16fa7
Showing 1 changed file with 11 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ import org.apache.log4j.LogManager
import org.apache.spark.sql._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.spark.sql.types.StructType
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue}
import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}

import scala.collection.JavaConversions._
Expand Down Expand Up @@ -183,8 +182,10 @@ class TestStructuredStreaming extends HoodieClientTestBase {
case te: TableNotFoundException =>
log.info("Got table not found exception. Retrying")
} finally {
Thread.sleep(sleepSecsAfterEachRun * 1000)
currTime = System.currentTimeMillis
if (!success) {
Thread.sleep(sleepSecsAfterEachRun * 1000)
currTime = System.currentTimeMillis
}
}
if (!success) throw new IllegalStateException("Timed-out waiting for " + numCommits + " commits to appear in " + tablePath)
numInstants
Expand All @@ -207,7 +208,7 @@ class TestStructuredStreaming extends HoodieClientTestBase {

def checkClusteringResult(destPath: String):Unit = {
// check have schedule clustering and clustering file group to one
waitTillHasCompletedReplaceInstant(destPath, 120, 5)
waitTillHasCompletedReplaceInstant(destPath, 120, 1)
metaClient.reloadActiveTimeline()
assertEquals(1, getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size)
}
Expand All @@ -221,7 +222,7 @@ class TestStructuredStreaming extends HoodieClientTestBase {

def checkClusteringResult(destPath: String):Unit = {
// check have schedule clustering and clustering file group to one
waitTillHasCompletedReplaceInstant(destPath, 120, 5)
waitTillHasCompletedReplaceInstant(destPath, 120, 1)
metaClient.reloadActiveTimeline()
assertEquals(1, getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size)
}
Expand All @@ -235,31 +236,14 @@ class TestStructuredStreaming extends HoodieClientTestBase {

def checkClusteringResult(destPath: String):Unit = {
// check have schedule clustering and clustering file group to one
waitTillHasCompletedReplaceInstant(destPath, 120, 5)
waitTillHasCompletedReplaceInstant(destPath, 120, 1)
metaClient.reloadActiveTimeline()
assertEquals(1, getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size)
}
structuredStreamingForTestClusteringRunner(sourcePath, destPath, false, true, true,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, checkClusteringResult)
}

@Test
def testStructuredStreamingWithoutClustering(): Unit = {
val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", "dest")

def checkClusteringResult(destPath: String):Unit = {
val msg = "Should have replace commit completed"
assertThrows(classOf[IllegalStateException], new Executable {
override def execute(): Unit = {
waitTillHasCompletedReplaceInstant(destPath, 120, 5)
}
}, msg)
println(msg)
}
structuredStreamingForTestClusteringRunner(sourcePath, destPath, false, false, false,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, checkClusteringResult)
}

def structuredStreamingForTestClusteringRunner(sourcePath: String, destPath: String, isInlineClustering: Boolean,
isAsyncClustering: Boolean, isAsyncCompaction: Boolean,
partitionOfRecords: String, checkClusteringResult: String => Unit): Unit = {
Expand All @@ -285,17 +269,17 @@ class TestStructuredStreaming extends HoodieClientTestBase {
// wait for spark streaming to process second microbatch
currNumCommits = waitTillAtleastNCommits(fs, destPath, currNumCommits + 1, 120, 5)
// for inline clustering, clustering may be complete along with 2nd commit
if (HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, destPath).getCompletedReplaceTimeline().countInstants() > 0) {
if (HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, destPath).getCompletedReplaceTimeline.countInstants() > 0) {
assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size())
// check have at least one file group
this.metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(destPath)
.setLoadActiveTimelineOnLoad(true).build()
.setLoadActiveTimelineOnLoad(true).build()
assertTrue(getLatestFileGroupsFileId(partitionOfRecords).size > 0)
} else {
assertEquals(currNumCommits, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size())
// check have more than one file group
this.metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(destPath)
.setLoadActiveTimelineOnLoad(true).build()
.setLoadActiveTimelineOnLoad(true).build()
assertTrue(getLatestFileGroupsFileId(partitionOfRecords).size > 1)
}

Expand Down

0 comments on commit ce16fa7

Please sign in to comment.