From e8adcd7b2b3465489f7ffaa57fb1c19ed29a609a Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Wed, 11 Dec 2024 18:49:04 +0800 Subject: [PATCH] [opt](compaction) Don't check missed rows in cumu compaction if input rowsets are not in tablet (#45279) Problem Summary: Suppose a heavy schema change process on BE converting tablet A to tablet B. 1. during schema change double write, new loads write [X-Y] on tablet B. 2. rowsets with version [a],[a+1],...,[b-1],[b] on tablet B are picked for cumu compaction(X<=atablet_meta()->tablet_schema()->cluster_key_uids().empty()) { merged_missed_rows_size += _stats.filtered_rows; } + + // Suppose a heavy schema change process on BE converting tablet A to tablet B. + // 1. during schema change double write, new loads write [X-Y] on tablet B. + // 2. rowsets with version [a],[a+1],...,[b-1],[b] on tablet B are picked for cumu compaction(X<=aget_header_lock()); + need_to_check_missed_rows = + std::all_of(_input_rowsets.begin(), _input_rowsets.end(), + [&](const RowsetSharedPtr& rowset) { + return tablet()->rowset_exists_unlocked(rowset); + }); + } + if (_tablet->tablet_state() == TABLET_RUNNING && - merged_missed_rows_size != missed_rows_size) { + merged_missed_rows_size != missed_rows_size && need_to_check_missed_rows) { std::stringstream ss; ss << "cumulative compaction: the merged rows(" << _stats.merged_rows << "), filtered rows(" << _stats.filtered_rows diff --git a/be/src/olap/cumulative_compaction.cpp b/be/src/olap/cumulative_compaction.cpp index 2dfd30fb86ed9a..a9509a005763f6 100644 --- a/be/src/olap/cumulative_compaction.cpp +++ b/be/src/olap/cumulative_compaction.cpp @@ -100,6 +100,20 @@ Status CumulativeCompaction::prepare_compact() { } Status CumulativeCompaction::execute_compact() { + DBUG_EXECUTE_IF("CumulativeCompaction::execute_compact.block", { + auto target_tablet_id = dp->param("tablet_id", -1); + if (target_tablet_id == _tablet->tablet_id()) { + LOG(INFO) << "start debug block " + << "CumulativeCompaction::execute_compact.block"; + while (DebugPoints::instance()->is_enable( + "CumulativeCompaction::execute_compact.block")) { + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + } + LOG(INFO) << "end debug block " + << "CumulativeCompaction::execute_compact.block"; + } + }) + std::unique_lock lock(tablet()->get_cumulative_compaction_lock(), std::try_to_lock); if (!lock.owns_lock()) { return Status::Error( diff --git a/be/src/olap/cumulative_compaction_policy.cpp b/be/src/olap/cumulative_compaction_policy.cpp index ee7a2b1812a0ae..c812a12b656580 100644 --- a/be/src/olap/cumulative_compaction_policy.cpp +++ b/be/src/olap/cumulative_compaction_policy.cpp @@ -28,6 +28,7 @@ #include "olap/olap_common.h" #include "olap/tablet.h" #include "olap/tablet_meta.h" +#include "util/debug_points.h" namespace doris { @@ -246,6 +247,21 @@ int SizeBasedCumulativeCompactionPolicy::pick_input_rowsets( const int64_t max_compaction_score, const int64_t min_compaction_score, std::vector* input_rowsets, Version* last_delete_version, size_t* compaction_score, bool allow_delete) { + DBUG_EXECUTE_IF("SizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets", { + auto target_tablet_id = dp->param("tablet_id", -1); + if (target_tablet_id == tablet->tablet_id()) { + auto start_version = dp->param("start_version", -1); + auto end_version = dp->param("end_version", -1); + for (auto& rowset : candidate_rowsets) { + if (rowset->start_version() >= start_version && + rowset->end_version() <= end_version) { + input_rowsets->push_back(rowset); + } + } + } + return input_rowsets->size(); + }) + size_t promotion_size = tablet->cumulative_promotion_size(); auto max_version = tablet->max_version().first; int transient_size = 0; diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index cdb637b1c42647..08b9b9a93d90f5 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -867,6 +867,8 @@ Status SchemaChangeJob::_do_process_alter_tablet(const TAlterTabletReqV2& reques return_columns[i] = i; } + DBUG_EXECUTE_IF("SchemaChangeJob::_do_process_alter_tablet.block", DBUG_BLOCK); + // begin to find deltas to convert from base tablet to new tablet so that // obtain base tablet and new tablet's push lock and header write lock to prevent loading data { diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 644ca9133eb885..a1a56507ffc67a 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -512,6 +512,15 @@ Status Tablet::add_rowset(RowsetSharedPtr rowset) { return Status::OK(); } +bool Tablet::rowset_exists_unlocked(const RowsetSharedPtr& rowset) { + if (auto it = _rs_version_map.find(rowset->version()); it == _rs_version_map.end()) { + return false; + } else if (rowset->rowset_id() != it->second->rowset_id()) { + return false; + } + return true; +} + Status Tablet::modify_rowsets(std::vector& to_add, std::vector& to_delete, bool check_delete) { // the compaction process allow to compact the single version, eg: version[4-4]. diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 0b7d758ab8fd88..40b911d6391b9b 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -173,6 +173,7 @@ class Tablet final : public BaseTablet { // MUST hold EXCLUSIVE `_meta_lock`. Status modify_rowsets(std::vector& to_add, std::vector& to_delete, bool check_delete = false); + bool rowset_exists_unlocked(const RowsetSharedPtr& rowset); Status add_inc_rowset(const RowsetSharedPtr& rowset); /// Delete stale rowset by timing. This delete policy uses now() minutes diff --git a/regression-test/data/fault_injection_p0/test_compaction_on_sc_new_tablet.out b/regression-test/data/fault_injection_p0/test_compaction_on_sc_new_tablet.out new file mode 100644 index 00000000000000..e7188943a10a19 --- /dev/null +++ b/regression-test/data/fault_injection_p0/test_compaction_on_sc_new_tablet.out @@ -0,0 +1,25 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 1 1 1 +2 2 2 2 +3 3 3 3 +4 4 4 4 +5 5 5 5 +6 6 6 6 +7 7 7 7 +8 8 8 8 +9 9 9 9 +10 10 10 10 + +-- !sql -- +1 9 9 9 +2 2 2 2 +3 3 3 3 +4 4 4 4 +5 5 5 5 +6 6 6 6 +7 7 7 7 +8 8 8 8 +9 9 9 9 +10 10 10 10 + diff --git a/regression-test/suites/fault_injection_p0/test_compaction_on_sc_new_tablet.groovy b/regression-test/suites/fault_injection_p0/test_compaction_on_sc_new_tablet.groovy new file mode 100644 index 00000000000000..2f3c44ef2dd00b --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_compaction_on_sc_new_tablet.groovy @@ -0,0 +1,149 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.junit.Assert +import java.util.concurrent.TimeUnit +import org.awaitility.Awaitility +import org.apache.doris.regression.suite.ClusterOptions + +suite("test_compaction_on_sc_new_tablet", "docker") { + def options = new ClusterOptions() + options.setFeNum(1) + options.setBeNum(1) + options.enableDebugPoints() + options.cloudMode = false + options.beConfigs += [ + 'enable_java_support=false', + 'enable_mow_compaction_correctness_check_core=true' + ] + docker(options) { + try { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + def table1 = "test_compaction_on_sc_new_tablet" + sql "DROP TABLE IF EXISTS ${table1} FORCE;" + sql """ CREATE TABLE IF NOT EXISTS ${table1} ( + `k` int, + `c1` int, + `c2` int, + `c3` int + ) UNIQUE KEY(k) + DISTRIBUTED BY HASH(k) BUCKETS 1 + PROPERTIES ( + "disable_auto_compaction" = "true", + "replication_num" = "1", + "enable_unique_key_merge_on_write" = "true"); """ + + // [2-11] + for (int i = 1; i <= 10; i++) { + sql "insert into ${table1} values($i,$i,$i,$i);" + } + qt_sql "select * from ${table1} order by k;" + + + def beNodes = sql_return_maparray("show backends;") + def tabletStats = sql_return_maparray("show tablets from ${table1};") + logger.info("tabletStats: \n${tabletStats}") + def tabletStat = tabletStats.get(0) + def tabletBackendId = tabletStat.BackendId + def tabletId = tabletStat.TabletId + def version = tabletStat.Version + def tabletBackend; + for (def be : beNodes) { + if (be.BackendId == tabletBackendId) { + tabletBackend = be + break; + } + } + logger.info("tablet ${tabletId} is on backend ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}, version=${version}"); + + // blocking the schema change process before it gains max version + GetDebugPoint().enableDebugPointForAllBEs("SchemaChangeJob::_do_process_alter_tablet.block") + Thread.sleep(2000) + + sql "alter table ${table1} modify column c1 varchar(100);" + + Thread.sleep(4000) + + // double write [11-22] + for (int i = 20; i <= 30; i++) { + sql "insert into ${table1} values(1,9,9,9);" + } + + tabletStats = sql_return_maparray("show tablets from ${table1};") + logger.info("tabletStats: \n${tabletStats}") + assertEquals(2, tabletStats.size()) + + def oldTabletStat + def newTabletStat + for (def stat: tabletStats) { + if (!stat.TabletId.equals(tabletId)) { + newTabletStat = stat + } else { + oldTabletStat = stat + } + } + logger.info("old tablet=[tablet_id=${oldTabletStat.TabletId}, version=${oldTabletStat.Version}]") + logger.info("new tablet=[tablet_id=${newTabletStat.TabletId}, version=${newTabletStat.Version}]") + + + // trigger cumu compaction on new tablet + int start_version = 15 + int end_version = 17 + // block compaction process on new tablet + GetDebugPoint().enableDebugPointForAllBEs("CumulativeCompaction::execute_compact.block", [tablet_id: "${newTabletStat.TabletId}"]) + // manully set cumu compaction's input rowsets on new tablet + GetDebugPoint().enableDebugPointForAllBEs("SizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets", + [tablet_id:"${newTabletStat.TabletId}", start_version:"${start_version}", end_version:"${end_version}"]) + + Thread.sleep(2000) + + logger.info("trigger compaction [15-17] on new tablet ${newTabletStat.TabletId}") + def (code, out, err) = be_run_cumulative_compaction(tabletBackend.Host, tabletBackend.HttpPort, newTabletStat.TabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + Assert.assertEquals(code, 0) + def compactJson = parseJson(out.trim()) + Assert.assertEquals("success", compactJson.status.toLowerCase()) + + // make the schema change run to complete and wait for it + GetDebugPoint().disableDebugPointForAllBEs("SchemaChangeJob::_do_process_alter_tablet.block") + waitForSchemaChangeDone { + sql """ SHOW ALTER TABLE COLUMN WHERE TableName='${table1}' ORDER BY createtime DESC LIMIT 1 """ + time 2000 + } + + Thread.sleep(2000) + + // make the cumu compaction run to complete and wait for it + GetDebugPoint().disableDebugPointForAllBEs("CumulativeCompaction::execute_compact.block") + + + // BE should skip to check merged rows in cumu compaction, otherwise it will cause coredump + // becasue [11-22] in new tablet will skip to calc delete bitmap becase tablet is in NOT_READY state + Thread.sleep(7000) + + qt_sql "select * from ${table1} order by k;" + + } catch(Exception e) { + logger.info(e.getMessage()) + throw e + } finally { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + } + } +}