Skip to content

Commit

Permalink
[feature](Recycler) Parallelize s3 delete operations and recycle_tabl…
Browse files Browse the repository at this point in the history
…et (apache#37630)

Previously the procedure of recycler instance is single-threaded, which
is not full sufficiently parallel. And there exists many network IO
operation. So this pr tries to spilt recycle tasks into different stage
which can be parallel. And make the delete operations parallel.
  • Loading branch information
ByteYue authored Jul 19, 2024
1 parent 603452d commit 05b05bd
Show file tree
Hide file tree
Showing 17 changed files with 545 additions and 86 deletions.
2 changes: 2 additions & 0 deletions cloud/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ CONF_Strings(recycle_whitelist, ""); // Comma seprated list
CONF_Strings(recycle_blacklist, ""); // Comma seprated list
CONF_mInt32(instance_recycler_worker_pool_size, "1");
CONF_Bool(enable_checker, "false");
// The parallelism for parallel recycle operation
CONF_Int32(recycle_pool_parallelism, "10");
// Currently only used for recycler test
CONF_Bool(enable_inverted_check, "false");
// interval for scanning instances to do checks and inspections
Expand Down
6 changes: 4 additions & 2 deletions cloud/src/recycler/azure_obj_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,8 @@ std::unique_ptr<ObjectListIterator> AzureObjClient::list_objects(ObjectStoragePa
// You can find out the num in https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=microsoft-entra-id
// > Each batch request supports a maximum of 256 subrequests.
ObjectStorageResponse AzureObjClient::delete_objects(const std::string& bucket,
std::vector<std::string> keys) {
std::vector<std::string> keys,
ObjClientOptions option) {
if (keys.empty()) {
return {0};
}
Expand Down Expand Up @@ -275,8 +276,9 @@ ObjectStorageResponse AzureObjClient::delete_object(ObjectStoragePathRef path) {
}

ObjectStorageResponse AzureObjClient::delete_objects_recursively(ObjectStoragePathRef path,
ObjClientOptions option,
int64_t expiration_time) {
return delete_objects_recursively_(path, expiration_time, BlobBatchMaxOperations);
return delete_objects_recursively_(path, option, expiration_time, BlobBatchMaxOperations);
}

ObjectStorageResponse AzureObjClient::get_life_cycle(const std::string& bucket,
Expand Down
5 changes: 3 additions & 2 deletions cloud/src/recycler/azure_obj_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,13 @@ class AzureObjClient final : public ObjStorageClient {

std::unique_ptr<ObjectListIterator> list_objects(ObjectStoragePathRef path) override;

ObjectStorageResponse delete_objects(const std::string& bucket,
std::vector<std::string> keys) override;
ObjectStorageResponse delete_objects(const std::string& bucket, std::vector<std::string> keys,
ObjClientOptions option) override;

ObjectStorageResponse delete_object(ObjectStoragePathRef path) override;

ObjectStorageResponse delete_objects_recursively(ObjectStoragePathRef path,
ObjClientOptions option,
int64_t expiration_time = 0) override;

ObjectStorageResponse get_life_cycle(const std::string& bucket,
Expand Down
30 changes: 24 additions & 6 deletions cloud/src/recycler/obj_storage_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,24 @@
#include "recycler/obj_storage_client.h"

#include "cpp/sync_point.h"
#include "recycler/sync_executor.h"
#include "recycler/util.h"

namespace doris::cloud {

ObjectStorageResponse ObjStorageClient::delete_objects_recursively_(ObjectStoragePathRef path,
const ObjClientOptions& option,
int64_t expired_time,
size_t batch_size) {
TEST_SYNC_POINT_CALLBACK("ObjStorageClient::delete_objects_recursively_", &batch_size);
auto list_iter = list_objects(path);

ObjectStorageResponse ret;
std::vector<std::string> keys;
SyncExecutor<int> concurrent_delete_executor(
option.executor,
fmt::format("delete objects under bucket {}, path {}", path.bucket, path.key),
[](const int& ret) { return ret != 0; });

for (auto obj = list_iter->next(); obj.has_value(); obj = list_iter->next()) {
if (expired_time > 0 && obj->mtime_s > expired_time) {
Expand All @@ -39,20 +46,31 @@ ObjectStorageResponse ObjStorageClient::delete_objects_recursively_(ObjectStorag
if (keys.size() < batch_size) {
continue;
}

ret = delete_objects(path.bucket, std::move(keys));
if (ret.ret != 0) {
return ret;
}
concurrent_delete_executor.add([this, &path, k = std::move(keys), option]() mutable {
return delete_objects(path.bucket, std::move(k), option).ret;
});
}

if (!list_iter->is_valid()) {
bool finished;
concurrent_delete_executor.when_all(&finished);
return {-1};
}

if (!keys.empty()) {
return delete_objects(path.bucket, std::move(keys));
concurrent_delete_executor.add([this, &path, k = std::move(keys), option]() mutable {
return delete_objects(path.bucket, std::move(k), option).ret;
});
}
bool finished = true;
std::vector<int> rets = concurrent_delete_executor.when_all(&finished);
for (int r : rets) {
if (r != 0) {
ret = -1;
}
}

ret = finished ? ret : -1;

return ret;
}
Expand Down
11 changes: 10 additions & 1 deletion cloud/src/recycler/obj_storage_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ class ObjectListIterator {
virtual std::optional<ObjectMeta> next() = 0;
};

class SimpleThreadPool;
struct ObjClientOptions {
bool prefetch {true};
std::shared_ptr<SimpleThreadPool> executor;
};

class ObjStorageClient {
public:
ObjStorageClient() = default;
Expand All @@ -71,14 +77,16 @@ class ObjStorageClient {

// According to the bucket and prefix specified by the user, it performs batch deletion based on the object names in the object array.
virtual ObjectStorageResponse delete_objects(const std::string& bucket,
std::vector<std::string> keys) = 0;
std::vector<std::string> keys,
ObjClientOptions option) = 0;

// Delete the file named key in the object storage bucket.
virtual ObjectStorageResponse delete_object(ObjectStoragePathRef path) = 0;

// According to the prefix, recursively delete all objects under the prefix.
// If `expiration_time` > 0, only delete objects with mtime earlier than `expiration_time`.
virtual ObjectStorageResponse delete_objects_recursively(ObjectStoragePathRef path,
ObjClientOptions option,
int64_t expiration_time = 0) = 0;

// Get the objects' expiration time on the bucket
Expand All @@ -91,6 +99,7 @@ class ObjStorageClient {

protected:
ObjectStorageResponse delete_objects_recursively_(ObjectStoragePathRef path,
const ObjClientOptions& option,
int64_t expiration_time, size_t batch_size);
};

Expand Down
Loading

0 comments on commit 05b05bd

Please sign in to comment.