Skip to content

Commit

Permalink
Fixed test issues
Browse files Browse the repository at this point in the history
  • Loading branch information
teaguesterling committed Nov 17, 2024
1 parent 6285508 commit 15d0cc3
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 48 deletions.
61 changes: 27 additions & 34 deletions src/common/iceberg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,33 +191,29 @@ string GenerateMetaDataUrl(FileSystem &fs, const string &meta_path, string &tabl
string IcebergSnapshot::GetMetaDataPath(const string &path, FileSystem &fs, string metadata_compression_codec, string table_version = DEFAULT_TABLE_VERSION, string version_format = DEFAULT_TABLE_VERSION_FORMAT) {
string version_hint;
string meta_path = fs.JoinPath(path, "metadata");
bool hint_is_metadata = false;

if (StringUtil::EndsWith(path, ".json")) {
// We've been given a real metadata path. Do nothing
version_hint = path;
hint_is_metadata = true;
} else if (StringUtil::EndsWith(table_version, ".text")||StringUtil::EndsWith(table_version, ".txt")) {
// We have been provided a hint file
version_hint = GetTableVersion(meta_path, fs, table_version);
} else if (!StringUtil::StartsWith(table_version, "?")) {
// We have a specific version supplied. No guessing here
// TODO: Could add more options here: e.g., ?latest or ?largest
// We've been given a real metadata path. Nothing else to do.
return path;
} else if (!fs.DirectoryExists(meta_path)) {
// Make sure we have a metadata directory to look in
throw IOException("Cannot open \""+path+"\": Metadata directory does not exist");
} else if(StringUtil::EndsWith(table_version, ".text")||StringUtil::EndsWith(table_version, ".txt")) {
// We were given a hint filename
version_hint = GetTableVersionFromHint(meta_path, fs, table_version);
return GenerateMetaDataUrl(fs, meta_path, version_hint, metadata_compression_codec, version_format);
} else if(!StringUtil::StartsWith(table_version, "?")) {
// We were given an explicit version number
version_hint = table_version;
} else if (fs.FileExists(fs.JoinPath(meta_path, DEFAULT_VERSION_HINT_FILE))) {
// Try to use the existing version-hint file
version_hint = GetTableVersion(meta_path, fs, DEFAULT_VERSION_HINT_FILE);
return GenerateMetaDataUrl(fs, meta_path, version_hint, metadata_compression_codec, version_format);
} else if(fs.FileExists(fs.JoinPath(meta_path, DEFAULT_VERSION_HINT_FILE))) {
// We're guessing, but a version-hint.text exists so we'll use that
version_hint = GetTableVersionFromHint(meta_path, fs, DEFAULT_VERSION_HINT_FILE);
return GenerateMetaDataUrl(fs, meta_path, version_hint, metadata_compression_codec, version_format);
} else {
// We need to guess the latest version from available metadata files
version_hint = GuessTableVersion(meta_path, fs, table_version, metadata_compression_codec, version_format);
hint_is_metadata = true;
// We need to guess from file paths
return GuessTableVersion(meta_path, fs, table_version, metadata_compression_codec, version_format);
}

if (hint_is_metadata) {
return version_hint;
} else {
return GenerateMetaDataUrl(fs, meta_path, version_hint, metadata_compression_codec, version_format);
}
}


Expand Down Expand Up @@ -255,7 +251,7 @@ IcebergSnapshot IcebergSnapshot::ParseSnapShot(yyjson_val *snapshot, idx_t icebe
return ret;
}

string IcebergSnapshot::GetTableVersion(const string &meta_path, FileSystem &fs, string version_file = DEFAULT_VERSION_HINT_FILE) {
string IcebergSnapshot::GetTableVersionFromHint(const string &meta_path, FileSystem &fs, string version_file = DEFAULT_VERSION_HINT_FILE) {
auto version_file_path = fs.JoinPath(meta_path, version_file);
auto version_file_content = IcebergUtils::FileToString(version_file_path, fs);

Expand All @@ -270,31 +266,27 @@ string IcebergSnapshot::GetTableVersion(const string &meta_path, FileSystem &fs,

string IcebergSnapshot::GuessTableVersion(const string &meta_path, FileSystem &fs, string &table_version, string &metadata_compression_codec, string &version_format = DEFAULT_TABLE_VERSION_FORMAT) {
string selected_metadata;

string version_pattern = "*"; // TODO: Different "table_version" strings could customize this
string compression_suffix = "";
if (metadata_compression_codec == "gzip") {
compression_suffix = ".gz";
}

for(auto try_format : StringUtil::Split(version_format, ',')) {
auto glob_pattern = StringUtil::Format(try_format, version_pattern, compression_suffix);

auto found_versions = fs.Glob(fs.JoinPath(meta_path, glob_pattern));
if(found_versions.size() > 0) {
selected_metadata = PickTableVersion(found_versions, version_pattern, glob_pattern);
if(!selected_metadata.empty()) { // Found one
break;
return selected_metadata;
}
}
}

if(!selected_metadata.empty()) {
return selected_metadata;
} else {
throw IOException(
"Could not guess Iceberg table version using '%s' compression and format(s): '%s'",
metadata_compression_codec, version_format);
}

throw IOException(
"Could not guess Iceberg table version using '%s' compression and format(s): '%s'",
metadata_compression_codec, version_format);
}

string IcebergSnapshot::PickTableVersion(vector<string> &found_metadata, string &version_pattern, string &glob) {
Expand All @@ -304,6 +296,7 @@ string IcebergSnapshot::PickTableVersion(vector<string> &found_metadata, string
return found_metadata.back();
}


yyjson_val *IcebergSnapshot::FindLatestSnapshotInternal(yyjson_val *snapshots) {
size_t idx, max;
yyjson_val *snapshot;
Expand Down
1 change: 1 addition & 0 deletions src/iceberg_functions/iceberg_metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ static unique_ptr<FunctionData> IcebergMetaDataBind(ClientContext &context, Tabl

FileSystem &fs = FileSystem::GetFileSystem(context);
auto iceberg_path = input.inputs[0].ToString();

bool allow_moved_paths = false;
string metadata_compression_codec = "none";
bool skip_schema_inference = false;
Expand Down
2 changes: 1 addition & 1 deletion src/include/iceberg_metadata.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class IcebergSnapshot {

protected:
//! Internal JSON parsing functions
static string GetTableVersion(const string &path, FileSystem &fs, string version_format);
static string GetTableVersionFromHint(const string &path, FileSystem &fs, string version_format);
static string GuessTableVersion(const string &meta_path, FileSystem &fs, string &table_version, string &metadata_compression_codec, string &version_format);
static string PickTableVersion(vector<string> &found_metadata, string &version_pattern, string &glob);
static yyjson_val *FindLatestSnapshotInternal(yyjson_val *snapshots);
Expand Down
36 changes: 35 additions & 1 deletion test/sql/iceberg_metadata.test
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,38 @@ lineitem_iceberg_gz/metadata/23f9dbea-1e7f-4694-a82c-dc3c9a94953e-m0.avro 0 DATA
statement error
SELECT * FROM ICEBERG_METADATA('data/iceberg/lineitem_iceberg_nonexistent');
----
IO Error: Could not guess Iceberg table version using 'none' compression and format(s): 'v%s%s.metadata.json,%s%s.metadata.json'
IO Error: Cannot open "data/iceberg/lineitem_iceberg_nonexistent": Metadata directory does not exist

query IIIIIIII
SELECT * FROM ICEBERG_METADATA('data/iceberg/lineitem_iceberg_no_hint', ALLOW_MOVED_PATHS=TRUE);
----
lineitem_iceberg/metadata/10eaca8a-1e1c-421e-ad6d-b232e5ee23d3-m1.avro 2 DATA ADDED EXISTING lineitem_iceberg/data/00041-414-f3c73457-bbd6-4b92-9c15-17b241171b16-00001.parquet PARQUET 51793
lineitem_iceberg/metadata/10eaca8a-1e1c-421e-ad6d-b232e5ee23d3-m0.avro 2 DATA DELETED EXISTING lineitem_iceberg/data/00000-411-0792dcfe-4e25-4ca3-8ada-175286069a47-00001.parquet PARQUET 60175

query IIIIIIII
SELECT * FROM ICEBERG_METADATA('data/iceberg/lineitem_iceberg_no_hint', ALLOW_MOVED_PATHS=TRUE, version='1');
----
lineitem_iceberg/metadata/cf3d0be5-cf70-453d-ad8f-48fdc412e608-m0.avro 1 DATA ADDED EXISTING lineitem_iceberg/data/00000-411-0792dcfe-4e25-4ca3-8ada-175286069a47-00001.parquet PARQUET 60175

query IIIIIIII
SELECT * FROM ICEBERG_METADATA('data/iceberg/lineitem_iceberg_no_hint', ALLOW_MOVED_PATHS=TRUE, version_name_format='v%s%s.metadata.json');
----
lineitem_iceberg/metadata/10eaca8a-1e1c-421e-ad6d-b232e5ee23d3-m1.avro 2 DATA ADDED EXISTING lineitem_iceberg/data/00041-414-f3c73457-bbd6-4b92-9c15-17b241171b16-00001.parquet PARQUET 51793
lineitem_iceberg/metadata/10eaca8a-1e1c-421e-ad6d-b232e5ee23d3-m0.avro 2 DATA DELETED EXISTING lineitem_iceberg/data/00000-411-0792dcfe-4e25-4ca3-8ada-175286069a47-00001.parquet PARQUET 60175

query IIIIIIII
SELECT * FROM ICEBERG_METADATA('data/iceberg/lineitem_iceberg_no_hint', ALLOW_MOVED_PATHS=TRUE, version='?', version_name_format='v%s%s.metadata.json');
----
lineitem_iceberg/metadata/10eaca8a-1e1c-421e-ad6d-b232e5ee23d3-m1.avro 2 DATA ADDED EXISTING lineitem_iceberg/data/00041-414-f3c73457-bbd6-4b92-9c15-17b241171b16-00001.parquet PARQUET 51793
lineitem_iceberg/metadata/10eaca8a-1e1c-421e-ad6d-b232e5ee23d3-m0.avro 2 DATA DELETED EXISTING lineitem_iceberg/data/00000-411-0792dcfe-4e25-4ca3-8ada-175286069a47-00001.parquet PARQUET 60175

query IIIIIIII
SELECT * FROM ICEBERG_METADATA('data/iceberg/lineitem_iceberg_gz_no_hint', ALLOW_MOVED_PATHS=TRUE, METADATA_COMPRESSION_CODEC='gzip', version='?');
----
lineitem_iceberg_gz/metadata/23f9dbea-1e7f-4694-a82c-dc3c9a94953e-m0.avro 0 DATA ADDED EXISTING lineitem_iceberg_gz/data/00000-2-371a340c-ded5-4e85-aa49-9c788d6f21cd-00001.parquet PARQUET 111968

query IIIIIIII
SELECT * FROM ICEBERG_METADATA('data/iceberg/lineitem_iceberg_gz_no_hint', ALLOW_MOVED_PATHS=TRUE, METADATA_COMPRESSION_CODEC='gzip');
----
lineitem_iceberg_gz/metadata/23f9dbea-1e7f-4694-a82c-dc3c9a94953e-m0.avro 0 DATA ADDED EXISTING lineitem_iceberg_gz/data/00000-2-371a340c-ded5-4e85-aa49-9c788d6f21cd-00001.parquet PARQUET 111968

21 changes: 10 additions & 11 deletions test/sql/iceberg_scan.test
Original file line number Diff line number Diff line change
Expand Up @@ -81,32 +81,31 @@ SELECT count(*) FROM ICEBERG_SCAN('data/iceberg/lineitem_iceberg_gz', ALLOW_MOVE
111968

query I
SELECT count(*) FROM ICEBERG_SCAN('data/iceberg/lineitem_iceberg', ALLOW_MOVED_PATHS=TRUE);
SELECT count(*) FROM ICEBERG_SCAN('data/iceberg/lineitem_iceberg_no_hint', '2023-02-15 15:07:54.504'::TIMESTAMP, ALLOW_MOVED_PATHS=TRUE);
----
111968
60175

query I
SELECT count(*) FROM ICEBERG_SCAN('data/iceberg/lineitem_iceberg', version="?", ALLOW_MOVED_PATHS=TRUE);
SELECT count(*) FROM ICEBERG_SCAN('data/iceberg/lineitem_iceberg_no_hint', '2023-02-15 15:07:54.729'::TIMESTAMP, ALLOW_MOVED_PATHS=TRUE);
----
111968
60175

query I
SELECT count(*) FROM ICEBERG_SCAN('data/iceberg/lineitem_iceberg_gz', ALLOW_MOVED_PATHS=TRUE, metadata_compression_codec="gzip");
SELECT count(*) FROM ICEBERG_SCAN('data/iceberg/lineitem_iceberg_no_hint', '2023-02-15 15:08:14.73'::TIMESTAMP, ALLOW_MOVED_PATHS=TRUE);
----
111968
51793

statement error
SELECT count(*) FROM ICEBERG_SCAN('data/iceberg/lineitem_iceberg_gz', ALLOW_MOVED_PATHS=TRUE, METADATA_COMPRESSION_CODEC="gzip", version="?", vesion_name_format="version%s%s.metadata.json");
FROM ICEBERG_SCAN('data/iceberg/lineitem_iceberg_no_hint', '2023-02-15 15:07:54.503'::TIMESTAMP, ALLOW_MOVED_PATHS=TRUE);
----
IO Error: No snapshots found
IO Error: Could not find latest snapshots for timestamp 2023-02-15 15:07:54.503

query I
SELECT count(*) FROM ICEBERG_SCAN('data/iceberg/lineitem_iceberg_no_hint', ALLOW_MOVED_PATHS=TRUE);
SELECT count(*) FROM ICEBERG_SCAN('data/iceberg/lineitem_iceberg_gz_no_hint', ALLOW_MOVED_PATHS=TRUE, METADATA_COMPRESSION_CODEC="gzip");
----
111968

query I
SELECT count(*) FROM ICEBERG_SCAN('data/iceberg/lineitem_iceberg_gz_no_hint', ALLOW_MOVED_PATHS=TRUE, metadata_compression_codec="gzip");
SELECT count(*) FROM ICEBERG_SCAN('data/iceberg/lineitem_iceberg_gz_no_hint', ALLOW_MOVED_PATHS=TRUE, METADATA_COMPRESSION_CODEC="gzip", version='2', version_name_format='v%s%s.metadata.json');
----
111968

25 changes: 24 additions & 1 deletion test/sql/iceberg_snapshots.test
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ SELECT * FROM ICEBERG_SNAPSHOTS('data/iceberg/lineitem_iceberg', version='1');
statement error
SELECT * FROM ICEBERG_SNAPSHOTS('data/iceberg/lineitem_iceberg_nonexistent');
----
IO Error: Could not guess Iceberg table version using 'none' compression and format(s): 'v%s%s.metadata.json,%s%s.metadata.json'
IO Error: Cannot open "data/iceberg/lineitem_iceberg_nonexistent": Metadata directory does not exist

statement error
SELECT * FROM ICEBERG_SNAPSHOTS('data/iceberg/lineitem_iceberg_gz');
Expand All @@ -57,3 +57,26 @@ query IIII
SELECT * FROM ICEBERG_SNAPSHOTS('data/iceberg/lineitem_iceberg_gz', metadata_compression_codec="gzip", version='2');
----
0 4468019210336628573 2024-03-13 18:38:58.602 lineitem_iceberg_gz/metadata/snap-4468019210336628573-1-23f9dbea-1e7f-4694-a82c-dc3c9a94953e.avro

query IIII
SELECT * FROM ICEBERG_SNAPSHOTS('data/iceberg/lineitem_iceberg_no_hint');
----
1 3776207205136740581 2023-02-15 15:07:54.504 lineitem_iceberg/metadata/snap-3776207205136740581-1-cf3d0be5-cf70-453d-ad8f-48fdc412e608.avro
2 7635660646343998149 2023-02-15 15:08:14.73 lineitem_iceberg/metadata/snap-7635660646343998149-1-10eaca8a-1e1c-421e-ad6d-b232e5ee23d3.avro

query IIII
SELECT * FROM ICEBERG_SNAPSHOTS('data/iceberg/lineitem_iceberg_no_hint', version='1');
----
1 3776207205136740581 2023-02-15 15:07:54.504 lineitem_iceberg/metadata/snap-3776207205136740581-1-cf3d0be5-cf70-453d-ad8f-48fdc412e608.avro

query IIII
SELECT * FROM ICEBERG_SNAPSHOTS('data/iceberg/lineitem_iceberg_no_hint', version="?");
----
1 3776207205136740581 2023-02-15 15:07:54.504 lineitem_iceberg/metadata/snap-3776207205136740581-1-cf3d0be5-cf70-453d-ad8f-48fdc412e608.avro
2 7635660646343998149 2023-02-15 15:08:14.73 lineitem_iceberg/metadata/snap-7635660646343998149-1-10eaca8a-1e1c-421e-ad6d-b232e5ee23d3.avro

query IIII
SELECT * FROM ICEBERG_SNAPSHOTS('data/iceberg/lineitem_iceberg_gz_no_hint', metadata_compression_codec="gzip");
----
0 4468019210336628573 2024-03-13 18:38:58.602 lineitem_iceberg_gz/metadata/snap-4468019210336628573-1-23f9dbea-1e7f-4694-a82c-dc3c9a94953e.avro

0 comments on commit 15d0cc3

Please sign in to comment.