Skip to content

Commit

Permalink
[feat](iceberg)Supports using rest type catalog to read tables in u…
Browse files Browse the repository at this point in the history
…nity catalog for 3.0 (#43525) (#45161)

pb: #43525
  • Loading branch information
wuwenchi authored Dec 9, 2024
1 parent e9a60c4 commit 860f7d0
Show file tree
Hide file tree
Showing 25 changed files with 376 additions and 101 deletions.
34 changes: 23 additions & 11 deletions be/src/io/fs/file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,58 +25,70 @@ namespace io {

Status FileSystem::create_file(const Path& file, FileWriterPtr* writer,
const FileWriterOptions* opts) {
auto path = absolute_path(file);
Path path;
RETURN_IF_ERROR(absolute_path(file, path));
FILESYSTEM_M(create_file_impl(path, writer, opts));
}

Status FileSystem::open_file(const Path& file, FileReaderSPtr* reader,
const FileReaderOptions* opts) {
auto path = absolute_path(file);
Path path;
RETURN_IF_ERROR(absolute_path(file, path));
FILESYSTEM_M(open_file_impl(path, reader, opts));
}

Status FileSystem::create_directory(const Path& dir, bool failed_if_exists) {
auto path = absolute_path(dir);
Path path;
RETURN_IF_ERROR(absolute_path(dir, path));
FILESYSTEM_M(create_directory_impl(path, failed_if_exists));
}

Status FileSystem::delete_file(const Path& file) {
auto path = absolute_path(file);
Path path;
RETURN_IF_ERROR(absolute_path(file, path));
FILESYSTEM_M(delete_file_impl(path));
}

Status FileSystem::delete_directory(const Path& dir) {
auto path = absolute_path(dir);
Path path;
RETURN_IF_ERROR(absolute_path(dir, path));
FILESYSTEM_M(delete_directory_impl(path));
}

Status FileSystem::batch_delete(const std::vector<Path>& files) {
std::vector<Path> abs_files;
for (auto& file : files) {
abs_files.push_back(absolute_path(file));
Path abs_file;
RETURN_IF_ERROR(absolute_path(file, abs_file));
abs_files.push_back(abs_file);
}
FILESYSTEM_M(batch_delete_impl(abs_files));
}

Status FileSystem::exists(const Path& path, bool* res) const {
auto fs_path = absolute_path(path);
Path fs_path;
RETURN_IF_ERROR(absolute_path(path, fs_path));
FILESYSTEM_M(exists_impl(fs_path, res));
}

Status FileSystem::file_size(const Path& file, int64_t* file_size) const {
auto path = absolute_path(file);
Path path;
RETURN_IF_ERROR(absolute_path(file, path));
FILESYSTEM_M(file_size_impl(path, file_size));
}

Status FileSystem::list(const Path& dir, bool only_file, std::vector<FileInfo>* files,
bool* exists) {
auto path = absolute_path(dir);
Path path;
RETURN_IF_ERROR(absolute_path(dir, path));
FILESYSTEM_M(list_impl(path, only_file, files, exists));
}

Status FileSystem::rename(const Path& orig_name, const Path& new_name) {
auto orig_path = absolute_path(orig_name);
auto new_path = absolute_path(new_name);
Path orig_path;
RETURN_IF_ERROR(absolute_path(orig_name, orig_path));
Path new_path;
RETURN_IF_ERROR(absolute_path(new_name, new_path));
FILESYSTEM_M(rename_impl(orig_path, new_path));
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/io/fs/file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ class FileSystem {

// FIMXE(plat1ko): The implementation and semantics of this function are not completely
// consistent, which is confused.
virtual Path absolute_path(const Path& path) const = 0;
virtual Status absolute_path(const Path& path, Path& abs_path) const = 0;

FileSystem(std::string id, FileSystemType type) : _id(std::move(id)), _type(type) {}

Expand Down
50 changes: 50 additions & 0 deletions be/src/io/fs/local_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -471,4 +471,54 @@ Status LocalFileSystem::permission_impl(const Path& file, std::filesystem::perms
return Status::OK();
}

Status LocalFileSystem::convert_to_abs_path(const Path& input_path_str, Path& abs_path) {
// valid path include:
// 1. abc/def will return abc/def
// 2. /abc/def will return /abc/def
// 3. file:/abc/def will return /abc/def
// 4. file://<authority>/abc/def will return /abc/def
std::string path_str = input_path_str;
size_t slash = path_str.find('/');
if (slash == 0) {
abs_path = input_path_str;
return Status::OK();
}

// Initialize scheme and authority
std::string scheme;
size_t start = 0;

// Parse URI scheme
size_t colon = path_str.find(':');
if (colon != std::string::npos && (slash == std::string::npos || colon < slash)) {
// Has a scheme
scheme = path_str.substr(0, colon);
if (scheme != "file") {
return Status::InternalError(
"Only supports `file` type scheme, like 'file:///path', 'file:/path'.");
}
start = colon + 1;
}

// Parse URI authority, if any
if (path_str.compare(start, 2, "//") == 0 && path_str.length() - start > 2) {
// Has authority
// such as : path_str = "file://authority/abc/def"
// and now : start = 5
size_t next_slash = path_str.find('/', start + 2);
// now : next_slash = 16
if (next_slash == std::string::npos) {
return Status::InternalError(
"This input string only has authority, but has no path information");
}
// We will skit authority
// now : start = 16
start = next_slash;
}

// URI path is the rest of the string
abs_path = path_str.substr(start);
return Status::OK();
}

} // namespace doris::io
6 changes: 5 additions & 1 deletion be/src/io/fs/local_file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ class LocalFileSystem final : public FileSystem {
public:
~LocalFileSystem() override;

static Status convert_to_abs_path(const Path& path, Path& abs_path);

/// hard link dest file to src file
Status link_file(const Path& src, const Path& dest);

Expand Down Expand Up @@ -104,7 +106,9 @@ class LocalFileSystem final : public FileSystem {

// `LocalFileSystem` always use absolute path as arguments
// FIXME(plat1ko): Eliminate this method
Path absolute_path(const Path& path) const override { return path; }
Status absolute_path(const Path& path, Path& abs_path) const override {
return convert_to_abs_path(path, abs_path);
}

friend const std::shared_ptr<LocalFileSystem>& global_local_filesystem();
};
Expand Down
10 changes: 7 additions & 3 deletions be/src/io/fs/remote_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,25 @@
namespace doris::io {

Status RemoteFileSystem::upload(const Path& local_file, const Path& dest_file) {
auto dest_path = absolute_path(dest_file);
Path dest_path;
RETURN_IF_ERROR(absolute_path(dest_file, dest_path));
FILESYSTEM_M(upload_impl(local_file, dest_path));
}

Status RemoteFileSystem::batch_upload(const std::vector<Path>& local_files,
const std::vector<Path>& remote_files) {
std::vector<Path> remote_paths;
for (auto& path : remote_files) {
remote_paths.push_back(absolute_path(path));
Path abs_path;
RETURN_IF_ERROR(absolute_path(path, abs_path));
remote_paths.push_back(abs_path);
}
FILESYSTEM_M(batch_upload_impl(local_files, remote_paths));
}

Status RemoteFileSystem::download(const Path& remote_file, const Path& local) {
auto remote_path = absolute_path(remote_file);
Path remote_path;
RETURN_IF_ERROR(absolute_path(remote_file, remote_path));
FILESYSTEM_M(download_impl(remote_path, local));
}

Expand Down
8 changes: 5 additions & 3 deletions be/src/io/fs/remote_file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,13 @@ class RemoteFileSystem : public FileSystem {
virtual Status open_file_internal(const Path& file, FileReaderSPtr* reader,
const FileReaderOptions& opts) = 0;

Path absolute_path(const Path& path) const override {
Status absolute_path(const Path& path, Path& abs_path) const override {
if (path.is_absolute()) {
return path;
abs_path = path;
} else {
abs_path = _root_path / path;
}
return _root_path / path;
return Status::OK();
}

Path _root_path;
Expand Down
7 changes: 4 additions & 3 deletions be/src/io/fs/s3_file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,16 +113,17 @@ class S3FileSystem final : public RemoteFileSystem {
const std::vector<Path>& remote_files) override;
Status download_impl(const Path& remote_file, const Path& local_file) override;

Path absolute_path(const Path& path) const override {
Status absolute_path(const Path& path, Path& abs_path) const override {
if (path.string().find("://") != std::string::npos) {
// the path is with schema, which means this is a full path like:
// s3://bucket/path/to/file.txt
// so no need to concat with prefix
return path;
abs_path = path;
} else {
// path with no schema
return _root_path / path;
abs_path = _root_path / path;
}
return Status::OK();
}

private:
Expand Down
17 changes: 17 additions & 0 deletions be/src/vec/exec/format/parquet/schema_desc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ Status FieldDescriptor::parse_from_thrift(const std::vector<tparquet::SchemaElem
return Status::InvalidArgument("Duplicated field name: {}", _fields[i].name);
}
_name_to_field.emplace(_fields[i].name, &_fields[i]);
if (_fields[i].field_id != -1) {
_field_id_name_mapping.emplace(_fields[i].field_id, _fields[i].name);
}
}

if (_next_schema_pos != t_schemas.size()) {
Expand All @@ -147,6 +150,14 @@ Status FieldDescriptor::parse_from_thrift(const std::vector<tparquet::SchemaElem
return Status::OK();
}

const doris::Slice FieldDescriptor::get_column_name_from_field_id(int32_t id) const {
auto const it = _field_id_name_mapping.find(id);
if (it == _field_id_name_mapping.end()) {
return {};
}
return {it->second.data()};
}

Status FieldDescriptor::parse_node_field(const std::vector<tparquet::SchemaElement>& t_schemas,
size_t curr_pos, FieldSchema* node_field) {
if (curr_pos >= t_schemas.size()) {
Expand All @@ -172,6 +183,7 @@ Status FieldDescriptor::parse_node_field(const std::vector<tparquet::SchemaEleme
node_field->type.add_sub_type(child->type);
node_field->is_nullable = false;
_next_schema_pos = curr_pos + 1;
node_field->field_id = t_schema.__isset.field_id ? t_schema.field_id : -1;
} else {
bool is_optional = is_optional_node(t_schema);
if (is_optional) {
Expand All @@ -194,6 +206,7 @@ void FieldDescriptor::parse_physical_field(const tparquet::SchemaElement& physic
auto type = get_doris_type(physical_schema);
physical_field->type = type.first;
physical_field->is_type_compatibility = type.second;
physical_field->field_id = physical_schema.__isset.field_id ? physical_schema.field_id : -1;
}

std::pair<TypeDescriptor, bool> FieldDescriptor::get_doris_type(
Expand Down Expand Up @@ -465,6 +478,7 @@ Status FieldDescriptor::parse_group_field(const std::vector<tparquet::SchemaElem
group_field->type.type = TYPE_ARRAY;
group_field->type.add_sub_type(struct_field->type);
group_field->is_nullable = false;
group_field->field_id = group_schema.__isset.field_id ? group_schema.field_id : -1;
} else {
RETURN_IF_ERROR(parse_struct_field(t_schemas, curr_pos, group_field));
}
Expand Down Expand Up @@ -533,6 +547,7 @@ Status FieldDescriptor::parse_list_field(const std::vector<tparquet::SchemaEleme
list_field->type.type = TYPE_ARRAY;
list_field->type.add_sub_type(list_field->children[0].type);
list_field->is_nullable = is_optional;
list_field->field_id = first_level.__isset.field_id ? first_level.field_id : -1;

return Status::OK();
}
Expand Down Expand Up @@ -597,6 +612,7 @@ Status FieldDescriptor::parse_map_field(const std::vector<tparquet::SchemaElemen
map_field->type.add_sub_type(map_kv_field->type.children[0]);
map_field->type.add_sub_type(map_kv_field->type.children[1]);
map_field->is_nullable = is_optional;
map_field->field_id = map_schema.__isset.field_id ? map_schema.field_id : -1;

return Status::OK();
}
Expand All @@ -619,6 +635,7 @@ Status FieldDescriptor::parse_struct_field(const std::vector<tparquet::SchemaEle
struct_field->name = to_lower(struct_schema.name);
struct_field->is_nullable = is_optional;
struct_field->type.type = TYPE_STRUCT;
struct_field->field_id = struct_schema.__isset.field_id ? struct_schema.field_id : -1;
for (int i = 0; i < num_children; ++i) {
struct_field->type.add_sub_type(struct_field->children[i].type,
struct_field->children[i].name);
Expand Down
8 changes: 8 additions & 0 deletions be/src/vec/exec/format/parquet/schema_desc.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

#include "common/status.h"
#include "runtime/types.h"
#include "util/slice.h"

namespace doris::vectorized {

Expand Down Expand Up @@ -56,6 +57,8 @@ struct FieldSchema {
~FieldSchema() = default;
FieldSchema(const FieldSchema& fieldSchema) = default;
std::string debug_string() const;

int32_t field_id;
};

class FieldDescriptor {
Expand All @@ -68,6 +71,7 @@ class FieldDescriptor {
std::unordered_map<std::string, const FieldSchema*> _name_to_field;
// Used in from_thrift, marking the next schema position that should be parsed
size_t _next_schema_pos;
std::unordered_map<int, std::string> _field_id_name_mapping;

void parse_physical_field(const tparquet::SchemaElement& physical_schema, bool is_nullable,
FieldSchema* physical_field);
Expand Down Expand Up @@ -128,6 +132,10 @@ class FieldDescriptor {
std::string debug_string() const;

int32_t size() const { return _fields.size(); }

bool has_parquet_field_id() const { return _field_id_name_mapping.size() > 0; }

const doris::Slice get_column_name_from_field_id(int32_t id) const;
};

} // namespace doris::vectorized
6 changes: 2 additions & 4 deletions be/src/vec/exec/format/parquet/vparquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,10 +253,8 @@ Status ParquetReader::_open_file() {
return Status::OK();
}

// Get iceberg col id to col name map stored in parquet metadata key values.
// This is for iceberg schema evolution.
std::vector<tparquet::KeyValue> ParquetReader::get_metadata_key_values() {
return _t_metadata->key_value_metadata;
const FieldDescriptor ParquetReader::get_file_metadata_schema() {
return _file_metadata->schema();
}

Status ParquetReader::open() {
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/format/parquet/vparquet_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ class ParquetReader : public GenericReader {
partition_columns,
const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) override;

std::vector<tparquet::KeyValue> get_metadata_key_values();
const FieldDescriptor get_file_metadata_schema();
void set_table_to_file_col_map(std::unordered_map<std::string, std::string>& map) {
_table_col_to_file_col = map;
}
Expand Down
Loading

0 comments on commit 860f7d0

Please sign in to comment.