From a7f8e0763ab696f5bacb757c9d9af972856b06df Mon Sep 17 00:00:00 2001 From: Teague Sterling Date: Sun, 10 Nov 2024 16:27:12 -0800 Subject: [PATCH] Adding 'version guessing' functionality with version='?' Signed-off-by: Teague Sterling --- src/common/iceberg.cpp | 40 +++++++++++++++++++-- src/iceberg_functions/iceberg_metadata.cpp | 6 +++- src/iceberg_functions/iceberg_scan.cpp | 2 +- src/iceberg_functions/iceberg_snapshots.cpp | 2 +- src/include/iceberg_metadata.hpp | 6 ++++ test/sql/iceberg_scan.test | 21 +++++++++++ 6 files changed, 72 insertions(+), 5 deletions(-) diff --git a/src/common/iceberg.cpp b/src/common/iceberg.cpp index 6e63969..9f38dc4 100644 --- a/src/common/iceberg.cpp +++ b/src/common/iceberg.cpp @@ -188,15 +188,22 @@ string GenerateMetaDataUrl(FileSystem &fs, const string &meta_path, string &tabl } -string IcebergSnapshot::GetMetaDataPath(const string &path, FileSystem &fs, string metadata_compression_codec, string table_version = DEFAULT_VERSION_HINT_FILE, string version_format = DEFAULT_TABLE_VERSION_FORMAT) { +string IcebergSnapshot::GetMetaDataPath(const string &path, FileSystem &fs, string metadata_compression_codec, string table_version = DEFAULT_TABLE_VERSION, string version_format = DEFAULT_TABLE_VERSION_FORMAT) { if (StringUtil::EndsWith(path, ".json")) { return path; } auto meta_path = fs.JoinPath(path, "metadata"); string version_hint; - if(StringUtil::EndsWith(table_version, ".text")||StringUtil::EndsWith(table_version, ".txt")) { + if(StringUtil::EndsWith(table_version, ".text")||StringUtil::EndsWith(table_version, ".txt")) { version_hint = GetTableVersion(meta_path, fs, table_version); + } else if(StringUtil::StartsWith(table_version, "?")) { // TODO: Could add more options here: e.g., ?latest or ?largest + // Try to use a version-hint file + if(fs.FileExists(fs.JoinPath(meta_path, DEFAULT_VERSION_HINT_FILE))) { + version_hint = GetTableVersion(meta_path, fs, DEFAULT_VERSION_HINT_FILE); + } else { + version_hint = GuessTableVersion(meta_path, fs, table_version, metadata_compression_codec, version_format); + } } else { version_hint = table_version; } @@ -251,6 +258,35 @@ string IcebergSnapshot::GetTableVersion(const string &meta_path, FileSystem &fs, } } +string IcebergSnapshot::GuessTableVersion(const string &meta_path, FileSystem &fs, string &table_version, string &metadata_compression_codec, string &version_format = DEFAULT_TABLE_VERSION_FORMAT) { + string version_pattern = "*"; // TODO: Different "table_version" strings could customize this + string glob_pattern; + string compression_suffix = ""; + vector found_versions; + if (metadata_compression_codec == "gzip") { + compression_suffix = ".gz"; + } + + for(auto try_format : StringUtil::Split(version_format, ',')) { + glob_pattern = fs.JoinPath(meta_path, StringUtil::Format(try_format, version_pattern, compression_suffix)); + found_versions = fs.Glob(glob_pattern); + if(!found_versions.empty()) { + return PickTableVersion(found_versions, version_pattern); + } + } + + throw IOException( + "Could not guess Iceberg table version using '%s' compression and format(s): '%s'", table_version, metadata_compression_codec, version_format); +} + +string IcebergSnapshot::PickTableVersion(vector &found, string &table_version) { + // TODO: Different "table_version" strings could customize this + // For now: just sort the versions and take the largest + std::sort(found.begin(), found.end()); + return found.back(); + +} + yyjson_val *IcebergSnapshot::FindLatestSnapshotInternal(yyjson_val *snapshots) { size_t idx, max; yyjson_val *snapshot; diff --git a/src/iceberg_functions/iceberg_metadata.cpp b/src/iceberg_functions/iceberg_metadata.cpp index 1ffff2c..83ff5c1 100644 --- a/src/iceberg_functions/iceberg_metadata.cpp +++ b/src/iceberg_functions/iceberg_metadata.cpp @@ -54,10 +54,14 @@ static unique_ptr IcebergMetaDataBind(ClientContext &context, Tabl FileSystem &fs = FileSystem::GetFileSystem(context); auto iceberg_path = input.inputs[0].ToString(); + for(string fs : fs.ListSubSystems()) { + printf("%s\n", fs.c_str()); + } + bool allow_moved_paths = false; string metadata_compression_codec = "none"; bool skip_schema_inference = false; - string table_version = DEFAULT_VERSION_HINT_FILE; + string table_version = DEFAULT_TABLE_VERSION; string version_name_format = DEFAULT_TABLE_VERSION_FORMAT; for (auto &kv : input.named_parameters) { diff --git a/src/iceberg_functions/iceberg_scan.cpp b/src/iceberg_functions/iceberg_scan.cpp index 2b3ec86..f4e8c1f 100644 --- a/src/iceberg_functions/iceberg_scan.cpp +++ b/src/iceberg_functions/iceberg_scan.cpp @@ -225,7 +225,7 @@ static unique_ptr IcebergScanBindReplace(ClientContext &context, Table bool skip_schema_inference = false; string mode = "default"; string metadata_compression_codec = "none"; - string table_version = DEFAULT_VERSION_HINT_FILE; + string table_version = DEFAULT_TABLE_VERSION; string version_name_format = DEFAULT_TABLE_VERSION_FORMAT; for (auto &kv : input.named_parameters) { diff --git a/src/iceberg_functions/iceberg_snapshots.cpp b/src/iceberg_functions/iceberg_snapshots.cpp index e6e4003..68be479 100644 --- a/src/iceberg_functions/iceberg_snapshots.cpp +++ b/src/iceberg_functions/iceberg_snapshots.cpp @@ -55,7 +55,7 @@ static unique_ptr IcebergSnapshotsBind(ClientContext &context, Tab auto bind_data = make_uniq(); string metadata_compression_codec = "none"; - string table_version = DEFAULT_VERSION_HINT_FILE; + string table_version = DEFAULT_TABLE_VERSION; string version_name_format = DEFAULT_TABLE_VERSION_FORMAT; bool skip_schema_inference = false; diff --git a/src/include/iceberg_metadata.hpp b/src/include/iceberg_metadata.hpp index d7d4478..341f5e4 100644 --- a/src/include/iceberg_metadata.hpp +++ b/src/include/iceberg_metadata.hpp @@ -22,6 +22,10 @@ static string DEFAULT_TABLE_VERSION_FORMAT = "v%s%s.metadata.json,%s%s.metadata. static string DEFAULT_VERSION_HINT_FILE = "version-hint.text"; +// Will first look for DEFAULT_VERSION_HINT_FILE then search for versions +// matching the DEFAULT_TABLE_VERSION_FORMAT, taking the "last" one +static string DEFAULT_TABLE_VERSION = "?"; + struct IcebergColumnDefinition { public: static IcebergColumnDefinition ParseFromJson(yyjson_val *val); @@ -81,6 +85,8 @@ class IcebergSnapshot { protected: //! Internal JSON parsing functions static string GetTableVersion(const string &path, FileSystem &fs, string version_format); + static string GuessTableVersion(const string &meta_path, FileSystem &fs, string &table_version, string &metadata_compression_codec, string &version_format); + static string PickTableVersion(vector &found, string &table_version); static yyjson_val *FindLatestSnapshotInternal(yyjson_val *snapshots); static yyjson_val *FindSnapshotByIdInternal(yyjson_val *snapshots, idx_t target_id); static yyjson_val *FindSnapshotByIdTimestampInternal(yyjson_val *snapshots, timestamp_t timestamp); diff --git a/test/sql/iceberg_scan.test b/test/sql/iceberg_scan.test index 4f84bb7..e2c234a 100644 --- a/test/sql/iceberg_scan.test +++ b/test/sql/iceberg_scan.test @@ -79,3 +79,24 @@ query I SELECT count(*) FROM ICEBERG_SCAN('data/iceberg/lineitem_iceberg_gz', ALLOW_MOVED_PATHS=TRUE, METADATA_COMPRESSION_CODEC="gzip", version='2', version_name_format='v%s%s.metadata.json'); ---- 111968 + +query I +SELECT count(*) FROM ICEBERG_SCAN('data/iceberg/lineitem_iceberg', ALLOW_MOVED_PATHS=TRUE); +---- +111968 + +query I +SELECT count(*) FROM ICEBERG_SCAN('data/iceberg/lineitem_iceberg', version="?", ALLOW_MOVED_PATHS=TRUE); +---- +111968 + +query I +SELECT count(*) FROM ICEBERG_SCAN('data/iceberg/lineitem_iceberg_gz', ALLOW_MOVED_PATHS=TRUE, metadata_compression_codec="gzip"); +---- +111968 + +statement error +SELECT count(*) FROM ICEBERG_SCAN('data/iceberg/lineitem_iceberg_gz', ALLOW_MOVED_PATHS=TRUE, METADATA_COMPRESSION_CODEC="gzip", version="?", vesion_name_format="version%s%s.metadata.json"); +---- +IO Error: No snapshots found +