Skip to content

Commit

Permalink
Merge pull request duckdb#42 from devendrasr/feature/read_gz_compress…
Browse files Browse the repository at this point in the history
…ed_metadata

Support to read gzip compressed metadata files
  • Loading branch information
samansmink authored Mar 18, 2024
2 parents c324227 + 4b4a5c6 commit 7ba8531
Show file tree
Hide file tree
Showing 11 changed files with 126 additions and 45 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
build
.vscode
.idea
cmake-build-debug
duckdb_unittest_tempdir/
Expand All @@ -10,4 +11,4 @@ test/sql/tmp.test
data/iceberg/generated_*
scripts/metastore_db/
scripts/derby.log
scripts/test-script-with-path.sql
scripts/test-script-with-path.sql
44 changes: 27 additions & 17 deletions src/common/iceberg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
namespace duckdb {

IcebergTable IcebergTable::Load(const string &iceberg_path, IcebergSnapshot &snapshot, FileSystem &fs,
bool allow_moved_paths) {
bool allow_moved_paths, string metadata_compression_codec) {
IcebergTable ret;
ret.path = iceberg_path;
ret.snapshot = snapshot;
Expand Down Expand Up @@ -118,8 +118,8 @@ unique_ptr<SnapshotParseInfo> IcebergSnapshot::GetParseInfo(yyjson_doc &metadata
return make_uniq<SnapshotParseInfo>(std::move(info));
}

unique_ptr<SnapshotParseInfo> IcebergSnapshot::GetParseInfo(const string &path, FileSystem &fs) {
auto metadata_json = ReadMetaData(path, fs);
unique_ptr<SnapshotParseInfo> IcebergSnapshot::GetParseInfo(const string &path, FileSystem &fs, string metadata_compression_codec) {
auto metadata_json = ReadMetaData(path, fs, metadata_compression_codec);
auto doc = yyjson_read(metadata_json.c_str(), metadata_json.size(), 0);
auto parse_info = GetParseInfo(*doc);

Expand All @@ -130,61 +130,71 @@ unique_ptr<SnapshotParseInfo> IcebergSnapshot::GetParseInfo(const string &path,
return std::move(parse_info);
}

IcebergSnapshot IcebergSnapshot::GetLatestSnapshot(const string &path, FileSystem &fs) {
auto info = GetParseInfo(path, fs);
IcebergSnapshot IcebergSnapshot::GetLatestSnapshot(const string &path, FileSystem &fs, string metadata_compression_codec) {
auto info = GetParseInfo(path, fs, metadata_compression_codec);
auto latest_snapshot = FindLatestSnapshotInternal(info->snapshots);

if (!latest_snapshot) {
throw IOException("No snapshots found");
}

return ParseSnapShot(latest_snapshot, info->iceberg_version, info->schema_id, info->schemas);
return ParseSnapShot(latest_snapshot, info->iceberg_version, info->schema_id, info->schemas, metadata_compression_codec);
}

IcebergSnapshot IcebergSnapshot::GetSnapshotById(const string &path, FileSystem &fs, idx_t snapshot_id) {
auto info = GetParseInfo(path, fs);
IcebergSnapshot IcebergSnapshot::GetSnapshotById(const string &path, FileSystem &fs, idx_t snapshot_id, string metadata_compression_codec) {
auto info = GetParseInfo(path, fs, metadata_compression_codec);
auto snapshot = FindSnapshotByIdInternal(info->snapshots, snapshot_id);

if (!snapshot) {
throw IOException("Could not find snapshot with id " + to_string(snapshot_id));
}

return ParseSnapShot(snapshot, info->iceberg_version, info->schema_id, info->schemas);
return ParseSnapShot(snapshot, info->iceberg_version, info->schema_id, info->schemas, metadata_compression_codec);
}

IcebergSnapshot IcebergSnapshot::GetSnapshotByTimestamp(const string &path, FileSystem &fs, timestamp_t timestamp) {
auto info = GetParseInfo(path, fs);
IcebergSnapshot IcebergSnapshot::GetSnapshotByTimestamp(const string &path, FileSystem &fs, timestamp_t timestamp, string metadata_compression_codec) {
auto info = GetParseInfo(path, fs, metadata_compression_codec);
auto snapshot = FindSnapshotByIdTimestampInternal(info->snapshots, timestamp);

if (!snapshot) {
throw IOException("Could not find latest snapshots for timestamp " + Timestamp::ToString(timestamp));
}

return ParseSnapShot(snapshot, info->iceberg_version, info->schema_id, info->schemas);
return ParseSnapShot(snapshot, info->iceberg_version, info->schema_id, info->schemas, metadata_compression_codec);
}

string IcebergSnapshot::ReadMetaData(const string &path, FileSystem &fs) {
string metadata_file_path;
// Function to generate a metadata file url
string GenerateMetaDataUrl(FileSystem &fs, const string &meta_path, const string &table_version, const string &metadata_compression_codec) {
if (metadata_compression_codec != "gzip") {
return fs.JoinPath(meta_path, "v" + table_version + ".metadata.json");
}
return fs.JoinPath(meta_path, "v" + table_version + ".gz.metadata.json");
}

string IcebergSnapshot::ReadMetaData(const string &path, FileSystem &fs, string metadata_compression_codec) {
string metadata_file_path;
if (StringUtil::EndsWith(path, ".json")) {
metadata_file_path = path;
} else {
auto table_version = GetTableVersion(path, fs);
auto meta_path = fs.JoinPath(path, "metadata");
metadata_file_path = fs.JoinPath(meta_path, "v" + table_version + ".metadata.json");
metadata_file_path = GenerateMetaDataUrl(fs, meta_path, table_version, metadata_compression_codec);
}

if (metadata_compression_codec == "gzip") {
return IcebergUtils::GzFileToString(metadata_file_path, fs);
}
return IcebergUtils::FileToString(metadata_file_path, fs);
}

IcebergSnapshot IcebergSnapshot::ParseSnapShot(yyjson_val *snapshot, idx_t iceberg_format_version, idx_t schema_id,
vector<yyjson_val *> &schemas) {
vector<yyjson_val *> &schemas, string metadata_compression_codec) {
IcebergSnapshot ret;
auto snapshot_tag = yyjson_get_tag(snapshot);
if (snapshot_tag != YYJSON_TYPE_OBJ) {
throw IOException("Invalid snapshot field found parsing iceberg metadata.json");
}

ret.metadata_compression_codec = metadata_compression_codec;
if (iceberg_format_version == 1) {
ret.sequence_number = 0;
} else if (iceberg_format_version == 2) {
Expand Down
10 changes: 10 additions & 0 deletions src/common/utils.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
#include "duckdb.hpp"
#include "iceberg_utils.hpp"
#include "zlib.h"
#include "fstream"
#include "duckdb/common/gzip_file_system.hpp"

namespace duckdb {

Expand All @@ -12,6 +15,13 @@ string IcebergUtils::FileToString(const string &path, FileSystem &fs) {
return ret_val;
}

// Function to decompress a gz file content string
string IcebergUtils::GzFileToString(const string &path, FileSystem &fs) {
// Initialize zlib variables
string gzipped_string = FileToString(path, fs);
return GZipFileSystem::UncompressGZIPString(gzipped_string);
}

string IcebergUtils::GetFullPath(const string &iceberg_path, const string &relative_file_path, FileSystem &fs) {
std::size_t found = relative_file_path.find("/metadata/");
if (found != string::npos) {
Expand Down
16 changes: 11 additions & 5 deletions src/iceberg_functions/iceberg_metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,30 +55,33 @@ static unique_ptr<FunctionData> IcebergMetaDataBind(ClientContext &context, Tabl
auto iceberg_path = input.inputs[0].ToString();

bool allow_moved_paths = false;

string metadata_compression_codec = "none";

for (auto &kv : input.named_parameters) {
auto loption = StringUtil::Lower(kv.first);
if (loption == "allow_moved_paths") {
allow_moved_paths = BooleanValue::Get(kv.second);
} else if (loption == "metadata_compression_codec") {
metadata_compression_codec = StringValue::Get(kv.second);
}
}

IcebergSnapshot snapshot_to_scan;
if (input.inputs.size() > 1) {
if (input.inputs[1].type() == LogicalType::UBIGINT) {
snapshot_to_scan = IcebergSnapshot::GetSnapshotById(iceberg_path, fs, input.inputs[1].GetValue<uint64_t>());
snapshot_to_scan = IcebergSnapshot::GetSnapshotById(iceberg_path, fs, input.inputs[1].GetValue<uint64_t>(), metadata_compression_codec);
} else if (input.inputs[1].type() == LogicalType::TIMESTAMP) {
snapshot_to_scan =
IcebergSnapshot::GetSnapshotByTimestamp(iceberg_path, fs, input.inputs[1].GetValue<timestamp_t>());
IcebergSnapshot::GetSnapshotByTimestamp(iceberg_path, fs, input.inputs[1].GetValue<timestamp_t>(), metadata_compression_codec);
} else {
throw InvalidInputException("Unknown argument type in IcebergScanBindReplace.");
}
} else {
snapshot_to_scan = IcebergSnapshot::GetLatestSnapshot(iceberg_path, fs);
snapshot_to_scan = IcebergSnapshot::GetLatestSnapshot(iceberg_path, fs, metadata_compression_codec);
}

ret->iceberg_table =
make_uniq<IcebergTable>(IcebergTable::Load(iceberg_path, snapshot_to_scan, fs, allow_moved_paths));
make_uniq<IcebergTable>(IcebergTable::Load(iceberg_path, snapshot_to_scan, fs, allow_moved_paths, metadata_compression_codec));

auto manifest_types = IcebergManifest::Types();
return_types.insert(return_types.end(), manifest_types.begin(), manifest_types.end());
Expand Down Expand Up @@ -139,16 +142,19 @@ TableFunctionSet IcebergFunctions::GetIcebergMetadataFunction() {
auto fun = TableFunction({LogicalType::VARCHAR}, IcebergMetaDataFunction, IcebergMetaDataBind,
IcebergMetaDataGlobalTableFunctionState::Init);
fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN;
fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR;
function_set.AddFunction(fun);

fun = TableFunction({LogicalType::VARCHAR, LogicalType::UBIGINT}, IcebergMetaDataFunction, IcebergMetaDataBind,
IcebergMetaDataGlobalTableFunctionState::Init);
fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN;
fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR;
function_set.AddFunction(fun);

fun = TableFunction({LogicalType::VARCHAR, LogicalType::TIMESTAMP}, IcebergMetaDataFunction, IcebergMetaDataBind,
IcebergMetaDataGlobalTableFunctionState::Init);
fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN;
fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR;
function_set.AddFunction(fun);

return function_set;
Expand Down
18 changes: 12 additions & 6 deletions src/iceberg_functions/iceberg_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ static Value GetParquetSchemaParam(vector<IcebergColumnDefinition> &schema) {

//! Build the Parquet Scan expression for the files we need to scan
static unique_ptr<TableRef> MakeScanExpression(vector<Value> &data_file_values, vector<Value> &delete_file_values,
vector<IcebergColumnDefinition> &schema, bool allow_moved_paths) {
vector<IcebergColumnDefinition> &schema, bool allow_moved_paths, string metadata_compression_codec) {
// No deletes, just return a TableFunctionRef for a parquet scan of the data files
if (delete_file_values.empty()) {
auto table_function_ref_data = make_uniq<TableFunctionRef>();
Expand Down Expand Up @@ -209,6 +209,7 @@ static unique_ptr<TableRef> IcebergScanBindReplace(ClientContext &context, Table
// performance
bool allow_moved_paths = false;
string mode = "default";
string metadata_compression_codec = "none";

for (auto &kv : input.named_parameters) {
auto loption = StringUtil::Lower(kv.first);
Expand All @@ -220,24 +221,26 @@ static unique_ptr<TableRef> IcebergScanBindReplace(ClientContext &context, Table
}
} else if (loption == "mode") {
mode = StringValue::Get(kv.second);
} else if (loption == "metadata_compression_codec") {
metadata_compression_codec = StringValue::Get(kv.second);
}
}

IcebergSnapshot snapshot_to_scan;
if (input.inputs.size() > 1) {
if (input.inputs[1].type() == LogicalType::UBIGINT) {
snapshot_to_scan = IcebergSnapshot::GetSnapshotById(iceberg_path, fs, input.inputs[1].GetValue<uint64_t>());
snapshot_to_scan = IcebergSnapshot::GetSnapshotById(iceberg_path, fs, input.inputs[1].GetValue<uint64_t>(), metadata_compression_codec);
} else if (input.inputs[1].type() == LogicalType::TIMESTAMP) {
snapshot_to_scan =
IcebergSnapshot::GetSnapshotByTimestamp(iceberg_path, fs, input.inputs[1].GetValue<timestamp_t>());
IcebergSnapshot::GetSnapshotByTimestamp(iceberg_path, fs, input.inputs[1].GetValue<timestamp_t>(), metadata_compression_codec);
} else {
throw InvalidInputException("Unknown argument type in IcebergScanBindReplace.");
}
} else {
snapshot_to_scan = IcebergSnapshot::GetLatestSnapshot(iceberg_path, fs);
snapshot_to_scan = IcebergSnapshot::GetLatestSnapshot(iceberg_path, fs, metadata_compression_codec);
}

IcebergTable iceberg_table = IcebergTable::Load(iceberg_path, snapshot_to_scan, fs, allow_moved_paths);
IcebergTable iceberg_table = IcebergTable::Load(iceberg_path, snapshot_to_scan, fs, allow_moved_paths, metadata_compression_codec);
auto data_files = iceberg_table.GetPaths<IcebergManifestContentType::DATA>();
auto delete_files = iceberg_table.GetPaths<IcebergManifestContentType::DELETE>();
vector<Value> data_file_values;
Expand All @@ -254,7 +257,7 @@ static unique_ptr<TableRef> IcebergScanBindReplace(ClientContext &context, Table
if (mode == "list_files") {
return MakeListFilesExpression(data_file_values, delete_file_values);
} else if (mode == "default") {
return MakeScanExpression(data_file_values, delete_file_values, snapshot_to_scan.schema, allow_moved_paths);
return MakeScanExpression(data_file_values, delete_file_values, snapshot_to_scan.schema, allow_moved_paths, metadata_compression_codec);
} else {
throw NotImplementedException("Unknown mode type for ICEBERG_SCAN bind : '" + mode + "'");
}
Expand All @@ -267,20 +270,23 @@ TableFunctionSet IcebergFunctions::GetIcebergScanFunction() {
fun.bind_replace = IcebergScanBindReplace;
fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN;
fun.named_parameters["mode"] = LogicalType::VARCHAR;
fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR;
function_set.AddFunction(fun);

fun = TableFunction({LogicalType::VARCHAR, LogicalType::UBIGINT}, nullptr, nullptr,
IcebergScanGlobalTableFunctionState::Init);
fun.bind_replace = IcebergScanBindReplace;
fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN;
fun.named_parameters["mode"] = LogicalType::VARCHAR;
fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR;
function_set.AddFunction(fun);

fun = TableFunction({LogicalType::VARCHAR, LogicalType::TIMESTAMP}, nullptr, nullptr,
IcebergScanGlobalTableFunctionState::Init);
fun.bind_replace = IcebergScanBindReplace;
fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN;
fun.named_parameters["mode"] = LogicalType::VARCHAR;
fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR;
function_set.AddFunction(fun);

return function_set;
Expand Down
26 changes: 21 additions & 5 deletions src/iceberg_functions/iceberg_snapshots.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ namespace duckdb {
struct IcebergSnaphotsBindData : public TableFunctionData {
IcebergSnaphotsBindData() {};
string filename;
string metadata_compression_codec;
};

struct IcebergSnapshotGlobalTableFunctionState : public GlobalTableFunctionState {
Expand All @@ -22,11 +23,12 @@ struct IcebergSnapshotGlobalTableFunctionState : public GlobalTableFunctionState
}
}
static unique_ptr<GlobalTableFunctionState> Init(ClientContext &context, TableFunctionInitInput &input) {

auto bind_data = input.bind_data->Cast<IcebergSnaphotsBindData>();
auto global_state = make_uniq<IcebergSnapshotGlobalTableFunctionState>();

FileSystem &fs = FileSystem::GetFileSystem(context);
global_state->metadata_file = IcebergSnapshot::ReadMetaData(bind_data.filename, fs);
global_state->metadata_file = IcebergSnapshot::ReadMetaData(bind_data.filename, fs, bind_data.metadata_compression_codec);
global_state->metadata_doc =
yyjson_read(global_state->metadata_file.c_str(), global_state->metadata_file.size(), 0);
auto root = yyjson_doc_get_root(global_state->metadata_doc);
Expand All @@ -45,8 +47,17 @@ struct IcebergSnapshotGlobalTableFunctionState : public GlobalTableFunctionState
static unique_ptr<FunctionData> IcebergSnapshotsBind(ClientContext &context, TableFunctionBindInput &input,
vector<LogicalType> &return_types, vector<string> &names) {
auto bind_data = make_uniq<IcebergSnaphotsBindData>();


string metadata_compression_codec = "none";

for (auto &kv : input.named_parameters) {
auto loption = StringUtil::Lower(kv.first);
if (loption == "metadata_compression_codec") {
metadata_compression_codec = StringValue::Get(kv.second);
}
}
bind_data->filename = input.inputs[0].ToString();
bind_data->metadata_compression_codec = metadata_compression_codec;

names.emplace_back("sequence_number");
return_types.emplace_back(LogicalType::UBIGINT);
Expand All @@ -63,10 +74,14 @@ static unique_ptr<FunctionData> IcebergSnapshotsBind(ClientContext &context, Tab
return std::move(bind_data);
}

static void IcebergSnapshotsFunction(ClientContext &context, TableFunctionInput &data,
vector<LogicalType> &return_types, vector<string> &names) {

}
// Snapshots function
static void IcebergSnapshotsFunction(ClientContext &context, TableFunctionInput &data, DataChunk &output) {
auto &global_state = data.global_state->Cast<IcebergSnapshotGlobalTableFunctionState>();

auto &bind_data = data.bind_data->Cast<IcebergSnaphotsBindData>();
idx_t i = 0;
while (auto next_snapshot = yyjson_arr_iter_next(&global_state.snapshot_it)) {
if (i >= STANDARD_VECTOR_SIZE) {
Expand All @@ -75,7 +90,7 @@ static void IcebergSnapshotsFunction(ClientContext &context, TableFunctionInput

auto parse_info = IcebergSnapshot::GetParseInfo(*global_state.metadata_doc);
auto snapshot = IcebergSnapshot::ParseSnapShot(next_snapshot, global_state.iceberg_format_version,
parse_info->schema_id, parse_info->schemas);
parse_info->schema_id, parse_info->schemas, bind_data.metadata_compression_codec);

FlatVector::GetData<int64_t>(output.data[0])[i] = snapshot.sequence_number;
FlatVector::GetData<int64_t>(output.data[1])[i] = snapshot.snapshot_id;
Expand All @@ -92,6 +107,7 @@ TableFunctionSet IcebergFunctions::GetIcebergSnapshotsFunction() {
TableFunctionSet function_set("iceberg_snapshots");
TableFunction table_function({LogicalType::VARCHAR}, IcebergSnapshotsFunction, IcebergSnapshotsBind,
IcebergSnapshotGlobalTableFunctionState::Init);
table_function.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR;
function_set.AddFunction(table_function);
return std::move(function_set);
}
Expand Down
Loading

0 comments on commit 7ba8531

Please sign in to comment.