diff --git a/be/src/cloud/cloud_cumulative_compaction.cpp b/be/src/cloud/cloud_cumulative_compaction.cpp index c466c35e2a2ab69..c5f75799e182465 100644 --- a/be/src/cloud/cloud_cumulative_compaction.cpp +++ b/be/src/cloud/cloud_cumulative_compaction.cpp @@ -91,6 +91,8 @@ Status CloudCumulativeCompaction::prepare_compact() { // plus 1 to skip the delete version. // NOTICE: after that, the cumulative point may be larger than max version of this tablet, but it doesn't matter. update_cumulative_point(); + st = Status::Error( + "_last_delete_version.first not equal to -1"); } return st; } diff --git a/be/src/cloud/cloud_storage_engine.cpp b/be/src/cloud/cloud_storage_engine.cpp index dc6abbac31ba1bf..9d18ecd6b14b4d9 100644 --- a/be/src/cloud/cloud_storage_engine.cpp +++ b/be/src/cloud/cloud_storage_engine.cpp @@ -674,7 +674,8 @@ Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletS auto st = compaction->prepare_compact(); if (!st.ok()) { long now = duration_cast(system_clock::now().time_since_epoch()).count(); - if (st.is()) { + if (st.is() && + st.msg() != "_last_delete_version.first not equal to -1") { // Backoff strategy if no suitable version tablet->last_cumu_no_suitable_version_ms = now; } diff --git a/regression-test/data/compaction/test_cumu_compaction_with_delete.out b/regression-test/data/compaction/test_cumu_compaction_with_delete.out new file mode 100644 index 000000000000000..2d0787106900eae --- /dev/null +++ b/regression-test/data/compaction/test_cumu_compaction_with_delete.out @@ -0,0 +1,37 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !1 -- +1 1 +2 2 +3 3 +4 4 +5 5 +6 6 + +-- !3 -- +2 2 +3 3 +4 4 +5 5 +6 6 + +-- !4 -- +3 3 +4 4 +5 5 +6 6 + +-- !5 -- +4 4 +5 5 +6 6 + +-- !5 -- +5 5 +6 6 + +-- !5 -- +6 6 + +-- !6 -- +6 6 + diff --git a/regression-test/suites/compaction/test_cumu_compaction_with_delete.groovy b/regression-test/suites/compaction/test_cumu_compaction_with_delete.groovy index 07983ec377d11e0..5049e6598dc624f 100644 --- a/regression-test/suites/compaction/test_cumu_compaction_with_delete.groovy +++ b/regression-test/suites/compaction/test_cumu_compaction_with_delete.groovy @@ -18,7 +18,40 @@ import org.codehaus.groovy.runtime.IOGroovyMethods suite("test_cumu_compaction_with_delete") { - def tableName = "test_full_compaction" + def tableName = "test_cumu_compaction_with_delete" + + def check_version_and_cumu_point = { tablets, version, cumu_point -> + // before compaction, there are 6 rowsets. + int rowsetCount = 0 + int cumuPoint = 0 + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + (code, out, err) = curl("GET", tablet.CompactionStatus) + logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def tabletJson = parseJson(out.trim()) + assert tabletJson.rowsets instanceof List + rowsetCount +=((List) tabletJson.rowsets).size() + //logger.info(tabletJson) + cumuPoint = tabletJson["cumulative point"] + } + assert (rowsetCount == version * 1) + assert (cumuPoint == cumu_point) + } + + def cumulative_compaction = { tablets, backendId_to_backendIP, backendId_to_backendHttpPort, backend_id -> + // trigger cumu compactions for all tablets in ${tableName} + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + backend_id = tablet.BackendId + + (code, out, err) = be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + sleep(1000) + + def compactJson = parseJson(out.trim()) + } + } try { String backend_id; @@ -28,135 +61,102 @@ suite("test_cumu_compaction_with_delete") { getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); backend_id = backendId_to_backendIP.keySet()[0] - def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id)) - logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err) - assertEquals(code, 0) - def configList = parseJson(out.trim()) - assert configList instanceof List - - boolean disableAutoCompaction = true - for (Object ele in (List) configList) { - assert ele instanceof List - if (((List) ele)[0] == "disable_auto_compaction") { - disableAutoCompaction = Boolean.parseBoolean(((List) ele)[2]) - } - } sql """ DROP TABLE IF EXISTS ${tableName} """ sql """ CREATE TABLE ${tableName} ( - `user_id` INT NOT NULL, `value` INT NOT NULL) + `user_id` INT NOT NULL, + `value` INT NOT NULL) UNIQUE KEY(`user_id`) DISTRIBUTED BY HASH(`user_id`) BUCKETS 1 PROPERTIES ("replication_allocation" = "tag.location.default: 1", "disable_auto_compaction" = "true", - "enable_mow_light_delete" = "false", - "enable_unique_key_merge_on_write" = "true");""" - - // version1 (1,1)(2,2) - sql """ INSERT INTO ${tableName} VALUES - (1,1),(2,2) - """ - qt_1 """select * from ${tableName} order by user_id""" - - - // version2 (1,10)(2,20) - sql """ INSERT INTO ${tableName} VALUES - (1,10),(2,20) - """ - qt_2 """select * from ${tableName} order by user_id""" - - - // version3 (1,100)(2,200) - sql """ INSERT INTO ${tableName} VALUES - (1,100),(2,200) - """ - qt_3 """select * from ${tableName} order by user_id""" - - - // version4 (1,100)(2,200)(3,300) - sql """ INSERT INTO ${tableName} VALUES - (3,300) - """ - qt_4 """select * from ${tableName} order by user_id""" - - - // version5 (1,100)(2,200)(3,100) - sql """update ${tableName} set value = 100 where user_id = 3""" - qt_5 """select * from ${tableName} order by user_id""" - - - // version6 (1,100)(2,200) - sql """delete from ${tableName} where user_id = 3""" - qt_6 """select * from ${tableName} order by user_id""" - - sql "SET skip_delete_predicate = true" - sql "SET skip_delete_sign = true" - sql "SET skip_delete_bitmap = true" - // show all hidden data - // (1,10)(1,100)(2,2)(2,20)(2,200)(3,300)(3,100) - qt_skip_delete """select * from ${tableName} order by user_id, value""" + "enable_mow_light_delete" = "true")""" + + // [0-1] | [2-2] [3-3] [4-4] [5-5] [6-6] [7-7] + // cumu point = 2 + sql """ INSERT INTO ${tableName} VALUES (1,1)""" + sql """ INSERT INTO ${tableName} VALUES (2,2)""" + sql """ INSERT INTO ${tableName} VALUES (3,3)""" + sql """ INSERT INTO ${tableName} VALUES (4,4)""" + sql """ INSERT INTO ${tableName} VALUES (5,5)""" + sql """ INSERT INTO ${tableName} VALUES (6,6)""" + qt_1 """select * from ${tableName} order by user_id, value""" //TabletId,ReplicaId,BackendId,SchemaHash,Version,LstSuccessVersion,LstFailedVersion,LstFailedTime,LocalDataSize,RemoteDataSize,RowCount,State,LstConsistencyCheckTime,CheckVersion,VersionCount,PathHash,MetaUrl,CompactionStatus def tablets = sql_return_maparray """ show tablets from ${tableName}; """ - def replicaNum = get_table_replica_num(tableName) - logger.info("get table replica num: " + replicaNum) - // before full compaction, there are 7 rowsets. - int rowsetCount = 0 - for (def tablet in tablets) { - String tablet_id = tablet.TabletId - (code, out, err) = curl("GET", tablet.CompactionStatus) - logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err) - assertEquals(code, 0) - def tabletJson = parseJson(out.trim()) - assert tabletJson.rowsets instanceof List - rowsetCount +=((List) tabletJson.rowsets).size() - } - assert (rowsetCount == 7 * replicaNum) - - // trigger full compactions for all tablets in ${tableName} + check_version_and_cumu_point(tablets, 7, 2) + + // [0-1] [2-7] | + // cumu point = 2 + cumulative_compaction(tablets, backendId_to_backendIP, backendId_to_backendHttpPort, backend_id) + check_version_and_cumu_point(tablets, 2, 2) + + // [0-1] [2-7] | [8-8] + // cumu point = 8 + sql """ delete from ${tableName} where user_id = 1""" + qt_3 """select * from ${tableName} order by user_id, value""" + + // [0-1] [2-7] [8-8] | + // cumu point = 9 + cumulative_compaction(tablets, backendId_to_backendIP, backendId_to_backendHttpPort, backend_id) + check_version_and_cumu_point(tablets, 3, 9) + + // [0-1] [2-7] [8-8] | [9-9] + // cumu point = 9 + sql """ delete from ${tableName} where user_id = 2""" + qt_4 """select * from ${tableName} order by user_id, value""" + + // [0-1] [2-7] [8-8] [9-9] | + // cumu point = 10 + cumulative_compaction(tablets, backendId_to_backendIP, backendId_to_backendHttpPort, backend_id) + check_version_and_cumu_point(tablets, 4, 10) + + // [0-1] [2-7] [8-8] [9-9] | [10-10] + // cumu point = 10 + sql """ delete from ${tableName} where user_id = 3""" + qt_5 """select * from ${tableName} order by user_id, value""" + + // [0-1] [2-7] [8-8] [9-9] [10-10] | + // cumu point = 11 + cumulative_compaction(tablets, backendId_to_backendIP, backendId_to_backendHttpPort, backend_id) + check_version_and_cumu_point(tablets, 5, 11) + + // [0-1] [2-7] [8-8] [9-9] [10-10] | [11-11] + // cumu point = 11 + sql """ delete from ${tableName} where user_id = 4""" + qt_5 """select * from ${tableName} order by user_id, value""" + + // [0-1] [2-7] [8-8] [9-9] [10-10] [11-11] | + // cumu point = 12 + cumulative_compaction(tablets, backendId_to_backendIP, backendId_to_backendHttpPort, backend_id) + check_version_and_cumu_point(tablets, 6, 12) + + // [0-1] [2-7] [8-8] [9-9] [10-10] [11-11] | [12-12] + // cumu point = 12 + sql """ delete from ${tableName} where user_id = 5""" + qt_5 """select * from ${tableName} order by user_id, value""" + + // [0-1] [2-7] [8-8] [9-9] [10-10] [11-11] [12-12] | + // cumu point = 13 + cumulative_compaction(tablets, backendId_to_backendIP, backendId_to_backendHttpPort, backend_id) + check_version_and_cumu_point(tablets, 7, 13) + + // trigger base compactions for all tablets in ${tableName} for (def tablet in tablets) { String tablet_id = tablet.TabletId backend_id = tablet.BackendId - times = 1 - do{ - (code, out, err) = be_run_full_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) - logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) - ++times - sleep(2000) - } while (parseJson(out.trim()).status.toLowerCase()!="success" && times<=10) + (code, out, err) = be_run_base_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + sleep(1000) def compactJson = parseJson(out.trim()) - if (compactJson.status.toLowerCase() == "fail") { - assertEquals(disableAutoCompaction, false) - logger.info("Compaction was done automatically!") - } - if (disableAutoCompaction) { - assertEquals("success", compactJson.status.toLowerCase()) - } - } - - // wait for full compaction done - for (def tablet in tablets) { - boolean running = true - do { - Thread.sleep(1000) - String tablet_id = tablet.TabletId - backend_id = tablet.BackendId - (code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) - logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) - assertEquals(code, 0) - def compactionStatus = parseJson(out.trim()) - assertEquals("success", compactionStatus.status.toLowerCase()) - running = compactionStatus.run_status - } while (running) } - // after full compaction, there is only 1 rowset. - + // after base compaction, there is only 1 rowset. rowsetCount = 0 for (def tablet in tablets) { String tablet_id = tablet.TabletId @@ -171,17 +171,11 @@ suite("test_cumu_compaction_with_delete") { if (cloudMode) { assert (rowsetCount == 2) } else { - assert (rowsetCount == 1 * replicaNum) + assert (rowsetCount == 1) } - // make sure all hidden data has been deleted - // (1,100)(2,200) - qt_select_final """select * from ${tableName} order by user_id""" + qt_6 """select * from ${tableName} order by user_id, value""" - sql "SET skip_delete_predicate = false" - sql "SET skip_delete_sign = false" - sql "SET skip_delete_bitmap = false" - qt_select_final2 """select * from ${tableName} order by user_id""" } finally { try_sql("DROP TABLE IF EXISTS ${tableName}") }