Skip to content

Commit

Permalink
Adding 'version guessing' functionality with version='?'
Browse files Browse the repository at this point in the history
Signed-off-by: Teague Sterling <[email protected]>
  • Loading branch information
teaguesterling committed Nov 11, 2024
1 parent d62d91d commit a7f8e07
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 5 deletions.
40 changes: 38 additions & 2 deletions src/common/iceberg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,15 +188,22 @@ string GenerateMetaDataUrl(FileSystem &fs, const string &meta_path, string &tabl
}


string IcebergSnapshot::GetMetaDataPath(const string &path, FileSystem &fs, string metadata_compression_codec, string table_version = DEFAULT_VERSION_HINT_FILE, string version_format = DEFAULT_TABLE_VERSION_FORMAT) {
string IcebergSnapshot::GetMetaDataPath(const string &path, FileSystem &fs, string metadata_compression_codec, string table_version = DEFAULT_TABLE_VERSION, string version_format = DEFAULT_TABLE_VERSION_FORMAT) {
if (StringUtil::EndsWith(path, ".json")) {
return path;
}

auto meta_path = fs.JoinPath(path, "metadata");
string version_hint;
if(StringUtil::EndsWith(table_version, ".text")||StringUtil::EndsWith(table_version, ".txt")) {
if(StringUtil::EndsWith(table_version, ".text")||StringUtil::EndsWith(table_version, ".txt")) {
version_hint = GetTableVersion(meta_path, fs, table_version);
} else if(StringUtil::StartsWith(table_version, "?")) { // TODO: Could add more options here: e.g., ?latest or ?largest
// Try to use a version-hint file
if(fs.FileExists(fs.JoinPath(meta_path, DEFAULT_VERSION_HINT_FILE))) {
version_hint = GetTableVersion(meta_path, fs, DEFAULT_VERSION_HINT_FILE);
} else {
version_hint = GuessTableVersion(meta_path, fs, table_version, metadata_compression_codec, version_format);
}
} else {
version_hint = table_version;
}
Expand Down Expand Up @@ -251,6 +258,35 @@ string IcebergSnapshot::GetTableVersion(const string &meta_path, FileSystem &fs,
}
}

string IcebergSnapshot::GuessTableVersion(const string &meta_path, FileSystem &fs, string &table_version, string &metadata_compression_codec, string &version_format = DEFAULT_TABLE_VERSION_FORMAT) {
string version_pattern = "*"; // TODO: Different "table_version" strings could customize this
string glob_pattern;
string compression_suffix = "";
vector<string> found_versions;
if (metadata_compression_codec == "gzip") {
compression_suffix = ".gz";
}

for(auto try_format : StringUtil::Split(version_format, ',')) {
glob_pattern = fs.JoinPath(meta_path, StringUtil::Format(try_format, version_pattern, compression_suffix));
found_versions = fs.Glob(glob_pattern);
if(!found_versions.empty()) {
return PickTableVersion(found_versions, version_pattern);
}
}

throw IOException(
"Could not guess Iceberg table version using '%s' compression and format(s): '%s'", table_version, metadata_compression_codec, version_format);
}

string IcebergSnapshot::PickTableVersion(vector<string> &found, string &table_version) {
// TODO: Different "table_version" strings could customize this
// For now: just sort the versions and take the largest
std::sort(found.begin(), found.end());
return found.back();

}

yyjson_val *IcebergSnapshot::FindLatestSnapshotInternal(yyjson_val *snapshots) {
size_t idx, max;
yyjson_val *snapshot;
Expand Down
6 changes: 5 additions & 1 deletion src/iceberg_functions/iceberg_metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,14 @@ static unique_ptr<FunctionData> IcebergMetaDataBind(ClientContext &context, Tabl
FileSystem &fs = FileSystem::GetFileSystem(context);
auto iceberg_path = input.inputs[0].ToString();

for(string fs : fs.ListSubSystems()) {
printf("%s\n", fs.c_str());
}

bool allow_moved_paths = false;
string metadata_compression_codec = "none";
bool skip_schema_inference = false;
string table_version = DEFAULT_VERSION_HINT_FILE;
string table_version = DEFAULT_TABLE_VERSION;
string version_name_format = DEFAULT_TABLE_VERSION_FORMAT;

for (auto &kv : input.named_parameters) {
Expand Down
2 changes: 1 addition & 1 deletion src/iceberg_functions/iceberg_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ static unique_ptr<TableRef> IcebergScanBindReplace(ClientContext &context, Table
bool skip_schema_inference = false;
string mode = "default";
string metadata_compression_codec = "none";
string table_version = DEFAULT_VERSION_HINT_FILE;
string table_version = DEFAULT_TABLE_VERSION;
string version_name_format = DEFAULT_TABLE_VERSION_FORMAT;

for (auto &kv : input.named_parameters) {
Expand Down
2 changes: 1 addition & 1 deletion src/iceberg_functions/iceberg_snapshots.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ static unique_ptr<FunctionData> IcebergSnapshotsBind(ClientContext &context, Tab
auto bind_data = make_uniq<IcebergSnaphotsBindData>();

string metadata_compression_codec = "none";
string table_version = DEFAULT_VERSION_HINT_FILE;
string table_version = DEFAULT_TABLE_VERSION;
string version_name_format = DEFAULT_TABLE_VERSION_FORMAT;
bool skip_schema_inference = false;

Expand Down
6 changes: 6 additions & 0 deletions src/include/iceberg_metadata.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ static string DEFAULT_TABLE_VERSION_FORMAT = "v%s%s.metadata.json,%s%s.metadata.

static string DEFAULT_VERSION_HINT_FILE = "version-hint.text";

// Will first look for DEFAULT_VERSION_HINT_FILE then search for versions
// matching the DEFAULT_TABLE_VERSION_FORMAT, taking the "last" one
static string DEFAULT_TABLE_VERSION = "?";

struct IcebergColumnDefinition {
public:
static IcebergColumnDefinition ParseFromJson(yyjson_val *val);
Expand Down Expand Up @@ -81,6 +85,8 @@ class IcebergSnapshot {
protected:
//! Internal JSON parsing functions
static string GetTableVersion(const string &path, FileSystem &fs, string version_format);
static string GuessTableVersion(const string &meta_path, FileSystem &fs, string &table_version, string &metadata_compression_codec, string &version_format);
static string PickTableVersion(vector<string> &found, string &table_version);
static yyjson_val *FindLatestSnapshotInternal(yyjson_val *snapshots);
static yyjson_val *FindSnapshotByIdInternal(yyjson_val *snapshots, idx_t target_id);
static yyjson_val *FindSnapshotByIdTimestampInternal(yyjson_val *snapshots, timestamp_t timestamp);
Expand Down
21 changes: 21 additions & 0 deletions test/sql/iceberg_scan.test
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,24 @@ query I
SELECT count(*) FROM ICEBERG_SCAN('data/iceberg/lineitem_iceberg_gz', ALLOW_MOVED_PATHS=TRUE, METADATA_COMPRESSION_CODEC="gzip", version='2', version_name_format='v%s%s.metadata.json');
----
111968

query I
SELECT count(*) FROM ICEBERG_SCAN('data/iceberg/lineitem_iceberg', ALLOW_MOVED_PATHS=TRUE);
----
111968

query I
SELECT count(*) FROM ICEBERG_SCAN('data/iceberg/lineitem_iceberg', version="?", ALLOW_MOVED_PATHS=TRUE);
----
111968

query I
SELECT count(*) FROM ICEBERG_SCAN('data/iceberg/lineitem_iceberg_gz', ALLOW_MOVED_PATHS=TRUE, metadata_compression_codec="gzip");
----
111968

statement error
SELECT count(*) FROM ICEBERG_SCAN('data/iceberg/lineitem_iceberg_gz', ALLOW_MOVED_PATHS=TRUE, METADATA_COMPRESSION_CODEC="gzip", version="?", vesion_name_format="version%s%s.metadata.json");
----
IO Error: No snapshots found

0 comments on commit a7f8e07

Please sign in to comment.