From d1eb413b88ff63bf103ee7a13a9cd35b704cf2cc Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Wed, 11 Dec 2024 22:52:13 +0800 Subject: [PATCH] branch-3.0: [fix](move-memtable) tolerate non-open streams in close wait #44680 (#45153) Cherry-picked from #44680 Co-authored-by: Kaijie Chen --- be/src/vec/sink/load_stream_stub.cpp | 16 ++++++---------- be/src/vec/sink/writer/vtablet_writer_v2.cpp | 12 +++++++++--- .../test_multi_replica_fault_injection.groovy | 13 ++++++++----- 3 files changed, 23 insertions(+), 18 deletions(-) diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index 2a38b179b95531..86002233b0c792 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -219,11 +219,7 @@ Status LoadStreamStub::append_data(int64_t partition_id, int64_t index_id, int64 add_failed_tablet(tablet_id, _status); return _status; } - DBUG_EXECUTE_IF("LoadStreamStub.only_send_segment_0", { - if (segment_id != 0) { - return Status::OK(); - } - }); + DBUG_EXECUTE_IF("LoadStreamStub.skip_send_segment", { return Status::OK(); }); PStreamHeader header; header.set_src_id(_src_id); *header.mutable_load_id() = _load_id; @@ -246,11 +242,7 @@ Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id, int64 add_failed_tablet(tablet_id, _status); return _status; } - DBUG_EXECUTE_IF("LoadStreamStub.only_send_segment_0", { - if (segment_id != 0) { - return Status::OK(); - } - }); + DBUG_EXECUTE_IF("LoadStreamStub.skip_send_segment", { return Status::OK(); }); PStreamHeader header; header.set_src_id(_src_id); *header.mutable_load_id() = _load_id; @@ -340,6 +332,10 @@ Status LoadStreamStub::wait_for_schema(int64_t partition_id, int64_t index_id, i Status LoadStreamStub::close_wait(RuntimeState* state, int64_t timeout_ms) { DBUG_EXECUTE_IF("LoadStreamStub::close_wait.long_wait", DBUG_BLOCK); + if (!_is_open.load()) { + // we don't need to close wait on non-open streams + return Status::OK(); + } if (!_is_closing.load()) { return _status; } diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index 17f83911a8901b..5c526b9e36bff1 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -269,14 +269,20 @@ Status VTabletWriterV2::open(RuntimeState* state, RuntimeProfile* profile) { } Status VTabletWriterV2::_open_streams() { - bool fault_injection_skip_be = true; + int fault_injection_skip_be = 0; bool any_backend = false; bool any_success = false; for (auto& [dst_id, _] : _tablets_for_node) { auto streams = _load_stream_map->get_or_create(dst_id); DBUG_EXECUTE_IF("VTabletWriterV2._open_streams.skip_one_backend", { - if (fault_injection_skip_be) { - fault_injection_skip_be = false; + if (fault_injection_skip_be < 1) { + fault_injection_skip_be++; + continue; + } + }); + DBUG_EXECUTE_IF("VTabletWriterV2._open_streams.skip_two_backends", { + if (fault_injection_skip_be < 2) { + fault_injection_skip_be++; continue; } }); diff --git a/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy index 2f6afd5ca6925b..d09983d52d0dc3 100644 --- a/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy +++ b/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy @@ -75,14 +75,15 @@ suite("test_multi_replica_fault_injection", "nonConcurrent") { file "baseall.txt" } - def load_with_injection = { injection, error_msg-> + def load_with_injection = { injection, error_msg, success=false-> try { sql "truncate table test" GetDebugPoint().enableDebugPointForAllBEs(injection) sql "insert into test select * from baseall where k1 <= 3" + assertTrue(success, String.format("Expected Exception '%s', actual success", error_msg)) } catch(Exception e) { logger.info(e.getMessage()) - assertTrue(e.getMessage().contains(error_msg)) + assertTrue(e.getMessage().contains(error_msg), e.toString()) } finally { GetDebugPoint().disableDebugPointForAllBEs(injection) } @@ -90,15 +91,17 @@ suite("test_multi_replica_fault_injection", "nonConcurrent") { // StreamSinkFileWriter appendv write segment failed one replica // success - load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_one_replica", "sucess") + load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_one_replica", "sucess", true) // StreamSinkFileWriter appendv write segment failed two replica load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_two_replica", "add segment failed") // StreamSinkFileWriter appendv write segment failed all replica load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_all_replica", "failed to send segment data to any replicas") // test segment num check when LoadStreamStub missed tail segments - load_with_injection("LoadStreamStub.only_send_segment_0", "segment num mismatch") + load_with_injection("LoadStreamStub.skip_send_segment", "segment num mismatch") // test one backend open failure - load_with_injection("VTabletWriterV2._open_streams.skip_one_backend", "success") + load_with_injection("VTabletWriterV2._open_streams.skip_one_backend", "success", true) + // test two backend open failure + load_with_injection("VTabletWriterV2._open_streams.skip_two_backends", "not enough streams 1/3") sql """ set enable_memtable_on_sink_node=false """ } }