Skip to content

Commit

Permalink
Merge branch 'master' into add-data-type-array-ut
Browse files Browse the repository at this point in the history
  • Loading branch information
amorynan authored Dec 27, 2024
2 parents 4b3cbc3 + 4ade575 commit 036c0f2
Show file tree
Hide file tree
Showing 4,769 changed files with 130,320 additions and 114,900 deletions.
The diff you're trying to view is too large. We only load the first 3000 changed files.
7 changes: 2 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,9 @@ Apache Doris is an easy-to-use, high-performance and real-time analytical databa

All this makes Apache Doris an ideal tool for scenarios including report analysis, ad-hoc query, unified data warehouse, and data lake query acceleration. On Apache Doris, users can build various applications, such as user behavior analysis, AB test platform, log retrieval analysis, user portrait analysis, and order analysis.

🎉 Version 2.1.4 released now. Check out the 🔗[Release Notes](https://doris.apache.org/docs/releasenotes/release-2.1.4) here. The 2.1 verison delivers exceptional performance with 100% higher out-of-the-box queries proven by TPC-DS 1TB tests, enhanced data lake analytics that are 4-6 times speedier than Trino and Spark, solid support for semi-structured data analysis with new Variant types and suite of analytical functions, asynchronous materialized views for query acceleration, optimized real-time writing at scale, and better workload management with stability and runtime SQL resource tracking.
🎉 Check out the 🔗[All releases](https://doris.apache.org/docs/releasenotes/all-release), where you'll find a chronological summary of Apache Doris versions released over the past year.


🎉 Version 2.0.12 is now released ! This fully evolved and stable release is ready for all users to upgrade. Check out the 🔗[Release Notes](https://doris.apache.org/docs/2.0/releasenotes/release-2.0.12) here.

👀 Have a look at the 🔗[Official Website](https://doris.apache.org/) for a comprehensive list of Apache Doris's core features, blogs and user cases.
👀 Explore the 🔗[Official Website](https://doris.apache.org/) to discover Apache Doris's core features, blogs, and user cases in detail.

## 📈 Usage Scenarios

Expand Down
1 change: 1 addition & 0 deletions be/src/agent/workload_group_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "agent/workload_group_listener.h"

#include "runtime/exec_env.h"
#include "runtime/workload_group/workload_group.h"
#include "runtime/workload_group/workload_group_manager.h"
#include "util/mem_info.h"
Expand Down
3 changes: 2 additions & 1 deletion be/src/agent/workload_group_listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@
#include <glog/logging.h>

#include "agent/topic_listener.h"
#include "runtime/exec_env.h"

namespace doris {

class ExecEnv;

class WorkloadGroupListener : public TopicListener {
public:
~WorkloadGroupListener() {}
Expand Down
7 changes: 4 additions & 3 deletions be/src/cloud/cloud_base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,9 @@ Status CloudBaseCompaction::execute_compact() {
<< ", output_version=" << _output_version;
return res;
}
LOG_INFO("finish CloudBaseCompaction, tablet_id={}, cost={}ms", _tablet->tablet_id(),
duration_cast<milliseconds>(steady_clock::now() - start).count())
LOG_INFO("finish CloudBaseCompaction, tablet_id={}, cost={}ms range=[{}-{}]",
_tablet->tablet_id(), duration_cast<milliseconds>(steady_clock::now() - start).count(),
_input_rowsets.front()->start_version(), _input_rowsets.back()->end_version())
.tag("job_id", _uuid)
.tag("input_rowsets", _input_rowsets.size())
.tag("input_rows", _input_row_num)
Expand Down Expand Up @@ -343,7 +344,7 @@ Status CloudBaseCompaction::modify_rowsets() {
.tag("input_rowsets", _input_rowsets.size())
.tag("input_rows", _input_row_num)
.tag("input_segments", _input_segments)
.tag("update_bitmap_size", output_rowset_delete_bitmap->delete_bitmap.size());
.tag("num_output_delete_bitmap", output_rowset_delete_bitmap->delete_bitmap.size());
compaction_job->set_delete_bitmap_lock_initiator(initiator);
}

Expand Down
12 changes: 9 additions & 3 deletions be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ Status CloudCumulativeCompaction::prepare_compact() {
// plus 1 to skip the delete version.
// NOTICE: after that, the cumulative point may be larger than max version of this tablet, but it doesn't matter.
update_cumulative_point();
if (!config::enable_sleep_between_delete_cumu_compaction) {
st = Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>(
"_last_delete_version.first not equal to -1");
}
}
return st;
}
Expand Down Expand Up @@ -200,8 +204,9 @@ Status CloudCumulativeCompaction::execute_compact() {
<< ", output_version=" << _output_version;
return res;
}
LOG_INFO("finish CloudCumulativeCompaction, tablet_id={}, cost={}ms", _tablet->tablet_id(),
duration_cast<milliseconds>(steady_clock::now() - start).count())
LOG_INFO("finish CloudCumulativeCompaction, tablet_id={}, cost={}ms, range=[{}-{}]",
_tablet->tablet_id(), duration_cast<milliseconds>(steady_clock::now() - start).count(),
_input_rowsets.front()->start_version(), _input_rowsets.back()->end_version())
.tag("job_id", _uuid)
.tag("input_rowsets", _input_rowsets.size())
.tag("input_rows", _input_row_num)
Expand Down Expand Up @@ -295,7 +300,8 @@ Status CloudCumulativeCompaction::modify_rowsets() {
.tag("input_rowsets", _input_rowsets.size())
.tag("input_rows", _input_row_num)
.tag("input_segments", _input_segments)
.tag("update_bitmap_size", output_rowset_delete_bitmap->delete_bitmap.size());
.tag("number_output_delete_bitmap",
output_rowset_delete_bitmap->delete_bitmap.size());
compaction_job->set_delete_bitmap_lock_initiator(initiator);
}

Expand Down
17 changes: 7 additions & 10 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ class MetaServiceProxy {
}

private:
static bool is_meta_service_endpoint_list() {
return config::meta_service_endpoint.find(',') != std::string::npos;
}

static Status get_pooled_client(std::shared_ptr<MetaService_Stub>* stub) {
static std::once_flag proxies_flag;
static size_t num_proxies = 1;
Expand All @@ -154,9 +158,6 @@ class MetaServiceProxy {
if (config::meta_service_connection_pooled) {
num_proxies = config::meta_service_connection_pool_size;
}
if (config::meta_service_endpoint.find(',') != std::string::npos) {
is_meta_service_endpoint_list = true;
}
proxies = std::make_unique<MetaServiceProxy[]>(num_proxies);
});

Expand All @@ -175,7 +176,7 @@ class MetaServiceProxy {

const char* load_balancer_name = nullptr;
std::string endpoint;
if (is_meta_service_endpoint_list) {
if (is_meta_service_endpoint_list()) {
endpoint = fmt::format("list://{}", config::meta_service_endpoint);
load_balancer_name = "random";
} else {
Expand Down Expand Up @@ -215,7 +216,7 @@ class MetaServiceProxy {
bool is_idle_timeout(long now) {
auto idle_timeout_ms = config::meta_service_idle_connection_timeout_ms;
// idle timeout only works without list endpoint.
return !is_meta_service_endpoint_list && idle_timeout_ms > 0 &&
return !is_meta_service_endpoint_list() && idle_timeout_ms > 0 &&
_last_access_at_ms.load(std::memory_order_relaxed) + idle_timeout_ms < now;
}

Expand Down Expand Up @@ -243,7 +244,7 @@ class MetaServiceProxy {

long deadline = now;
// connection age only works without list endpoint.
if (!is_meta_service_endpoint_list &&
if (!is_meta_service_endpoint_list() &&
config::meta_service_connection_age_base_seconds > 0) {
std::default_random_engine rng(static_cast<uint32_t>(now));
std::uniform_int_distribution<> uni(
Expand All @@ -262,16 +263,12 @@ class MetaServiceProxy {
return Status::OK();
}

static std::atomic_bool is_meta_service_endpoint_list;

std::shared_mutex _mutex;
std::atomic<long> _last_access_at_ms {0};
long _deadline_ms {0};
std::shared_ptr<MetaService_Stub> _stub;
};

std::atomic_bool MetaServiceProxy::is_meta_service_endpoint_list = false;

template <typename T, typename... Ts>
struct is_any : std::disjunction<std::is_same<T, Ts>...> {};

Expand Down
9 changes: 9 additions & 0 deletions be/src/cloud/cloud_schema_change_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,15 @@ Status CloudSchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& reque
reader_context.batch_size = ALTER_TABLE_BATCH_SIZE;
reader_context.delete_bitmap = &_base_tablet->tablet_meta()->delete_bitmap();
reader_context.version = Version(0, start_resp.alter_version());
std::vector<uint32_t> cluster_key_idxes;
if (!_base_tablet_schema->cluster_key_uids().empty()) {
for (const auto& uid : _base_tablet_schema->cluster_key_uids()) {
cluster_key_idxes.emplace_back(_base_tablet_schema->field_index(uid));
}
reader_context.read_orderby_key_columns = &cluster_key_idxes;
reader_context.is_unique = false;
reader_context.sequence_id_idx = -1;
}

for (auto& split : rs_splits) {
RETURN_IF_ERROR(split.rs_reader->init(&reader_context));
Expand Down
3 changes: 2 additions & 1 deletion be/src/cloud/cloud_storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,8 @@ Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletS
auto st = compaction->prepare_compact();
if (!st.ok()) {
long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
if (st.is<ErrorCode::CUMULATIVE_NO_SUITABLE_VERSION>()) {
if (st.is<ErrorCode::CUMULATIVE_NO_SUITABLE_VERSION>() &&
st.msg() != "_last_delete_version.first not equal to -1") {
// Backoff strategy if no suitable version
tablet->last_cumu_no_suitable_version_ms = now;
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class CloudStorageEngine final : public BaseStorageEngine {
void _check_file_cache_ttl_block_valid();

std::optional<StorageResource> get_storage_resource(const std::string& vault_id) {
LOG(INFO) << "Getting storage resource for vault_id: " << vault_id;
VLOG_DEBUG << "Getting storage resource for vault_id: " << vault_id;

bool synced = false;
do {
Expand Down
44 changes: 29 additions & 15 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "cloud/cloud_meta_mgr.h"
#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet_mgr.h"
#include "common/logging.h"
#include "io/cache/block_file_cache_downloader.h"
#include "io/cache/block_file_cache_factory.h"
#include "olap/cumulative_compaction_time_series_policy.h"
Expand All @@ -54,6 +55,7 @@ namespace doris {
using namespace ErrorCode;

static constexpr int COMPACTION_DELETE_BITMAP_LOCK_ID = -1;
static constexpr int LOAD_INITIATOR_ID = -1;

CloudTablet::CloudTablet(CloudStorageEngine& engine, TabletMetaSharedPtr tablet_meta)
: BaseTablet(std::move(tablet_meta)), _engine(engine) {}
Expand Down Expand Up @@ -407,6 +409,9 @@ uint64_t CloudTablet::delete_expired_stale_rowsets() {
auto rs_it = _stale_rs_version_map.find(v_ts->version());
if (rs_it != _stale_rs_version_map.end()) {
expired_rowsets.push_back(rs_it->second);
LOG(INFO) << "erase stale rowset, tablet_id=" << tablet_id()
<< " rowset_id=" << rs_it->second->rowset_id().to_string()
<< " version=" << rs_it->first.to_string();
_stale_rs_version_map.erase(rs_it);
} else {
LOG(WARNING) << "cannot find stale rowset " << v_ts->version() << " in tablet "
Expand Down Expand Up @@ -504,13 +509,19 @@ Result<std::unique_ptr<RowsetWriter>> CloudTablet::create_rowset_writer(
Result<std::unique_ptr<RowsetWriter>> CloudTablet::create_transient_rowset_writer(
const Rowset& rowset, std::shared_ptr<PartialUpdateInfo> partial_update_info,
int64_t txn_expiration) {
if (rowset.rowset_meta()->rowset_state() != RowsetStatePB::BEGIN_PARTIAL_UPDATE) [[unlikely]] {
// May cause the segment files generated by the transient rowset writer unable to be
// recycled, see `CloudRowsetWriter::build` for detail.
LOG(WARNING) << "Wrong rowset state: " << rowset.rowset_meta()->rowset_state();
DCHECK(false) << rowset.rowset_meta()->rowset_state();
if (rowset.rowset_meta_state() != RowsetStatePB::BEGIN_PARTIAL_UPDATE &&
rowset.rowset_meta_state() != RowsetStatePB::COMMITTED) [[unlikely]] {
auto msg = fmt::format(
"wrong rowset state when create_transient_rowset_writer, rowset state should be "
"BEGIN_PARTIAL_UPDATE or COMMITTED, but found {}, rowset_id={}, tablet_id={}",
RowsetStatePB_Name(rowset.rowset_meta_state()), rowset.rowset_id().to_string(),
tablet_id());
// see `CloudRowsetWriter::build` for detail.
// if this is in a retry task, the rowset state may have been changed to RowsetStatePB::COMMITTED
// in `RowsetMeta::merge_rowset_meta()` in previous trials.
LOG(WARNING) << msg;
DCHECK(false) << msg;
}

RowsetWriterContext context;
context.rowset_state = PREPARED;
context.segments_overlap = OVERLAPPING;
Expand Down Expand Up @@ -650,11 +661,14 @@ void CloudTablet::get_compaction_status(std::string* json_result) {
}

void CloudTablet::set_cumulative_layer_point(int64_t new_point) {
if (new_point == Tablet::K_INVALID_CUMULATIVE_POINT || new_point >= _cumulative_point) {
_cumulative_point = new_point;
return;
}
// cumulative point should only be reset to -1, or be increased
CHECK(new_point == Tablet::K_INVALID_CUMULATIVE_POINT || new_point >= _cumulative_point)
<< "Unexpected cumulative point: " << new_point
<< ", origin: " << _cumulative_point.load();
_cumulative_point = new_point;
// FIXME: could happen in currently unresolved race conditions
LOG(WARNING) << "Unexpected cumulative point: " << new_point
<< ", origin: " << _cumulative_point.load();
}

std::vector<RowsetSharedPtr> CloudTablet::pick_candidate_rowsets_to_base_compaction() {
Expand Down Expand Up @@ -703,6 +717,9 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t tx

if (txn_info->partial_update_info && txn_info->partial_update_info->is_partial_update() &&
rowset_writer->num_rows() > 0) {
DBUG_EXECUTE_IF("CloudTablet::save_delete_bitmap.update_tmp_rowset.error", {
return Status::InternalError<false>("injected update_tmp_rowset error.");
});
const auto& rowset_meta = rowset->rowset_meta();
RETURN_IF_ERROR(_engine.meta_mgr().update_tmp_rowset(*rowset_meta));
}
Expand All @@ -719,8 +736,8 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t tx
}

auto ms_lock_id = lock_id == -1 ? txn_id : lock_id;
RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap(
*this, ms_lock_id, COMPACTION_DELETE_BITMAP_LOCK_ID, new_delete_bitmap.get()));
RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap(*this, ms_lock_id, LOAD_INITIATOR_ID,
new_delete_bitmap.get()));

// store the delete bitmap with sentinel marks in txn_delete_bitmap_cache because if the txn is retried for some reason,
// it will use the delete bitmap from txn_delete_bitmap_cache when re-calculating the delete bitmap, during which it will do
Expand Down Expand Up @@ -856,9 +873,6 @@ Status CloudTablet::sync_meta() {
}
return st;
}
if (tablet_meta->tablet_state() != TABLET_RUNNING) { // impossible
return Status::InternalError("invalid tablet state. tablet_id={}", tablet_id());
}

auto new_ttl_seconds = tablet_meta->ttl_seconds();
if (_tablet_meta->ttl_seconds() != new_ttl_seconds) {
Expand Down
Loading

0 comments on commit 036c0f2

Please sign in to comment.