Skip to content

Commit

Permalink
Implement automatic version guessing
Browse files Browse the repository at this point in the history
Signed-off-by: Teague Sterling <[email protected]>
  • Loading branch information
teaguesterling committed Nov 12, 2024
1 parent 05262fb commit c61d4dd
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 28 deletions.
58 changes: 36 additions & 22 deletions src/common/iceberg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,25 +189,32 @@ string GenerateMetaDataUrl(FileSystem &fs, const string &meta_path, string &tabl


string IcebergSnapshot::GetMetaDataPath(const string &path, FileSystem &fs, string metadata_compression_codec, string table_version = DEFAULT_TABLE_VERSION, string version_format = DEFAULT_TABLE_VERSION_FORMAT) {
bool hint_is_metadata = false;
string version_hint;
if (StringUtil::EndsWith(path, ".json")) {
return path;
version_hint = path;
hint_is_metadata = true;
}

auto meta_path = fs.JoinPath(path, "metadata");
string version_hint;
if(StringUtil::EndsWith(table_version, ".text")||StringUtil::EndsWith(table_version, ".txt")) {
version_hint = GetTableVersion(meta_path, fs, table_version);
} else if(StringUtil::StartsWith(table_version, "?")) { // TODO: Could add more options here: e.g., ?latest or ?largest
} else if(!StringUtil::StartsWith(table_version, "?")) {
// TODO: Could add more options here: e.g., ?latest or ?largest
version_hint = table_version;
} else if(fs.FileExists(fs.JoinPath(meta_path, DEFAULT_VERSION_HINT_FILE))) {
// Try to use a version-hint file
if(fs.FileExists(fs.JoinPath(meta_path, DEFAULT_VERSION_HINT_FILE))) {
version_hint = GetTableVersion(meta_path, fs, DEFAULT_VERSION_HINT_FILE);
} else {
version_hint = GuessTableVersion(meta_path, fs, table_version, metadata_compression_codec, version_format);
}
version_hint = GetTableVersion(meta_path, fs, DEFAULT_VERSION_HINT_FILE);
} else {
version_hint = table_version;
version_hint = GuessTableVersion(meta_path, fs, table_version, metadata_compression_codec, version_format);
hint_is_metadata = true;
}

if (hint_is_metadata) {
return version_hint;
} else {
return GenerateMetaDataUrl(fs, meta_path, version_hint, metadata_compression_codec, version_format);
}
return GenerateMetaDataUrl(fs, meta_path, version_hint, metadata_compression_codec, version_format);
}


Expand Down Expand Up @@ -259,32 +266,39 @@ string IcebergSnapshot::GetTableVersion(const string &meta_path, FileSystem &fs,
}

string IcebergSnapshot::GuessTableVersion(const string &meta_path, FileSystem &fs, string &table_version, string &metadata_compression_codec, string &version_format = DEFAULT_TABLE_VERSION_FORMAT) {
string selected_metadata;

string version_pattern = "*"; // TODO: Different "table_version" strings could customize this
string glob_pattern;
string compression_suffix = "";
vector<string> found_versions;
if (metadata_compression_codec == "gzip") {
compression_suffix = ".gz";
}

for(auto try_format : StringUtil::Split(version_format, ',')) {
glob_pattern = fs.JoinPath(meta_path, StringUtil::Format(try_format, version_pattern, compression_suffix));
found_versions = fs.Glob(glob_pattern);
if(!found_versions.empty()) {
return PickTableVersion(found_versions, version_pattern);
auto glob_pattern = StringUtil::Format(try_format, version_pattern, compression_suffix);
auto found_versions = fs.Glob(fs.JoinPath(meta_path, glob_pattern));
if(found_versions.size() > 0) {
selected_metadata = PickTableVersion(found_versions, version_pattern, glob_pattern);
if(!selected_metadata.empty()) { // Found one
break;
}
}
}

throw IOException(
"Could not guess Iceberg table version using '%s' compression and format(s): '%s'", metadata_compression_codec, version_format);
if(!selected_metadata.empty()) {
return selected_metadata;
} else {
throw IOException(
"Could not guess Iceberg table version using '%s' compression and format(s): '%s'",
metadata_compression_codec, version_format);
}
}

string IcebergSnapshot::PickTableVersion(vector<string> &found, string &table_version) {
string IcebergSnapshot::PickTableVersion(vector<string> &found_metadata, string &version_pattern, string &glob) {
// TODO: Different "table_version" strings could customize this
// For now: just sort the versions and take the largest
std::sort(found.begin(), found.end());
return found.back();

std::sort(found_metadata.begin(), found_metadata.end());
return found_metadata.back();
}

yyjson_val *IcebergSnapshot::FindLatestSnapshotInternal(yyjson_val *snapshots) {
Expand Down
5 changes: 0 additions & 5 deletions src/iceberg_functions/iceberg_metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,6 @@ static unique_ptr<FunctionData> IcebergMetaDataBind(ClientContext &context, Tabl

FileSystem &fs = FileSystem::GetFileSystem(context);
auto iceberg_path = input.inputs[0].ToString();

for(string fs : fs.ListSubSystems()) {
printf("%s\n", fs.c_str());
}

bool allow_moved_paths = false;
string metadata_compression_codec = "none";
bool skip_schema_inference = false;
Expand Down
2 changes: 1 addition & 1 deletion src/include/iceberg_metadata.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class IcebergSnapshot {
//! Internal JSON parsing functions
static string GetTableVersion(const string &path, FileSystem &fs, string version_format);
static string GuessTableVersion(const string &meta_path, FileSystem &fs, string &table_version, string &metadata_compression_codec, string &version_format);
static string PickTableVersion(vector<string> &found, string &table_version);
static string PickTableVersion(vector<string> &found_metadata, string &version_pattern, string &glob);
static yyjson_val *FindLatestSnapshotInternal(yyjson_val *snapshots);
static yyjson_val *FindSnapshotByIdInternal(yyjson_val *snapshots, idx_t target_id);
static yyjson_val *FindSnapshotByIdTimestampInternal(yyjson_val *snapshots, timestamp_t timestamp);
Expand Down

0 comments on commit c61d4dd

Please sign in to comment.