Skip to content

Commit

Permalink
Merge pull request #42 from devendrasr/feature/read_gz_compressed_met…
Browse files Browse the repository at this point in the history
…adata

Support to read gzip compressed metadata files
  • Loading branch information
samansmink authored Mar 18, 2024
2 parents 6eef909 + f266c4a commit f7f1a35
Show file tree
Hide file tree
Showing 23 changed files with 127 additions and 45 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
build
.vscode
.idea
cmake-build-debug
duckdb_unittest_tempdir/
Expand All @@ -10,4 +11,4 @@ test/sql/tmp.test
data/iceberg/generated_*
scripts/metastore_db/
scripts/derby.log
scripts/test-script-with-path.sql
scripts/test-script-with-path.sql
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
2
44 changes: 27 additions & 17 deletions src/common/iceberg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
namespace duckdb {

IcebergTable IcebergTable::Load(const string &iceberg_path, IcebergSnapshot &snapshot, FileSystem &fs,
bool allow_moved_paths) {
bool allow_moved_paths, string metadata_compression_codec) {
IcebergTable ret;
ret.path = iceberg_path;
ret.snapshot = snapshot;
Expand Down Expand Up @@ -118,8 +118,8 @@ unique_ptr<SnapshotParseInfo> IcebergSnapshot::GetParseInfo(yyjson_doc &metadata
return make_uniq<SnapshotParseInfo>(std::move(info));
}

unique_ptr<SnapshotParseInfo> IcebergSnapshot::GetParseInfo(const string &path, FileSystem &fs) {
auto metadata_json = ReadMetaData(path, fs);
unique_ptr<SnapshotParseInfo> IcebergSnapshot::GetParseInfo(const string &path, FileSystem &fs, string metadata_compression_codec) {
auto metadata_json = ReadMetaData(path, fs, metadata_compression_codec);
auto doc = yyjson_read(metadata_json.c_str(), metadata_json.size(), 0);
auto parse_info = GetParseInfo(*doc);

Expand All @@ -130,61 +130,71 @@ unique_ptr<SnapshotParseInfo> IcebergSnapshot::GetParseInfo(const string &path,
return std::move(parse_info);
}

IcebergSnapshot IcebergSnapshot::GetLatestSnapshot(const string &path, FileSystem &fs) {
auto info = GetParseInfo(path, fs);
IcebergSnapshot IcebergSnapshot::GetLatestSnapshot(const string &path, FileSystem &fs, string metadata_compression_codec) {
auto info = GetParseInfo(path, fs, metadata_compression_codec);
auto latest_snapshot = FindLatestSnapshotInternal(info->snapshots);

if (!latest_snapshot) {
throw IOException("No snapshots found");
}

return ParseSnapShot(latest_snapshot, info->iceberg_version, info->schema_id, info->schemas);
return ParseSnapShot(latest_snapshot, info->iceberg_version, info->schema_id, info->schemas, metadata_compression_codec);
}

IcebergSnapshot IcebergSnapshot::GetSnapshotById(const string &path, FileSystem &fs, idx_t snapshot_id) {
auto info = GetParseInfo(path, fs);
IcebergSnapshot IcebergSnapshot::GetSnapshotById(const string &path, FileSystem &fs, idx_t snapshot_id, string metadata_compression_codec) {
auto info = GetParseInfo(path, fs, metadata_compression_codec);
auto snapshot = FindSnapshotByIdInternal(info->snapshots, snapshot_id);

if (!snapshot) {
throw IOException("Could not find snapshot with id " + to_string(snapshot_id));
}

return ParseSnapShot(snapshot, info->iceberg_version, info->schema_id, info->schemas);
return ParseSnapShot(snapshot, info->iceberg_version, info->schema_id, info->schemas, metadata_compression_codec);
}

IcebergSnapshot IcebergSnapshot::GetSnapshotByTimestamp(const string &path, FileSystem &fs, timestamp_t timestamp) {
auto info = GetParseInfo(path, fs);
IcebergSnapshot IcebergSnapshot::GetSnapshotByTimestamp(const string &path, FileSystem &fs, timestamp_t timestamp, string metadata_compression_codec) {
auto info = GetParseInfo(path, fs, metadata_compression_codec);
auto snapshot = FindSnapshotByIdTimestampInternal(info->snapshots, timestamp);

if (!snapshot) {
throw IOException("Could not find latest snapshots for timestamp " + Timestamp::ToString(timestamp));
}

return ParseSnapShot(snapshot, info->iceberg_version, info->schema_id, info->schemas);
return ParseSnapShot(snapshot, info->iceberg_version, info->schema_id, info->schemas, metadata_compression_codec);
}

string IcebergSnapshot::ReadMetaData(const string &path, FileSystem &fs) {
string metadata_file_path;
// Function to generate a metadata file url
string GenerateMetaDataUrl(FileSystem &fs, const string &meta_path, const string &table_version, const string &metadata_compression_codec) {
if (metadata_compression_codec != "gzip") {
return fs.JoinPath(meta_path, "v" + table_version + ".metadata.json");
}
return fs.JoinPath(meta_path, "v" + table_version + ".gz.metadata.json");
}

string IcebergSnapshot::ReadMetaData(const string &path, FileSystem &fs, string metadata_compression_codec) {
string metadata_file_path;
if (StringUtil::EndsWith(path, ".json")) {
metadata_file_path = path;
} else {
auto table_version = GetTableVersion(path, fs);
auto meta_path = fs.JoinPath(path, "metadata");
metadata_file_path = fs.JoinPath(meta_path, "v" + table_version + ".metadata.json");
metadata_file_path = GenerateMetaDataUrl(fs, meta_path, table_version, metadata_compression_codec);
}

if (metadata_compression_codec == "gzip") {
return IcebergUtils::GzFileToString(metadata_file_path, fs);
}
return IcebergUtils::FileToString(metadata_file_path, fs);
}

IcebergSnapshot IcebergSnapshot::ParseSnapShot(yyjson_val *snapshot, idx_t iceberg_format_version, idx_t schema_id,
vector<yyjson_val *> &schemas) {
vector<yyjson_val *> &schemas, string metadata_compression_codec) {
IcebergSnapshot ret;
auto snapshot_tag = yyjson_get_tag(snapshot);
if (snapshot_tag != YYJSON_TYPE_OBJ) {
throw IOException("Invalid snapshot field found parsing iceberg metadata.json");
}

ret.metadata_compression_codec = metadata_compression_codec;
if (iceberg_format_version == 1) {
ret.sequence_number = 0;
} else if (iceberg_format_version == 2) {
Expand Down
10 changes: 10 additions & 0 deletions src/common/utils.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
#include "duckdb.hpp"
#include "iceberg_utils.hpp"
#include "zlib.h"
#include "fstream"
#include "duckdb/common/gzip_file_system.hpp"

namespace duckdb {

Expand All @@ -12,6 +15,13 @@ string IcebergUtils::FileToString(const string &path, FileSystem &fs) {
return ret_val;
}

// Function to decompress a gz file content string
string IcebergUtils::GzFileToString(const string &path, FileSystem &fs) {
// Initialize zlib variables
string gzipped_string = FileToString(path, fs);
return GZipFileSystem::UncompressGZIPString(gzipped_string);
}

string IcebergUtils::GetFullPath(const string &iceberg_path, const string &relative_file_path, FileSystem &fs) {
std::size_t found = relative_file_path.find("/metadata/");
if (found != string::npos) {
Expand Down
16 changes: 11 additions & 5 deletions src/iceberg_functions/iceberg_metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,30 +55,33 @@ static unique_ptr<FunctionData> IcebergMetaDataBind(ClientContext &context, Tabl
auto iceberg_path = input.inputs[0].ToString();

bool allow_moved_paths = false;

string metadata_compression_codec = "none";

for (auto &kv : input.named_parameters) {
auto loption = StringUtil::Lower(kv.first);
if (loption == "allow_moved_paths") {
allow_moved_paths = BooleanValue::Get(kv.second);
} else if (loption == "metadata_compression_codec") {
metadata_compression_codec = StringValue::Get(kv.second);
}
}

IcebergSnapshot snapshot_to_scan;
if (input.inputs.size() > 1) {
if (input.inputs[1].type() == LogicalType::UBIGINT) {
snapshot_to_scan = IcebergSnapshot::GetSnapshotById(iceberg_path, fs, input.inputs[1].GetValue<uint64_t>());
snapshot_to_scan = IcebergSnapshot::GetSnapshotById(iceberg_path, fs, input.inputs[1].GetValue<uint64_t>(), metadata_compression_codec);
} else if (input.inputs[1].type() == LogicalType::TIMESTAMP) {
snapshot_to_scan =
IcebergSnapshot::GetSnapshotByTimestamp(iceberg_path, fs, input.inputs[1].GetValue<timestamp_t>());
IcebergSnapshot::GetSnapshotByTimestamp(iceberg_path, fs, input.inputs[1].GetValue<timestamp_t>(), metadata_compression_codec);
} else {
throw InvalidInputException("Unknown argument type in IcebergScanBindReplace.");
}
} else {
snapshot_to_scan = IcebergSnapshot::GetLatestSnapshot(iceberg_path, fs);
snapshot_to_scan = IcebergSnapshot::GetLatestSnapshot(iceberg_path, fs, metadata_compression_codec);
}

ret->iceberg_table =
make_uniq<IcebergTable>(IcebergTable::Load(iceberg_path, snapshot_to_scan, fs, allow_moved_paths));
make_uniq<IcebergTable>(IcebergTable::Load(iceberg_path, snapshot_to_scan, fs, allow_moved_paths, metadata_compression_codec));

auto manifest_types = IcebergManifest::Types();
return_types.insert(return_types.end(), manifest_types.begin(), manifest_types.end());
Expand Down Expand Up @@ -139,16 +142,19 @@ TableFunctionSet IcebergFunctions::GetIcebergMetadataFunction() {
auto fun = TableFunction({LogicalType::VARCHAR}, IcebergMetaDataFunction, IcebergMetaDataBind,
IcebergMetaDataGlobalTableFunctionState::Init);
fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN;
fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR;
function_set.AddFunction(fun);

fun = TableFunction({LogicalType::VARCHAR, LogicalType::UBIGINT}, IcebergMetaDataFunction, IcebergMetaDataBind,
IcebergMetaDataGlobalTableFunctionState::Init);
fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN;
fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR;
function_set.AddFunction(fun);

fun = TableFunction({LogicalType::VARCHAR, LogicalType::TIMESTAMP}, IcebergMetaDataFunction, IcebergMetaDataBind,
IcebergMetaDataGlobalTableFunctionState::Init);
fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN;
fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR;
function_set.AddFunction(fun);

return function_set;
Expand Down
18 changes: 12 additions & 6 deletions src/iceberg_functions/iceberg_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ static Value GetParquetSchemaParam(vector<IcebergColumnDefinition> &schema) {

//! Build the Parquet Scan expression for the files we need to scan
static unique_ptr<TableRef> MakeScanExpression(vector<Value> &data_file_values, vector<Value> &delete_file_values,
vector<IcebergColumnDefinition> &schema, bool allow_moved_paths) {
vector<IcebergColumnDefinition> &schema, bool allow_moved_paths, string metadata_compression_codec) {
// No deletes, just return a TableFunctionRef for a parquet scan of the data files
if (delete_file_values.empty()) {
auto table_function_ref_data = make_uniq<TableFunctionRef>();
Expand Down Expand Up @@ -209,6 +209,7 @@ static unique_ptr<TableRef> IcebergScanBindReplace(ClientContext &context, Table
// performance
bool allow_moved_paths = false;
string mode = "default";
string metadata_compression_codec = "none";

for (auto &kv : input.named_parameters) {
auto loption = StringUtil::Lower(kv.first);
Expand All @@ -220,24 +221,26 @@ static unique_ptr<TableRef> IcebergScanBindReplace(ClientContext &context, Table
}
} else if (loption == "mode") {
mode = StringValue::Get(kv.second);
} else if (loption == "metadata_compression_codec") {
metadata_compression_codec = StringValue::Get(kv.second);
}
}

IcebergSnapshot snapshot_to_scan;
if (input.inputs.size() > 1) {
if (input.inputs[1].type() == LogicalType::UBIGINT) {
snapshot_to_scan = IcebergSnapshot::GetSnapshotById(iceberg_path, fs, input.inputs[1].GetValue<uint64_t>());
snapshot_to_scan = IcebergSnapshot::GetSnapshotById(iceberg_path, fs, input.inputs[1].GetValue<uint64_t>(), metadata_compression_codec);
} else if (input.inputs[1].type() == LogicalType::TIMESTAMP) {
snapshot_to_scan =
IcebergSnapshot::GetSnapshotByTimestamp(iceberg_path, fs, input.inputs[1].GetValue<timestamp_t>());
IcebergSnapshot::GetSnapshotByTimestamp(iceberg_path, fs, input.inputs[1].GetValue<timestamp_t>(), metadata_compression_codec);
} else {
throw InvalidInputException("Unknown argument type in IcebergScanBindReplace.");
}
} else {
snapshot_to_scan = IcebergSnapshot::GetLatestSnapshot(iceberg_path, fs);
snapshot_to_scan = IcebergSnapshot::GetLatestSnapshot(iceberg_path, fs, metadata_compression_codec);
}

IcebergTable iceberg_table = IcebergTable::Load(iceberg_path, snapshot_to_scan, fs, allow_moved_paths);
IcebergTable iceberg_table = IcebergTable::Load(iceberg_path, snapshot_to_scan, fs, allow_moved_paths, metadata_compression_codec);
auto data_files = iceberg_table.GetPaths<IcebergManifestContentType::DATA>();
auto delete_files = iceberg_table.GetPaths<IcebergManifestContentType::DELETE>();
vector<Value> data_file_values;
Expand All @@ -254,7 +257,7 @@ static unique_ptr<TableRef> IcebergScanBindReplace(ClientContext &context, Table
if (mode == "list_files") {
return MakeListFilesExpression(data_file_values, delete_file_values);
} else if (mode == "default") {
return MakeScanExpression(data_file_values, delete_file_values, snapshot_to_scan.schema, allow_moved_paths);
return MakeScanExpression(data_file_values, delete_file_values, snapshot_to_scan.schema, allow_moved_paths, metadata_compression_codec);
} else {
throw NotImplementedException("Unknown mode type for ICEBERG_SCAN bind : '" + mode + "'");
}
Expand All @@ -267,20 +270,23 @@ TableFunctionSet IcebergFunctions::GetIcebergScanFunction() {
fun.bind_replace = IcebergScanBindReplace;
fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN;
fun.named_parameters["mode"] = LogicalType::VARCHAR;
fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR;
function_set.AddFunction(fun);

fun = TableFunction({LogicalType::VARCHAR, LogicalType::UBIGINT}, nullptr, nullptr,
IcebergScanGlobalTableFunctionState::Init);
fun.bind_replace = IcebergScanBindReplace;
fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN;
fun.named_parameters["mode"] = LogicalType::VARCHAR;
fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR;
function_set.AddFunction(fun);

fun = TableFunction({LogicalType::VARCHAR, LogicalType::TIMESTAMP}, nullptr, nullptr,
IcebergScanGlobalTableFunctionState::Init);
fun.bind_replace = IcebergScanBindReplace;
fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN;
fun.named_parameters["mode"] = LogicalType::VARCHAR;
fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR;
function_set.AddFunction(fun);

return function_set;
Expand Down
26 changes: 21 additions & 5 deletions src/iceberg_functions/iceberg_snapshots.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ namespace duckdb {
struct IcebergSnaphotsBindData : public TableFunctionData {
IcebergSnaphotsBindData() {};
string filename;
string metadata_compression_codec;
};

struct IcebergSnapshotGlobalTableFunctionState : public GlobalTableFunctionState {
Expand All @@ -22,11 +23,12 @@ struct IcebergSnapshotGlobalTableFunctionState : public GlobalTableFunctionState
}
}
static unique_ptr<GlobalTableFunctionState> Init(ClientContext &context, TableFunctionInitInput &input) {

auto bind_data = input.bind_data->Cast<IcebergSnaphotsBindData>();
auto global_state = make_uniq<IcebergSnapshotGlobalTableFunctionState>();

FileSystem &fs = FileSystem::GetFileSystem(context);
global_state->metadata_file = IcebergSnapshot::ReadMetaData(bind_data.filename, fs);
global_state->metadata_file = IcebergSnapshot::ReadMetaData(bind_data.filename, fs, bind_data.metadata_compression_codec);
global_state->metadata_doc =
yyjson_read(global_state->metadata_file.c_str(), global_state->metadata_file.size(), 0);
auto root = yyjson_doc_get_root(global_state->metadata_doc);
Expand All @@ -45,8 +47,17 @@ struct IcebergSnapshotGlobalTableFunctionState : public GlobalTableFunctionState
static unique_ptr<FunctionData> IcebergSnapshotsBind(ClientContext &context, TableFunctionBindInput &input,
vector<LogicalType> &return_types, vector<string> &names) {
auto bind_data = make_uniq<IcebergSnaphotsBindData>();


string metadata_compression_codec = "none";

for (auto &kv : input.named_parameters) {
auto loption = StringUtil::Lower(kv.first);
if (loption == "metadata_compression_codec") {
metadata_compression_codec = StringValue::Get(kv.second);
}
}
bind_data->filename = input.inputs[0].ToString();
bind_data->metadata_compression_codec = metadata_compression_codec;

names.emplace_back("sequence_number");
return_types.emplace_back(LogicalType::UBIGINT);
Expand All @@ -63,10 +74,14 @@ static unique_ptr<FunctionData> IcebergSnapshotsBind(ClientContext &context, Tab
return std::move(bind_data);
}

static void IcebergSnapshotsFunction(ClientContext &context, TableFunctionInput &data,
vector<LogicalType> &return_types, vector<string> &names) {

}
// Snapshots function
static void IcebergSnapshotsFunction(ClientContext &context, TableFunctionInput &data, DataChunk &output) {
auto &global_state = data.global_state->Cast<IcebergSnapshotGlobalTableFunctionState>();

auto &bind_data = data.bind_data->Cast<IcebergSnaphotsBindData>();
idx_t i = 0;
while (auto next_snapshot = yyjson_arr_iter_next(&global_state.snapshot_it)) {
if (i >= STANDARD_VECTOR_SIZE) {
Expand All @@ -75,7 +90,7 @@ static void IcebergSnapshotsFunction(ClientContext &context, TableFunctionInput

auto parse_info = IcebergSnapshot::GetParseInfo(*global_state.metadata_doc);
auto snapshot = IcebergSnapshot::ParseSnapShot(next_snapshot, global_state.iceberg_format_version,
parse_info->schema_id, parse_info->schemas);
parse_info->schema_id, parse_info->schemas, bind_data.metadata_compression_codec);

FlatVector::GetData<int64_t>(output.data[0])[i] = snapshot.sequence_number;
FlatVector::GetData<int64_t>(output.data[1])[i] = snapshot.snapshot_id;
Expand All @@ -92,6 +107,7 @@ TableFunctionSet IcebergFunctions::GetIcebergSnapshotsFunction() {
TableFunctionSet function_set("iceberg_snapshots");
TableFunction table_function({LogicalType::VARCHAR}, IcebergSnapshotsFunction, IcebergSnapshotsBind,
IcebergSnapshotGlobalTableFunctionState::Init);
table_function.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR;
function_set.AddFunction(table_function);
return std::move(function_set);
}
Expand Down
Loading

0 comments on commit f7f1a35

Please sign in to comment.