Skip to content

Commit

Permalink
[test](regression) add fault injection cases for LoadStream (apache#2…
Browse files Browse the repository at this point in the history
…9101)

Signed-off-by: freemandealer <[email protected]>
  • Loading branch information
freemandealer authored Dec 28, 2023
1 parent 03a6a28 commit b31494b
Show file tree
Hide file tree
Showing 9 changed files with 165 additions and 6 deletions.
6 changes: 5 additions & 1 deletion be/src/io/fs/file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "common/status.h"
#include "gutil/macros.h"
#include "io/fs/path.h"
#include "util/debug_points.h"
#include "util/slice.h"

namespace doris {
Expand Down Expand Up @@ -63,7 +64,10 @@ class FileWriter {

const Path& path() const { return _path; }

size_t bytes_appended() const { return _bytes_appended; }
size_t bytes_appended() const {
DBUG_EXECUTE_IF("FileWriter.bytes_appended.zero_bytes_appended", { return 0; });
return _bytes_appended;
}

std::shared_ptr<FileSystem> fs() const { return _fs; }

Expand Down
5 changes: 5 additions & 0 deletions be/src/io/fs/local_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "io/fs/local_file_writer.h"
#include "runtime/thread_context.h"
#include "util/async_io.h" // IWYU pragma: keep
#include "util/debug_points.h"
#include "util/defer_op.h"

namespace doris {
Expand All @@ -57,6 +58,10 @@ LocalFileSystem::~LocalFileSystem() = default;
Status LocalFileSystem::create_file_impl(const Path& file, FileWriterPtr* writer,
const FileWriterOptions* opts) {
int fd = ::open(file.c_str(), O_TRUNC | O_WRONLY | O_CREAT | O_CLOEXEC, 0666);
DBUG_EXECUTE_IF("LocalFileSystem.create_file_impl.open_file_failed", {
::close(fd);
fd = -1;
});
if (-1 == fd) {
return Status::IOError("failed to open {}: {}", file.native(), errno_to_str());
}
Expand Down
6 changes: 6 additions & 0 deletions be/src/io/fs/local_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "io/fs/file_writer.h"
#include "io/fs/local_file_system.h"
#include "io/fs/path.h"
#include "util/debug_points.h"
#include "util/doris_metrics.h"

namespace doris {
Expand Down Expand Up @@ -203,6 +204,11 @@ Status LocalFileWriter::_close(bool sync) {
if (0 != ::close(_fd)) {
return Status::IOError("cannot close {}: {}", _path.native(), std::strerror(errno));
}

DBUG_EXECUTE_IF("LocalFileWriter.close.failed", {
return Status::IOError("cannot close {}: {}", _path.native(), std::strerror(errno));
});

return Status::OK();
}

Expand Down
1 change: 1 addition & 0 deletions be/src/io/fs/stream_sink_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include "olap/olap_common.h"
#include "olap/rowset/beta_rowset_writer.h"
#include "util/debug_points.h"
#include "util/uid_util.h"
#include "vec/sink/load_stream_stub.h"

Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/rowset_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ Status RowsetBuilder::check_tablet_version_count() {
<< st;
}
int version_count = tablet()->version_count();
DBUG_EXECUTE_IF("RowsetBuilder.check_tablet_version_count.too_many_version",
{ version_count = INT_MAX; });
if (version_count > config::max_tablet_version_num) {
return Status::Error<TOO_MANY_VERSION>(
"failed to init rowset builder. version count: {}, exceed limit: {}, "
Expand Down
15 changes: 15 additions & 0 deletions be/src/runtime/load_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,13 @@
#include "runtime/load_channel.h"
#include "runtime/load_stream_mgr.h"
#include "runtime/load_stream_writer.h"
#include "util/debug_points.h"
#include "util/runtime_profile.h"
#include "util/thrift_util.h"
#include "util/uid_util.h"

#define UNKNOWN_ID_FOR_TEST 0x7c00

namespace doris {

bvar::LatencyRecorder g_load_stream_flush_wait_ms("load_stream_flush_wait_ms");
Expand Down Expand Up @@ -169,6 +172,7 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data
int64_t src_id = header.src_id();
uint32_t segid = header.segment_id();
uint32_t new_segid;
DBUG_EXECUTE_IF("TabletStream.add_segment.unknown_segid", { segid = UNKNOWN_ID_FOR_TEST; });
{
std::lock_guard lock_guard(_lock);
if (!_segids_mapping.contains(src_id)) {
Expand Down Expand Up @@ -440,6 +444,8 @@ Status LoadStream::_append_data(const PStreamHeader& header, butil::IOBuf* data)
IndexStreamSharedPtr index_stream;

int64_t index_id = header.index_id();
DBUG_EXECUTE_IF("TabletStream.add_segment.unknown_indexid",
{ index_id = UNKNOWN_ID_FOR_TEST; });
auto it = _index_streams_map.find(index_id);
if (it == _index_streams_map.end()) {
return Status::Error<ErrorCode::INVALID_ARGUMENT>("unknown index_id {}", index_id);
Expand Down Expand Up @@ -479,6 +485,15 @@ int LoadStream::on_received_messages(StreamId id, butil::IOBuf* const messages[]
void LoadStream::_dispatch(StreamId id, const PStreamHeader& hdr, butil::IOBuf* data) {
VLOG_DEBUG << PStreamHeader_Opcode_Name(hdr.opcode()) << " from " << hdr.src_id()
<< " with tablet " << hdr.tablet_id();
DBUG_EXECUTE_IF("LoadStream._dispatch.unknown_loadid", {
PUniqueId& load_id = const_cast<PUniqueId&>(hdr.load_id());
load_id.set_hi(UNKNOWN_ID_FOR_TEST);
load_id.set_lo(UNKNOWN_ID_FOR_TEST);
});
DBUG_EXECUTE_IF("LoadStream._dispatch.unknown_srcid", {
PStreamHeader& t_hdr = const_cast<PStreamHeader&>(hdr);
t_hdr.set_src_id(UNKNOWN_ID_FOR_TEST);
});
if (UniqueId(hdr.load_id()) != UniqueId(_load_id)) {
Status st = Status::Error<ErrorCode::INVALID_ARGUMENT>(
"invalid load id {}, expected {}", print_id(hdr.load_id()), print_id(_load_id));
Expand Down
15 changes: 12 additions & 3 deletions be/src/runtime/load_stream_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
#include "runtime/memory/mem_tracker.h"
#include "service/backend_options.h"
#include "util/brpc_client_cache.h"
#include "util/debug_points.h"
#include "util/mem_info.h"
#include "util/ref_count_closure.h"
#include "util/stopwatch.hpp"
Expand Down Expand Up @@ -87,9 +88,7 @@ Status LoadStreamWriter::append_data(uint32_t segid, uint64_t offset, butil::IOB
io::FileWriter* file_writer = nullptr;
{
std::lock_guard lock_guard(_lock);
if (!_is_init) {
RETURN_IF_ERROR(init());
}
DCHECK(_is_init);
if (segid >= _segment_file_writers.size()) {
for (size_t i = _segment_file_writers.size(); i <= segid; i++) {
Status st;
Expand All @@ -106,6 +105,7 @@ Status LoadStreamWriter::append_data(uint32_t segid, uint64_t offset, butil::IOB
// TODO: IOBuf to Slice
file_writer = _segment_file_writers[segid].get();
}
DBUG_EXECUTE_IF("LoadStreamWriter.append_data.null_file_writer", { file_writer = nullptr; });
VLOG_DEBUG << " file_writer " << file_writer << "seg id " << segid;
if (file_writer == nullptr) {
return Status::Corruption("append_data failed, file writer {} is destoryed", segid);
Expand All @@ -122,14 +122,18 @@ Status LoadStreamWriter::close_segment(uint32_t segid) {
io::FileWriter* file_writer = nullptr;
{
std::lock_guard lock_guard(_lock);
DBUG_EXECUTE_IF("LoadStreamWriter.close_segment.uninited_writer", { _is_init = false; });
if (!_is_init) {
return Status::Corruption("close_segment failed, LoadStreamWriter is not inited");
}
DBUG_EXECUTE_IF("LoadStreamWriter.close_segment.bad_segid",
{ segid = _segment_file_writers.size(); });
if (segid >= _segment_file_writers.size()) {
return Status::Corruption("close_segment failed, segment {} is never opened", segid);
}
file_writer = _segment_file_writers[segid].get();
}
DBUG_EXECUTE_IF("LoadStreamWriter.close_segment.null_file_writer", { file_writer = nullptr; });
if (file_writer == nullptr) {
return Status::Corruption("close_segment failed, file writer {} is destoryed", segid);
}
Expand All @@ -151,14 +155,18 @@ Status LoadStreamWriter::add_segment(uint32_t segid, const SegmentStatistics& st
io::FileWriter* file_writer = nullptr;
{
std::lock_guard lock_guard(_lock);
DBUG_EXECUTE_IF("LoadStreamWriter.add_segment.uninited_writer", { _is_init = false; });
if (!_is_init) {
return Status::Corruption("add_segment failed, LoadStreamWriter is not inited");
}
DBUG_EXECUTE_IF("LoadStreamWriter.add_segment.bad_segid",
{ segid = _segment_file_writers.size(); });
if (segid >= _segment_file_writers.size()) {
return Status::Corruption("add_segment failed, segment {} is never opened", segid);
}
file_writer = _segment_file_writers[segid].get();
}
DBUG_EXECUTE_IF("LoadStreamWriter.add_segment.null_file_writer", { file_writer = nullptr; });
if (file_writer == nullptr) {
return Status::Corruption("add_segment failed, file writer {} is destoryed", segid);
}
Expand All @@ -177,6 +185,7 @@ Status LoadStreamWriter::add_segment(uint32_t segid, const SegmentStatistics& st

Status LoadStreamWriter::close() {
std::lock_guard<std::mutex> l(_lock);
DBUG_EXECUTE_IF("LoadStreamWriter.close.uninited_writer", { _is_init = false; });
if (!_is_init) {
// if this delta writer is not initialized, but close() is called.
// which means this tablet has no data loaded, but at least one tablet
Expand Down
2 changes: 0 additions & 2 deletions be/src/runtime/load_stream_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@ class LoadStreamWriter {
// wait for all memtables to be flushed.
Status close();

int64_t tablet_id() const { return _req.tablet_id; }

private:
bool _is_init = false;
bool _is_canceled = false;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import org.codehaus.groovy.runtime.IOGroovyMethods
import org.apache.doris.regression.util.Http

suite("load_stream_fault_injection", "nonConcurrent") {
// init query case data
sql """
CREATE TABLE IF NOT EXISTS `baseall` (
`k0` boolean null comment "",
`k1` tinyint(4) null comment "",
`k2` smallint(6) null comment "",
`k3` int(11) null comment "",
`k4` bigint(20) null comment "",
`k5` decimal(9, 3) null comment "",
`k6` char(5) null comment "",
`k10` date null comment "",
`k11` datetime null comment "",
`k7` varchar(20) null comment "",
`k8` double max null comment "",
`k9` float sum null comment "",
`k12` string replace null comment "",
`k13` largeint(40) replace null comment ""
) engine=olap
DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1")
"""
sql """
CREATE TABLE IF NOT EXISTS `test` (
`k0` boolean null comment "",
`k1` tinyint(4) null comment "",
`k2` smallint(6) null comment "",
`k3` int(11) null comment "",
`k4` bigint(20) null comment "",
`k5` decimal(9, 3) null comment "",
`k6` char(5) null comment "",
`k10` date null comment "",
`k11` datetime null comment "",
`k7` varchar(20) null comment "",
`k8` double max null comment "",
`k9` float sum null comment "",
`k12` string replace_if_not_null null comment "",
`k13` largeint(40) replace null comment ""
) engine=olap
DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1")
"""

GetDebugPoint().clearDebugPointsForAllBEs()
streamLoad {
table "baseall"
db "regression_test_fault_injection_p0"
set 'column_separator', ','
file "baseall.txt"
}

def load_with_injection = { injection, expect_errmsg ->
try {
GetDebugPoint().enableDebugPointForAllBEs(injection)
sql "insert into test select * from baseall where k1 <= 3"
} catch(Exception e) {
// assertTrue(e.getMessage().contains("Process has no memory available")) // the msg should contain the root cause
logger.info(e.getMessage())
} finally {
GetDebugPoint().disableDebugPointForAllBEs(injection)
}
}

// LoadStreamWriter create file failed
load_with_injection("LocalFileSystem.create_file_impl.open_file_failed", "")
// LoadStreamWriter append_data meet null file writer error
load_with_injection("LoadStreamWriter.append_data.null_file_writer", "")
// LoadStreamWriter append_data meet bytes_appended and real file size not match error
load_with_injection("FileWriter.bytes_appended.zero_bytes_appended", "")
// LoadStreamWriter close_segment meet not inited error
load_with_injection("LoadStreamWriter.close_segment.uninited_writer", "")
// LoadStreamWriter close_segment meet not bad segid error
load_with_injection("LoadStreamWriter.close_segment.bad_segid", "")
// LoadStreamWriter close_segment meet null file writer error
load_with_injection("LoadStreamWriter.close_segment.null_file_writer", "")
// LoadStreamWriter close_segment meet file writer failed to close error
load_with_injection("LocalFileWriter.close.failed", "")
// LoadStreamWriter close_segment meet bytes_appended and real file size not match error
load_with_injection("FileWriter.close_segment.zero_bytes_appended", "")
// LoadStreamWriter add_segment meet not inited error
load_with_injection("LoadStreamWriter.add_segment.uninited_writer", "")
// LoadStreamWriter add_segment meet not bad segid error
load_with_injection("LoadStreamWriter.add_segment.bad_segid", "")
// LoadStreamWriter add_segment meet null file writer error
load_with_injection("LoadStreamWriter.add_segment.null_file_writer", "")
// LoadStreamWriter add_segment meet bytes_appended and real file size not match error
load_with_injection("FileWriter.add_segment.zero_bytes_appended", "")
// LoadStreamWriter close meet not inited error
load_with_injection("LoadStreamWriter.close.uninited_writer", "")
// LoadStream init failed coz LoadStreamWriter init failed
load_with_injection("RowsetBuilder.check_tablet_version_count.too_many_version", "")
// LoadStream add_segment meet unknown segid in request header
load_with_injection("TabletStream.add_segment.unknown_segid", "")
// LoadStream append_data meet unknown index id in request header
load_with_injection("abletStream.add_segment.unknown_indexid", "")
// LoadStream dispatch meet unknown load id
load_with_injection("LoadStream._dispatch.unknown_loadid", "")
// LoadStream dispatch meet unknown src id
load_with_injection("LoadStream._dispatch.unknown_srcid", "")
}

0 comments on commit b31494b

Please sign in to comment.