Skip to content

Commit

Permalink
[feat](iceberg)Supports using rest type catalog to read tables in uni…
Browse files Browse the repository at this point in the history
…ty catalog for 2.1 (#43525) (#45217)

bp: #43525
  • Loading branch information
wuwenchi authored Dec 12, 2024
1 parent 8217371 commit 667f5e6
Show file tree
Hide file tree
Showing 24 changed files with 398 additions and 110 deletions.
34 changes: 23 additions & 11 deletions be/src/io/fs/file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,58 +25,70 @@ namespace io {

Status FileSystem::create_file(const Path& file, FileWriterPtr* writer,
const FileWriterOptions* opts) {
auto path = absolute_path(file);
Path path;
RETURN_IF_ERROR(absolute_path(file, path));
FILESYSTEM_M(create_file_impl(path, writer, opts));
}

Status FileSystem::open_file(const Path& file, FileReaderSPtr* reader,
const FileReaderOptions* opts) {
auto path = absolute_path(file);
Path path;
RETURN_IF_ERROR(absolute_path(file, path));
FILESYSTEM_M(open_file_impl(path, reader, opts));
}

Status FileSystem::create_directory(const Path& dir, bool failed_if_exists) {
auto path = absolute_path(dir);
Path path;
RETURN_IF_ERROR(absolute_path(dir, path));
FILESYSTEM_M(create_directory_impl(path, failed_if_exists));
}

Status FileSystem::delete_file(const Path& file) {
auto path = absolute_path(file);
Path path;
RETURN_IF_ERROR(absolute_path(file, path));
FILESYSTEM_M(delete_file_impl(path));
}

Status FileSystem::delete_directory(const Path& dir) {
auto path = absolute_path(dir);
Path path;
RETURN_IF_ERROR(absolute_path(dir, path));
FILESYSTEM_M(delete_directory_impl(path));
}

Status FileSystem::batch_delete(const std::vector<Path>& files) {
std::vector<Path> abs_files;
for (auto& file : files) {
abs_files.push_back(absolute_path(file));
Path abs_file;
RETURN_IF_ERROR(absolute_path(file, abs_file));
abs_files.push_back(abs_file);
}
FILESYSTEM_M(batch_delete_impl(abs_files));
}

Status FileSystem::exists(const Path& path, bool* res) const {
auto fs_path = absolute_path(path);
Path fs_path;
RETURN_IF_ERROR(absolute_path(path, fs_path));
FILESYSTEM_M(exists_impl(fs_path, res));
}

Status FileSystem::file_size(const Path& file, int64_t* file_size) const {
auto path = absolute_path(file);
Path path;
RETURN_IF_ERROR(absolute_path(file, path));
FILESYSTEM_M(file_size_impl(path, file_size));
}

Status FileSystem::list(const Path& dir, bool only_file, std::vector<FileInfo>* files,
bool* exists) {
auto path = absolute_path(dir);
Path path;
RETURN_IF_ERROR(absolute_path(dir, path));
FILESYSTEM_M(list_impl(path, only_file, files, exists));
}

Status FileSystem::rename(const Path& orig_name, const Path& new_name) {
auto orig_path = absolute_path(orig_name);
auto new_path = absolute_path(new_name);
Path orig_path;
RETURN_IF_ERROR(absolute_path(orig_name, orig_path));
Path new_path;
RETURN_IF_ERROR(absolute_path(new_name, new_path));
FILESYSTEM_M(rename_impl(orig_path, new_path));
}

Expand Down
8 changes: 5 additions & 3 deletions be/src/io/fs/file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,13 @@ class FileSystem : public std::enable_shared_from_this<FileSystem> {
/// rename file from orig_name to new_name
virtual Status rename_impl(const Path& orig_name, const Path& new_name) = 0;

virtual Path absolute_path(const Path& path) const {
virtual Status absolute_path(const Path& path, Path& abs_path) const {
if (path.is_absolute()) {
return path;
abs_path = path;
} else {
abs_path = _root_path / path;
}
return _root_path / path;
return Status::OK();
}

FileSystem(Path&& root_path, std::string&& id, FileSystemType type)
Expand Down
82 changes: 71 additions & 11 deletions be/src/io/fs/local_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ Status LocalFileSystem::delete_directory_impl(const Path& dir) {
}

Status LocalFileSystem::delete_directory_or_file(const Path& path) {
auto the_path = absolute_path(path);
Path the_path;
RETURN_IF_ERROR(absolute_path(path, the_path));
FILESYSTEM_M(delete_directory_or_file_impl(the_path));
}

Expand Down Expand Up @@ -248,8 +249,10 @@ Status LocalFileSystem::rename_impl(const Path& orig_name, const Path& new_name)
}

Status LocalFileSystem::link_file(const Path& src, const Path& dest) {
auto src_file = absolute_path(src);
auto dest_file = absolute_path(dest);
Path src_file;
RETURN_IF_ERROR(absolute_path(src, src_file));
Path dest_file;
RETURN_IF_ERROR(absolute_path(dest, dest_file));
FILESYSTEM_M(link_file_impl(src_file, dest_file));
}

Expand All @@ -272,7 +275,8 @@ Status LocalFileSystem::canonicalize(const Path& path, std::string* real_path) {
}

Status LocalFileSystem::is_directory(const Path& path, bool* res) {
auto tmp_path = absolute_path(path);
Path tmp_path;
RETURN_IF_ERROR(absolute_path(path, tmp_path));
std::error_code ec;
*res = std::filesystem::is_directory(tmp_path, ec);
if (ec) {
Expand All @@ -282,7 +286,8 @@ Status LocalFileSystem::is_directory(const Path& path, bool* res) {
}

Status LocalFileSystem::md5sum(const Path& file, std::string* md5sum) {
auto path = absolute_path(file);
Path path;
RETURN_IF_ERROR(absolute_path(file, path));
FILESYSTEM_M(md5sum_impl(path, md5sum));
}

Expand Down Expand Up @@ -318,8 +323,9 @@ Status LocalFileSystem::md5sum_impl(const Path& file, std::string* md5sum) {

Status LocalFileSystem::iterate_directory(const std::string& dir,
const std::function<bool(const FileInfo& file)>& cb) {
auto path = absolute_path(dir);
FILESYSTEM_M(iterate_directory_impl(dir, cb));
Path path;
RETURN_IF_ERROR(absolute_path(dir, path));
FILESYSTEM_M(iterate_directory_impl(path, cb));
}

Status LocalFileSystem::iterate_directory_impl(
Expand All @@ -336,7 +342,8 @@ Status LocalFileSystem::iterate_directory_impl(
}

Status LocalFileSystem::get_space_info(const Path& dir, size_t* capacity, size_t* available) {
auto path = absolute_path(dir);
Path path;
RETURN_IF_ERROR(absolute_path(dir, path));
FILESYSTEM_M(get_space_info_impl(path, capacity, available));
}

Expand All @@ -353,8 +360,10 @@ Status LocalFileSystem::get_space_info_impl(const Path& path, size_t* capacity,
}

Status LocalFileSystem::copy_path(const Path& src, const Path& dest) {
auto src_path = absolute_path(src);
auto dest_path = absolute_path(dest);
Path src_path;
RETURN_IF_ERROR(absolute_path(src, src_path));
Path dest_path;
RETURN_IF_ERROR(absolute_path(dest, dest_path));
FILESYSTEM_M(copy_path_impl(src_path, dest_path));
}

Expand Down Expand Up @@ -455,7 +464,8 @@ Status LocalFileSystem::_glob(const std::string& pattern, std::vector<std::strin
}

Status LocalFileSystem::permission(const Path& file, std::filesystem::perms prms) {
auto path = absolute_path(file);
Path path;
RETURN_IF_ERROR(absolute_path(file, path));
FILESYSTEM_M(permission_impl(path, prms));
}

Expand All @@ -468,5 +478,55 @@ Status LocalFileSystem::permission_impl(const Path& file, std::filesystem::perms
return Status::OK();
}

Status LocalFileSystem::convert_to_abs_path(const Path& input_path_str, Path& abs_path) {
// valid path include:
// 1. abc/def will return abc/def
// 2. /abc/def will return /abc/def
// 3. file:/abc/def will return /abc/def
// 4. file://<authority>/abc/def will return /abc/def
std::string path_str = input_path_str;
size_t slash = path_str.find('/');
if (slash == 0) {
abs_path = input_path_str;
return Status::OK();
}

// Initialize scheme and authority
std::string scheme;
size_t start = 0;

// Parse URI scheme
size_t colon = path_str.find(':');
if (colon != std::string::npos && (slash == std::string::npos || colon < slash)) {
// Has a scheme
scheme = path_str.substr(0, colon);
if (scheme != "file") {
return Status::InternalError(
"Only supports `file` type scheme, like 'file:///path', 'file:/path'.");
}
start = colon + 1;
}

// Parse URI authority, if any
if (path_str.compare(start, 2, "//") == 0 && path_str.length() - start > 2) {
// Has authority
// such as : path_str = "file://authority/abc/def"
// and now : start = 5
size_t next_slash = path_str.find('/', start + 2);
// now : next_slash = 16
if (next_slash == std::string::npos) {
return Status::InternalError(
"This input string only has authority, but has no path information");
}
// We will skit authority
// now : start = 16
start = next_slash;
}

// URI path is the rest of the string
abs_path = path_str.substr(start);
return Status::OK();
}

} // namespace io
} // namespace doris
6 changes: 6 additions & 0 deletions be/src/io/fs/local_file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <string>
#include <vector>

#include "common/exception.h"
#include "common/status.h"
#include "io/fs/file_system.h"
#include "io/fs/path.h"
Expand All @@ -33,6 +34,7 @@ namespace doris::io {
class LocalFileSystem final : public FileSystem {
public:
static std::shared_ptr<LocalFileSystem> create(Path path, std::string id = "");
static Status convert_to_abs_path(const Path& path, Path& abs_path);
~LocalFileSystem() override;

/// hard link dest file to src file
Expand Down Expand Up @@ -98,6 +100,10 @@ class LocalFileSystem final : public FileSystem {
Status copy_path_impl(const Path& src, const Path& dest);
Status permission_impl(const Path& file, std::filesystem::perms prms);

Status absolute_path(const Path& path, Path& abs_path) const override {
return convert_to_abs_path(path, abs_path);
}

private:
// a wrapper for glob(), return file list in "res"
Status _glob(const std::string& pattern, std::vector<std::string>* res);
Expand Down
10 changes: 7 additions & 3 deletions be/src/io/fs/remote_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,25 @@ namespace doris {
namespace io {

Status RemoteFileSystem::upload(const Path& local_file, const Path& dest_file) {
auto dest_path = absolute_path(dest_file);
Path dest_path;
RETURN_IF_ERROR(absolute_path(dest_file, dest_path));
FILESYSTEM_M(upload_impl(local_file, dest_path));
}

Status RemoteFileSystem::batch_upload(const std::vector<Path>& local_files,
const std::vector<Path>& remote_files) {
std::vector<Path> remote_paths;
for (auto& path : remote_files) {
remote_paths.push_back(absolute_path(path));
Path abs_path;
RETURN_IF_ERROR(absolute_path(path, abs_path));
remote_paths.push_back(abs_path);
}
FILESYSTEM_M(batch_upload_impl(local_files, remote_paths));
}

Status RemoteFileSystem::download(const Path& remote_file, const Path& local) {
auto remote_path = absolute_path(remote_file);
Path remote_path;
RETURN_IF_ERROR(absolute_path(remote_file, remote_path));
FILESYSTEM_M(download_impl(remote_path, local));
}

Expand Down
7 changes: 4 additions & 3 deletions be/src/io/fs/s3_file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,17 @@ class S3FileSystem final : public RemoteFileSystem {
const std::vector<Path>& remote_files) override;
Status download_impl(const Path& remote_file, const Path& local_file) override;

Path absolute_path(const Path& path) const override {
Status absolute_path(const Path& path, Path& abs_path) const override {
if (path.string().find("://") != std::string::npos) {
// the path is with schema, which means this is a full path like:
// s3://bucket/path/to/file.txt
// so no need to concat with prefix
return path;
abs_path = path;
} else {
// path with no schema
return _root_path / path;
abs_path = _root_path / path;
}
return Status::OK();
}

private:
Expand Down
18 changes: 18 additions & 0 deletions be/src/vec/exec/format/parquet/schema_desc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ Status FieldDescriptor::parse_from_thrift(const std::vector<tparquet::SchemaElem
return Status::InvalidArgument("Duplicated field name: {}", _fields[i].name);
}
_name_to_field.emplace(_fields[i].name, &_fields[i]);
if (_fields[i].field_id != -1) {
_field_id_name_mapping.emplace(_fields[i].field_id, _fields[i].name);
}
}

if (_next_schema_pos != t_schemas.size()) {
Expand All @@ -147,6 +150,14 @@ Status FieldDescriptor::parse_from_thrift(const std::vector<tparquet::SchemaElem
return Status::OK();
}

const doris::Slice FieldDescriptor::get_column_name_from_field_id(int32_t id) const {
auto const it = _field_id_name_mapping.find(id);
if (it == _field_id_name_mapping.end()) {
return {};
}
return {it->second.data()};
}

Status FieldDescriptor::parse_node_field(const std::vector<tparquet::SchemaElement>& t_schemas,
size_t curr_pos, FieldSchema* node_field) {
if (curr_pos >= t_schemas.size()) {
Expand All @@ -172,6 +183,7 @@ Status FieldDescriptor::parse_node_field(const std::vector<tparquet::SchemaEleme
node_field->type.add_sub_type(child->type);
node_field->is_nullable = false;
_next_schema_pos = curr_pos + 1;
node_field->field_id = t_schema.__isset.field_id ? t_schema.field_id : -1;
} else {
bool is_optional = is_optional_node(t_schema);
if (is_optional) {
Expand All @@ -194,6 +206,7 @@ void FieldDescriptor::parse_physical_field(const tparquet::SchemaElement& physic
auto type = get_doris_type(physical_schema);
physical_field->type = type.first;
physical_field->is_type_compatibility = type.second;
physical_field->field_id = physical_schema.__isset.field_id ? physical_schema.field_id : -1;
}

std::pair<TypeDescriptor, bool> FieldDescriptor::get_doris_type(
Expand Down Expand Up @@ -465,6 +478,7 @@ Status FieldDescriptor::parse_group_field(const std::vector<tparquet::SchemaElem
group_field->type.type = TYPE_ARRAY;
group_field->type.add_sub_type(struct_field->type);
group_field->is_nullable = false;
group_field->field_id = group_schema.__isset.field_id ? group_schema.field_id : -1;
} else {
RETURN_IF_ERROR(parse_struct_field(t_schemas, curr_pos, group_field));
}
Expand Down Expand Up @@ -533,6 +547,7 @@ Status FieldDescriptor::parse_list_field(const std::vector<tparquet::SchemaEleme
list_field->type.type = TYPE_ARRAY;
list_field->type.add_sub_type(list_field->children[0].type);
list_field->is_nullable = is_optional;
list_field->field_id = first_level.__isset.field_id ? first_level.field_id : -1;

return Status::OK();
}
Expand Down Expand Up @@ -597,6 +612,7 @@ Status FieldDescriptor::parse_map_field(const std::vector<tparquet::SchemaElemen
map_field->type.add_sub_type(map_kv_field->type.children[0]);
map_field->type.add_sub_type(map_kv_field->type.children[1]);
map_field->is_nullable = is_optional;
map_field->field_id = map_schema.__isset.field_id ? map_schema.field_id : -1;

return Status::OK();
}
Expand All @@ -619,6 +635,8 @@ Status FieldDescriptor::parse_struct_field(const std::vector<tparquet::SchemaEle
struct_field->name = to_lower(struct_schema.name);
struct_field->is_nullable = is_optional;
struct_field->type.type = TYPE_STRUCT;
struct_field->field_id = struct_schema.__isset.field_id ? struct_schema.field_id : -1;

for (int i = 0; i < num_children; ++i) {
struct_field->type.add_sub_type(struct_field->children[i].type,
struct_field->children[i].name);
Expand Down
Loading

0 comments on commit 667f5e6

Please sign in to comment.