Skip to content

Commit

Permalink
Merge pull request #30 from samansmink/parse_json_schema
Browse files Browse the repository at this point in the history
Parse schema from table metadata
  • Loading branch information
samansmink authored Nov 30, 2023
2 parents 56f9f56 + 6d4dbe8 commit e16988b
Show file tree
Hide file tree
Showing 28 changed files with 443 additions and 178 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/Linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ jobs:
- name: Setup vcpkg
uses: lukka/[email protected]
with:
vcpkgGitCommitId: 501db0f17ef6df184fcdbfbe0f87cde2313b6ab1
vcpkgGitCommitId: a42af01b72c28a8e1d7b48107b33e4f286a55ef6

# Build extension
- name: Build extension
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/MacOS.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ jobs:
- name: Setup vcpkg
uses: lukka/[email protected]
with:
vcpkgGitCommitId: 501db0f17ef6df184fcdbfbe0f87cde2313b6ab1
vcpkgGitCommitId: a42af01b72c28a8e1d7b48107b33e4f286a55ef6

- name: Build extension
shell: bash
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/Rest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ jobs:
- name: Setup vcpkg
uses: lukka/[email protected]
with:
vcpkgGitCommitId: 501db0f17ef6df184fcdbfbe0f87cde2313b6ab1
vcpkgGitCommitId: a42af01b72c28a8e1d7b48107b33e4f286a55ef6

- name: Build extension
env:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/Windows.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
- name: Setup vcpkg
uses: lukka/[email protected]
with:
vcpkgGitCommitId: 501db0f17ef6df184fcdbfbe0f87cde2313b6ab1
vcpkgGitCommitId: a42af01b72c28a8e1d7b48107b33e4f286a55ef6

- uses: actions/setup-python@v2
with:
Expand Down
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ set(EXTENSION_SOURCES
src/iceberg_extension.cpp
src/iceberg_functions.cpp
src/common/utils.cpp
src/common/schema.cpp
src/common/iceberg.cpp
src/iceberg_functions/iceberg_snapshots.cpp
src/iceberg_functions/iceberg_scan.cpp
Expand Down
11 changes: 8 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ all: release

MKFILE_PATH := $(abspath $(lastword $(MAKEFILE_LIST)))
PROJ_DIR := $(dir $(MKFILE_PATH))
DISABLE_SANITIZER_FLAG ?=

OSX_BUILD_UNIVERSAL_FLAG=
ifneq (${OSX_BUILD_ARCH}, "")
Expand All @@ -13,6 +14,10 @@ ifeq (${STATIC_LIBCPP}, 1)
STATIC_LIBCPP=-DSTATIC_LIBCPP=TRUE
endif

ifeq (${DISABLE_SANITIZER}, 1)
DISABLE_SANITIZER_FLAG=-DENABLE_SANITIZER=FALSE -DENABLE_UBSAN=0
endif

VCPKG_TOOLCHAIN_PATH?=
ifneq ("${VCPKG_TOOLCHAIN_PATH}", "")
TOOLCHAIN_FLAGS:=${TOOLCHAIN_FLAGS} -DVCPKG_MANIFEST_DIR='${PROJ_DIR}' -DVCPKG_BUILD=1 -DCMAKE_TOOLCHAIN_FILE='${VCPKG_TOOLCHAIN_PATH}'
Expand Down Expand Up @@ -45,17 +50,17 @@ clean:
# Main build
debug:
mkdir -p build/debug && \
cmake $(GENERATOR) $(FORCE_COLOR) $(EXTENSION_FLAGS) ${CLIENT_FLAGS} -DEXTENSION_STATIC_BUILD=1 -DCMAKE_BUILD_TYPE=Debug ${BUILD_FLAGS} -S ./duckdb/ -B build/debug && \
cmake $(GENERATOR) $(FORCE_COLOR) $(EXTENSION_FLAGS) ${CLIENT_FLAGS} ${DISABLE_SANITIZER_FLAG} -DEXTENSION_STATIC_BUILD=1 -DCMAKE_BUILD_TYPE=Debug ${BUILD_FLAGS} -S ./duckdb/ -B build/debug && \
cmake --build build/debug --config Debug

release:
mkdir -p build/release && \
cmake $(GENERATOR) $(FORCE_COLOR) $(EXTENSION_FLAGS) ${CLIENT_FLAGS} -DEXTENSION_STATIC_BUILD=1 -DCMAKE_BUILD_TYPE=Release ${BUILD_FLAGS} -S ./duckdb/ -B build/release && \
cmake $(GENERATOR) $(FORCE_COLOR) $(EXTENSION_FLAGS) ${CLIENT_FLAGS} ${DISABLE_SANITIZER_FLAG} -DEXTENSION_STATIC_BUILD=1 -DCMAKE_BUILD_TYPE=Release ${BUILD_FLAGS} -S ./duckdb/ -B build/release && \
cmake --build build/release --config Release

reldebug:
mkdir -p build/release && \
cmake $(GENERATOR) $(FORCE_COLOR) $(EXTENSION_FLAGS) ${CLIENT_FLAGS} -DEXTENSION_STATIC_BUILD=1 -DCMAKE_BUILD_TYPE=RelWithDebInfo ${BUILD_FLAGS} -S ./duckdb/ -B build/reldebug && \
cmake $(GENERATOR) $(FORCE_COLOR) $(EXTENSION_FLAGS) ${CLIENT_FLAGS} ${DISABLE_SANITIZER_FLAG} -DEXTENSION_STATIC_BUILD=1 -DCMAKE_BUILD_TYPE=RelWithDebInfo ${BUILD_FLAGS} -S ./duckdb/ -B build/reldebug && \
cmake --build build/release --config RelWithDebInfo

# Client build
Expand Down
2 changes: 1 addition & 1 deletion duckdb
Submodule duckdb updated 1210 files
3 changes: 0 additions & 3 deletions scripts/start-rest-catalog.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@ python3 provision.py
UNPARTITIONED_TABLE_PATH=$(curl -s http://127.0.0.1:8181/v1/namespaces/default/tables/table_unpartitioned | jq -r '."metadata-location"')

SQL=$(cat <<-END
INSTALL iceberg;
LOAD iceberg;
SET s3_access_key_id='admin';
SET s3_secret_access_key='password';
SET s3_endpoint='127.0.0.1:9000';
Expand Down
10 changes: 2 additions & 8 deletions scripts/test_data_generator/generate_base_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,7 @@
l_commitdate::TIMESTAMPTZ as l_commitdate_timestamp_tz,
l_comment as l_comment_string,
gen_random_uuid()::VARCHAR as uuid,
l_comment::BLOB as l_comment_blob,
{'a': l_shipmode, 'b': l_quantity} as l_shipmode_quantity_struct,
[l_linenumber, l_quantity] as l_linenumber_quantity_list,
map(['linenumber', 'quantity'], [l_linenumber, l_quantity]) as l_linenumber_quantity_map
l_comment::BLOB as l_comment_blob
FROM
lineitem;""");
elif (MODE.lower() == "default"):
Expand All @@ -70,10 +67,7 @@
l_commitdate::TIMESTAMPTZ as l_commitdate_timestamp_tz,
l_comment as l_comment_string,
gen_random_uuid()::UUID as uuid,
l_comment::BLOB as l_comment_blob,
{'a': l_shipmode, 'b': l_quantity} as l_shipmode_quantity_struct,
[l_linenumber, l_quantity] as l_linenumber_quantity_list,
map(['linenumber', 'quantity'], [l_linenumber, l_quantity]) as l_linenumber_quantity_map
l_comment::BLOB as l_comment_blob
FROM
lineitem;""");
else:
Expand Down
5 changes: 1 addition & 4 deletions scripts/test_data_generator/updates_v1/q01.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,5 @@ set l_orderkey_bool=NULL,
l_commitdate_timestamp=NULL,
l_commitdate_timestamp_tz=NULL,
l_comment_string=NULL,
l_comment_blob=NULL,
l_shipmode_quantity_struct=NULL,
l_linenumber_quantity_list=NULL,
l_linenumber_quantity_map=NULL
l_comment_blob=NULL
where l_partkey_int % 2 = 0;
2 changes: 2 additions & 0 deletions scripts/test_data_generator/updates_v1/q06.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE iceberg_catalog.pyspark_iceberg_table
ADD COLUMN schema_evol_added_col_1 INT DEFAULT 42;
3 changes: 3 additions & 0 deletions scripts/test_data_generator/updates_v1/q07.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
UPDATE iceberg_catalog.pyspark_iceberg_table
SET schema_evol_added_col_1 = l_partkey_int
WHERE l_partkey_int % 5 = 0;
2 changes: 2 additions & 0 deletions scripts/test_data_generator/updates_v1/q08.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE iceberg_catalog.pyspark_iceberg_table
ALTER COLUMN schema_evol_added_col_1 TYPE BIGINT;
5 changes: 1 addition & 4 deletions scripts/test_data_generator/updates_v2/q01.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,5 @@ set l_orderkey_bool=NULL,
l_commitdate_timestamp=NULL,
l_commitdate_timestamp_tz=NULL,
l_comment_string=NULL,
l_comment_blob=NULL,
l_shipmode_quantity_struct=NULL,
l_linenumber_quantity_list=NULL,
l_linenumber_quantity_map=NULL
l_comment_blob=NULL
where l_partkey_int % 2 = 0;
2 changes: 2 additions & 0 deletions scripts/test_data_generator/updates_v2/q06.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE iceberg_catalog.pyspark_iceberg_table
ADD COLUMN schema_evol_added_col_1 INT DEFAULT 42;
3 changes: 3 additions & 0 deletions scripts/test_data_generator/updates_v2/q07.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
UPDATE iceberg_catalog.pyspark_iceberg_table
SET schema_evol_added_col_1 = l_partkey_int
WHERE l_partkey_int % 5 = 0;
2 changes: 2 additions & 0 deletions scripts/test_data_generator/updates_v2/q08.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE iceberg_catalog.pyspark_iceberg_table
ALTER COLUMN schema_evol_added_col_1 TYPE BIGINT;
87 changes: 60 additions & 27 deletions src/common/iceberg.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "duckdb.hpp"
#include "iceberg_metadata.hpp"
#include "iceberg_utils.hpp"
#include "iceberg_types.hpp"

#include "avro/Compiler.hh"
Expand Down Expand Up @@ -34,7 +35,7 @@ IcebergTable IcebergTable::Load(const string &iceberg_path, IcebergSnapshot &sna
return ret;
}

vector<IcebergManifest> IcebergTable::ReadManifestListFile(string path, FileSystem &fs, idx_t iceberg_format_version) {
vector<IcebergManifest> IcebergTable::ReadManifestListFile(const string &path, FileSystem &fs, idx_t iceberg_format_version) {
vector<IcebergManifest> ret;

// TODO: make streaming
Expand Down Expand Up @@ -62,7 +63,8 @@ vector<IcebergManifest> IcebergTable::ReadManifestListFile(string path, FileSyst
return ret;
}

vector<IcebergManifestEntry> IcebergTable::ReadManifestEntries(string path, FileSystem &fs, idx_t iceberg_format_version) {
vector<IcebergManifestEntry> IcebergTable::ReadManifestEntries(const string &path, FileSystem &fs,
idx_t iceberg_format_version) {
vector<IcebergManifestEntry> ret;

// TODO: make streaming
Expand All @@ -88,52 +90,80 @@ vector<IcebergManifestEntry> IcebergTable::ReadManifestEntries(string path, File
return ret;
}

IcebergSnapshot IcebergSnapshot::GetLatestSnapshot(string &path, FileSystem &fs) {
unique_ptr<SnapshotParseInfo> IcebergSnapshot::GetParseInfo(yyjson_doc &metadata_json) {
SnapshotParseInfo info {};
auto root = yyjson_doc_get_root(&metadata_json);
info.iceberg_version = IcebergUtils::TryGetNumFromObject(root, "format-version");
info.snapshots = yyjson_obj_get(root, "snapshots");

// Multiple schemas can be present in the json metadata 'schemas' list
if (yyjson_obj_getn(root, "current-schema-id", string("current-schema-id").size())) {
size_t idx, max;
yyjson_val *schema;
info.schema_id = IcebergUtils::TryGetNumFromObject(root, "current-schema-id");
auto schemas = yyjson_obj_get(root, "schemas");
yyjson_arr_foreach(schemas, idx, max, schema) {
info.schemas.push_back(schema);
}
} else {
auto schema = yyjson_obj_get(root, "schema");
if (!schema) {
throw IOException("Neither a valid schema or schemas field was found");
}
auto found_schema_id = IcebergUtils::TryGetNumFromObject(schema, "schema-id");
info.schemas.push_back(schema);
info.schema_id = found_schema_id;
}

return make_uniq<SnapshotParseInfo>(std::move(info));
}

unique_ptr<SnapshotParseInfo> IcebergSnapshot::GetParseInfo(const string &path, FileSystem &fs) {
auto metadata_json = ReadMetaData(path, fs);
auto doc = yyjson_read(metadata_json.c_str(), metadata_json.size(), 0);
auto root = yyjson_doc_get_root(doc);
auto iceberg_format_version = IcebergUtils::TryGetNumFromObject(root, "format-version");
auto snapshots = yyjson_obj_get(root, "snapshots");
auto latest_snapshot = FindLatestSnapshotInternal(snapshots);
auto parse_info = GetParseInfo(*doc);

// Transfer string and yyjson doc ownership
parse_info->doc = doc;
parse_info->document = std::move(metadata_json);

return std::move(parse_info);
}

IcebergSnapshot IcebergSnapshot::GetLatestSnapshot(const string &path, FileSystem &fs) {
auto info = GetParseInfo(path, fs);
auto latest_snapshot = FindLatestSnapshotInternal(info->snapshots);

if (!latest_snapshot) {
throw IOException("No snapshots found");
}

return ParseSnapShot(latest_snapshot, iceberg_format_version);
return ParseSnapShot(latest_snapshot, info->iceberg_version, info->schema_id, info->schemas);
}

IcebergSnapshot IcebergSnapshot::GetSnapshotById(string &path, FileSystem &fs, idx_t snapshot_id) {
auto metadata_json = ReadMetaData(path, fs);
auto doc = yyjson_read(metadata_json.c_str(), metadata_json.size(), 0);
auto root = yyjson_doc_get_root(doc);
auto iceberg_format_version = IcebergUtils::TryGetNumFromObject(root, "format-version");
auto snapshots = yyjson_obj_get(root, "snapshots");
auto snapshot = FindSnapshotByIdInternal(snapshots, snapshot_id);
IcebergSnapshot IcebergSnapshot::GetSnapshotById(const string &path, FileSystem &fs, idx_t snapshot_id) {
auto info = GetParseInfo(path, fs);
auto snapshot = FindSnapshotByIdInternal(info->snapshots, snapshot_id);

if (!snapshot) {
throw IOException("Could not find snapshot with id " + to_string(snapshot_id));
}

return ParseSnapShot(snapshot, iceberg_format_version);
return ParseSnapShot(snapshot, info->iceberg_version, info->schema_id, info->schemas);
}

IcebergSnapshot IcebergSnapshot::GetSnapshotByTimestamp(string &path, FileSystem &fs, timestamp_t timestamp) {
auto metadata_json = ReadMetaData(path, fs);
auto doc = yyjson_read(metadata_json.c_str(), metadata_json.size(), 0);
auto root = yyjson_doc_get_root(doc);
auto iceberg_format_version = IcebergUtils::TryGetNumFromObject(root, "format-version");
auto snapshots = yyjson_obj_get(root, "snapshots");
auto snapshot = FindSnapshotByIdTimestampInternal(snapshots, timestamp);
IcebergSnapshot IcebergSnapshot::GetSnapshotByTimestamp(const string &path, FileSystem &fs, timestamp_t timestamp) {
auto info = GetParseInfo(path, fs);
auto snapshot = FindSnapshotByIdTimestampInternal(info->snapshots, timestamp);

if (!snapshot) {
throw IOException("Could not find latest snapshots for timestamp " + Timestamp::ToString(timestamp));
}

return ParseSnapShot(snapshot, iceberg_format_version);
return ParseSnapShot(snapshot, info->iceberg_version, info->schema_id, info->schemas);
}

string IcebergSnapshot::ReadMetaData(string &path, FileSystem &fs) {
string IcebergSnapshot::ReadMetaData(const string &path, FileSystem &fs) {
string metadata_file_path;

if (StringUtil::EndsWith(path, ".json")) {
Expand All @@ -147,7 +177,8 @@ string IcebergSnapshot::ReadMetaData(string &path, FileSystem &fs) {
return IcebergUtils::FileToString(metadata_file_path, fs);
}

IcebergSnapshot IcebergSnapshot::ParseSnapShot(yyjson_val *snapshot, idx_t iceberg_format_version) {
IcebergSnapshot IcebergSnapshot::ParseSnapShot(yyjson_val *snapshot, idx_t iceberg_format_version, idx_t schema_id,
vector<yyjson_val *> &schemas) {
IcebergSnapshot ret;
auto snapshot_tag = yyjson_get_tag(snapshot);
if (snapshot_tag != YYJSON_TYPE_OBJ) {
Expand All @@ -164,11 +195,13 @@ IcebergSnapshot IcebergSnapshot::ParseSnapShot(yyjson_val *snapshot, idx_t icebe
ret.timestamp_ms = Timestamp::FromEpochMs(IcebergUtils::TryGetNumFromObject(snapshot, "timestamp-ms"));
ret.manifest_list = IcebergUtils::TryGetStrFromObject(snapshot, "manifest-list");
ret.iceberg_format_version = iceberg_format_version;
ret.schema_id = schema_id;
ret.schema = ParseSchema(schemas, ret.schema_id);

return ret;
}

string IcebergSnapshot::GetTableVersion(string &path, FileSystem &fs) {
string IcebergSnapshot::GetTableVersion(const string &path, FileSystem &fs) {
auto meta_path = fs.JoinPath(path, "metadata");
auto version_file_path = fs.JoinPath(meta_path, "version-hint.text");
auto version_file_content = IcebergUtils::FileToString(version_file_path, fs);
Expand Down
Loading

0 comments on commit e16988b

Please sign in to comment.