Skip to content

Commit

Permalink
3
Browse files Browse the repository at this point in the history
  • Loading branch information
Yukang-Lian committed Dec 4, 2024
1 parent 7da0a2e commit 9252dbd
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 92 deletions.
5 changes: 3 additions & 2 deletions be/src/olap/cumulative_compaction_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,9 @@ int SizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
}
DBUG_EXECUTE_IF("SizeBaseCumulativeCompactionPolicy.pick_input_rowsets.return_input_rowsets",
{ return transient_size; })
DBUG_EXECUTE_IF("SizeBaseCumulativeCompactionPolicy.pick_input_rowsets.set_promotion_size_to_max",
{ promotion_size = INT64_MAX; })
DBUG_EXECUTE_IF(
"SizeBaseCumulativeCompactionPolicy.pick_input_rowsets.set_promotion_size_to_max",
{ promotion_size = INT64_MAX; })

if (total_size >= promotion_size) {
return transient_size;
Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/tablet_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <gen_cpp/MasterService_types.h>
#include <gen_cpp/Types_types.h>
#include <gen_cpp/olap_file.pb.h>
#include <glog/logging.h>
#include <re2/re2.h>
#include <unistd.h>

Expand Down Expand Up @@ -775,6 +776,8 @@ std::vector<TabletSharedPtr> TabletManager::find_best_tablets_to_compaction(
last_failure_ms = tablet_ptr->last_base_compaction_failure_time();
}
if (now_ms - last_failure_ms <= 5000) {
DBUG_EXECUTE_IF("TabletManager::find_best_tablets_to_compaction.dcheck",
{ DCHECK(false) << "Too often to check compaction"; })
VLOG_DEBUG << "Too often to check compaction, skip it. "
<< "compaction_type=" << compaction_type_str
<< ", last_failure_time_ms=" << last_failure_ms
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,51 +19,11 @@ import org.codehaus.groovy.runtime.IOGroovyMethods

suite("test_cumu_compaction_with_delete") {
def tableName = "test_cumu_compaction_with_delete"
def cloudMode = isCloudMode()

def check_version_and_cumu_point = { tablets, version, cumu_point ->
// before compaction, there are 6 rowsets.
int rowsetCount = 0
int cumuPoint = 0
for (def tablet in tablets) {
String tablet_id = tablet.TabletId
(code, out, err) = curl("GET", tablet.CompactionStatus)
logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def tabletJson = parseJson(out.trim())
assert tabletJson.rowsets instanceof List
rowsetCount +=((List<String>) tabletJson.rowsets).size()
//logger.info(tabletJson)
cumuPoint = tabletJson["cumulative point"]
}
assert (rowsetCount == version * 1)
assert (cumuPoint == cumu_point)
}

def cumulative_compaction = { tablets, backendId_to_backendIP, backendId_to_backendHttpPort, backend_id ->
// trigger cumu compactions for all tablets in ${tableName}
for (def tablet in tablets) {
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId

(code, out, err) = be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
sleep(1000)

def compactJson = parseJson(out.trim())
}
}

GetDebugPoint().clearDebugPointsForAllBEs()
try {
GetDebugPoint().enableDebugPointForAllBEs("SizeBaseCumulativeCompactionPolicy.pick_input_rowsets.set_promotion_size_to_max")
String backend_id;

def backendId_to_backendIP = [:]
def backendId_to_backendHttpPort = [:]
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);

backend_id = backendId_to_backendIP.keySet()[0]
GetDebugPoint().enableDebugPointForAllBEs("TabletManager::find_best_tablets_to_compaction.dcheck")

sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
Expand All @@ -74,57 +34,12 @@ suite("test_cumu_compaction_with_delete") {
DISTRIBUTED BY HASH(`user_id`)
BUCKETS 1
PROPERTIES ("replication_allocation" = "tag.location.default: 1",
"disable_auto_compaction" = "true",
"enable_mow_light_delete" = "true")"""

//TabletId,ReplicaId,BackendId,SchemaHash,Version,LstSuccessVersion,LstFailedVersion,LstFailedTime,LocalDataSize,RemoteDataSize,RowCount,State,LstConsistencyCheckTime,CheckVersion,VersionCount,PathHash,MetaUrl,CompactionStatus
def tablets = sql_return_maparray """ show tablets from ${tableName}; """

// [0-1] | [2-2] [3-3] [4-4] [5-5] [6-6] [7-7]
// cumu point = 2
sql """ INSERT INTO ${tableName} VALUES (1,1)"""
sql """ delete from ${tableName} where user_id = 1"""
cumulative_compaction(tablets, backendId_to_backendIP, backendId_to_backendHttpPort, backend_id)

sql """ INSERT INTO ${tableName} VALUES (1,1)"""
sql """ delete from ${tableName} where user_id = 1"""
cumulative_compaction(tablets, backendId_to_backendIP, backendId_to_backendHttpPort, backend_id)

sql """ INSERT INTO ${tableName} VALUES (1,1)"""
sql """ delete from ${tableName} where user_id = 1"""
cumulative_compaction(tablets, backendId_to_backendIP, backendId_to_backendHttpPort, backend_id)

sql """ INSERT INTO ${tableName} VALUES (1,1)"""
sql """ delete from ${tableName} where user_id = 1"""
cumulative_compaction(tablets, backendId_to_backendIP, backendId_to_backendHttpPort, backend_id)

// trigger base compactions for all tablets in ${tableName}
for (def tablet in tablets) {
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId

(code, out, err) = be_run_base_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
sleep(1000)

def compactJson = parseJson(out.trim())
}

// after base compaction, there is only 1 rowset.
rowsetCount = 0
for (def tablet in tablets) {
String tablet_id = tablet.TabletId
(code, out, err) = curl("GET", tablet.CompactionStatus)
logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def tabletJson = parseJson(out.trim())
assert tabletJson.rowsets instanceof List
rowsetCount +=((List<String>) tabletJson.rowsets).size()
}
if (cloudMode) {
assert (rowsetCount == 2)
} else {
assert (rowsetCount == 1)
for(int i = 1; i <= 100; ++i){
sql """ INSERT INTO ${tableName} VALUES (1,1)"""
sql """ delete from ${tableName} where user_id = 1"""
Thread.sleep(500)
}

qt_6 """select * from ${tableName} order by user_id, value"""
Expand All @@ -134,5 +49,6 @@ suite("test_cumu_compaction_with_delete") {
} finally {
try_sql("DROP TABLE IF EXISTS ${tableName} FORCE")
GetDebugPoint().disableDebugPointForAllBEs("SizeBaseCumulativeCompactionPolicy.pick_input_rowsets.set_promotion_size_to_max")
GetDebugPoint().disableDebugPointForAllBEs("TabletManager::find_best_tablets_to_compaction.dcheck")
}
}

0 comments on commit 9252dbd

Please sign in to comment.