Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support to read gzip compressed metadata files #42

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
build
.vscode
.idea
cmake-build-debug
duckdb_unittest_tempdir/
Expand All @@ -10,4 +11,4 @@ test/sql/tmp.test
data/iceberg/generated_*
scripts/metastore_db/
scripts/derby.log
scripts/test-script-with-path.sql
scripts/test-script-with-path.sql
52 changes: 31 additions & 21 deletions src/common/iceberg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
namespace duckdb {

IcebergTable IcebergTable::Load(const string &iceberg_path, IcebergSnapshot &snapshot, FileSystem &fs,
bool allow_moved_paths) {
bool allow_moved_paths, string metadata_compression_codec) {
IcebergTable ret;
ret.path = iceberg_path;
ret.snapshot = snapshot;
Expand Down Expand Up @@ -118,8 +118,8 @@ unique_ptr<SnapshotParseInfo> IcebergSnapshot::GetParseInfo(yyjson_doc &metadata
return make_uniq<SnapshotParseInfo>(std::move(info));
}

unique_ptr<SnapshotParseInfo> IcebergSnapshot::GetParseInfo(const string &path, FileSystem &fs) {
auto metadata_json = ReadMetaData(path, fs);
unique_ptr<SnapshotParseInfo> IcebergSnapshot::GetParseInfo(const string &path, FileSystem &fs, string metadata_compression_codec) {
auto metadata_json = ReadMetaData(path, fs, metadata_compression_codec);
auto doc = yyjson_read(metadata_json.c_str(), metadata_json.size(), 0);
auto parse_info = GetParseInfo(*doc);

Expand All @@ -130,61 +130,71 @@ unique_ptr<SnapshotParseInfo> IcebergSnapshot::GetParseInfo(const string &path,
return std::move(parse_info);
}

IcebergSnapshot IcebergSnapshot::GetLatestSnapshot(const string &path, FileSystem &fs) {
auto info = GetParseInfo(path, fs);
IcebergSnapshot IcebergSnapshot::GetLatestSnapshot(const string &path, FileSystem &fs, string metadata_compression_codec) {
auto info = GetParseInfo(path, fs, metadata_compression_codec);
auto latest_snapshot = FindLatestSnapshotInternal(info->snapshots);

if (!latest_snapshot) {
throw IOException("No snapshots found");
}

return ParseSnapShot(latest_snapshot, info->iceberg_version, info->schema_id, info->schemas);
return ParseSnapShot(latest_snapshot, info->iceberg_version, info->schema_id, info->schemas, metadata_compression_codec);
}

IcebergSnapshot IcebergSnapshot::GetSnapshotById(const string &path, FileSystem &fs, idx_t snapshot_id) {
auto info = GetParseInfo(path, fs);
IcebergSnapshot IcebergSnapshot::GetSnapshotById(const string &path, FileSystem &fs, idx_t snapshot_id, string metadata_compression_codec) {
auto info = GetParseInfo(path, fs, metadata_compression_codec);
auto snapshot = FindSnapshotByIdInternal(info->snapshots, snapshot_id);

if (!snapshot) {
throw IOException("Could not find snapshot with id " + to_string(snapshot_id));
}

return ParseSnapShot(snapshot, info->iceberg_version, info->schema_id, info->schemas);
return ParseSnapShot(snapshot, info->iceberg_version, info->schema_id, info->schemas, metadata_compression_codec);
}

IcebergSnapshot IcebergSnapshot::GetSnapshotByTimestamp(const string &path, FileSystem &fs, timestamp_t timestamp) {
auto info = GetParseInfo(path, fs);
IcebergSnapshot IcebergSnapshot::GetSnapshotByTimestamp(const string &path, FileSystem &fs, timestamp_t timestamp, string metadata_compression_codec) {
auto info = GetParseInfo(path, fs, metadata_compression_codec);
auto snapshot = FindSnapshotByIdTimestampInternal(info->snapshots, timestamp);

if (!snapshot) {
throw IOException("Could not find latest snapshots for timestamp " + Timestamp::ToString(timestamp));
}

return ParseSnapShot(snapshot, info->iceberg_version, info->schema_id, info->schemas);
return ParseSnapShot(snapshot, info->iceberg_version, info->schema_id, info->schemas, metadata_compression_codec);
}

string IcebergSnapshot::ReadMetaData(const string &path, FileSystem &fs) {
string IcebergSnapshot::ReadMetaData(const string &path, FileSystem &fs, string metadata_compression_codec) {
string metadata_file_path;

if (StringUtil::EndsWith(path, ".json")) {
metadata_file_path = path;
// check if metadata is gz compressed file?
devendrasr marked this conversation as resolved.
Show resolved Hide resolved
if (metadata_compression_codec == "gzip") {
return IcebergUtils::GzFileToString(metadata_file_path, fs);
}
return IcebergUtils::FileToString(metadata_file_path, fs);
}
auto table_version = GetTableVersion(path, fs);
auto meta_path = fs.JoinPath(path, "metadata");
metadata_file_path = fs.JoinPath(meta_path, "v" + table_version + ".metadata.json");
if (metadata_compression_codec == "gzip") {
// try with gz metadata file
metadata_file_path = fs.JoinPath(meta_path, "v" + table_version + ".gz.metadata.json");
// attempting to return file content as gz compressed json string.
return IcebergUtils::GzFileToString(metadata_file_path, fs);
} else {
auto table_version = GetTableVersion(path, fs);
auto meta_path = fs.JoinPath(path, "metadata");
metadata_file_path = fs.JoinPath(meta_path, "v" + table_version + ".metadata.json");
// attempting to return file content as json string.
return IcebergUtils::FileToString(metadata_file_path, fs);
}

return IcebergUtils::FileToString(metadata_file_path, fs);
}

IcebergSnapshot IcebergSnapshot::ParseSnapShot(yyjson_val *snapshot, idx_t iceberg_format_version, idx_t schema_id,
vector<yyjson_val *> &schemas) {
vector<yyjson_val *> &schemas, string metadata_compression_codec) {
IcebergSnapshot ret;
auto snapshot_tag = yyjson_get_tag(snapshot);
if (snapshot_tag != YYJSON_TYPE_OBJ) {
throw IOException("Invalid snapshot field found parsing iceberg metadata.json");
}

ret.metadata_compression_codec = metadata_compression_codec;
if (iceberg_format_version == 1) {
ret.sequence_number = 0;
} else if (iceberg_format_version == 2) {
Expand Down
37 changes: 37 additions & 0 deletions src/common/utils.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include "duckdb.hpp"
#include "iceberg_utils.hpp"
#include "zlib.h"
#include "fstream"

namespace duckdb {

Expand All @@ -12,6 +14,41 @@ string IcebergUtils::FileToString(const string &path, FileSystem &fs) {
return ret_val;
}

// Function to decompress a gz file content string
string IcebergUtils::GzFileToString(const string &path, FileSystem &fs) {
devendrasr marked this conversation as resolved.
Show resolved Hide resolved
// Initialize zlib variables
string gzipped_string = FileToString(path, fs);
std::stringstream decompressed;
int CHUNK_SIZE = 16384;
z_stream zs;
zs.zalloc = Z_NULL;
zs.zfree = Z_NULL;
zs.opaque = Z_NULL;
zs.avail_in = gzipped_string.size();
zs.next_in = (Bytef *)gzipped_string.data();
int ret = inflateInit2(&zs, 16 + MAX_WBITS); // MAX_WBITS + 16 to enable gzip decoding
if (ret != Z_OK)
{
throw std::runtime_error("inflateInit failed");
}
do
{
char out[CHUNK_SIZE];
zs.avail_out = CHUNK_SIZE;
zs.next_out = (Bytef *)out;
ret = inflate(&zs, Z_NO_FLUSH);
if (ret < 0)
{
inflateEnd(&zs);
throw std::runtime_error("inflate failed with error code " + to_string(ret));
}
decompressed.write(out, CHUNK_SIZE - zs.avail_out);
} while (zs.avail_out == 0);
inflateEnd(&zs);
string ds = decompressed.str();
return ds;
}

string IcebergUtils::GetFullPath(const string &iceberg_path, const string &relative_file_path, FileSystem &fs) {
std::size_t found = relative_file_path.find("/metadata/");
if (found != string::npos) {
Expand Down
16 changes: 11 additions & 5 deletions src/iceberg_functions/iceberg_metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,30 +55,33 @@ static unique_ptr<FunctionData> IcebergMetaDataBind(ClientContext &context, Tabl
auto iceberg_path = input.inputs[0].ToString();

bool allow_moved_paths = false;

string metadata_compression_codec = "none";

for (auto &kv : input.named_parameters) {
auto loption = StringUtil::Lower(kv.first);
if (loption == "allow_moved_paths") {
allow_moved_paths = BooleanValue::Get(kv.second);
} else if (loption == "metadata_compression_codec") {
metadata_compression_codec = StringValue::Get(kv.second);
}
}

IcebergSnapshot snapshot_to_scan;
if (input.inputs.size() > 1) {
if (input.inputs[1].type() == LogicalType::UBIGINT) {
snapshot_to_scan = IcebergSnapshot::GetSnapshotById(iceberg_path, fs, input.inputs[1].GetValue<uint64_t>());
snapshot_to_scan = IcebergSnapshot::GetSnapshotById(iceberg_path, fs, input.inputs[1].GetValue<uint64_t>(), metadata_compression_codec);
} else if (input.inputs[1].type() == LogicalType::TIMESTAMP) {
snapshot_to_scan =
IcebergSnapshot::GetSnapshotByTimestamp(iceberg_path, fs, input.inputs[1].GetValue<timestamp_t>());
IcebergSnapshot::GetSnapshotByTimestamp(iceberg_path, fs, input.inputs[1].GetValue<timestamp_t>(), metadata_compression_codec);
} else {
throw InvalidInputException("Unknown argument type in IcebergScanBindReplace.");
}
} else {
snapshot_to_scan = IcebergSnapshot::GetLatestSnapshot(iceberg_path, fs);
snapshot_to_scan = IcebergSnapshot::GetLatestSnapshot(iceberg_path, fs, metadata_compression_codec);
}

ret->iceberg_table =
make_uniq<IcebergTable>(IcebergTable::Load(iceberg_path, snapshot_to_scan, fs, allow_moved_paths));
make_uniq<IcebergTable>(IcebergTable::Load(iceberg_path, snapshot_to_scan, fs, allow_moved_paths, metadata_compression_codec));

auto manifest_types = IcebergManifest::Types();
return_types.insert(return_types.end(), manifest_types.begin(), manifest_types.end());
Expand Down Expand Up @@ -139,16 +142,19 @@ TableFunctionSet IcebergFunctions::GetIcebergMetadataFunction() {
auto fun = TableFunction({LogicalType::VARCHAR}, IcebergMetaDataFunction, IcebergMetaDataBind,
IcebergMetaDataGlobalTableFunctionState::Init);
fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN;
fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR;
function_set.AddFunction(fun);

fun = TableFunction({LogicalType::VARCHAR, LogicalType::UBIGINT}, IcebergMetaDataFunction, IcebergMetaDataBind,
IcebergMetaDataGlobalTableFunctionState::Init);
fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN;
fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR;
function_set.AddFunction(fun);

fun = TableFunction({LogicalType::VARCHAR, LogicalType::TIMESTAMP}, IcebergMetaDataFunction, IcebergMetaDataBind,
IcebergMetaDataGlobalTableFunctionState::Init);
fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN;
fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR;
function_set.AddFunction(fun);

return function_set;
Expand Down
18 changes: 12 additions & 6 deletions src/iceberg_functions/iceberg_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ static Value GetParquetSchemaParam(vector<IcebergColumnDefinition> &schema) {

//! Build the Parquet Scan expression for the files we need to scan
static unique_ptr<TableRef> MakeScanExpression(vector<Value> &data_file_values, vector<Value> &delete_file_values,
vector<IcebergColumnDefinition> &schema, bool allow_moved_paths) {
vector<IcebergColumnDefinition> &schema, bool allow_moved_paths, string metadata_compression_codec) {
// No deletes, just return a TableFunctionRef for a parquet scan of the data files
if (delete_file_values.empty()) {
auto table_function_ref_data = make_uniq<TableFunctionRef>();
Expand Down Expand Up @@ -209,6 +209,7 @@ static unique_ptr<TableRef> IcebergScanBindReplace(ClientContext &context, Table
// performance
bool allow_moved_paths = false;
string mode = "default";
string metadata_compression_codec = "none";

for (auto &kv : input.named_parameters) {
auto loption = StringUtil::Lower(kv.first);
Expand All @@ -220,24 +221,26 @@ static unique_ptr<TableRef> IcebergScanBindReplace(ClientContext &context, Table
}
} else if (loption == "mode") {
mode = StringValue::Get(kv.second);
} else if (loption == "metadata_compression_codec") {
metadata_compression_codec = StringValue::Get(kv.second);
}
}

IcebergSnapshot snapshot_to_scan;
if (input.inputs.size() > 1) {
if (input.inputs[1].type() == LogicalType::UBIGINT) {
snapshot_to_scan = IcebergSnapshot::GetSnapshotById(iceberg_path, fs, input.inputs[1].GetValue<uint64_t>());
snapshot_to_scan = IcebergSnapshot::GetSnapshotById(iceberg_path, fs, input.inputs[1].GetValue<uint64_t>(), metadata_compression_codec);
} else if (input.inputs[1].type() == LogicalType::TIMESTAMP) {
snapshot_to_scan =
IcebergSnapshot::GetSnapshotByTimestamp(iceberg_path, fs, input.inputs[1].GetValue<timestamp_t>());
IcebergSnapshot::GetSnapshotByTimestamp(iceberg_path, fs, input.inputs[1].GetValue<timestamp_t>(), metadata_compression_codec);
} else {
throw InvalidInputException("Unknown argument type in IcebergScanBindReplace.");
}
} else {
snapshot_to_scan = IcebergSnapshot::GetLatestSnapshot(iceberg_path, fs);
snapshot_to_scan = IcebergSnapshot::GetLatestSnapshot(iceberg_path, fs, metadata_compression_codec);
}

IcebergTable iceberg_table = IcebergTable::Load(iceberg_path, snapshot_to_scan, fs, allow_moved_paths);
IcebergTable iceberg_table = IcebergTable::Load(iceberg_path, snapshot_to_scan, fs, allow_moved_paths, metadata_compression_codec);
auto data_files = iceberg_table.GetPaths<IcebergManifestContentType::DATA>();
auto delete_files = iceberg_table.GetPaths<IcebergManifestContentType::DELETE>();
vector<Value> data_file_values;
Expand All @@ -254,7 +257,7 @@ static unique_ptr<TableRef> IcebergScanBindReplace(ClientContext &context, Table
if (mode == "list_files") {
return MakeListFilesExpression(data_file_values, delete_file_values);
} else if (mode == "default") {
return MakeScanExpression(data_file_values, delete_file_values, snapshot_to_scan.schema, allow_moved_paths);
return MakeScanExpression(data_file_values, delete_file_values, snapshot_to_scan.schema, allow_moved_paths, metadata_compression_codec);
} else {
throw NotImplementedException("Unknown mode type for ICEBERG_SCAN bind : '" + mode + "'");
}
Expand All @@ -267,20 +270,23 @@ TableFunctionSet IcebergFunctions::GetIcebergScanFunction() {
fun.bind_replace = IcebergScanBindReplace;
fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN;
fun.named_parameters["mode"] = LogicalType::VARCHAR;
fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR;
function_set.AddFunction(fun);

fun = TableFunction({LogicalType::VARCHAR, LogicalType::UBIGINT}, nullptr, nullptr,
IcebergScanGlobalTableFunctionState::Init);
fun.bind_replace = IcebergScanBindReplace;
fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN;
fun.named_parameters["mode"] = LogicalType::VARCHAR;
fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR;
function_set.AddFunction(fun);

fun = TableFunction({LogicalType::VARCHAR, LogicalType::TIMESTAMP}, nullptr, nullptr,
IcebergScanGlobalTableFunctionState::Init);
fun.bind_replace = IcebergScanBindReplace;
fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN;
fun.named_parameters["mode"] = LogicalType::VARCHAR;
fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR;
function_set.AddFunction(fun);

return function_set;
Expand Down
26 changes: 21 additions & 5 deletions src/iceberg_functions/iceberg_snapshots.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ namespace duckdb {
struct IcebergSnaphotsBindData : public TableFunctionData {
IcebergSnaphotsBindData() {};
string filename;
string metadata_compression_codec;
};

struct IcebergSnapshotGlobalTableFunctionState : public GlobalTableFunctionState {
Expand All @@ -22,11 +23,12 @@ struct IcebergSnapshotGlobalTableFunctionState : public GlobalTableFunctionState
}
}
static unique_ptr<GlobalTableFunctionState> Init(ClientContext &context, TableFunctionInitInput &input) {

auto bind_data = input.bind_data->Cast<IcebergSnaphotsBindData>();
auto global_state = make_uniq<IcebergSnapshotGlobalTableFunctionState>();

FileSystem &fs = FileSystem::GetFileSystem(context);
global_state->metadata_file = IcebergSnapshot::ReadMetaData(bind_data.filename, fs);
global_state->metadata_file = IcebergSnapshot::ReadMetaData(bind_data.filename, fs, bind_data.metadata_compression_codec);
global_state->metadata_doc =
yyjson_read(global_state->metadata_file.c_str(), global_state->metadata_file.size(), 0);
auto root = yyjson_doc_get_root(global_state->metadata_doc);
Expand All @@ -45,8 +47,17 @@ struct IcebergSnapshotGlobalTableFunctionState : public GlobalTableFunctionState
static unique_ptr<FunctionData> IcebergSnapshotsBind(ClientContext &context, TableFunctionBindInput &input,
vector<LogicalType> &return_types, vector<string> &names) {
auto bind_data = make_uniq<IcebergSnaphotsBindData>();


string metadata_compression_codec = "none";

for (auto &kv : input.named_parameters) {
auto loption = StringUtil::Lower(kv.first);
if (loption == "metadata_compression_codec") {
metadata_compression_codec = StringValue::Get(kv.second);
}
}
bind_data->filename = input.inputs[0].ToString();
bind_data->metadata_compression_codec = metadata_compression_codec;

names.emplace_back("sequence_number");
return_types.emplace_back(LogicalType::UBIGINT);
Expand All @@ -63,10 +74,14 @@ static unique_ptr<FunctionData> IcebergSnapshotsBind(ClientContext &context, Tab
return std::move(bind_data);
}

static void IcebergSnapshotsFunction(ClientContext &context, TableFunctionInput &data,
vector<LogicalType> &return_types, vector<string> &names) {

}
// Snapshots function
static void IcebergSnapshotsFunction(ClientContext &context, TableFunctionInput &data, DataChunk &output) {
auto &global_state = data.global_state->Cast<IcebergSnapshotGlobalTableFunctionState>();

auto &bind_data = data.bind_data->Cast<IcebergSnaphotsBindData>();
idx_t i = 0;
while (auto next_snapshot = yyjson_arr_iter_next(&global_state.snapshot_it)) {
if (i >= STANDARD_VECTOR_SIZE) {
Expand All @@ -75,7 +90,7 @@ static void IcebergSnapshotsFunction(ClientContext &context, TableFunctionInput

auto parse_info = IcebergSnapshot::GetParseInfo(*global_state.metadata_doc);
auto snapshot = IcebergSnapshot::ParseSnapShot(next_snapshot, global_state.iceberg_format_version,
parse_info->schema_id, parse_info->schemas);
parse_info->schema_id, parse_info->schemas, bind_data.metadata_compression_codec);

FlatVector::GetData<int64_t>(output.data[0])[i] = snapshot.sequence_number;
FlatVector::GetData<int64_t>(output.data[1])[i] = snapshot.snapshot_id;
Expand All @@ -92,6 +107,7 @@ TableFunctionSet IcebergFunctions::GetIcebergSnapshotsFunction() {
TableFunctionSet function_set("iceberg_snapshots");
TableFunction table_function({LogicalType::VARCHAR}, IcebergSnapshotsFunction, IcebergSnapshotsBind,
IcebergSnapshotGlobalTableFunctionState::Init);
table_function.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR;
function_set.AddFunction(table_function);
return std::move(function_set);
}
Expand Down
Loading