From 53308569129acdeb9e72b5c5cb9e0b99d94524d8 Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Thu, 21 Nov 2024 14:07:36 +0800 Subject: [PATCH] [fix](move-memtable) immediately return error when close wait failed (#44344) Problem Summary: #38003 introduced a problem where the last sink node could report success even when close wait timeout, which may cause data loss. Previously we made that change hoping to tolerate minority replica failure in this step. However, it turns out the last sink node could miss tablet reports from downstreams in case of close wait failure. This PR fixes the problem by return the close_wait error immediately. The most common error in close wait is timeout, and it should not be fault tolerant on a replica basis anyways. --- be/src/vec/sink/writer/vtablet_writer_v2.cpp | 7 ++++--- be/src/vec/sink/writer/vtablet_writer_v2.h | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index 5d47aec0cf3009..17f83911a8901b 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -606,7 +606,7 @@ Status VTabletWriterV2::close(Status exec_status) { // close_wait on all non-incremental streams, even if this is not the last sink. // because some per-instance data structures are now shared among all sinks // due to sharing delta writers and load stream stubs. - _close_wait(false); + RETURN_IF_ERROR(_close_wait(false)); // send CLOSE_LOAD on all incremental streams if this is the last sink. // this must happen after all non-incremental streams are closed, @@ -616,7 +616,7 @@ Status VTabletWriterV2::close(Status exec_status) { } // close_wait on all incremental streams, even if this is not the last sink. - _close_wait(true); + RETURN_IF_ERROR(_close_wait(true)); // calculate and submit commit info if (is_last_sink) { @@ -665,7 +665,7 @@ Status VTabletWriterV2::close(Status exec_status) { return status; } -void VTabletWriterV2::_close_wait(bool incremental) { +Status VTabletWriterV2::_close_wait(bool incremental) { SCOPED_TIMER(_close_load_timer); auto st = _load_stream_map->for_each_st( [this, incremental](int64_t dst_id, LoadStreamStubs& streams) -> Status { @@ -690,6 +690,7 @@ void VTabletWriterV2::_close_wait(bool incremental) { if (!st.ok()) { LOG(WARNING) << "close_wait failed: " << st << ", load_id=" << print_id(_load_id); } + return st; } void VTabletWriterV2::_calc_tablets_to_commit() { diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h b/be/src/vec/sink/writer/vtablet_writer_v2.h index 52cae53a0b5d0a..46a3974bba8aa0 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.h +++ b/be/src/vec/sink/writer/vtablet_writer_v2.h @@ -148,7 +148,7 @@ class VTabletWriterV2 final : public AsyncResultWriter { void _calc_tablets_to_commit(); - void _close_wait(bool incremental); + Status _close_wait(bool incremental); void _cancel(Status status);