Skip to content

Commit

Permalink
[test](regression-test) use unified trigger_and_wait_compaction method (
Browse files Browse the repository at this point in the history
#45761) (#45906)

picks #45761 to branch-3.0

Issue Number: close #45591
Related PR: #45761
  • Loading branch information
py023 authored Dec 25, 2024
1 parent 93f86ce commit 487e17a
Show file tree
Hide file tree
Showing 86 changed files with 483 additions and 3,130 deletions.
156 changes: 156 additions & 0 deletions regression-test/plugins/plugin_compaction.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import org.apache.doris.regression.suite.Suite
import java.util.concurrent.TimeUnit
import org.awaitility.Awaitility;

Suite.metaClass.be_get_compaction_status{ String ip, String port, String tablet_id /* param */->
return curl("GET", String.format("http://%s:%s/api/compaction/run_status?tablet_id=%s", ip, port, tablet_id))
}

Suite.metaClass.be_get_overall_compaction_status{ String ip, String port /* param */->
return curl("GET", String.format("http://%s:%s/api/compaction/run_status", ip, port))
}

Suite.metaClass.be_show_tablet_status{ String ip, String port, String tablet_id /* param */->
return curl("GET", String.format("http://%s:%s/api/compaction/show?tablet_id=%s", ip, port, tablet_id))
}

Suite.metaClass._be_run_compaction = { String ip, String port, String tablet_id, String compact_type ->
return curl("POST", String.format("http://%s:%s/api/compaction/run?tablet_id=%s&compact_type=%s",
ip, port, tablet_id, compact_type))
}

Suite.metaClass.be_run_base_compaction = { String ip, String port, String tablet_id /* param */->
return _be_run_compaction(ip, port, tablet_id, "base")
}

logger.info("Added 'be_run_base_compaction' function to Suite")

Suite.metaClass.be_run_cumulative_compaction = { String ip, String port, String tablet_id /* param */->
return _be_run_compaction(ip, port, tablet_id, "cumulative")
}

logger.info("Added 'be_run_cumulative_compaction' function to Suite")

Suite.metaClass.be_run_full_compaction = { String ip, String port, String tablet_id /* param */->
return _be_run_compaction(ip, port, tablet_id, "full")
}

Suite.metaClass.be_run_full_compaction_by_table_id = { String ip, String port, String table_id /* param */->
return curl("POST", String.format("http://%s:%s/api/compaction/run?table_id=%s&compact_type=full", ip, port, table_id))
}

logger.info("Added 'be_run_full_compaction' function to Suite")

Suite.metaClass.trigger_and_wait_compaction = { String table_name, String compaction_type, int timeout_seconds=300 ->
if (!(compaction_type in ["cumulative", "base", "full"])) {
throw new IllegalArgumentException("invalid compaction type: ${compaction_type}, supported types: cumulative, base, full")
}

def backendId_to_backendIP = [:]
def backendId_to_backendHttpPort = [:]
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
def tablets = sql_return_maparray """show tablets from ${table_name}"""
def exit_code, stdout, stderr

def auto_compaction_disabled = sql("show create table ${table_name}")[0][1].contains('"disable_auto_compaction" = "true"')
def is_time_series_compaction = sql("show create table ${table_name}")[0][1].contains('"compaction_policy" = "time_series"')

// 1. cache compaction status
def be_tablet_compaction_status = [:]
for (tablet in tablets) {
def be_host = backendId_to_backendIP["${tablet.BackendId}"]
def be_port = backendId_to_backendHttpPort["${tablet.BackendId}"]
(exit_code, stdout, stderr) = be_show_tablet_status(be_host, be_port, tablet.TabletId)
assert exit_code == 0: "get tablet status failed, exit code: ${exit_code}, stdout: ${stdout}, stderr: ${stderr}"

def tabletStatus = parseJson(stdout.trim())
be_tablet_compaction_status.put("${be_host}-${tablet.TabletId}", tabletStatus)
}
// 2. trigger compaction
def triggered_tablets = []
for (tablet in tablets) {
def be_host = backendId_to_backendIP["${tablet.BackendId}"]
def be_port = backendId_to_backendHttpPort["${tablet.BackendId}"]
switch (compaction_type) {
case "cumulative":
(exit_code, stdout, stderr) = be_run_cumulative_compaction(be_host, be_port, tablet.TabletId)
break
case "base":
(exit_code, stdout, stderr) = be_run_base_compaction(be_host, be_port, tablet.TabletId)
break
case "full":
(exit_code, stdout, stderr) = be_run_full_compaction(be_host, be_port, tablet.TabletId)
break
}
assert exit_code == 0: "trigger compaction failed, exit code: ${exit_code}, stdout: ${stdout}, stderr: ${stderr}"
def trigger_status = parseJson(stdout.trim())
if (trigger_status.status.toLowerCase() != "success") {
if (trigger_status.status.toLowerCase() == "already_exist") {
triggered_tablets.add(tablet) // compaction already in queue, treat it as successfully triggered
} else if (!auto_compaction_disabled) {
// ignore the error if auto compaction enabled
} else {
throw new Exception("trigger compaction failed, be host: ${be_host}, tablet id: ${tablet.TabletId}, status: ${trigger_status.status}")
}
} else {
triggered_tablets.add(tablet)
}
}

// 3. wait all compaction finished
def running = triggered_tablets.size() > 0
Awaitility.await().atMost(timeout_seconds, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(() -> {
for (tablet in triggered_tablets) {
def be_host = backendId_to_backendIP["${tablet.BackendId}"]
def be_port = backendId_to_backendHttpPort["${tablet.BackendId}"]

(exit_code, stdout, stderr) = be_get_compaction_status(be_host, be_port, tablet.TabletId)
assert exit_code == 0: "get compaction status failed, exit code: ${exit_code}, stdout: ${stdout}, stderr: ${stderr}"
def compactionStatus = parseJson(stdout.trim())
assert compactionStatus.status.toLowerCase() == "success": "compaction failed, be host: ${be_host}, tablet id: ${tablet.TabletId}, status: ${compactionStatus.status}"
// running is true means compaction is still running
running = compactionStatus.run_status

if (!isCloudMode() && !is_time_series_compaction) {
(exit_code, stdout, stderr) = be_show_tablet_status(be_host, be_port, tablet.TabletId)
assert exit_code == 0: "get tablet status failed, exit code: ${exit_code}, stdout: ${stdout}, stderr: ${stderr}"
def tabletStatus = parseJson(stdout.trim())
def oldStatus = be_tablet_compaction_status.get("${be_host}-${tablet.TabletId}")
// last compaction success time isn't updated, indicates compaction is not started(so we treat it as running and wait)
running = running || (oldStatus["last ${compaction_type} success time"] == tabletStatus["last ${compaction_type} success time"])
if (running) {
logger.info("compaction is still running, be host: ${be_host}, tablet id: ${tablet.TabletId}, run status: ${compactionStatus.run_status}, old status: ${oldStatus}, new status: ${tabletStatus}")
return false
}
} else {
// 1. cloud mode doesn't show compaction success time in tablet status for the time being,
// 2. time series compaction sometimes doesn't update compaction success time
// so we solely check run_status for these two cases
if (running) {
logger.info("compaction is still running, be host: ${be_host}, tablet id: ${tablet.TabletId}")
return false
}
}
}
return true
})

assert !running: "wait compaction timeout, be host: ${be_host}"
}
56 changes: 7 additions & 49 deletions regression-test/plugins/plugin_curl_requester.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.http.conn.ConnectTimeoutException
import org.apache.http.conn.HttpHostConnectException
import org.codehaus.groovy.runtime.IOGroovyMethods


Suite.metaClass.http_client = { String method, String url /* param */ ->
Suite suite = delegate as Suite
if (method != "GET" && method != "POST") {
Expand All @@ -35,7 +36,7 @@ Suite.metaClass.http_client = { String method, String url /* param */ ->
if (!url || !(url =~ /^https?:\/\/.+/)) {
throw new Exception("Invalid url: ${url}")
}

Integer timeout = 300 // seconds
Integer maxRetries = 10
Integer retryCount = 0
Expand Down Expand Up @@ -111,15 +112,15 @@ Suite.metaClass.http_client = { String method, String url /* param */ ->

logger.info("Added 'http_client' function to Suite")

Suite.metaClass.curl = { String method, String url, String body = null /* param */->
Suite.metaClass.curl = { String method, String url, String body = null /* param */->
Suite suite = delegate as Suite
if (method != "GET" && method != "POST") {
throw new Exception(String.format("invalid curl method: %s", method))
}
if (url.isBlank()) {
throw new Exception("invalid curl url, blank")
}

Integer timeout = 10; // 10 seconds;
Integer maxRetries = 10; // Maximum number of retries
Integer retryCount = 0; // Current retry count
Expand All @@ -131,7 +132,7 @@ Suite.metaClass.curl = { String method, String url, String body = null /* param
} else {
cmd = String.format("curl --max-time %d -X %s %s", timeout, method, url).toString()
}

logger.info("curl cmd: " + cmd)
def process
int code
Expand Down Expand Up @@ -161,57 +162,14 @@ Suite.metaClass.curl = { String method, String url, String body = null /* param

return [code, out, err]
}

logger.info("Added 'curl' function to Suite")


Suite.metaClass.show_be_config = { String ip, String port /*param */ ->
return curl("GET", String.format("http://%s:%s/api/show_config", ip, port))
}

logger.info("Added 'show_be_config' function to Suite")

Suite.metaClass.be_get_compaction_status{ String ip, String port, String tablet_id /* param */->
return curl("GET", String.format("http://%s:%s/api/compaction/run_status?tablet_id=%s", ip, port, tablet_id))
}

Suite.metaClass.be_get_overall_compaction_status{ String ip, String port /* param */->
return curl("GET", String.format("http://%s:%s/api/compaction/run_status", ip, port))
}

Suite.metaClass.be_show_tablet_status{ String ip, String port, String tablet_id /* param */->
return curl("GET", String.format("http://%s:%s/api/compaction/show?tablet_id=%s", ip, port, tablet_id))
}

logger.info("Added 'be_get_compaction_status' function to Suite")

Suite.metaClass._be_run_compaction = { String ip, String port, String tablet_id, String compact_type ->
return curl("POST", String.format("http://%s:%s/api/compaction/run?tablet_id=%s&compact_type=%s",
ip, port, tablet_id, compact_type))
}

Suite.metaClass.be_run_base_compaction = { String ip, String port, String tablet_id /* param */->
return _be_run_compaction(ip, port, tablet_id, "base")
}

logger.info("Added 'be_run_base_compaction' function to Suite")

Suite.metaClass.be_run_cumulative_compaction = { String ip, String port, String tablet_id /* param */->
return _be_run_compaction(ip, port, tablet_id, "cumulative")
}

logger.info("Added 'be_run_cumulative_compaction' function to Suite")

Suite.metaClass.be_run_full_compaction = { String ip, String port, String tablet_id /* param */->
return _be_run_compaction(ip, port, tablet_id, "full")
}

Suite.metaClass.be_run_full_compaction_by_table_id = { String ip, String port, String table_id /* param */->
return curl("POST", String.format("http://%s:%s/api/compaction/run?table_id=%s&compact_type=full", ip, port, table_id))
}

logger.info("Added 'be_run_full_compaction' function to Suite")

Suite.metaClass.update_be_config = { String ip, String port, String key, String value /*param */ ->
return curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", ip, port, key, value))
}
Expand All @@ -233,7 +191,6 @@ Suite.metaClass.update_all_be_config = { String key, Object value ->

logger.info("Added 'update_all_be_config' function to Suite")


Suite.metaClass._be_report = { String ip, int port, String reportName ->
def url = "http://${ip}:${port}/api/report/${reportName}"
def result = Http.GET(url, true)
Expand Down Expand Up @@ -299,4 +256,5 @@ Suite.metaClass.check_nested_index_file = { ip, port, tablet_id, expected_rowset
}
}

logger.info("Added 'check_nested_index_file' function to Suite")
logger.info("Added 'check_nested_index_file' function to Suite")

Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,13 @@ suite("test_stale_rowset") {
|PROPERTIES(
|"exec_mem_limit" = "8589934592",
|"load_parallelism" = "3")""".stripMargin()



def table = "nation"
sql new File("""${context.file.parent}/../ddl/${table}_delete.sql""").text
// create table if not exists
sql new File("""${context.file.parent}/../ddl/${table}.sql""").text

def load_nation_once = {
def load_nation_once = {
def uniqueID = Math.abs(UUID.randomUUID().hashCode()).toString()
def loadLabel = table + "_" + uniqueID
// load data from cos
Expand Down Expand Up @@ -160,63 +158,7 @@ suite("test_stale_rowset") {
String[][] tablets = sql """ show tablets from ${tableName}; """

// trigger compactions for all tablets in ${tableName}
for (String[] tablet in tablets) {
String tablet_id = tablet[0]
backend_id = tablet[2]
StringBuilder sb = new StringBuilder();
sb.append("curl -X POST http://")
sb.append(backendId_to_backendIP.get(backend_id))
sb.append(":")
sb.append(backendId_to_backendHttpPort.get(backend_id))
sb.append("/api/compaction/run?tablet_id=")
sb.append(tablet_id)
sb.append("&compact_type=cumulative")

String command = sb.toString()
process = command.execute()
code = process.waitFor()
err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())));
out = process.getText()
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
if (compactJson.status.toLowerCase() == "fail") {
assertEquals(disableAutoCompaction, false)
logger.info("Compaction was done automatically!")
}
if (disableAutoCompaction) {
assertEquals("success", compactJson.status.toLowerCase())
}
}

// wait for all compactions done
for (String[] tablet in tablets) {
boolean running = true
do {
Thread.sleep(1000)
String tablet_id = tablet[0]
backend_id = tablet[2]
StringBuilder sb = new StringBuilder();
sb.append("curl -X GET http://")
sb.append(backendId_to_backendIP.get(backend_id))
sb.append(":")
sb.append(backendId_to_backendHttpPort.get(backend_id))
sb.append("/api/compaction/run_status?tablet_id=")
sb.append(tablet_id)

String command = sb.toString()
logger.info(command)
process = command.execute()
code = process.waitFor()
err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())));
out = process.getText()
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
}
trigger_and_wait_compaction(tableName, "cumulative")

sql """
select count(*) from ${tableName};
Expand Down
Loading

0 comments on commit 487e17a

Please sign in to comment.