Skip to content

Commit

Permalink
Merge branch 'master' into restore_binlog
Browse files Browse the repository at this point in the history
  • Loading branch information
Vallishp authored Dec 9, 2024
2 parents 234e36c + 9c4c0b3 commit 8091a1b
Show file tree
Hide file tree
Showing 58 changed files with 3,411 additions and 492 deletions.
1 change: 1 addition & 0 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ Status CloudTablet::sync_if_not_running() {
}

TabletSchemaSPtr CloudTablet::merged_tablet_schema() const {
std::shared_lock rlock(_meta_lock);
return _merged_tablet_schema;
}

Expand Down
13 changes: 7 additions & 6 deletions be/src/io/cache/block_file_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1093,15 +1093,16 @@ bool BlockFileCache::remove_if_ttl_file_unlock(const UInt128Wrapper& file_key, b
_key_to_time.find(file_key) != _key_to_time.end()) {
if (!remove_directly) {
for (auto& [_, cell] : _files[file_key]) {
if (cell.file_block->cache_type() == FileCacheType::TTL) {
Status st = cell.file_block->update_expiration_time(0);
if (!st.ok()) {
LOG_WARNING("Failed to update expiration time to 0").error(st);
}
if (cell.file_block->cache_type() != FileCacheType::TTL) {
continue;
}
Status st = cell.file_block->update_expiration_time(0);
if (!st.ok()) {
LOG_WARNING("Failed to update expiration time to 0").error(st);
}

if (cell.file_block->cache_type() == FileCacheType::NORMAL) continue;
auto st = cell.file_block->change_cache_type_between_ttl_and_others(
st = cell.file_block->change_cache_type_between_ttl_and_others(
FileCacheType::NORMAL);
if (st.ok()) {
if (cell.queue_iterator) {
Expand Down
5 changes: 4 additions & 1 deletion be/src/olap/base_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,10 @@ class BaseTablet {
int64_t limit);

// Return the merged schema of all rowsets
virtual TabletSchemaSPtr merged_tablet_schema() const { return _max_version_schema; }
virtual TabletSchemaSPtr merged_tablet_schema() const {
std::shared_lock rlock(_meta_lock);
return _max_version_schema;
}

void traverse_rowsets(std::function<void(const RowsetSharedPtr&)> visitor,
bool include_stale = false) {
Expand Down
30 changes: 20 additions & 10 deletions be/src/olap/rowid_conversion.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,33 @@ class RowIdConversion {
~RowIdConversion() { RELEASE_THREAD_MEM_TRACKER(_seg_rowid_map_mem_used); }

// resize segment rowid map to its rows num
void init_segment_map(const RowsetId& src_rowset_id, const std::vector<uint32_t>& num_rows) {
size_t delta_std_pair_cap = 0;
Status init_segment_map(const RowsetId& src_rowset_id, const std::vector<uint32_t>& num_rows) {
for (size_t i = 0; i < num_rows.size(); i++) {
constexpr size_t RESERVED_MEMORY = 10 * 1024 * 1024; // 10M
if (doris::GlobalMemoryArbitrator::is_exceed_hard_mem_limit(RESERVED_MEMORY)) {
return Status::MemoryLimitExceeded(fmt::format(
"RowIdConversion init_segment_map failed, memory exceed limit, {}, "
"consuming "
"tracker:<{}>, peak used {}, current used {}.",
doris::GlobalMemoryArbitrator::process_limit_exceeded_errmsg_str(),
doris::thread_context()->thread_mem_tracker()->label(),
doris::thread_context()->thread_mem_tracker()->peak_consumption(),
doris::thread_context()->thread_mem_tracker()->consumption()));
}

uint32_t id = _segments_rowid_map.size();
_segment_to_id_map.emplace(std::pair<RowsetId, uint32_t> {src_rowset_id, i}, id);
_id_to_segment_map.emplace_back(src_rowset_id, i);
std::vector<std::pair<uint32_t, uint32_t>> vec(
num_rows[i], std::pair<uint32_t, uint32_t>(UINT32_MAX, UINT32_MAX));
delta_std_pair_cap += vec.capacity();

//NOTE: manually count _segments_rowid_map's memory here, because _segments_rowid_map could be used by indexCompaction.
// indexCompaction is a thridparty code, it's too complex to modify it.
// refer compact_column.
track_mem_usage(vec.capacity());
_segments_rowid_map.emplace_back(std::move(vec));
}
//NOTE: manually count _segments_rowid_map's memory here, because _segments_rowid_map could be used by indexCompaction.
// indexCompaction is a thridparty code, it's too complex to modify it.
// refer compact_column.
track_mem_usage(delta_std_pair_cap);
return Status::OK();
}

// set dst rowset id
Expand Down Expand Up @@ -124,9 +136,7 @@ class RowIdConversion {
size_t new_size =
_std_pair_cap * sizeof(std::pair<uint32_t, uint32_t>) +
_segments_rowid_map.capacity() * sizeof(std::vector<std::pair<uint32_t, uint32_t>>);

RELEASE_THREAD_MEM_TRACKER(_seg_rowid_map_mem_used);
CONSUME_THREAD_MEM_TRACKER(new_size);
CONSUME_THREAD_MEM_TRACKER(new_size - _seg_rowid_map_mem_used);
_seg_rowid_map_mem_used = new_size;
}

Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/rowset/beta_rowset_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,8 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
// init segment rowid map for rowid conversion
std::vector<uint32_t> segment_num_rows;
RETURN_IF_ERROR(get_segment_num_rows(&segment_num_rows));
_read_context->rowid_conversion->init_segment_map(rowset()->rowset_id(), segment_num_rows);
RETURN_IF_ERROR(_read_context->rowid_conversion->init_segment_map(rowset()->rowset_id(),
segment_num_rows));
}

auto [seg_start, seg_end] = _segment_offsets;
Expand Down
10 changes: 5 additions & 5 deletions be/src/olap/rowset/segment_v2/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2175,11 +2175,11 @@ Status SegmentIterator::_next_batch_internal(vectorized::Block* block) {
if (block->rows() == 0) {
vectorized::MutableColumnPtr col0 =
std::move(*block->get_by_position(0).column).mutate();
auto res_column = vectorized::ColumnString::create();
res_column->insert_data("", 0);
auto col_const = vectorized::ColumnConst::create(std::move(res_column),
selected_size);
block->replace_by_position(0, std::move(col_const));
auto tmp_indicator_col =
block->get_by_position(0)
.type->create_column_const_with_default_value(
selected_size);
block->replace_by_position(0, std::move(tmp_indicator_col));
_output_index_result_column_for_expr(_sel_rowid_idx.data(), selected_size,
block);
block->shrink_char_type_column_suffix_zero(_char_type_idx_no_0);
Expand Down
Loading

0 comments on commit 8091a1b

Please sign in to comment.