Skip to content

Commit

Permalink
Merge branch 'master' into 20240417_fix_stacktrace
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz authored Apr 18, 2024
2 parents b5973a5 + 9606252 commit e3907c5
Show file tree
Hide file tree
Showing 10 changed files with 252 additions and 121 deletions.
4 changes: 4 additions & 0 deletions be/src/io/fs/hdfs_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,11 @@ Status HdfsFileWriter::close() {
_closed = true;

if (_sync_file_data) {
#ifdef USE_LIBHDFS3
int ret = hdfsSync(_hdfs_handler->hdfs_fs, _hdfs_file);
#else
int ret = hdfsHSync(_hdfs_handler->hdfs_fs, _hdfs_file);
#endif
if (ret != 0) {
return Status::InternalError("failed to sync hdfs file. fs_name={} path={} : {}",
_fs_name, _path.native(), hdfs_error());
Expand Down
15 changes: 8 additions & 7 deletions be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -453,19 +453,20 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
!_opts.rowset_ctx->partial_update_info->can_insert_new_rows_in_partial_update;
specified_rowsets =
tablet->get_rowset_by_ids(&_mow_context->rowset_ids, should_include_stale);
if (_opts.rowset_ctx->partial_update_info->is_strict_mode &&
specified_rowsets.size() != _mow_context->rowset_ids.size()) {
if (specified_rowsets.size() != _mow_context->rowset_ids.size()) {
// Only when this is a strict mode partial update that missing rowsets here will lead to problems.
// In other case, the missing rowsets will be calculated in later phases(commit phase/publish phase)
LOG(WARNING) << fmt::format(
"[Memtable Flush] some rowsets have been deleted due to "
"compaction(specified_rowsets.size()={}, but rowset_ids.size()={}) in strict "
"mode partial update. tablet_id: {}, cur max_version: {}, transaction_id: {}",
"compaction(specified_rowsets.size()={}, but rowset_ids.size()={}) in "
"partial update. tablet_id: {}, cur max_version: {}, transaction_id: {}",
specified_rowsets.size(), _mow_context->rowset_ids.size(), _tablet->tablet_id(),
_mow_context->max_version, _mow_context->txn_id);
return Status::InternalError<false>(
"[Memtable Flush] some rowsets have been deleted due to "
"compaction in strict mode partial update");
if (_opts.rowset_ctx->partial_update_info->is_strict_mode) {
return Status::InternalError<false>(
"[Memtable Flush] some rowsets have been deleted due to "
"compaction in strict mode partial update");
}
}
}
std::vector<std::unique_ptr<SegmentCacheHandle>> segment_caches(specified_rowsets.size());
Expand Down
15 changes: 8 additions & 7 deletions be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -386,19 +386,20 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
!_opts.rowset_ctx->partial_update_info->can_insert_new_rows_in_partial_update;
specified_rowsets =
tablet->get_rowset_by_ids(&_mow_context->rowset_ids, should_include_stale);
if (_opts.rowset_ctx->partial_update_info->is_strict_mode &&
specified_rowsets.size() != _mow_context->rowset_ids.size()) {
if (specified_rowsets.size() != _mow_context->rowset_ids.size()) {
// Only when this is a strict mode partial update that missing rowsets here will lead to problems.
// In other case, the missing rowsets will be calculated in later phases(commit phase/publish phase)
LOG(WARNING) << fmt::format(
"[Memtable Flush] some rowsets have been deleted due to "
"compaction(specified_rowsets.size()={}, but rowset_ids.size()={}) in strict "
"mode partial update. tablet_id: {}, cur max_version: {}, transaction_id: {}",
"compaction(specified_rowsets.size()={}, but rowset_ids.size()={}) in "
"partial update. tablet_id: {}, cur max_version: {}, transaction_id: {}",
specified_rowsets.size(), _mow_context->rowset_ids.size(), _tablet->tablet_id(),
_mow_context->max_version, _mow_context->txn_id);
return Status::InternalError<false>(
"[Memtable Flush] some rowsets have been deleted due to "
"compaction in strict mode partial update");
if (_opts.rowset_ctx->partial_update_info->is_strict_mode) {
return Status::InternalError<false>(
"[Memtable Flush] some rowsets have been deleted due to "
"compaction in strict mode partial update");
}
}
}
std::vector<std::unique_ptr<SegmentCacheHandle>> segment_caches(specified_rowsets.size());
Expand Down
48 changes: 24 additions & 24 deletions be/src/runtime/memory/mem_tracker_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ MemTrackerLimiter::~MemTrackerLimiter() {
#ifdef NDEBUG
LOG(INFO) << err_msg;
#else
LOG(FATAL) << err_msg << print_address_sanitizers();
LOG(INFO) << err_msg << print_address_sanitizers();
#endif
}
if (ExecEnv::tracking_memory()) {
Expand All @@ -132,10 +132,10 @@ MemTrackerLimiter::~MemTrackerLimiter() {
_consumption->set(0);
#ifndef NDEBUG
} else if (!_address_sanitizers.empty()) {
LOG(FATAL) << "[Address Sanitizer] consumption is 0, but address sanitizers not empty. "
<< ", mem tracker label: " << _label
<< ", peak consumption: " << _consumption->peak_value()
<< print_address_sanitizers();
LOG(INFO) << "[Address Sanitizer] consumption is 0, but address sanitizers not empty. "
<< ", mem tracker label: " << _label
<< ", peak consumption: " << _consumption->peak_value()
<< print_address_sanitizers();
#endif
}
g_memtrackerlimiter_cnt << -1;
Expand All @@ -147,13 +147,13 @@ void MemTrackerLimiter::add_address_sanitizers(void* buf, size_t size) {
std::lock_guard<std::mutex> l(_address_sanitizers_mtx);
auto it = _address_sanitizers.find(buf);
if (it != _address_sanitizers.end()) {
LOG(FATAL) << "[Address Sanitizer] memory buf repeat add, mem tracker label: " << _label
<< ", consumption: " << _consumption->current_value()
<< ", peak consumption: " << _consumption->peak_value() << ", buf: " << buf
<< ", size: " << size << ", old buf: " << it->first
<< ", old size: " << it->second.size
<< ", new stack_trace: " << get_stack_trace()
<< ", old stack_trace: " << it->second.stack_trace;
LOG(INFO) << "[Address Sanitizer] memory buf repeat add, mem tracker label: " << _label
<< ", consumption: " << _consumption->current_value()
<< ", peak consumption: " << _consumption->peak_value() << ", buf: " << buf
<< ", size: " << size << ", old buf: " << it->first
<< ", old size: " << it->second.size
<< ", new stack_trace: " << get_stack_trace()
<< ", old stack_trace: " << it->second.stack_trace;
}

// if alignment not equal to 0, maybe usable_size > size.
Expand All @@ -170,21 +170,21 @@ void MemTrackerLimiter::remove_address_sanitizers(void* buf, size_t size) {
auto it = _address_sanitizers.find(buf);
if (it != _address_sanitizers.end()) {
if (it->second.size != size) {
LOG(FATAL) << "[Address Sanitizer] free memory buf size inaccurate, mem tracker "
"label: "
<< _label << ", consumption: " << _consumption->current_value()
<< ", peak consumption: " << _consumption->peak_value()
<< ", buf: " << buf << ", size: " << size << ", old buf: " << it->first
<< ", old size: " << it->second.size
<< ", new stack_trace: " << get_stack_trace()
<< ", old stack_trace: " << it->second.stack_trace;
LOG(INFO) << "[Address Sanitizer] free memory buf size inaccurate, mem tracker "
"label: "
<< _label << ", consumption: " << _consumption->current_value()
<< ", peak consumption: " << _consumption->peak_value()
<< ", buf: " << buf << ", size: " << size << ", old buf: " << it->first
<< ", old size: " << it->second.size
<< ", new stack_trace: " << get_stack_trace()
<< ", old stack_trace: " << it->second.stack_trace;
}
_address_sanitizers.erase(buf);
} else {
LOG(FATAL) << "[Address Sanitizer] memory buf not exist, mem tracker label: " << _label
<< ", consumption: " << _consumption->current_value()
<< ", peak consumption: " << _consumption->peak_value() << ", buf: " << buf
<< ", size: " << size << ", stack_trace: " << get_stack_trace();
LOG(INFO) << "[Address Sanitizer] memory buf not exist, mem tracker label: " << _label
<< ", consumption: " << _consumption->current_value()
<< ", peak consumption: " << _consumption->peak_value() << ", buf: " << buf
<< ", size: " << size << ", stack_trace: " << get_stack_trace();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,11 +256,11 @@ class AggregateFunctionGroupArrayIntersect
read_pod_binary(is_set_contains_null, buf);
data.value->change_contains_null_value(is_set_contains_null);
read_pod_binary(data.init, buf);
size_t size;
UInt64 size;
read_var_uint(size, buf);

T element;
for (size_t i = 0; i < size; ++i) {
for (UInt64 i = 0; i < size; ++i) {
read_int_binary(element, buf);
data.value->insert(static_cast<void*>(&element));
}
Expand Down Expand Up @@ -484,11 +484,11 @@ class AggregateFunctionGroupArrayIntersectGeneric
read_pod_binary(is_set_contains_null, buf);
data.value->change_contains_null_value(is_set_contains_null);
read_pod_binary(data.init, buf);
size_t size;
UInt64 size;
read_var_uint(size, buf);

StringRef element;
for (size_t i = 0; i < size; ++i) {
for (UInt64 i = 0; i < size; ++i) {
element = read_string_binary_into(*arena, buf);
data.value->insert((void*)element.data, element.size);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,12 @@ public OlapTable getDestTable() {
return destTable;
}

// the caller should get table read lock when call this method
public TExecPlanFragmentParams plan(TUniqueId loadId) throws UserException {
return this.plan(loadId, 1);
}

// the caller should get table read lock when call this method
// create the plan. the plan's query id and load id are same, using the parameter 'loadId'
public TExecPlanFragmentParams plan(TUniqueId loadId, int fragmentInstanceIdIndex) throws UserException {
if (destTable.getKeysType() != KeysType.UNIQUE_KEYS
Expand Down Expand Up @@ -344,11 +346,13 @@ public TExecPlanFragmentParams plan(TUniqueId loadId, int fragmentInstanceIdInde
return params;
}

// the caller should get table read lock when call this method
// single table plan fragmentInstanceIndex is 1(default value)
public TPipelineFragmentParams planForPipeline(TUniqueId loadId) throws UserException {
return this.planForPipeline(loadId, 1);
}

// the caller should get table read lock when call this method
public TPipelineFragmentParams planForPipeline(TUniqueId loadId, int fragmentInstanceIdIndex) throws UserException {
if (destTable.getKeysType() != KeysType.UNIQUE_KEYS
&& taskInfo.getMergeType() != LoadTask.MergeType.APPEND) {
Expand Down
Loading

0 comments on commit e3907c5

Please sign in to comment.