Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
Yukang-Lian committed Jan 3, 2025
1 parent 518faa3 commit f73e1ab
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 54 deletions.
29 changes: 28 additions & 1 deletion cloud/src/recycler/hdfs_accessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include <bvar/latency_recorder.h>
#include <gen_cpp/cloud.pb.h>

#include <regex>

#include "common/stopwatch.h"
#include "recycler/util.h"

Expand Down Expand Up @@ -342,6 +344,19 @@ std::string HdfsAccessor::to_uri(const std::string& relative_path) {
return uri_ + '/' + relative_path;
}

// extract table path from prefix
// e.g.
// data/492211/02000000008a012957476a3e174dfdaa71ee5f80a3abafa3_ -> data/492211/
std::string extract_tablet_path(const std::string& path) {
// Find the last '/'
size_t last_slash = path.find_last_of('/');
if (last_slash == std::string::npos) {
LOG_WARNING("no '/' found in path").tag("path", path);
return "";
}
return path.substr(0, last_slash + 1);
}

int HdfsAccessor::init() {
// TODO(plat1ko): Cache hdfsFS
fs_ = HDFSBuilder::create_fs(info_.build_conf());
Expand All @@ -357,7 +372,15 @@ int HdfsAccessor::delete_prefix(const std::string& path_prefix, int64_t expirati
auto uri = to_uri(path_prefix);
LOG(INFO) << "delete prefix, uri=" << uri;
std::unique_ptr<ListIterator> list_iter;
int ret = list_all(&list_iter);
auto tablet_path = extract_tablet_path(path_prefix);
if (tablet_path.empty()) {
LOG_WARNING("extract tablet path failed").tag("path prefix", path_prefix);
return -1;
}
LOG_INFO("extract tablet path success")
.tag("path prefix", path_prefix)
.tag("tablet path", tablet_path);
int ret = list_directory(tablet_path, &list_iter);
if (ret != 0) {
LOG(WARNING) << "delete prefix, failed to list" << uri;
return ret;
Expand All @@ -372,6 +395,10 @@ int HdfsAccessor::delete_prefix(const std::string& path_prefix, int64_t expirati
}
++num_deleted;
}
if (num_deleted == 0) {
LOG_WARNING("recycler delete prefix num = 0, maybe there are some problems?")
.tag("path prefix", path_prefix);
}
LOG(INFO) << "delete prefix " << (ret != 0 ? "failed" : "succ") << " ret=" << ret
<< " uri=" << uri << " num_listed=" << num_listed << " num_deleted=" << num_deleted;
return ret;
Expand Down
177 changes: 124 additions & 53 deletions cloud/test/hdfs_accessor_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <gtest/gtest.h>

#include <iostream>
#include <vector>

#include "common/config.h"
#include "common/logging.h"
Expand Down Expand Up @@ -59,20 +60,20 @@ TEST(HdfsAccessorTest, normal) {

HdfsAccessor accessor(info);
int ret = accessor.init();
ASSERT_EQ(ret, 0);
EXPECT_EQ(ret, 0);

std::string file1 = "data/10000/1_0.dat";

ret = accessor.delete_directory("");
ASSERT_NE(ret, 0);
EXPECT_NE(ret, 0);
ret = accessor.delete_all();
ASSERT_EQ(ret, 0);
EXPECT_EQ(ret, 0);

ret = accessor.put_file(file1, "");
ASSERT_EQ(ret, 0);
EXPECT_EQ(ret, 0);

ret = accessor.exists(file1);
ASSERT_EQ(ret, 0);
EXPECT_EQ(ret, 0);

auto* sp = SyncPoint::get_instance();
sp->enable_processing();
Expand All @@ -90,46 +91,46 @@ TEST(HdfsAccessorTest, normal) {

std::unique_ptr<ListIterator> iter;
ret = accessor.list_directory("data", &iter);
ASSERT_EQ(ret, 0);
ASSERT_TRUE(iter);
ASSERT_TRUE(iter->is_valid());
ASSERT_TRUE(iter->has_next());
ASSERT_EQ(iter->next()->path, file1);
ASSERT_FALSE(iter->has_next());
EXPECT_EQ(ret, 0);
EXPECT_TRUE(iter);
EXPECT_TRUE(iter->is_valid());
EXPECT_TRUE(iter->has_next());
EXPECT_EQ(iter->next()->path, file1);
EXPECT_FALSE(iter->has_next());
iter.reset();
ASSERT_EQ(alloc_entries, 0);
EXPECT_EQ(alloc_entries, 0);

ret = accessor.list_directory("data/", &iter);
ASSERT_EQ(ret, 0);
ASSERT_TRUE(iter->is_valid());
ASSERT_TRUE(iter->has_next());
ASSERT_EQ(iter->next()->path, file1);
ASSERT_FALSE(iter->has_next());
ASSERT_FALSE(iter->next());
EXPECT_EQ(ret, 0);
EXPECT_TRUE(iter->is_valid());
EXPECT_TRUE(iter->has_next());
EXPECT_EQ(iter->next()->path, file1);
EXPECT_FALSE(iter->has_next());
EXPECT_FALSE(iter->next());
iter.reset();
ASSERT_EQ(alloc_entries, 0);
EXPECT_EQ(alloc_entries, 0);

ret = accessor.list_directory("data/100", &iter);
ASSERT_EQ(ret, 0);
ASSERT_FALSE(iter->has_next());
ASSERT_FALSE(iter->next());
EXPECT_EQ(ret, 0);
EXPECT_FALSE(iter->has_next());
EXPECT_FALSE(iter->next());
iter.reset();
ASSERT_EQ(alloc_entries, 0);
EXPECT_EQ(alloc_entries, 0);

ret = accessor.delete_file(file1);
ASSERT_EQ(ret, 0);
EXPECT_EQ(ret, 0);
ret = accessor.exists(file1);
ASSERT_EQ(ret, 1);
EXPECT_EQ(ret, 1);
ret = accessor.list_directory("", &iter);
ASSERT_NE(ret, 0);
EXPECT_NE(ret, 0);
ret = accessor.list_all(&iter);
ASSERT_EQ(ret, 0);
ASSERT_FALSE(iter->has_next());
ASSERT_FALSE(iter->next());
EXPECT_EQ(ret, 0);
EXPECT_FALSE(iter->has_next());
EXPECT_FALSE(iter->next());
iter.reset();
ASSERT_EQ(alloc_entries, 0);
EXPECT_EQ(alloc_entries, 0);
ret = accessor.delete_file(file1);
ASSERT_EQ(ret, 0);
EXPECT_EQ(ret, 0);

std::vector<std::string> files;
for (int dir = 10000; dir < 10005; ++dir) {
Expand All @@ -140,18 +141,18 @@ TEST(HdfsAccessorTest, normal) {

for (auto&& file : files) {
ret = accessor.put_file(file, "");
ASSERT_EQ(ret, 0);
EXPECT_EQ(ret, 0);
}

std::unordered_set<std::string> list_files;
ret = accessor.list_all(&iter);
ASSERT_EQ(ret, 0);
EXPECT_EQ(ret, 0);
for (auto file = iter->next(); file.has_value(); file = iter->next()) {
list_files.insert(std::move(file->path));
}
iter.reset();
ASSERT_EQ(alloc_entries, 0);
ASSERT_EQ(list_files.size(), files.size());
EXPECT_EQ(alloc_entries, 0);
EXPECT_EQ(list_files.size(), files.size());
for (auto&& file : files) {
EXPECT_TRUE(list_files.contains(file));
}
Expand All @@ -163,69 +164,139 @@ TEST(HdfsAccessorTest, normal) {
files.pop_back();
}
ret = accessor.delete_files(to_delete_files);
ASSERT_EQ(ret, 0);
EXPECT_EQ(ret, 0);

ret = accessor.list_all(&iter);
ASSERT_EQ(ret, 0);
EXPECT_EQ(ret, 0);
list_files.clear();
for (auto file = iter->next(); file.has_value(); file = iter->next()) {
list_files.insert(std::move(file->path));
}
iter.reset();
ASSERT_EQ(alloc_entries, 0);
ASSERT_EQ(list_files.size(), files.size());
EXPECT_EQ(alloc_entries, 0);
EXPECT_EQ(list_files.size(), files.size());
for (auto&& file : files) {
EXPECT_TRUE(list_files.contains(file));
}

std::string to_delete_dir = "data/10001";
ret = accessor.delete_directory(to_delete_dir);
ASSERT_EQ(ret, 0);
EXPECT_EQ(ret, 0);
ret = accessor.list_directory(to_delete_dir, &iter);
ASSERT_EQ(ret, 0);
ASSERT_FALSE(iter->has_next());
EXPECT_EQ(ret, 0);
EXPECT_FALSE(iter->has_next());

files.erase(std::remove_if(files.begin(), files.end(),
[&](auto&& file) { return file.starts_with(to_delete_dir); }),
files.end());
ret = accessor.list_all(&iter);
ASSERT_EQ(ret, 0);
EXPECT_EQ(ret, 0);
list_files.clear();
for (auto file = iter->next(); file.has_value(); file = iter->next()) {
list_files.insert(std::move(file->path));
}
iter.reset();
ASSERT_EQ(alloc_entries, 0);
ASSERT_EQ(list_files.size(), files.size());
EXPECT_EQ(alloc_entries, 0);
EXPECT_EQ(list_files.size(), files.size());
for (auto&& file : files) {
EXPECT_TRUE(list_files.contains(file));
}

std::string to_delete_prefix = "data/10003/";
ret = accessor.delete_directory(to_delete_prefix);
ASSERT_EQ(ret, 0);
EXPECT_EQ(ret, 0);
files.erase(std::remove_if(files.begin(), files.end(),
[&](auto&& file) { return file.starts_with(to_delete_prefix); }),
files.end());
ret = accessor.list_all(&iter);
ASSERT_EQ(ret, 0);
EXPECT_EQ(ret, 0);
list_files.clear();
for (auto file = iter->next(); file.has_value(); file = iter->next()) {
list_files.insert(std::move(file->path));
}
iter.reset();
ASSERT_EQ(alloc_entries, 0);
ASSERT_EQ(list_files.size(), files.size());
EXPECT_EQ(alloc_entries, 0);
EXPECT_EQ(list_files.size(), files.size());
for (auto&& file : files) {
EXPECT_TRUE(list_files.contains(file));
}

ret = accessor.delete_all();
ASSERT_EQ(ret, 0);
EXPECT_EQ(ret, 0);
ret = accessor.list_all(&iter);
ASSERT_EQ(ret, 0);
ASSERT_FALSE(iter->has_next());
ASSERT_FALSE(iter->next());
EXPECT_EQ(ret, 0);
EXPECT_FALSE(iter->has_next());
EXPECT_FALSE(iter->next());
}

TEST(HdfsAccessorTest, delete_prefix) {
HdfsVaultInfo info;
info.set_prefix(config::test_hdfs_prefix + "/HdfsAccessorTest/" + butil::GenerateGUID());
auto* conf = info.mutable_build_conf();
conf->set_fs_name(config::test_hdfs_fs_name);

HdfsAccessor accessor(info);
int ret = accessor.init();
EXPECT_EQ(ret, 0);

auto put_and_verify = [&accessor](const std::string& file) {
int ret = accessor.put_file(file, "");
EXPECT_EQ(ret, 0);
ret = accessor.exists(file);
EXPECT_EQ(ret, 0);
};

ret = accessor.delete_directory("");
EXPECT_NE(ret, 0);
ret = accessor.delete_all();
EXPECT_EQ(ret, 0);

put_and_verify("data/10000/1_0.dat");
put_and_verify("data/10000/2_0.dat");
put_and_verify("data/10000/20000/1_0.dat");
put_and_verify("data/10000/20000/30000/1_0.dat");
put_and_verify("data/20000/1_0.dat");
put_and_verify("data111/10000/1_0.dat");

ret = accessor.delete_prefix("data/10000/1_");
EXPECT_EQ(ret, 0);
ret = accessor.delete_prefix("data/10000/2_");
EXPECT_EQ(ret, 0);

std::unordered_set<std::string> list_files;
std::unique_ptr<ListIterator> iter;
ret = accessor.list_all(&iter);
EXPECT_EQ(ret, 0);
list_files.clear();
for (auto file = iter->next(); file.has_value(); file = iter->next()) {
list_files.insert(std::move(file->path));
}
EXPECT_EQ(list_files.size(), 4);
EXPECT_TRUE(list_files.contains("data/10000/20000/1_0.dat"));
EXPECT_TRUE(list_files.contains("data/10000/20000/30000/1_0.dat"));
EXPECT_TRUE(list_files.contains("data/20000/1_0.dat"));
EXPECT_TRUE(list_files.contains("data111/10000/1_0.dat"));

ret = accessor.delete_prefix("data/10000");
EXPECT_EQ(ret, -1);
ret = accessor.delete_prefix("data111/10000");
EXPECT_EQ(ret, -1);
ret = accessor.delete_prefix("data/20000/1_");
EXPECT_EQ(ret, 0);
ret = accessor.delete_prefix("data/10000/1");
EXPECT_EQ(ret, -1);

iter.reset();
ret = accessor.list_all(&iter);
EXPECT_EQ(ret, 0);
list_files.clear();
for (auto file = iter->next(); file.has_value(); file = iter->next()) {
list_files.insert(std::move(file->path));
}
EXPECT_EQ(list_files.size(), 3);
EXPECT_TRUE(list_files.contains("data/10000/20000/1_0.dat"));
EXPECT_TRUE(list_files.contains("data/10000/20000/30000/1_0.dat"));
EXPECT_TRUE(list_files.contains("data111/10000/1_0.dat"));
}

} // namespace doris::cloud

0 comments on commit f73e1ab

Please sign in to comment.