Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat](iceberg)Supports using rest type catalog to read tables in unity catalog #43525

Merged
merged 4 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 23 additions & 11 deletions be/src/io/fs/file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,58 +25,70 @@ namespace io {

Status FileSystem::create_file(const Path& file, FileWriterPtr* writer,
const FileWriterOptions* opts) {
auto path = absolute_path(file);
Path path;
RETURN_IF_ERROR(absolute_path(file, path));
FILESYSTEM_M(create_file_impl(path, writer, opts));
}

Status FileSystem::open_file(const Path& file, FileReaderSPtr* reader,
const FileReaderOptions* opts) {
auto path = absolute_path(file);
Path path;
RETURN_IF_ERROR(absolute_path(file, path));
FILESYSTEM_M(open_file_impl(path, reader, opts));
}

Status FileSystem::create_directory(const Path& dir, bool failed_if_exists) {
auto path = absolute_path(dir);
Path path;
RETURN_IF_ERROR(absolute_path(dir, path));
FILESYSTEM_M(create_directory_impl(path, failed_if_exists));
}

Status FileSystem::delete_file(const Path& file) {
auto path = absolute_path(file);
Path path;
RETURN_IF_ERROR(absolute_path(file, path));
FILESYSTEM_M(delete_file_impl(path));
}

Status FileSystem::delete_directory(const Path& dir) {
auto path = absolute_path(dir);
Path path;
RETURN_IF_ERROR(absolute_path(dir, path));
FILESYSTEM_M(delete_directory_impl(path));
}

Status FileSystem::batch_delete(const std::vector<Path>& files) {
std::vector<Path> abs_files;
for (auto& file : files) {
abs_files.push_back(absolute_path(file));
Path abs_file;
RETURN_IF_ERROR(absolute_path(file, abs_file));
abs_files.push_back(abs_file);
}
FILESYSTEM_M(batch_delete_impl(abs_files));
}

Status FileSystem::exists(const Path& path, bool* res) const {
auto fs_path = absolute_path(path);
Path fs_path;
RETURN_IF_ERROR(absolute_path(path, fs_path));
FILESYSTEM_M(exists_impl(fs_path, res));
}

Status FileSystem::file_size(const Path& file, int64_t* file_size) const {
auto path = absolute_path(file);
Path path;
RETURN_IF_ERROR(absolute_path(file, path));
FILESYSTEM_M(file_size_impl(path, file_size));
}

Status FileSystem::list(const Path& dir, bool only_file, std::vector<FileInfo>* files,
bool* exists) {
auto path = absolute_path(dir);
Path path;
RETURN_IF_ERROR(absolute_path(dir, path));
FILESYSTEM_M(list_impl(path, only_file, files, exists));
}

Status FileSystem::rename(const Path& orig_name, const Path& new_name) {
auto orig_path = absolute_path(orig_name);
auto new_path = absolute_path(new_name);
Path orig_path;
RETURN_IF_ERROR(absolute_path(orig_name, orig_path));
Path new_path;
RETURN_IF_ERROR(absolute_path(new_name, new_path));
FILESYSTEM_M(rename_impl(orig_path, new_path));
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/io/fs/file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ class FileSystem {

// FIMXE(plat1ko): The implementation and semantics of this function are not completely
// consistent, which is confused.
virtual Path absolute_path(const Path& path) const = 0;
virtual Status absolute_path(const Path& path, Path& abs_path) const = 0;

FileSystem(std::string id, FileSystemType type) : _id(std::move(id)), _type(type) {}

Expand Down
50 changes: 50 additions & 0 deletions be/src/io/fs/local_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -471,4 +471,54 @@ Status LocalFileSystem::permission_impl(const Path& file, std::filesystem::perms
return Status::OK();
}

Status LocalFileSystem::convert_to_abs_path(const Path& input_path_str, Path& abs_path) {
// valid path include:
// 1. abc/def will return abc/def
// 2. /abc/def will return /abc/def
// 3. file:/abc/def will return /abc/def
// 4. file://<authority>/abc/def will return /abc/def
std::string path_str = input_path_str;
size_t slash = path_str.find('/');
if (slash == 0) {
abs_path = input_path_str;
return Status::OK();
}

// Initialize scheme and authority
std::string scheme;
size_t start = 0;

// Parse URI scheme
size_t colon = path_str.find(':');
if (colon != std::string::npos && (slash == std::string::npos || colon < slash)) {
// Has a scheme
scheme = path_str.substr(0, colon);
if (scheme != "file") {
return Status::InternalError(
"Only supports `file` type scheme, like 'file:///path', 'file:/path'.");
}
start = colon + 1;
}

// Parse URI authority, if any
if (path_str.compare(start, 2, "//") == 0 && path_str.length() - start > 2) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add example here

// Has authority
// such as : path_str = "file://authority/abc/def"
// and now : start = 5
size_t next_slash = path_str.find('/', start + 2);
// now : next_slash = 16
if (next_slash == std::string::npos) {
return Status::InternalError(
"This input string only has authority, but has no path information");
}
// We will skit authority
// now : start = 16
start = next_slash;
}

// URI path is the rest of the string
abs_path = path_str.substr(start);
return Status::OK();
}

} // namespace doris::io
6 changes: 5 additions & 1 deletion be/src/io/fs/local_file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ class LocalFileSystem final : public FileSystem {
public:
~LocalFileSystem() override;

static Status convert_to_abs_path(const Path& path, Path& abs_path);

/// hard link dest file to src file
Status link_file(const Path& src, const Path& dest);

Expand Down Expand Up @@ -104,7 +106,9 @@ class LocalFileSystem final : public FileSystem {

// `LocalFileSystem` always use absolute path as arguments
// FIXME(plat1ko): Eliminate this method
Path absolute_path(const Path& path) const override { return path; }
Status absolute_path(const Path& path, Path& abs_path) const override {
return convert_to_abs_path(path, abs_path);
}

friend const std::shared_ptr<LocalFileSystem>& global_local_filesystem();
};
Expand Down
10 changes: 7 additions & 3 deletions be/src/io/fs/remote_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,25 @@
namespace doris::io {

Status RemoteFileSystem::upload(const Path& local_file, const Path& dest_file) {
auto dest_path = absolute_path(dest_file);
Path dest_path;
RETURN_IF_ERROR(absolute_path(dest_file, dest_path));
FILESYSTEM_M(upload_impl(local_file, dest_path));
}

Status RemoteFileSystem::batch_upload(const std::vector<Path>& local_files,
const std::vector<Path>& remote_files) {
std::vector<Path> remote_paths;
for (auto& path : remote_files) {
remote_paths.push_back(absolute_path(path));
Path abs_path;
RETURN_IF_ERROR(absolute_path(path, abs_path));
remote_paths.push_back(abs_path);
}
FILESYSTEM_M(batch_upload_impl(local_files, remote_paths));
}

Status RemoteFileSystem::download(const Path& remote_file, const Path& local) {
auto remote_path = absolute_path(remote_file);
Path remote_path;
RETURN_IF_ERROR(absolute_path(remote_file, remote_path));
FILESYSTEM_M(download_impl(remote_path, local));
}

Expand Down
8 changes: 5 additions & 3 deletions be/src/io/fs/remote_file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,13 @@ class RemoteFileSystem : public FileSystem {
virtual Status open_file_internal(const Path& file, FileReaderSPtr* reader,
const FileReaderOptions& opts) = 0;

Path absolute_path(const Path& path) const override {
Status absolute_path(const Path& path, Path& abs_path) const override {
if (path.is_absolute()) {
return path;
abs_path = path;
} else {
abs_path = _root_path / path;
}
return _root_path / path;
return Status::OK();
}

Path _root_path;
Expand Down
7 changes: 4 additions & 3 deletions be/src/io/fs/s3_file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,16 +113,17 @@ class S3FileSystem final : public RemoteFileSystem {
const std::vector<Path>& remote_files) override;
Status download_impl(const Path& remote_file, const Path& local_file) override;

Path absolute_path(const Path& path) const override {
Status absolute_path(const Path& path, Path& abs_path) const override {
if (path.string().find("://") != std::string::npos) {
// the path is with schema, which means this is a full path like:
// s3://bucket/path/to/file.txt
// so no need to concat with prefix
return path;
abs_path = path;
} else {
// path with no schema
return _root_path / path;
abs_path = _root_path / path;
}
return Status::OK();
}

private:
Expand Down
17 changes: 17 additions & 0 deletions be/src/vec/exec/format/parquet/schema_desc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ Status FieldDescriptor::parse_from_thrift(const std::vector<tparquet::SchemaElem
return Status::InvalidArgument("Duplicated field name: {}", _fields[i].name);
}
_name_to_field.emplace(_fields[i].name, &_fields[i]);
if (_fields[i].field_id != -1) {
_field_id_name_mapping.emplace(_fields[i].field_id, _fields[i].name);
}
}

if (_next_schema_pos != t_schemas.size()) {
Expand All @@ -147,6 +150,14 @@ Status FieldDescriptor::parse_from_thrift(const std::vector<tparquet::SchemaElem
return Status::OK();
}

const doris::Slice FieldDescriptor::get_column_name_from_field_id(int32_t id) const {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: return type 'const doris::Slice' is 'const'-qualified at the top level, which may reduce code readability without improving const correctness [readability-const-return-type]

Suggested change
const doris::Slice FieldDescriptor::get_column_name_from_field_id(int32_t id) const {
doris::Slice FieldDescriptor::get_column_name_from_field_id(int32_t id) const {

be/src/vec/exec/format/parquet/schema_desc.h:137:

-     const doris::Slice get_column_name_from_field_id(int32_t id) const;
+     doris::Slice get_column_name_from_field_id(int32_t id) const;

auto const it = _field_id_name_mapping.find(id);
if (it == _field_id_name_mapping.end()) {
return {};
}
return {it->second.data()};
}

Status FieldDescriptor::parse_node_field(const std::vector<tparquet::SchemaElement>& t_schemas,
size_t curr_pos, FieldSchema* node_field) {
if (curr_pos >= t_schemas.size()) {
Expand All @@ -172,6 +183,7 @@ Status FieldDescriptor::parse_node_field(const std::vector<tparquet::SchemaEleme
node_field->type.add_sub_type(child->type);
node_field->is_nullable = false;
_next_schema_pos = curr_pos + 1;
node_field->field_id = t_schema.__isset.field_id ? t_schema.field_id : -1;
} else {
bool is_optional = is_optional_node(t_schema);
if (is_optional) {
Expand All @@ -194,6 +206,7 @@ void FieldDescriptor::parse_physical_field(const tparquet::SchemaElement& physic
auto type = get_doris_type(physical_schema);
physical_field->type = type.first;
physical_field->is_type_compatibility = type.second;
physical_field->field_id = physical_schema.__isset.field_id ? physical_schema.field_id : -1;
}

std::pair<TypeDescriptor, bool> FieldDescriptor::get_doris_type(
Expand Down Expand Up @@ -465,6 +478,7 @@ Status FieldDescriptor::parse_group_field(const std::vector<tparquet::SchemaElem
group_field->type.type = TYPE_ARRAY;
group_field->type.add_sub_type(struct_field->type);
group_field->is_nullable = false;
group_field->field_id = group_schema.__isset.field_id ? group_schema.field_id : -1;
} else {
RETURN_IF_ERROR(parse_struct_field(t_schemas, curr_pos, group_field));
}
Expand Down Expand Up @@ -533,6 +547,7 @@ Status FieldDescriptor::parse_list_field(const std::vector<tparquet::SchemaEleme
list_field->type.type = TYPE_ARRAY;
list_field->type.add_sub_type(list_field->children[0].type);
list_field->is_nullable = is_optional;
list_field->field_id = first_level.__isset.field_id ? first_level.field_id : -1;

return Status::OK();
}
Expand Down Expand Up @@ -597,6 +612,7 @@ Status FieldDescriptor::parse_map_field(const std::vector<tparquet::SchemaElemen
map_field->type.add_sub_type(map_kv_field->type.children[0]);
map_field->type.add_sub_type(map_kv_field->type.children[1]);
map_field->is_nullable = is_optional;
map_field->field_id = map_schema.__isset.field_id ? map_schema.field_id : -1;

return Status::OK();
}
Expand All @@ -619,6 +635,7 @@ Status FieldDescriptor::parse_struct_field(const std::vector<tparquet::SchemaEle
struct_field->name = to_lower(struct_schema.name);
struct_field->is_nullable = is_optional;
struct_field->type.type = TYPE_STRUCT;
struct_field->field_id = struct_schema.__isset.field_id ? struct_schema.field_id : -1;
for (int i = 0; i < num_children; ++i) {
struct_field->type.add_sub_type(struct_field->children[i].type,
struct_field->children[i].name);
Expand Down
8 changes: 8 additions & 0 deletions be/src/vec/exec/format/parquet/schema_desc.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

#include "common/status.h"
#include "runtime/types.h"
#include "util/slice.h"

namespace doris::vectorized {

Expand Down Expand Up @@ -56,6 +57,8 @@ struct FieldSchema {
~FieldSchema() = default;
FieldSchema(const FieldSchema& fieldSchema) = default;
std::string debug_string() const;

int32_t field_id;
};

class FieldDescriptor {
Expand All @@ -68,6 +71,7 @@ class FieldDescriptor {
std::unordered_map<std::string, const FieldSchema*> _name_to_field;
// Used in from_thrift, marking the next schema position that should be parsed
size_t _next_schema_pos;
std::unordered_map<int, std::string> _field_id_name_mapping;

void parse_physical_field(const tparquet::SchemaElement& physical_schema, bool is_nullable,
FieldSchema* physical_field);
Expand Down Expand Up @@ -128,6 +132,10 @@ class FieldDescriptor {
std::string debug_string() const;

int32_t size() const { return _fields.size(); }

bool has_parquet_field_id() const { return _field_id_name_mapping.size() > 0; }

const doris::Slice get_column_name_from_field_id(int32_t id) const;
};

} // namespace doris::vectorized
6 changes: 2 additions & 4 deletions be/src/vec/exec/format/parquet/vparquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,10 +253,8 @@ Status ParquetReader::_open_file() {
return Status::OK();
}

// Get iceberg col id to col name map stored in parquet metadata key values.
// This is for iceberg schema evolution.
std::vector<tparquet::KeyValue> ParquetReader::get_metadata_key_values() {
return _t_metadata->key_value_metadata;
const FieldDescriptor ParquetReader::get_file_metadata_schema() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: return type 'const std::doris::vectorized::FieldDescriptor' is 'const'-qualified at the top level, which may reduce code readability without improving const correctness [readability-const-return-type]

Suggested change
const FieldDescriptor ParquetReader::get_file_metadata_schema() {
FieldDescriptor ParquetReader::get_file_metadata_schema() {

be/src/vec/exec/format/parquet/vparquet_reader.h:151:

-     const FieldDescriptor get_file_metadata_schema();
+     FieldDescriptor get_file_metadata_schema();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: return type 'const std::doris::vectorized::FieldDescriptor' is 'const'-qualified at the top level, which may reduce code readability without improving const correctness [readability-const-return-type]

Suggested change
const FieldDescriptor ParquetReader::get_file_metadata_schema() {
FieldDescriptor ParquetReader::get_file_metadata_schema() {

be/src/vec/exec/format/parquet/vparquet_reader.h:150:

-     const FieldDescriptor get_file_metadata_schema();
+     FieldDescriptor get_file_metadata_schema();

return _file_metadata->schema();
}

Status ParquetReader::open() {
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/format/parquet/vparquet_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ class ParquetReader : public GenericReader {
partition_columns,
const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) override;

std::vector<tparquet::KeyValue> get_metadata_key_values();
const FieldDescriptor get_file_metadata_schema();
void set_table_to_file_col_map(std::unordered_map<std::string, std::string>& map) {
_table_col_to_file_col = map;
}
Expand Down
Loading
Loading