Skip to content

Commit

Permalink
[enhancement](test) Add show data regression test (#45501)
Browse files Browse the repository at this point in the history
  • Loading branch information
Yukang-Lian authored and Your Name committed Jan 3, 2025
1 parent 7d6ba1d commit 01b82ce
Show file tree
Hide file tree
Showing 24 changed files with 2,809 additions and 141 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@


// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
Expand Down Expand Up @@ -49,7 +48,7 @@ Suite.metaClass.listOssObjectWithPrefix = { OSS client, String bucketName, Strin
String nextMarker = null;
final int maxKeys = 500;
List<OSSObjectSummary> sums = null;

if (!client.doesBucketExist(bucketName)) {
logger.info("no bucket named ${bucketName} in ${endpoint}")
return
Expand All @@ -61,14 +60,14 @@ Suite.metaClass.listOssObjectWithPrefix = { OSS client, String bucketName, Strin
do {
objectListing = client.listObjects(new ListObjectsRequest(bucketName).
withPrefix(prefix).withMarker(nextMarker).withMaxKeys(maxKeys));

sums = objectListing.getObjectSummaries();
for (OSSObjectSummary s : sums) {
logger.info("\t" + s.getKey());
}

nextMarker = objectListing.getNextMarker();

} while (objectListing.isTruncated());
} catch (OSSException oe) {
logger.error("Caught an OSSException, which means your request made it to OSS, "
Expand Down Expand Up @@ -107,7 +106,7 @@ Suite.metaClass.calculateFolderLength = { OSS client, String bucketName, String
for (OSSObjectSummary s : sums) {
size += s.getSize();
}
} while (objectListing.isTruncated());
} while (objectListing.isTruncated());
return size;
}

Expand Down Expand Up @@ -143,7 +142,7 @@ Suite.metaClass.getOssAllDirSizeWithPrefix = { OSS client, String bucketName, St
logger.info(s.getKey() + " : " + (s.getSize() / (1024 * 1024 * 1024)) + "GB");
}
} while (objectListing.isTruncated());

} catch (OSSException oe) {
logger.error("Caught an OSSException, which means your request made it to OSS, "
+ "but was rejected with an error response for some reason.");
Expand All @@ -164,6 +163,3 @@ Suite.metaClass.getOssAllDirSizeWithPrefix = { OSS client, String bucketName, St
logger.info("Done!")
}
}



Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,45 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// The cases is copied from https://github.com/trinodb/trino/tree/master
// /testing/trino-product-tests/src/main/resources/sql-tests/testcases/tpcds
// and modified by Doris.
import groovy.json.JsonOutput
import org.apache.doris.regression.suite.Suite
import org.codehaus.groovy.runtime.IOGroovyMethods

// loading one data 10 times, expect data size not rising
suite("test_mow_show_data_in_cloud","p2") {
//cloud-mode
if (!isCloudMode()) {
logger.info("not cloud mode, not run")
return
Suite.metaClass.repeate_stream_load_same_data = { String tableName, int loadTimes, String filePath->
for (int i = 0; i < loadTimes; i++) {
streamLoad {
table tableName
set 'column_separator', '|'
set 'compress_type', 'GZ'
file """${getS3Url()}/${filePath}"""
time 10000 // limit inflight 10s
check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
}
}
}
}

def repeate_stream_load_same_data = { String tableName, int loadTimes ->
for (int i = 0; i < loadTimes; i++) {
Suite.metaClass.stream_load_partial_update_data = { String tableName->
for (int i = 0; i < 20; i++) {
int start = i * 10 + 1
int end = (i + 1) * 10
def elements = (start..end).collect { "a$it" }
String columns = "id," + elements.join(',')
streamLoad {
table tableName
set 'column_separator', '|'
set 'compress_type', 'GZ'
file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split00.gz"""
set 'columns', columns
set 'partial_columns', 'true'
file """${getS3Url()}/regression/show_data/fullData.1.part${i+1}.gz"""
time 10000 // limit inflight 10s
check { result, exception, startTime, endTime ->
if (exception != null) {
Expand All @@ -50,27 +68,27 @@ suite("test_mow_show_data_in_cloud","p2") {
}
}

def get_tablets_from_table = { String table ->
Suite.metaClass.get_tablets_from_table = { String table ->
def res = sql_return_maparray """show tablets from ${table}"""
return res
}

def show_tablet_compaction = { HashMap tablet ->
Suite.metaClass.show_tablet_compaction = { HashMap tablet ->
StringBuilder sb = new StringBuilder();
sb.append("curl -X GET ")
sb.append(tablet["CompactionStatus"])
String command = sb.toString()
logger.info(command)
process = command.execute()
code = process.waitFor()
err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())));
out = process.getText()
def process = command.execute()
def code = process.waitFor()
def err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())));
def out = process.getText()
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
return parseJson(out.trim())
}

def trigger_tablet_compaction = { HashMap tablet, String compact_type ->
Suite.metaClass.trigger_tablet_compaction = { HashMap tablet, String compact_type ->
//support trigger base/cumulative/full compaction
def tabletStatusBeforeCompaction = show_tablet_compaction(tablet)

Expand All @@ -82,10 +100,10 @@ suite("test_mow_show_data_in_cloud","p2") {
sb.append(triggerCompactionUrl)
String command = sb.toString()
logger.info(command)
process = command.execute()
code = process.waitFor()
err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())));
out = process.getText()
def process = command.execute()
def code = process.waitFor()
def err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())));
def out = process.getText()
def outJson = parseJson(out)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
// if code = 0 means compaction happend, need to check
Expand All @@ -112,16 +130,16 @@ suite("test_mow_show_data_in_cloud","p2") {
}
}

def trigger_compaction = { List<List<Object>> tablets ->
Suite.metaClass.trigger_compaction = { List<List<Object>> tablets ->
for(def tablet: tablets) {
trigger_tablet_compaction(tablet, "cumulative")
trigger_tablet_compaction(tablet, "base")
trigger_tablet_compaction(tablet, "full")
}
}

def caculate_table_data_size_in_backend_storage = { List<List<Object>> tablets ->
storageType = context.config.otherConfigs.get("storageProvider")
Suite.metaClass.caculate_table_data_size_in_backend_storage = { List<List<Object>> tablets ->
def storageType = context.config.otherConfigs.get("storageProvider")
Double storageSize = 0

List<String> tabletIds = []
Expand All @@ -131,30 +149,30 @@ suite("test_mow_show_data_in_cloud","p2") {

if (storageType.toLowerCase() == "oss") {
//cbs means cluster backend storage
ak = context.config.otherConfigs.get("cbsS3Ak")
sk = context.config.otherConfigs.get("cbsS3Sk")
endpoint = context.config.otherConfigs.get("cbsS3Endpoint")
bucketName = context.config.otherConfigs.get("cbsS3Bucket")
storagePrefix = context.config.otherConfigs.get("cbsS3Prefix")
def ak = context.config.otherConfigs.get("cbsS3Ak")
def sk = context.config.otherConfigs.get("cbsS3Sk")
def endpoint = context.config.otherConfigs.get("cbsS3Endpoint")
def bucketName = context.config.otherConfigs.get("cbsS3Bucket")
def storagePrefix = context.config.otherConfigs.get("cbsS3Prefix")

client = initOssClient(ak, sk, endpoint)
def client = initOssClient(ak, sk, endpoint)
for(String tabletId: tabletIds) {
storageSize += calculateFolderLength(client, bucketName, storagePrefix + "/data/" + tabletId)
}
shutDownOssClient(client)
}

if (storageType.toLowerCase() == "hdfs") {
fsName = context.config.otherConfigs.get("cbsFsName")
isKerberosFs = context.config.otherConfigs.get("cbsFsKerberos")
fsUser = context.config.otherConfigs.get("cbsFsUser")
storagePrefix = context.config.otherConfigs.get("cbsFsPrefix")
def fsName = context.config.otherConfigs.get("cbsFsName")
def isKerberosFs = context.config.otherConfigs.get("cbsFsKerberos")
def fsUser = context.config.otherConfigs.get("cbsFsUser")
def storagePrefix = context.config.otherConfigs.get("cbsFsPrefix")
}

return storageSize
}

def translate_different_unit_to_MB = { String size, String unitField ->
Suite.metaClass.translate_different_unit_to_MB = { String size, String unitField ->
Double sizeKb = 0.0
if (unitField == "KB") {
sizeKb = Double.parseDouble(size) / 1024
Expand All @@ -168,7 +186,7 @@ suite("test_mow_show_data_in_cloud","p2") {
return sizeKb
}

def show_table_data_size_through_mysql = { String table ->
Suite.metaClass.show_table_data_size_through_mysql = { String table ->
def mysqlShowDataSize = 0L
def res = sql_return_maparray " show data from ${table}"
def tableSizeInfo = res[0]
Expand All @@ -181,7 +199,7 @@ suite("test_mow_show_data_in_cloud","p2") {
return mysqlShowDataSize
}

def caculate_table_data_size_through_api = { List<List<Object>> tablets ->
Suite.metaClass.caculate_table_data_size_through_api = { List<List<Object>> tablets ->
Double apiCaculateSize = 0
for (HashMap tablet in tablets) {
def tabletStatus = show_tablet_compaction(tablet)
Expand All @@ -199,42 +217,4 @@ suite("test_mow_show_data_in_cloud","p2") {

return apiCaculateSize
}

def main = {
tableName="lineitem_mow"
sql "DROP TABLE IF EXISTS ${tableName};"
sql new File("""${context.file.parent}/ddl/${tableName}.sql""").text
sql new File("""${context.file.parent}/ddl/lineitem_delete.sql""").text.replaceAll("\\\$\\{table\\}", tableName)
List<String> tablets = get_tablets_from_table(tableName)
def loadTimes = [1, 10]
Map<String, List> sizeRecords = ["apiSize":[], "mysqlSize":[], "cbsSize":[]]
for (int i in loadTimes){
// stream load 1 time, record each size
repeate_stream_load_same_data(tableName, i)
def rows = sql_return_maparray "select count(*) as count from ${tableName};"
logger.info("table ${tableName} has ${rows[0]["count"]} rows")
// 加一下触发compaction的机制
trigger_compaction(tablets)

// 然后 sleep 5min, 等fe汇报完
sleep(300 * 1000)

sizeRecords["apiSize"].add(caculate_table_data_size_through_api(tablets))
sizeRecords["cbsSize"].add(caculate_table_data_size_in_backend_storage(tablets))
sizeRecords["mysqlSize"].add(show_table_data_size_through_mysql(tableName))
sleep(300 * 1000)
logger.info("after ${i} times stream load, mysqlSize is: ${sizeRecords["mysqlSize"][-1]}, apiSize is: ${sizeRecords["apiSize"][-1]}, storageSize is: ${sizeRecords["cbsSize"][-1]}")

}

// expect mysqlSize == apiSize == storageSize
assertEquals(sizeRecords["mysqlSize"][0], sizeRecords["apiSize"][0])
assertEquals(sizeRecords["mysqlSize"][0], sizeRecords["cbsSize"][0])
// expect load 1 times == load 10 times
assertEquals(sizeRecords["mysqlSize"][0], sizeRecords["mysqlSize"][1])
assertEquals(sizeRecords["apiSize"][0], sizeRecords["apiSize"][1])
assertEquals(sizeRecords["cbsSize"][0], sizeRecords["cbsSize"][1])
}

main()
}
//http://qa-build.oss-cn-beijing.aliyuncs.com/regression/show_data/fullData.1.part1.gz
2 changes: 0 additions & 2 deletions regression-test/suites/show_data/ddl/lineitem_delete.sql

This file was deleted.

25 changes: 0 additions & 25 deletions regression-test/suites/show_data/ddl/lineitem_dup.sql

This file was deleted.

25 changes: 0 additions & 25 deletions regression-test/suites/show_data/ddl/lineitem_mow.sql

This file was deleted.

Loading

0 comments on commit 01b82ce

Please sign in to comment.