Skip to content

Commit

Permalink
REFINE REST
Browse files Browse the repository at this point in the history
  • Loading branch information
zhiqiang-hhhh committed Jan 13, 2025
1 parent b5a5033 commit 8538b2f
Showing 1 changed file with 63 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,94 +73,78 @@ suite('adaptive_pipeline_task_serial_read_on_limit') {
insert into adaptive_pipeline_task_serial_read_on_limit values
(1010, "A"),(2010, "B"),(3010, "C"),(4010, "D"),(5010,"E"),(6010,"F"),(7010,"G"),(8010,"H"),(9010,"K");
"""

def uuidString = UUID.randomUUID().toString()

sql "set enable_profile=true"
// set parallel_pipeline_task_num to 1 so that only one scan node,
// set parallel_pipeline_task_num to 1 so that only one scan operator is created,
// and we can check MaxScannerThreadNum in profile.
sql "set parallel_pipeline_task_num=1;"
// no limit, MaxScannerThreadNum = TabletNum
sql """
select "no_limit_1_${uuidString}", * from adaptive_pipeline_task_serial_read_on_limit;
"""
sql "set parallel_pipeline_task_num=0;"
// With Limit, MaxScannerThreadNum = 1
sql """
select "with_limit_1_${uuidString}", * from adaptive_pipeline_task_serial_read_on_limit limit 10000;
"""
// With Limit, but bigger then adaptive_pipeline_task_serial_read_on_limit, MaxScannerThreadNum = TabletNum
sql """
select "with_limit_2_${uuidString}", * from adaptive_pipeline_task_serial_read_on_limit limit 10001;
"""
sql """
set enable_adaptive_pipeline_task_serial_read_on_limit=false;
"""
sql "set parallel_pipeline_task_num=1;"
// Forbid the strategy, with limit, MaxScannerThreadNum = TabletNum
sql """
select "not_enable_limit_${uuidString}", * from adaptive_pipeline_task_serial_read_on_limit limit 100;
"""

sql "set parallel_pipeline_task_num=0;"
// Create a set<String> queryShouldHasOnePeakRunningScanner
// to store the query that should have only one peak running scanner.
def queryShouldHaveOnePeakRunningScanner = new HashSet<String>()
queryShouldHaveOnePeakRunningScanner.add("select * from adaptive_pipeline_task_serial_read_on_limit limit 10;")
queryShouldHaveOnePeakRunningScanner.add("select * from adaptive_pipeline_task_serial_read_on_limit limit 10000;")

// Enable the strategy, with limit 20, MaxScannerThreadNum = 1
sql """
set enable_adaptive_pipeline_task_serial_read_on_limit=true;
"""
sql """
set adaptive_pipeline_task_serial_read_on_limit=20;
"""
sql """
select "modify_to_20_${uuidString}", * from adaptive_pipeline_task_serial_read_on_limit limit 15;
"""
def queryShouldHasMoreThanOnePeakRunningScanner = new HashSet<String>()
queryShouldHasMoreThanOnePeakRunningScanner.add("select * from adaptive_pipeline_task_serial_read_on_limit limit 9999;")
queryShouldHasMoreThanOnePeakRunningScanner.add("select * from adaptive_pipeline_task_serial_read_on_limit where id > 10 limit 1;")
queryShouldHasMoreThanOnePeakRunningScanner.add("select \"enable_adaptive_pipeline_task_serial_read_on_limit=false\", * from adaptive_pipeline_task_serial_read_on_limit limit 1000000;")

// With Limit, MaxScannerThreadNum = 1
sql "select * from adaptive_pipeline_task_serial_read_on_limit limit 10;"
sql "select * from adaptive_pipeline_task_serial_read_on_limit limit 10000;"
// With Limit, but bigger then adaptive_pipeline_task_serial_read_on_limit, MaxScannerThreadNum = TabletNum
sql "set adaptive_pipeline_task_serial_read_on_limit=9998;"
sql "select * from adaptive_pipeline_task_serial_read_on_limit limit 9999;"
// With limit, but with predicates too. MaxScannerThreadNum = TabletNum
sql "select * from adaptive_pipeline_task_serial_read_on_limit where id > 10 limit 1;"
// With large engough limit, but enable_adaptive_pipeline_task_serial_read_on_limit is false. MaxScannerThreadNum = TabletNum
sql "set enable_adaptive_pipeline_task_serial_read_on_limit=false;"
sql """select "enable_adaptive_pipeline_task_serial_read_on_limit=false", * from adaptive_pipeline_task_serial_read_on_limit limit 1000000;"""
// Sleep 500ms to wait for the profile collection
Thread.sleep(500)

// Get profile list by using show query profile command
// SHOW QUERY PROFILE returns profile meta as a table.
// The first column is profile id, last column is query stmt.
// Compare the query stmt, and get profile id for each query that we just emitted.

def queryProfiles = sql "show query profile limit 100;"
def profileList = queryProfiles.collect { row -> row.toList() }
List<String> profileShouldHaveOnePeakRunningScanner = new ArrayList<String>()
List<String> profileShouldHaveMoreThanOnePeakRunningScanner = new ArrayList<String>()

for (def profileItem in profileList) {
if (profileShouldHaveMoreThanOnePeakRunningScanner.size() + profileShouldHaveOnePeakRunningScanner.size() ==
queryShouldHasMoreThanOnePeakRunningScanner.size() + queryShouldHaveOnePeakRunningScanner.size()) {
break
}

sql "set enable_profile=false"
if (queryShouldHaveOnePeakRunningScanner.contains(profileItem[-1])) {
profileShouldHaveOnePeakRunningScanner.add(profileItem[0])
continue
}
if (queryShouldHasMoreThanOnePeakRunningScanner.contains(profileItem[-1])) {
profileShouldHaveMoreThanOnePeakRunningScanner.add(profileItem[0])
continue
}
}

Thread.sleep(5)
logger.info("profileShouldHaveOnePeakRunningScanner: {}", profileShouldHaveOnePeakRunningScanner)
logger.info("profileShouldHaveMoreThanOnePeakRunningScanner: {}", profileShouldHaveMoreThanOnePeakRunningScanner)

def wholeString = getProfileList()
List profileData = new JsonSlurper().parseText(wholeString).data.rows
String queryIdNoLimit1 = "";
String queryIdWithLimit1 = "";
String queryIdWithLimit2 = "";
String queryIDNotEnableLimit = "";
String queryIdModifyTo20 = "";
assertTrue(profileShouldHaveOnePeakRunningScanner.size() == queryShouldHaveOnePeakRunningScanner.size())
assertTrue(profileShouldHaveMoreThanOnePeakRunningScanner.size() == queryShouldHasMoreThanOnePeakRunningScanner.size())

logger.info("{}", uuidString)
for (def profileId : profileShouldHaveOnePeakRunningScanner) {
def profile = getProfile(profileId).toString()
logger.info("Profile ${profile}")
assertTrue(profile.contains("- MaxScannerThreadNum: 1"))
}

for (def profileItem in profileData) {
if (profileItem["Sql Statement"].toString().contains("no_limit_1_${uuidString}")) {
queryIdNoLimit1 = profileItem["Profile ID"].toString()
logger.info("profileItem: {}", profileItem)
}
if (profileItem["Sql Statement"].toString().contains("with_limit_1_${uuidString}")) {
queryIdWithLimit1 = profileItem["Profile ID"].toString()
logger.info("profileItem: {}", profileItem)
}
if (profileItem["Sql Statement"].toString().contains("with_limit_2_${uuidString}")) {
queryIdWithLimit2 = profileItem["Profile ID"].toString()
logger.info("profileItem: {}", profileItem)
}
if (profileItem["Sql Statement"].toString().contains("not_enable_limit_${uuidString}")) {
queryIDNotEnableLimit = profileItem["Profile ID"].toString()
logger.info("profileItem: {}", profileItem)
}
if (profileItem["Sql Statement"].toString().contains("modify_to_20_${uuidString}")) {
queryIdModifyTo20 = profileItem["Profile ID"].toString()
logger.info("profileItem: {}", profileItem)
}
for (def profileId : profileShouldHaveMoreThanOnePeakRunningScanner) {
def profile = getProfile(profileId).toString()
logger.info("Profile ${profile}")
assertTrue(!profile.contains("- MaxScannerThreadNum: 1"))
}

logger.info("queryIdWithLimit1_${uuidString}: {}", queryIdWithLimit1)
logger.info("queryIdModifyTo20_${uuidString}: {}", queryIdModifyTo20)

assertTrue(queryIdWithLimit1 != "")
assertTrue(queryIdModifyTo20 != "")

def String profileWithLimit1 = getProfile(queryIdWithLimit1).toString()
def String profileModifyTo20 = getProfile(queryIdModifyTo20).toString()
logger.info("profileWithLimit1:\n${profileWithLimit1}")
assertTrue(profileWithLimit1.contains("- MaxScannerThreadNum: 1"))
logger.info("profileModifyTo20:\n${profileModifyTo20}")
assertTrue(profileModifyTo20.contains("- MaxScannerThreadNum: 1"))
}

0 comments on commit 8538b2f

Please sign in to comment.