Skip to content

Commit

Permalink
2
Browse files Browse the repository at this point in the history
  • Loading branch information
Yukang-Lian committed Jan 3, 2025
1 parent caa94d7 commit 41f1142
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 5 deletions.
57 changes: 56 additions & 1 deletion cloud/src/recycler/hdfs_accessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include <bvar/latency_recorder.h>
#include <gen_cpp/cloud.pb.h>

#include <regex>

#include "common/stopwatch.h"
#include "recycler/util.h"

Expand Down Expand Up @@ -342,6 +344,47 @@ std::string HdfsAccessor::to_uri(const std::string& relative_path) {
return uri_ + '/' + relative_path;
}

std::string HdfsAccessor::extract_tablet_path(const std::string& path) {
// Check if path is empty
if (path.empty()) {
LOG_WARNING("input path is empty").tag("path", path);
return "";
}

// Check if path ends with '_'
if (path.back() != '_') {
LOG_WARNING("path must end with '_'").tag("path", path);
return "";
}

// Check if path matches the expected pattern
std::regex pattern(R"(^data/(\d+)/.+_$)");
if (!std::regex_match(path, pattern)) {
LOG_WARNING("path format must be 'data/number/xxx_'").tag("path", path);
return "";
}

// Find the last '/'
size_t last_slash = path.find_last_of('/');
if (last_slash == std::string::npos) {
LOG_WARNING("no '/' found in path").tag("path", path);
return "";
}

std::string folder_path = path.substr(0, last_slash);

// Verify the extracted path matches "data/number" format
std::regex folder_pattern(R"(^data/\d+$)");
if (!std::regex_match(folder_path, folder_pattern)) {
LOG_WARNING("extracted folder path must match 'data/number'")
.tag("folder_path", folder_path)
.tag("original_path", path);
return "";
}

return folder_path;
}

int HdfsAccessor::init() {
// TODO(plat1ko): Cache hdfsFS
fs_ = HDFSBuilder::create_fs(info_.build_conf());
Expand All @@ -357,7 +400,15 @@ int HdfsAccessor::delete_prefix(const std::string& path_prefix, int64_t expirati
auto uri = to_uri(path_prefix);
LOG(INFO) << "delete prefix, uri=" << uri;
std::unique_ptr<ListIterator> list_iter;
int ret = list_directory(path_prefix, &list_iter);
auto tablet_path = extract_tablet_path(path_prefix);
if (tablet_path.empty()) {
LOG_WARNING("extract tablet path failed").tag("path prefix", path_prefix);
return -1;
}
LOG_INFO("extract tablet path success")
.tag("path prefix", path_prefix)
.tag("tablet path", tablet_path);
int ret = list_directory(tablet_path, &list_iter);
if (ret != 0) {
LOG(WARNING) << "delete prefix, failed to list" << uri;
return ret;
Expand All @@ -372,6 +423,10 @@ int HdfsAccessor::delete_prefix(const std::string& path_prefix, int64_t expirati
}
++num_deleted;
}
if (num_deleted == 0) {
LOG_WARNING("recycler delete prefix num = 0, maybe there are some problems?")
.tag("path prefix", path_prefix);
}
LOG(INFO) << "delete prefix " << (ret != 0 ? "failed" : "succ") << " ret=" << ret
<< " uri=" << uri << " num_listed=" << num_listed << " num_deleted=" << num_deleted;
return ret;
Expand Down
5 changes: 5 additions & 0 deletions cloud/src/recycler/hdfs_accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ class HdfsAccessor final : public StorageVaultAccessor {
// Convert relative path to uri
std::string to_uri(const std::string& path);

// extract table path from prefix
// e.g.
// data/492211/02000000008a012957476a3e174dfdaa71ee5f80a3abafa3_ -> data/492211
std::string extract_tablet_path(const std::string& relative_path);

const HdfsVaultInfo& info_; // Only use when init

HdfsSPtr fs_;
Expand Down
20 changes: 16 additions & 4 deletions cloud/test/hdfs_accessor_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,9 @@ TEST(HdfsAccessorTest, delete_prefix) {
put_and_verify("data/20000/1_0.dat");
put_and_verify("data111/10000/1_0.dat");

ret = accessor.delete_prefix("data/10000");
ret = accessor.delete_prefix("data/10000/1_");
EXPECT_EQ(ret, 0);
ret = accessor.delete_prefix("data/10000/2_");
EXPECT_EQ(ret, 0);

std::unordered_set<std::string> list_files;
Expand All @@ -269,12 +271,20 @@ TEST(HdfsAccessorTest, delete_prefix) {
for (auto file = iter->next(); file.has_value(); file = iter->next()) {
list_files.insert(std::move(file->path));
}
EXPECT_EQ(list_files.size(), 2);
EXPECT_EQ(list_files.size(), 4);
EXPECT_TRUE(list_files.contains("data/10000/20000/1_0.dat"));
EXPECT_TRUE(list_files.contains("data/10000/20000/30000/1_0.dat"));
EXPECT_TRUE(list_files.contains("data/20000/1_0.dat"));
EXPECT_TRUE(list_files.contains("data111/10000/1_0.dat"));

ret = accessor.delete_prefix("data/");
ret = accessor.delete_prefix("data/10000");
EXPECT_EQ(ret, -1);
ret = accessor.delete_prefix("data111/10000");
EXPECT_EQ(ret, -1);
ret = accessor.delete_prefix("data/20000/1_");
EXPECT_EQ(ret, 0);
ret = accessor.delete_prefix("data/10000/1");
EXPECT_EQ(ret, -1);

iter.reset();
ret = accessor.list_all(&iter);
Expand All @@ -283,7 +293,9 @@ TEST(HdfsAccessorTest, delete_prefix) {
for (auto file = iter->next(); file.has_value(); file = iter->next()) {
list_files.insert(std::move(file->path));
}
EXPECT_EQ(list_files.size(), 1);
EXPECT_EQ(list_files.size(), 3);
EXPECT_TRUE(list_files.contains("data/10000/20000/1_0.dat"));
EXPECT_TRUE(list_files.contains("data/10000/20000/30000/1_0.dat"));
EXPECT_TRUE(list_files.contains("data111/10000/1_0.dat"));
}

Expand Down

0 comments on commit 41f1142

Please sign in to comment.