Skip to content

Commit

Permalink
Merge branch 'master' into 20230819_fix_stacktrace
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz authored Sep 20, 2023
2 parents e48c9c4 + e4b551e commit 3362797
Show file tree
Hide file tree
Showing 127 changed files with 1,427 additions and 563 deletions.
1 change: 1 addition & 0 deletions .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ github:
- nanfeng1999
- gitccl
- shuke987
- wm1581066

notifications:
pullrequests_status: [email protected]
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1088,6 +1088,8 @@ DEFINE_Int16(bitmap_serialize_version, "1");
// the count of thread to group commit insert
DEFINE_Int32(group_commit_insert_threads, "10");

DEFINE_mInt32(scan_thread_nice_value, "0");

#ifdef BE_TEST
// test s3
DEFINE_String(test_s3_resource, "resource");
Expand Down
6 changes: 6 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1159,6 +1159,12 @@ DECLARE_Int16(bitmap_serialize_version);
// This config can be set to limit thread number in group commit insert thread pool.
DECLARE_mInt32(group_commit_insert_threads);

// The configuration item is used to lower the priority of the scanner thread,
// typically employed to ensure CPU scheduling for write operations.
// Default is 0, which is default value of thread nice value, increase this value
// to lower the priority of scan threads
DECLARE_Int32(scan_thread_nice_value);

#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
Expand Down
2 changes: 1 addition & 1 deletion be/src/common/status.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ TStatus Status::to_thrift() const {
void Status::to_protobuf(PStatus* s) const {
s->clear_error_msgs();
s->set_status_code((int)_code);
if (!ok() && _err_msg) {
if (!ok()) {
s->add_error_msgs(fmt::format("({})[{}]{}", BackendOptions::get_localhost(),
code_as_string(), _err_msg ? _err_msg->_msg : ""));
}
Expand Down
12 changes: 8 additions & 4 deletions be/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -350,14 +350,18 @@ class Status {
return *this;
}

template <bool stacktrace = true>
Status static create(const TStatus& status) {
return Error<true>(status.status_code,
status.error_msgs.empty() ? "" : status.error_msgs[0]);
return Error<stacktrace>(
status.status_code,
"TStatus: " + (status.error_msgs.empty() ? "" : status.error_msgs[0]));
}

template <bool stacktrace = true>
Status static create(const PStatus& pstatus) {
return Error<true>(pstatus.status_code(),
pstatus.error_msgs_size() == 0 ? "" : pstatus.error_msgs(0));
return Error<stacktrace>(
pstatus.status_code(),
"PStatus: " + (pstatus.error_msgs_size() == 0 ? "" : pstatus.error_msgs(0)));
}

template <int code, bool stacktrace = true, typename... Args>
Expand Down
9 changes: 9 additions & 0 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ Status Compaction::do_compaction(int64_t permits) {
<< ", before=" << checksum_before << ", checksum_after=" << checksum_after;
}
}
_load_segment_to_cache();
return st;
}

Expand Down Expand Up @@ -825,6 +826,14 @@ int64_t Compaction::get_compaction_permits() {
return permits;
}

Status Compaction::_load_segment_to_cache() {
// Load new rowset's segments to cache.
SegmentCacheHandle handle;
RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(
std::static_pointer_cast<BetaRowset>(_output_rowset), &handle, true));
return Status::OK();
}

#ifdef BE_TEST
void Compaction::set_input_rowset(const std::vector<RowsetSharedPtr>& rowsets) {
_input_rowsets = rowsets;
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ class Compaction {

private:
bool _check_if_includes_input_rowsets(const RowsetIdUnorderedSet& commit_rowset_ids_set) const;
Status _load_segment_to_cache();

protected:
// the root tracker for this compaction
Expand Down
2 changes: 2 additions & 0 deletions be/src/pipeline/pipeline_x/dependency.h
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,9 @@ class AnalyticDependency final : public WriteDependency {
auto need_more_input = whether_need_next_partition(_analytic_state.found_partition_end);
if (need_more_input) {
block_reading();
set_ready_for_write();
} else {
block_writing();
set_ready_for_read();
}
return need_more_input;
Expand Down
18 changes: 8 additions & 10 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ FragmentMgr::FragmentMgr(ExecEnv* exec_env)
CHECK(s.ok()) << s.to_string();
}

FragmentMgr::~FragmentMgr() {}
FragmentMgr::~FragmentMgr() = default;

void FragmentMgr::stop() {
DEREGISTER_HOOK_METRIC(plan_fragment_count);
Expand Down Expand Up @@ -185,9 +185,9 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) {
if (!coord_status.ok()) {
std::stringstream ss;
UniqueId uid(req.query_id.hi, req.query_id.lo);
ss << "couldn't get a client for " << req.coord_addr << ", reason: " << coord_status;
LOG(WARNING) << "query_id: " << uid << ", " << ss.str();
req.update_fn(Status::InternalError(ss.str()));
req.update_fn(Status::InternalError(
"query_id: {}, couldn't get a client for {}, reason is {}", uid.to_string(),
PrintThriftNetworkAddress(req.coord_addr), coord_status.to_string()));
return;
}

Expand Down Expand Up @@ -322,12 +322,10 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) {
coord->reportExecStatus(res, params);
}

rpc_status = Status(Status::create(res.status));
rpc_status = Status::create<false>(res.status);
} catch (TException& e) {
std::stringstream msg;
msg << "ReportExecStatus() to " << req.coord_addr << " failed:\n" << e.what();
LOG(WARNING) << msg.str();
rpc_status = Status::InternalError(msg.str());
rpc_status = Status::InternalError("ReportExecStatus() to {} failed: {}",
PrintThriftNetworkAddress(req.coord_addr), e.what());
}

if (!rpc_status.ok()) {
Expand Down Expand Up @@ -1232,7 +1230,7 @@ Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request,
std::shared_ptr<pipeline::PipelineFragmentContext> pip_context;

RuntimeFilterMgr* runtime_filter_mgr = nullptr;
ObjectPool* pool;
ObjectPool* pool = nullptr;
if (is_pipeline) {
std::unique_lock<std::mutex> lock(_lock);
auto iter = _pipeline_map.find(tfragment_instance_id);
Expand Down
4 changes: 1 addition & 3 deletions be/src/service/backend_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -401,9 +401,7 @@ void BackendService::ingest_binlog(TIngestBinlogResult& result,
};

if (!config::enable_feature_binlog) {
auto error_msg = "enable feature binlog is false";
LOG(WARNING) << error_msg;
set_tstatus(TStatusCode::RUNTIME_ERROR, error_msg);
set_tstatus(TStatusCode::RUNTIME_ERROR, "enable feature binlog is false");
return;
}

Expand Down
118 changes: 118 additions & 0 deletions be/src/util/sha.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "util/sha.h"

#include <openssl/sha.h>

#include <string_view>

namespace doris {

constexpr static char dig_vec_lower[] = "0123456789abcdef";

void SHA1Digest::reset(const void* data, size_t length) {
SHA1_Init(&_sha_ctx);
SHA1_Update(&_sha_ctx, data, length);
}

std::string_view SHA1Digest::digest() {
unsigned char buf[SHA_DIGEST_LENGTH];
SHA1_Final(buf, &_sha_ctx);

char* to = _reuse_hex;
for (int i = 0; i < SHA_DIGEST_LENGTH; ++i) {
*to++ = dig_vec_lower[buf[i] >> 4];
*to++ = dig_vec_lower[buf[i] & 0x0F];
}

return std::string_view {_reuse_hex, _reuse_hex + 2 * SHA_DIGEST_LENGTH};
}

void SHA224Digest::reset(const void* data, size_t length) {
SHA224_Init(&_sha224_ctx);
SHA224_Update(&_sha224_ctx, data, length);
}

std::string_view SHA224Digest::digest() {
unsigned char buf[SHA224_DIGEST_LENGTH];
SHA224_Final(buf, &_sha224_ctx);

char* to = _reuse_hex;
for (int i = 0; i < SHA224_DIGEST_LENGTH; ++i) {
*to++ = dig_vec_lower[buf[i] >> 4];
*to++ = dig_vec_lower[buf[i] & 0x0F];
}

return std::string_view {_reuse_hex, _reuse_hex + 2 * SHA224_DIGEST_LENGTH};
}

void SHA256Digest::reset(const void* data, size_t length) {
SHA256_Init(&_sha256_ctx);
SHA256_Update(&_sha256_ctx, data, length);
}

std::string_view SHA256Digest::digest() {
unsigned char buf[SHA256_DIGEST_LENGTH];
SHA256_Final(buf, &_sha256_ctx);

char* to = _reuse_hex;
for (int i = 0; i < SHA256_DIGEST_LENGTH; ++i) {
*to++ = dig_vec_lower[buf[i] >> 4];
*to++ = dig_vec_lower[buf[i] & 0x0F];
}

return std::string_view {_reuse_hex, _reuse_hex + 2 * SHA256_DIGEST_LENGTH};
}

void SHA384Digest::reset(const void* data, size_t length) {
SHA384_Init(&_sha384_ctx);
SHA384_Update(&_sha384_ctx, data, length);
}

std::string_view SHA384Digest::digest() {
unsigned char buf[SHA384_DIGEST_LENGTH];
SHA384_Final(buf, &_sha384_ctx);

char* to = _reuse_hex;
for (int i = 0; i < SHA384_DIGEST_LENGTH; ++i) {
*to++ = dig_vec_lower[buf[i] >> 4];
*to++ = dig_vec_lower[buf[i] & 0x0F];
}

return std::string_view {_reuse_hex, _reuse_hex + 2 * SHA384_DIGEST_LENGTH};
}

void SHA512Digest::reset(const void* data, size_t length) {
SHA512_Init(&_sha512_ctx);
SHA512_Update(&_sha512_ctx, data, length);
}

std::string_view SHA512Digest::digest() {
unsigned char buf[SHA512_DIGEST_LENGTH];
SHA512_Final(buf, &_sha512_ctx);

char* to = _reuse_hex;
for (int i = 0; i < SHA512_DIGEST_LENGTH; ++i) {
*to++ = dig_vec_lower[buf[i] >> 4];
*to++ = dig_vec_lower[buf[i] & 0x0F];
}

return std::string_view {_reuse_hex, _reuse_hex + 2 * SHA512_DIGEST_LENGTH};
}

} // namespace doris
75 changes: 75 additions & 0 deletions be/src/util/sha.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <openssl/sha.h>

#include <string_view>

namespace doris {

class SHA1Digest {
public:
void reset(const void* data, size_t length);
std::string_view digest();

private:
SHA_CTX _sha_ctx;
char _reuse_hex[2 * SHA_DIGEST_LENGTH];
};

class SHA224Digest {
public:
void reset(const void* data, size_t length);
std::string_view digest();

private:
SHA256_CTX _sha224_ctx;
char _reuse_hex[2 * SHA224_DIGEST_LENGTH];
};

class SHA256Digest {
public:
void reset(const void* data, size_t length);
std::string_view digest();

private:
SHA256_CTX _sha256_ctx;
char _reuse_hex[2 * SHA256_DIGEST_LENGTH];
};

class SHA384Digest {
public:
void reset(const void* data, size_t length);
std::string_view digest();

private:
SHA512_CTX _sha384_ctx;
char _reuse_hex[2 * SHA384_DIGEST_LENGTH];
};

class SHA512Digest {
public:
void reset(const void* data, size_t length);
std::string_view digest();

private:
SHA512_CTX _sha512_ctx;
char _reuse_hex[2 * SHA512_DIGEST_LENGTH];
};
} // namespace doris
Loading

0 comments on commit 3362797

Please sign in to comment.