Skip to content

Commit

Permalink
Merge branch 'master' into 20240625_arrow_flight_regressiontest
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz authored Jun 27, 2024
2 parents 096bc61 + 23b0b7b commit bc2668a
Show file tree
Hide file tree
Showing 185 changed files with 5,991 additions and 855 deletions.
6 changes: 4 additions & 2 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,8 @@ DEFINE_Validator(compaction_task_num_per_disk,
[](const int config) -> bool { return config >= 2; });
DEFINE_Validator(compaction_task_num_per_fast_disk,
[](const int config) -> bool { return config >= 2; });
DEFINE_Validator(low_priority_compaction_task_num_per_disk,
[](const int config) -> bool { return config >= 2; });

// How many rounds of cumulative compaction for each round of base compaction when compaction tasks generation.
DEFINE_mInt32(cumulative_compaction_rounds_for_each_base_compaction_round, "9");
Expand All @@ -458,8 +460,8 @@ DEFINE_mInt64(pick_rowset_to_compact_interval_sec, "86400");

// Compaction priority schedule
DEFINE_mBool(enable_compaction_priority_scheduling, "true");
DEFINE_mInt32(low_priority_compaction_task_num_per_disk, "1");
DEFINE_mDouble(low_priority_tablet_version_num_ratio, "0.7");
DEFINE_mInt32(low_priority_compaction_task_num_per_disk, "2");
DEFINE_mInt32(low_priority_compaction_score_threshold, "200");

// Thread count to do tablet meta checkpoint, -1 means use the data directories count.
DEFINE_Int32(max_meta_checkpoint_threads, "-1");
Expand Down
2 changes: 1 addition & 1 deletion be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ DECLARE_mInt64(pick_rowset_to_compact_interval_sec);
// Compaction priority schedule
DECLARE_mBool(enable_compaction_priority_scheduling);
DECLARE_mInt32(low_priority_compaction_task_num_per_disk);
DECLARE_mDouble(low_priority_tablet_version_num_ratio);
DECLARE_mInt32(low_priority_compaction_score_threshold);

// Thread count to do tablet meta checkpoint, -1 means use the data directories count.
DECLARE_Int32(max_meta_checkpoint_threads);
Expand Down
148 changes: 115 additions & 33 deletions be/src/io/fs/azure_obj_storage_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,26 @@
#include <azure/core/http/http_status_code.hpp>
#include <azure/core/io/body_stream.hpp>
#include <azure/storage/blobs.hpp>
#include <azure/storage/blobs/blob_batch.hpp>
#include <azure/storage/blobs/blob_client.hpp>
#include <azure/storage/blobs/blob_container_client.hpp>
#include <azure/storage/blobs/blob_sas_builder.hpp>
#include <azure/storage/blobs/rest_client.hpp>
#include <azure/storage/common/account_sas_builder.hpp>
#include <azure/storage/common/storage_credential.hpp>
#include <azure/storage/common/storage_exception.hpp>
#include <chrono>
#include <exception>
#include <iterator>
#include <ranges>

#include "common/exception.h"
#include "common/logging.h"
#include "common/status.h"
#include "io/fs/obj_storage_client.h"
#include "util/s3_util.h"

using namespace Azure::Storage::Blobs;

namespace {
std::string wrap_object_storage_path_msg(const doris::io::ObjectStoragePathOptions& opts) {
Expand All @@ -47,6 +56,8 @@ auto base64_encode_part_num(int part_num) {
return Aws::Utils::HashingUtils::Base64Encode(
{reinterpret_cast<unsigned char*>(&part_num), sizeof(part_num)});
}

constexpr char SAS_TOKEN_URL_TEMPLATE[] = "https://{}.blob.core.windows.net/{}/{}{}";
} // namespace

namespace doris::io {
Expand Down Expand Up @@ -75,13 +86,55 @@ ObjectStorageResponse do_azure_client_call(Func f, const ObjectStoragePathOption
LOG_WARNING(msg);
return {.status = convert_to_obj_response(Status::InternalError<false>(std::move(msg)))};
}
return {};
return ObjectStorageResponse::OK();
}

struct AzureBatchDeleter {
AzureBatchDeleter(BlobContainerClient* client, const ObjectStoragePathOptions& opts)
: _client(client), _batch(client->CreateBatch()), _opts(opts) {}
// Submit one blob to be deleted in `AzureBatchDeleter::execute`
void delete_blob(const std::string& blob_name) {
deferred_resps.emplace_back(_batch.DeleteBlob(blob_name));
}
ObjectStorageResponse execute() {
if (deferred_resps.empty()) {
return ObjectStorageResponse::OK();
}
auto resp = do_azure_client_call([&]() { _client->SubmitBatch(_batch); }, _opts);
if (resp.status.code != ErrorCode::OK) {
return resp;
}

auto get_defer_response = [](const auto& defer) {
// DeferredResponse<Models::DeleteBlobResult> might throw exception
if (!defer.GetResponse().Value.Deleted) {
throw Exception(Status::IOError<false>("Batch delete failed"));
}
};
for (auto&& defer_response : deferred_resps) {
auto response =
do_azure_client_call([&]() { get_defer_response(defer_response); }, _opts);
if (response.status.code != ErrorCode::OK) {
return response;
}
}

return ObjectStorageResponse::OK();
}

private:
BlobContainerClient* _client;
BlobContainerBatch _batch;
const ObjectStoragePathOptions& _opts;
std::vector<Azure::Storage::DeferredResponse<Models::DeleteBlobResult>> deferred_resps;
};

// Azure would do nothing
ObjectStorageUploadResponse AzureObjStorageClient::create_multipart_upload(
const ObjectStoragePathOptions& opts) {
return {};
return ObjectStorageUploadResponse {
.resp = ObjectStorageResponse::OK(),
};
}

ObjectStorageResponse AzureObjStorageClient::put_object(const ObjectStoragePathOptions& opts,
Expand Down Expand Up @@ -120,7 +173,9 @@ ObjectStorageUploadResponse AzureObjStorageClient::upload_part(const ObjectStora
};
// clang-format on
}
return {};
return ObjectStorageUploadResponse {
.resp = ObjectStorageResponse::OK(),
};
}

ObjectStorageResponse AzureObjStorageClient::complete_multipart_upload(
Expand All @@ -136,7 +191,7 @@ ObjectStorageResponse AzureObjStorageClient::complete_multipart_upload(

ObjectStorageHeadResponse AzureObjStorageClient::head_object(const ObjectStoragePathOptions& opts) {
try {
Azure::Storage::Blobs::Models::BlobProperties properties =
Models::BlobProperties properties =
_client->GetBlockBlobClient(opts.key).GetProperties().Value;
return {.file_size = properties.BlobSize};
} catch (Azure::Core::RequestFailedException& e) {
Expand Down Expand Up @@ -166,7 +221,7 @@ ObjectStorageResponse AzureObjStorageClient::get_object(const ObjectStoragePathO
auto client = _client->GetBlockBlobClient(opts.key);
return do_azure_client_call(
[&]() {
Azure::Storage::Blobs::DownloadBlobToOptions download_opts;
DownloadBlobToOptions download_opts;
Azure::Core::Http::HttpRange range {static_cast<int64_t>(offset), bytes_read};
download_opts.Range = range;
auto resp = client.DownloadTo(reinterpret_cast<uint8_t*>(buffer), bytes_read,
Expand All @@ -178,15 +233,15 @@ ObjectStorageResponse AzureObjStorageClient::get_object(const ObjectStoragePathO

ObjectStorageResponse AzureObjStorageClient::list_objects(const ObjectStoragePathOptions& opts,
std::vector<FileInfo>* files) {
auto get_file_file = [&](Azure::Storage::Blobs::ListBlobsPagedResponse& resp) {
auto get_file_file = [&](ListBlobsPagedResponse& resp) {
std::ranges::transform(resp.Blobs, std::back_inserter(*files), [](auto&& blob_item) {
return FileInfo {
.file_name = blob_item.Name, .file_size = blob_item.BlobSize, .is_file = true};
});
};
return do_azure_client_call(
[&]() {
Azure::Storage::Blobs::ListBlobsOptions list_opts;
ListBlobsOptions list_opts;
list_opts.Prefix = opts.prefix;
auto resp = _client->ListBlobs(list_opts);
get_file_file(resp);
Expand All @@ -210,52 +265,79 @@ ObjectStorageResponse AzureObjStorageClient::delete_objects(const ObjectStorageP
auto end = std::end(objs);

while (begin != end) {
auto batch = _client->CreateBatch();
auto chunkEnd = begin;
std::advance(chunkEnd, std::min(BlobBatchMaxOperations,
static_cast<size_t>(std::distance(begin, end))));
for (auto it = begin; it != chunkEnd; ++it) {
batch.DeleteBlob(*it);
}
begin = chunkEnd;
auto resp = do_azure_client_call([&]() { _client->SubmitBatch(batch); }, opts);
if (resp.status.code != ErrorCode::OK) {
auto deleter = AzureBatchDeleter(_client.get(), opts);
auto chunk_end = begin;
std::advance(chunk_end, std::min(BlobBatchMaxOperations,
static_cast<size_t>(std::distance(begin, end))));

std::ranges::for_each(std::ranges::subrange(begin, chunk_end),
[&](const std::string& obj) { deleter.delete_blob(obj); });
begin = chunk_end;
if (auto resp = deleter.execute(); resp.status.code != ErrorCode::OK) {
return resp;
}
}
return {};
return ObjectStorageResponse::OK();
}

ObjectStorageResponse AzureObjStorageClient::delete_object(const ObjectStoragePathOptions& opts) {
return do_azure_client_call([&]() { _client->DeleteBlob(opts.key); }, opts);
return do_azure_client_call(
[&]() {
auto resp = _client->DeleteBlob(opts.key);
if (!resp.Value.Deleted) {
throw Exception(Status::IOError<false>("Delete azure blob failed"));
}
},
opts);
}

ObjectStorageResponse AzureObjStorageClient::delete_objects_recursively(
const ObjectStoragePathOptions& opts) {
Azure::Storage::Blobs::ListBlobsOptions list_opts;
ListBlobsOptions list_opts;
list_opts.Prefix = opts.prefix;
list_opts.PageSizeHint = BlobBatchMaxOperations;
auto delete_func = [&](const std::vector<Models::BlobItem>& blobs) -> ObjectStorageResponse {
auto deleter = AzureBatchDeleter(_client.get(), opts);
auto batch = _client->CreateBatch();
for (auto&& blob_item : blobs) {
deleter.delete_blob(blob_item.Name);
}
if (auto response = deleter.execute(); response.status.code != ErrorCode::OK) {
return response;
}
return ObjectStorageResponse::OK();
};
auto resp = _client->ListBlobs(list_opts);
auto batch = _client->CreateBatch();
for (auto&& blob_item : resp.Blobs) {
batch.DeleteBlob(blob_item.Name);
}
auto response = do_azure_client_call([&]() { _client->SubmitBatch(batch); }, opts);
if (response.status.code != ErrorCode::OK) {
if (auto response = delete_func(resp.Blobs); response.status.code != ErrorCode::OK) {
return response;
}

while (!resp.NextPageToken->empty()) {
batch = _client->CreateBatch();
list_opts.ContinuationToken = resp.NextPageToken;
resp = _client->ListBlobs(list_opts);
for (auto&& blob_item : resp.Blobs) {
batch.DeleteBlob(blob_item.Name);
}
auto response = do_azure_client_call([&]() { _client->SubmitBatch(batch); }, opts);
if (response.status.code != ErrorCode::OK) {

if (auto response = delete_func(resp.Blobs); response.status.code != ErrorCode::OK) {
return response;
}
}
return {};
return ObjectStorageResponse::OK();
}

std::string AzureObjStorageClient::generate_presigned_url(const ObjectStoragePathOptions& opts,
int64_t expiration_secs,
const S3ClientConf& conf) {
Azure::Storage::Sas::BlobSasBuilder sas_builder;
sas_builder.ExpiresOn =
std::chrono::system_clock::now() + std::chrono::seconds(expiration_secs);
sas_builder.BlobContainerName = opts.bucket;
sas_builder.BlobName = opts.key;
sas_builder.Resource = Azure::Storage::Sas::BlobSasResource::Blob;
sas_builder.Protocol = Azure::Storage::Sas::SasProtocol::HttpsOnly;
sas_builder.SetPermissions(Azure::Storage::Sas::BlobSasPermissions::Read);

std::string sasToken = sas_builder.GenerateSasToken(
Azure::Storage::StorageSharedKeyCredential(conf.ak, conf.sk));

return fmt::format(SAS_TOKEN_URL_TEMPLATE, conf.ak, conf.bucket, opts.key, sasToken);
}
} // namespace doris::io
5 changes: 1 addition & 4 deletions be/src/io/fs/azure_obj_storage_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,8 @@ class AzureObjStorageClient final : public ObjStorageClient {
std::vector<std::string> objs) override;
ObjectStorageResponse delete_object(const ObjectStoragePathOptions& opts) override;
ObjectStorageResponse delete_objects_recursively(const ObjectStoragePathOptions& opts) override;
// TODO(ByteYue) : to be implemented
std::string generate_presigned_url(const ObjectStoragePathOptions& opts,
int64_t expiration_secs) override {
return "http://azure.to.be.implenmented";
};
int64_t expiration_secs, const S3ClientConf& conf) override;

private:
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> _client;
Expand Down
12 changes: 11 additions & 1 deletion be/src/io/fs/obj_storage_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "io/fs/path.h"
namespace doris {
class Status;
struct S3ClientConf;
namespace io {

// Names are in lexico order.
Expand Down Expand Up @@ -60,6 +61,14 @@ struct ObjectStorageResponse {
ObjectStorageStatus status {};
int http_code {200};
std::string request_id = std::string();
static ObjectStorageResponse OK() {
// clang-format off
return {
.status { .code = 0, },
.http_code = 200,
};
// clang-format on
}
};

struct ObjectStorageUploadResponse {
Expand Down Expand Up @@ -118,7 +127,8 @@ class ObjStorageClient {
const ObjectStoragePathOptions& opts) = 0;
// Return a presigned URL for users to access the object
virtual std::string generate_presigned_url(const ObjectStoragePathOptions& opts,
int64_t expiration_secs) = 0;
int64_t expiration_secs,
const S3ClientConf& conf) = 0;
};
} // namespace io
} // namespace doris
3 changes: 2 additions & 1 deletion be/src/io/fs/s3_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,8 @@ std::string S3FileSystem::generate_presigned_url(const Path& path, int64_t expir
} else {
client = _client->get();
}
return client->generate_presigned_url({.bucket = _bucket, .key = key}, expiration_secs);
return client->generate_presigned_url({.bucket = _bucket, .key = key}, expiration_secs,
_client->s3_client_conf());
}

} // namespace doris::io
Loading

0 comments on commit bc2668a

Please sign in to comment.