Skip to content

Commit

Permalink
Merge branch 'master' into 20240417_fix_tracker_compression
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz authored Apr 18, 2024
2 parents fc90fdd + 9606252 commit ec3934c
Show file tree
Hide file tree
Showing 15 changed files with 460 additions and 300 deletions.
4 changes: 4 additions & 0 deletions be/src/io/fs/hdfs_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,11 @@ Status HdfsFileWriter::close() {
_closed = true;

if (_sync_file_data) {
#ifdef USE_LIBHDFS3
int ret = hdfsSync(_hdfs_handler->hdfs_fs, _hdfs_file);
#else
int ret = hdfsHSync(_hdfs_handler->hdfs_fs, _hdfs_file);
#endif
if (ret != 0) {
return Status::InternalError("failed to sync hdfs file. fs_name={} path={} : {}",
_fs_name, _path.native(), hdfs_error());
Expand Down
15 changes: 8 additions & 7 deletions be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -453,19 +453,20 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
!_opts.rowset_ctx->partial_update_info->can_insert_new_rows_in_partial_update;
specified_rowsets =
tablet->get_rowset_by_ids(&_mow_context->rowset_ids, should_include_stale);
if (_opts.rowset_ctx->partial_update_info->is_strict_mode &&
specified_rowsets.size() != _mow_context->rowset_ids.size()) {
if (specified_rowsets.size() != _mow_context->rowset_ids.size()) {
// Only when this is a strict mode partial update that missing rowsets here will lead to problems.
// In other case, the missing rowsets will be calculated in later phases(commit phase/publish phase)
LOG(WARNING) << fmt::format(
"[Memtable Flush] some rowsets have been deleted due to "
"compaction(specified_rowsets.size()={}, but rowset_ids.size()={}) in strict "
"mode partial update. tablet_id: {}, cur max_version: {}, transaction_id: {}",
"compaction(specified_rowsets.size()={}, but rowset_ids.size()={}) in "
"partial update. tablet_id: {}, cur max_version: {}, transaction_id: {}",
specified_rowsets.size(), _mow_context->rowset_ids.size(), _tablet->tablet_id(),
_mow_context->max_version, _mow_context->txn_id);
return Status::InternalError<false>(
"[Memtable Flush] some rowsets have been deleted due to "
"compaction in strict mode partial update");
if (_opts.rowset_ctx->partial_update_info->is_strict_mode) {
return Status::InternalError<false>(
"[Memtable Flush] some rowsets have been deleted due to "
"compaction in strict mode partial update");
}
}
}
std::vector<std::unique_ptr<SegmentCacheHandle>> segment_caches(specified_rowsets.size());
Expand Down
15 changes: 8 additions & 7 deletions be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -386,19 +386,20 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
!_opts.rowset_ctx->partial_update_info->can_insert_new_rows_in_partial_update;
specified_rowsets =
tablet->get_rowset_by_ids(&_mow_context->rowset_ids, should_include_stale);
if (_opts.rowset_ctx->partial_update_info->is_strict_mode &&
specified_rowsets.size() != _mow_context->rowset_ids.size()) {
if (specified_rowsets.size() != _mow_context->rowset_ids.size()) {
// Only when this is a strict mode partial update that missing rowsets here will lead to problems.
// In other case, the missing rowsets will be calculated in later phases(commit phase/publish phase)
LOG(WARNING) << fmt::format(
"[Memtable Flush] some rowsets have been deleted due to "
"compaction(specified_rowsets.size()={}, but rowset_ids.size()={}) in strict "
"mode partial update. tablet_id: {}, cur max_version: {}, transaction_id: {}",
"compaction(specified_rowsets.size()={}, but rowset_ids.size()={}) in "
"partial update. tablet_id: {}, cur max_version: {}, transaction_id: {}",
specified_rowsets.size(), _mow_context->rowset_ids.size(), _tablet->tablet_id(),
_mow_context->max_version, _mow_context->txn_id);
return Status::InternalError<false>(
"[Memtable Flush] some rowsets have been deleted due to "
"compaction in strict mode partial update");
if (_opts.rowset_ctx->partial_update_info->is_strict_mode) {
return Status::InternalError<false>(
"[Memtable Flush] some rowsets have been deleted due to "
"compaction in strict mode partial update");
}
}
}
std::vector<std::unique_ptr<SegmentCacheHandle>> segment_caches(specified_rowsets.size());
Expand Down
48 changes: 24 additions & 24 deletions be/src/runtime/memory/mem_tracker_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ MemTrackerLimiter::~MemTrackerLimiter() {
#ifdef NDEBUG
LOG(INFO) << err_msg;
#else
LOG(FATAL) << err_msg << print_address_sanitizers();
LOG(INFO) << err_msg << print_address_sanitizers();
#endif
}
if (ExecEnv::tracking_memory()) {
Expand All @@ -132,10 +132,10 @@ MemTrackerLimiter::~MemTrackerLimiter() {
_consumption->set(0);
#ifndef NDEBUG
} else if (!_address_sanitizers.empty()) {
LOG(FATAL) << "[Address Sanitizer] consumption is 0, but address sanitizers not empty. "
<< ", mem tracker label: " << _label
<< ", peak consumption: " << _consumption->peak_value()
<< print_address_sanitizers();
LOG(INFO) << "[Address Sanitizer] consumption is 0, but address sanitizers not empty. "
<< ", mem tracker label: " << _label
<< ", peak consumption: " << _consumption->peak_value()
<< print_address_sanitizers();
#endif
}
g_memtrackerlimiter_cnt << -1;
Expand All @@ -147,13 +147,13 @@ void MemTrackerLimiter::add_address_sanitizers(void* buf, size_t size) {
std::lock_guard<std::mutex> l(_address_sanitizers_mtx);
auto it = _address_sanitizers.find(buf);
if (it != _address_sanitizers.end()) {
LOG(FATAL) << "[Address Sanitizer] memory buf repeat add, mem tracker label: " << _label
<< ", consumption: " << _consumption->current_value()
<< ", peak consumption: " << _consumption->peak_value() << ", buf: " << buf
<< ", size: " << size << ", old buf: " << it->first
<< ", old size: " << it->second.size
<< ", new stack_trace: " << get_stack_trace()
<< ", old stack_trace: " << it->second.stack_trace;
LOG(INFO) << "[Address Sanitizer] memory buf repeat add, mem tracker label: " << _label
<< ", consumption: " << _consumption->current_value()
<< ", peak consumption: " << _consumption->peak_value() << ", buf: " << buf
<< ", size: " << size << ", old buf: " << it->first
<< ", old size: " << it->second.size
<< ", new stack_trace: " << get_stack_trace()
<< ", old stack_trace: " << it->second.stack_trace;
}

// if alignment not equal to 0, maybe usable_size > size.
Expand All @@ -170,21 +170,21 @@ void MemTrackerLimiter::remove_address_sanitizers(void* buf, size_t size) {
auto it = _address_sanitizers.find(buf);
if (it != _address_sanitizers.end()) {
if (it->second.size != size) {
LOG(FATAL) << "[Address Sanitizer] free memory buf size inaccurate, mem tracker "
"label: "
<< _label << ", consumption: " << _consumption->current_value()
<< ", peak consumption: " << _consumption->peak_value()
<< ", buf: " << buf << ", size: " << size << ", old buf: " << it->first
<< ", old size: " << it->second.size
<< ", new stack_trace: " << get_stack_trace()
<< ", old stack_trace: " << it->second.stack_trace;
LOG(INFO) << "[Address Sanitizer] free memory buf size inaccurate, mem tracker "
"label: "
<< _label << ", consumption: " << _consumption->current_value()
<< ", peak consumption: " << _consumption->peak_value()
<< ", buf: " << buf << ", size: " << size << ", old buf: " << it->first
<< ", old size: " << it->second.size
<< ", new stack_trace: " << get_stack_trace()
<< ", old stack_trace: " << it->second.stack_trace;
}
_address_sanitizers.erase(buf);
} else {
LOG(FATAL) << "[Address Sanitizer] memory buf not exist, mem tracker label: " << _label
<< ", consumption: " << _consumption->current_value()
<< ", peak consumption: " << _consumption->peak_value() << ", buf: " << buf
<< ", size: " << size << ", stack_trace: " << get_stack_trace();
LOG(INFO) << "[Address Sanitizer] memory buf not exist, mem tracker label: " << _label
<< ", consumption: " << _consumption->current_value()
<< ", peak consumption: " << _consumption->peak_value() << ", buf: " << buf
<< ", size: " << size << ", stack_trace: " << get_stack_trace();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,11 +256,11 @@ class AggregateFunctionGroupArrayIntersect
read_pod_binary(is_set_contains_null, buf);
data.value->change_contains_null_value(is_set_contains_null);
read_pod_binary(data.init, buf);
size_t size;
UInt64 size;
read_var_uint(size, buf);

T element;
for (size_t i = 0; i < size; ++i) {
for (UInt64 i = 0; i < size; ++i) {
read_int_binary(element, buf);
data.value->insert(static_cast<void*>(&element));
}
Expand Down Expand Up @@ -484,11 +484,11 @@ class AggregateFunctionGroupArrayIntersectGeneric
read_pod_binary(is_set_contains_null, buf);
data.value->change_contains_null_value(is_set_contains_null);
read_pod_binary(data.init, buf);
size_t size;
UInt64 size;
read_var_uint(size, buf);

StringRef element;
for (size_t i = 0; i < size; ++i) {
for (UInt64 i = 0; i < size; ++i) {
element = read_string_binary_into(*arena, buf);
data.value->insert((void*)element.data, element.size);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ public long getIndexId() {
return indexId;
}

public void resetIndexIdForRestore(long id) {
indexId = id;
}

public KeysType getKeysType() {
return keysType;
}
Expand Down
14 changes: 13 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,9 @@ public Status resetIdsForRestore(Env env, Database db, ReplicaAllocation restore
// base index
baseIndexId = newIdxId;
}
indexIdToMeta.put(newIdxId, origIdxIdToMeta.get(entry.getKey()));
MaterializedIndexMeta indexMeta = origIdxIdToMeta.get(entry.getKey());
indexMeta.resetIndexIdForRestore(newIdxId);
indexIdToMeta.put(newIdxId, indexMeta);
indexNameToId.put(entry.getValue(), newIdxId);
}

Expand Down Expand Up @@ -1598,6 +1600,16 @@ public void readFields(DataInput in) throws IOException {
this.indexNameToId.put(indexName, indexId);
MaterializedIndexMeta indexMeta = MaterializedIndexMeta.read(in);
indexIdToMeta.put(indexId, indexMeta);

// HACK: the index id in MaterializedIndexMeta is not equals to the index id
// saved in OlapTable, because the table restore from snapshot is not reset
// the MaterializedIndexMeta correctly.
if (indexMeta.getIndexId() != indexId) {
LOG.warn("HACK: the index id {} in materialized index meta of {} is not equals"
+ " to the index saved in table {} ({}), reset it to {}",
indexMeta.getIndexId(), indexName, name, id, indexId);
indexMeta.resetIndexIdForRestore(indexId);
}
}

// partition and distribution info
Expand Down
Loading

0 comments on commit ec3934c

Please sign in to comment.