From ec69ed236a4df19bb1a5d0ab0974f5a647cbd0e2 Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Wed, 13 Nov 2024 19:02:52 +0800 Subject: [PATCH] [fix](move-memtable) error status should not be overwritten in TabletStream (#43813) Use `AtomicStatus` for `TabletStream::_status` to ensure thread safety. Use `AtomicStatus::update` to avoid error status being overwritten by `Status::OK()` --- be/src/runtime/load_stream.cpp | 69 +++++++++++++++++----------------- be/src/runtime/load_stream.h | 2 +- 2 files changed, 35 insertions(+), 36 deletions(-) diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp index 27908e35a9a440..7942770387382b 100644 --- a/be/src/runtime/load_stream.cpp +++ b/be/src/runtime/load_stream.cpp @@ -62,7 +62,6 @@ TabletStream::TabletStream(PUniqueId load_id, int64_t id, int64_t txn_id, _txn_id(txn_id), _load_stream_mgr(load_stream_mgr) { load_stream_mgr->create_tokens(_flush_tokens); - _status = Status::OK(); _profile = profile->create_child(fmt::format("TabletStream {}", id), true, true); _append_data_timer = ADD_TIMER(_profile, "AppendDataTime"); _add_segment_timer = ADD_TIMER(_profile, "AddSegmentTime"); @@ -71,7 +70,7 @@ TabletStream::TabletStream(PUniqueId load_id, int64_t id, int64_t txn_id, inline std::ostream& operator<<(std::ostream& ostr, const TabletStream& tablet_stream) { ostr << "load_id=" << tablet_stream._load_id << ", txn_id=" << tablet_stream._txn_id - << ", tablet_id=" << tablet_stream._id << ", status=" << tablet_stream._status; + << ", tablet_id=" << tablet_stream._id << ", status=" << tablet_stream._status.status(); return ostr; } @@ -89,19 +88,19 @@ Status TabletStream::init(std::shared_ptr schema, int64_t _load_stream_writer = std::make_shared(&req, _profile); DBUG_EXECUTE_IF("TabletStream.init.uninited_writer", { - _status = Status::Uninitialized("fault injection"); - return _status; + _status.update(Status::Uninitialized("fault injection")); + return _status.status(); }); - _status = _load_stream_writer->init(); + _status.update(_load_stream_writer->init()); if (!_status.ok()) { LOG(INFO) << "failed to init rowset builder due to " << *this; } - return _status; + return _status.status(); } Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data) { if (!_status.ok()) { - return _status; + return _status.status(); } // dispatch add_segment request @@ -150,8 +149,8 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data if (eos && st.ok()) { st = _load_stream_writer->close_segment(new_segid); } - if (!st.ok() && _status.ok()) { - _status = st; + if (!st.ok()) { + _status.update(st); LOG(WARNING) << "write data failed " << st << ", " << *this; } }; @@ -167,11 +166,11 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data timer.start(); while (flush_token->num_tasks() >= load_stream_flush_token_max_tasks) { if (timer.elapsed_time() / 1000 / 1000 >= load_stream_max_wait_flush_token_time_ms) { - _status = Status::Error( - "wait flush token back pressure time is more than " - "load_stream_max_wait_flush_token_time {}", - load_stream_max_wait_flush_token_time_ms); - return _status; + _status.update( + Status::Error("wait flush token back pressure time is more than " + "load_stream_max_wait_flush_token_time {}", + load_stream_max_wait_flush_token_time_ms)); + return _status.status(); } bthread_usleep(2 * 1000); // 2ms } @@ -181,14 +180,14 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data g_load_stream_flush_running_threads << 1; auto st = flush_token->submit_func(flush_func); if (!st.ok()) { - _status = st; + _status.update(st); } - return _status; + return _status.status(); } Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data) { if (!_status.ok()) { - return _status; + return _status.status(); } SCOPED_TIMER(_add_segment_timer); @@ -207,17 +206,17 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data { std::lock_guard lock_guard(_lock); if (!_segids_mapping.contains(src_id)) { - _status = Status::InternalError( + _status.update(Status::InternalError( "add segment failed, no segment written by this src be yet, src_id={}, " "segment_id={}", - src_id, segid); - return _status; + src_id, segid)); + return _status.status(); } if (segid >= _segids_mapping[src_id]->size()) { - _status = Status::InternalError( + _status.update(Status::InternalError( "add segment failed, segment is never written, src_id={}, segment_id={}", - src_id, segid); - return _status; + src_id, segid)); + return _status.status(); } new_segid = _segids_mapping[src_id]->at(segid); } @@ -226,17 +225,17 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data auto add_segment_func = [this, new_segid, stat, flush_schema]() { signal::set_signal_task_id(_load_id); auto st = _load_stream_writer->add_segment(new_segid, stat, flush_schema); - if (!st.ok() && _status.ok()) { - _status = st; + if (!st.ok()) { + _status.update(st); LOG(INFO) << "add segment failed " << *this; } }; auto& flush_token = _flush_tokens[new_segid % _flush_tokens.size()]; auto st = flush_token->submit_func(add_segment_func); if (!st.ok()) { - _status = st; + _status.update(st); } - return _status; + return _status.status(); } Status TabletStream::_run_in_heavy_work_pool(std::function fn) { @@ -265,12 +264,12 @@ void TabletStream::pre_close() { } SCOPED_TIMER(_close_wait_timer); - _status = _run_in_heavy_work_pool([this]() { + _status.update(_run_in_heavy_work_pool([this]() { for (auto& token : _flush_tokens) { token->wait(); } return Status::OK(); - }); + })); // it is necessary to check status after wait_func, // for create_rowset could fail during add_segment when loading to MOW table, // in this case, should skip close to avoid submit_calc_delete_bitmap_task which could cause coredump. @@ -279,23 +278,23 @@ void TabletStream::pre_close() { } if (_check_num_segments && (_next_segid.load() != _num_segments)) { - _status = Status::Corruption( + _status.update(Status::Corruption( "segment num mismatch in tablet {}, expected: {}, actual: {}, load_id: {}", _id, - _num_segments, _next_segid.load(), print_id(_load_id)); + _num_segments, _next_segid.load(), print_id(_load_id))); return; } - _status = _run_in_heavy_work_pool([this]() { return _load_stream_writer->pre_close(); }); + _status.update(_run_in_heavy_work_pool([this]() { return _load_stream_writer->pre_close(); })); } Status TabletStream::close() { if (!_status.ok()) { - return _status; + return _status.status(); } SCOPED_TIMER(_close_wait_timer); - _status = _run_in_heavy_work_pool([this]() { return _load_stream_writer->close(); }); - return _status; + _status.update(_run_in_heavy_work_pool([this]() { return _load_stream_writer->close(); })); + return _status.status(); } IndexStream::IndexStream(PUniqueId load_id, int64_t id, int64_t txn_id, diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h index 747b62875c31bf..462fce5a147ccf 100644 --- a/be/src/runtime/load_stream.h +++ b/be/src/runtime/load_stream.h @@ -71,7 +71,7 @@ class TabletStream { int64_t _num_segments = 0; bool _check_num_segments = true; bthread::Mutex _lock; - Status _status; + AtomicStatus _status; PUniqueId _load_id; int64_t _txn_id; RuntimeProfile* _profile = nullptr;