From ab618851c24c6e82333168de09419f103f0fd8ee Mon Sep 17 00:00:00 2001 From: Sam Ansmink Date: Mon, 10 Jun 2024 16:01:21 +0200 Subject: [PATCH 1/5] bump delta kernel --- CMakeLists.txt | 2 +- src/delta_utils.cpp | 4 ++-- test/sql/dat/basic_append.test | 7 ------- 3 files changed, 3 insertions(+), 10 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index f8d12ec..a37370c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -60,7 +60,7 @@ endif() ExternalProject_Add( ${KERNEL_NAME} GIT_REPOSITORY "https://github.com/delta-incubator/delta-kernel-rs" - GIT_TAG 08f0764a00e89f42136fd478823d28278adc7ee8 + GIT_TAG 823367e4dc13b627914412ee2ca7933a1c7b822a CONFIGURE_COMMAND "" UPDATE_COMMAND "" BUILD_IN_SOURCE 1 diff --git a/src/delta_utils.cpp b/src/delta_utils.cpp index b02e898..7f1d3a7 100644 --- a/src/delta_utils.cpp +++ b/src/delta_utils.cpp @@ -226,7 +226,7 @@ static unordered_map PrunePredicates(unordered_mapcolumn_filters); + auto filters = predicate->column_filters; auto it = filters.begin(); auto end = filters.end(); @@ -312,7 +312,7 @@ uintptr_t PredicateVisitor::VisitFilter(const string &col_name, const TableFilte case TableFilterType::CONJUNCTION_AND: return VisitAndFilter(col_name, static_cast(filter), state); default: - throw NotImplementedException("Attempted to push down unimplemented filter type: '%s'", EnumUtil::ToString(filter.filter_type)); + return ~0; } } diff --git a/test/sql/dat/basic_append.test b/test/sql/dat/basic_append.test index 4ff31bc..87930b8 100644 --- a/test/sql/dat/basic_append.test +++ b/test/sql/dat/basic_append.test @@ -56,9 +56,6 @@ FROM delta_scan('${DAT_PATH}/out/reader_tests/generated/basic_append/delta') 2 3 -# TODO: Figure out what's wrong here -mode skip - # Now we add a filter that filters out one of the files query II SELECT letter, number @@ -67,8 +64,6 @@ WHERE number < 2 ---- a 1 -mode unskip - # Now we add a filter that filters out the other file query III SELECT a_float, letter, number, @@ -77,8 +72,6 @@ WHERE number > 4 ---- 5.5 e 5 -mode skip - # Now we add a filter that filters out all columns query III SELECT a_float, number, letter From a6f85ef00572de92836b7b9ece90da8affaf25b1 Mon Sep 17 00:00:00 2001 From: Sam Ansmink Date: Mon, 10 Jun 2024 16:02:23 +0200 Subject: [PATCH 2/5] set correct duckdb version for submodule --- duckdb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/duckdb b/duckdb index 7b8efd3..1f98600 160000 --- a/duckdb +++ b/duckdb @@ -1 +1 @@ -Subproject commit 7b8efd3d0fab38ec9dae467861a317af3f1d7f3e +Subproject commit 1f98600c2cf8722a6d2f2d805bb4af5e701319fc From ef1dd70dee920175a559f69fc3ffa388b778c9ea Mon Sep 17 00:00:00 2001 From: Sam Ansmink Date: Thu, 13 Jun 2024 11:33:00 +0200 Subject: [PATCH 3/5] bump delta to c901665b98b --- CMakeLists.txt | 2 +- Makefile | 5 ----- extension-ci-tools | 2 +- scripts/generate_test_data.py | 8 +++++++- src/delta_utils.cpp | 36 ++++++++++++++++++++++++++++++++--- src/functions/delta_scan.cpp | 2 ++ src/include/delta_utils.hpp | 5 +++++ 7 files changed, 49 insertions(+), 11 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index a37370c..28ea1d2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -60,7 +60,7 @@ endif() ExternalProject_Add( ${KERNEL_NAME} GIT_REPOSITORY "https://github.com/delta-incubator/delta-kernel-rs" - GIT_TAG 823367e4dc13b627914412ee2ca7933a1c7b822a + GIT_TAG c901665b98b2fed5ff1c713a9666eba9d16ea281 CONFIGURE_COMMAND "" UPDATE_COMMAND "" BUILD_IN_SOURCE 1 diff --git a/Makefile b/Makefile index 05db957..78144e6 100644 --- a/Makefile +++ b/Makefile @@ -14,11 +14,6 @@ test_debug: export DAT_PATH=./build/debug/rust/src/delta_kernel/acceptance/tests # Include the Makefile from extension-ci-tools include extension-ci-tools/makefiles/duckdb_extension.Makefile -reldebug: - mkdir -p build/reldebug && \ - cmake $(GENERATOR) $(BUILD_FLAGS) $(EXT_RELEASE_FLAGS) -DCMAKE_BUILD_TYPE=RelWithDebInfo -S ./duckdb/ -B build/reldebug && \ - cmake --build build/reldebug --config RelWithDebInfo - # Generate some test data to test with generate-data: python3 -m pip install delta-spark duckdb pandas deltalake pyspark delta diff --git a/extension-ci-tools b/extension-ci-tools index 71b8a60..c0cc931 160000 --- a/extension-ci-tools +++ b/extension-ci-tools @@ -1 +1 @@ -Subproject commit 71b8a603ea24b1ac8a2cff134aca28163576548f +Subproject commit c0cc9319492bfa38344c2f28bd35f2304c74cdde diff --git a/scripts/generate_test_data.py b/scripts/generate_test_data.py index 715e882..cb1d2f7 100644 --- a/scripts/generate_test_data.py +++ b/scripts/generate_test_data.py @@ -136,8 +136,14 @@ def generate_test_data_pyspark(name, current_path, input_path, delete_predicate for table in ["customer","lineitem","nation","orders","part","partsupp","region","supplier"]: generate_test_data_pyspark(f"tpch_sf0_01_{table}", f'tpch_sf0_01/{table}', f'{TMP_PATH}/tpch_sf0_01_export/{table}.parquet') +## TPCH SF1 full dataset +con = duckdb.connect() +con.query(f"call dbgen(sf=1); EXPORT DATABASE '{TMP_PATH}/tpch_sf1_export' (FORMAT parquet)") +for table in ["customer","lineitem","nation","orders","part","partsupp","region","supplier"]: + generate_test_data_pyspark(f"tpch_sf1_{table}", f'tpch_sf1/{table}', f'{TMP_PATH}/tpch_sf1_export/{table}.parquet') + ## TPCDS SF0.01 full dataset con = duckdb.connect() con.query(f"call dsdgen(sf=0.01); EXPORT DATABASE '{TMP_PATH}/tpcds_sf0_01_export' (FORMAT parquet)") for table in ["call_center","catalog_page","catalog_returns","catalog_sales","customer","customer_demographics","customer_address","date_dim","household_demographics","inventory","income_band","item","promotion","reason","ship_mode","store","store_returns","store_sales","time_dim","warehouse","web_page","web_returns","web_sales","web_site"]: - generate_test_data_pyspark(f"tpcds_sf0_01_{table}", f'tpcds_sf0_01/{table}', f'{TMP_PATH}/tpcds_sf0_01_export/{table}.parquet') + generate_test_data_pyspark(f"tpcds_sf0_01_{table}", f'tpcds_sf0_01/{table}', f'{TMP_PATH}/tpcds_sf0_01_export/{table}.parquet') \ No newline at end of file diff --git a/src/delta_utils.cpp b/src/delta_utils.cpp index 7f1d3a7..a805d15 100644 --- a/src/delta_utils.cpp +++ b/src/delta_utils.cpp @@ -3,6 +3,7 @@ #include "duckdb.hpp" #include "duckdb/main/extension_util.hpp" #include +#include namespace duckdb { @@ -257,8 +258,24 @@ uintptr_t PredicateVisitor::VisitConstantFilter(const string &col_name, const Co case LogicalType::BIGINT: right = visit_expression_literal_long(state, BigIntValue::Get(value)); break; - - + // case LogicalType::INTEGER: + // right = visit_expression_literal_int(state, IntegerValue::Get(value)); + // break; + // case LogicalType::SMALLINT: + // right = visit_expression_literal_short(state, SmallIntValue::Get(value)); + // break; + // case LogicalType::TINYINT: + // right = visit_expression_literal_byte(state, TinyIntValue::Get(value)); + // break; + // case LogicalType::FLOAT: + // right = visit_expression_literal_float(state, FloatValue::Get(value)); + // break; + // case LogicalType::DOUBLE: + // right = visit_expression_literal_double(state, DoubleValue::Get(value)); + // break; + // case LogicalType::BOOLEAN: + // right = visit_expression_literal_bool(state, BooleanValue::Get(value)); + // break; case LogicalType::VARCHAR: { // WARNING: C++ lifetime extension rules don't protect calls of the form foo(std::string(...).c_str()) auto str = StringValue::Get(value); @@ -266,7 +283,6 @@ uintptr_t PredicateVisitor::VisitConstantFilter(const string &col_name, const Co right = KernelUtils::UnpackResult(maybe_right, "VisitConstantFilter failed to visit_expression_literal_string"); break; } - default: break; // unsupported type } @@ -305,12 +321,26 @@ uintptr_t PredicateVisitor::VisitAndFilter(const string &col_name, const Conjunc return visit_expression_and(state, &eit); } +uintptr_t PredicateVisitor::VisitIsNull(const string &col_name, ffi::KernelExpressionVisitorState *state) { + auto maybe_left = ffi::visit_expression_column(state, KernelUtils::ToDeltaString(col_name), DuckDBEngineError::AllocateError); + uintptr_t left = KernelUtils::UnpackResult(maybe_left, "VisitIsNull failed to visit_expression_column"); + return ffi::visit_expression_is_null(state, left); +} + +uintptr_t PredicateVisitor::VisitIsNotNull(const string &col_name, ffi::KernelExpressionVisitorState *state) { + return ffi::visit_expression_not(state, VisitIsNull(col_name, state)); +} + uintptr_t PredicateVisitor::VisitFilter(const string &col_name, const TableFilter &filter, ffi::KernelExpressionVisitorState* state) { switch (filter.filter_type) { case TableFilterType::CONSTANT_COMPARISON: return VisitConstantFilter(col_name, static_cast(filter), state); case TableFilterType::CONJUNCTION_AND: return VisitAndFilter(col_name, static_cast(filter), state); + // case TableFilterType::IS_NULL: + // return VisitIsNull(col_name, state); + // case TableFilterType::IS_NOT_NULL: + // return VisitIsNotNull(col_name, state); default: return ~0; } diff --git a/src/functions/delta_scan.cpp b/src/functions/delta_scan.cpp index dd2a027..d4320e5 100644 --- a/src/functions/delta_scan.cpp +++ b/src/functions/delta_scan.cpp @@ -31,6 +31,8 @@ static void visit_callback(ffi::NullableCvoid engine_context, struct ffi::Kernel StringUtil::RTrim(path_string, "/"); path_string += "/" + KernelUtils::FromDeltaString(path); + printf("Got File %s\n", path_string.c_str()); + // First we append the file to our resolved files context->resolved_files.push_back(DeltaSnapshot::ToDuckDBPath(path_string)); context->metadata.emplace_back(make_uniq()); diff --git a/src/include/delta_utils.hpp b/src/include/delta_utils.hpp index bcb5f74..37dc289 100644 --- a/src/include/delta_utils.hpp +++ b/src/include/delta_utils.hpp @@ -5,6 +5,7 @@ #include "duckdb/planner/filter/conjunction_filter.hpp" #include "duckdb/common/enum_util.hpp" #include +#include // TODO: clean up this file as we go @@ -140,6 +141,10 @@ class PredicateVisitor : public ffi::EnginePredicate { uintptr_t VisitConstantFilter(const string &col_name, const ConstantFilter &filter, ffi::KernelExpressionVisitorState* state); uintptr_t VisitAndFilter(const string &col_name, const ConjunctionAndFilter &filter, ffi::KernelExpressionVisitorState* state); + + uintptr_t VisitIsNull(const string &col_name, ffi::KernelExpressionVisitorState* state); + uintptr_t VisitIsNotNull(const string &col_name, ffi::KernelExpressionVisitorState* state); + uintptr_t VisitFilter(const string &col_name, const TableFilter &filter, ffi::KernelExpressionVisitorState* state); }; From 1563715a79c5b17db469ca233f28a30cd080ef4f Mon Sep 17 00:00:00 2001 From: Sam Ansmink Date: Sat, 15 Jun 2024 10:54:26 +0200 Subject: [PATCH 4/5] bump delta to 181232a45562, enable cardinalty estimation, fix varchar pushdown --- CMakeLists.txt | 4 +-- scripts/generate_test_data.py | 3 +- src/delta_utils.cpp | 57 +++++++++++++++++++---------------- src/functions/delta_scan.cpp | 3 -- 4 files changed, 35 insertions(+), 32 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 28ea1d2..58e3d39 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -59,8 +59,8 @@ endif() # Add rust_example as a CMake target ExternalProject_Add( ${KERNEL_NAME} - GIT_REPOSITORY "https://github.com/delta-incubator/delta-kernel-rs" - GIT_TAG c901665b98b2fed5ff1c713a9666eba9d16ea281 + GIT_REPOSITORY "https://github.com/nicklan/delta-kernel-rs" + GIT_TAG 181232a45562ca78be763c2f5fb46b88a2463b5c CONFIGURE_COMMAND "" UPDATE_COMMAND "" BUILD_IN_SOURCE 1 diff --git a/scripts/generate_test_data.py b/scripts/generate_test_data.py index cb1d2f7..e7bf588 100644 --- a/scripts/generate_test_data.py +++ b/scripts/generate_test_data.py @@ -78,7 +78,8 @@ def generate_test_data_pyspark(name, current_path, input_path, delete_predicate ## CREATE ## CONFIGURE USAGE OF DELETION VECTORS - spark.sql(f"ALTER TABLE test_table_{name} SET TBLPROPERTIES ('delta.enableDeletionVectors' = true);") + if (delete_predicate): + spark.sql(f"ALTER TABLE test_table_{name} SET TBLPROPERTIES ('delta.enableDeletionVectors' = true);") ## ADDING DELETES deltaTable = DeltaTable.forPath(spark, delta_table_path) diff --git a/src/delta_utils.cpp b/src/delta_utils.cpp index a805d15..104d65a 100644 --- a/src/delta_utils.cpp +++ b/src/delta_utils.cpp @@ -200,6 +200,10 @@ static bool CanHandleFilter(TableFilter *filter) { switch (filter->filter_type) { case TableFilterType::CONSTANT_COMPARISON: return true; + case TableFilterType::IS_NULL: + return true; + case TableFilterType::IS_NOT_NULL: + return true; case TableFilterType::CONJUNCTION_AND: { auto &conjunction = static_cast(*filter); bool can_handle = true; @@ -258,28 +262,28 @@ uintptr_t PredicateVisitor::VisitConstantFilter(const string &col_name, const Co case LogicalType::BIGINT: right = visit_expression_literal_long(state, BigIntValue::Get(value)); break; - // case LogicalType::INTEGER: - // right = visit_expression_literal_int(state, IntegerValue::Get(value)); - // break; - // case LogicalType::SMALLINT: - // right = visit_expression_literal_short(state, SmallIntValue::Get(value)); - // break; - // case LogicalType::TINYINT: - // right = visit_expression_literal_byte(state, TinyIntValue::Get(value)); - // break; - // case LogicalType::FLOAT: - // right = visit_expression_literal_float(state, FloatValue::Get(value)); - // break; - // case LogicalType::DOUBLE: - // right = visit_expression_literal_double(state, DoubleValue::Get(value)); - // break; - // case LogicalType::BOOLEAN: - // right = visit_expression_literal_bool(state, BooleanValue::Get(value)); - // break; + case LogicalType::INTEGER: + right = visit_expression_literal_int(state, IntegerValue::Get(value)); + break; + case LogicalType::SMALLINT: + right = visit_expression_literal_short(state, SmallIntValue::Get(value)); + break; + case LogicalType::TINYINT: + right = visit_expression_literal_byte(state, TinyIntValue::Get(value)); + break; + case LogicalType::FLOAT: + right = visit_expression_literal_float(state, FloatValue::Get(value)); + break; + case LogicalType::DOUBLE: + right = visit_expression_literal_double(state, DoubleValue::Get(value)); + break; + case LogicalType::BOOLEAN: + right = visit_expression_literal_bool(state, BooleanValue::Get(value)); + break; case LogicalType::VARCHAR: { // WARNING: C++ lifetime extension rules don't protect calls of the form foo(std::string(...).c_str()) auto str = StringValue::Get(value); - auto maybe_right = ffi::visit_expression_literal_string(state, KernelUtils::ToDeltaString(col_name), DuckDBEngineError::AllocateError); + auto maybe_right = ffi::visit_expression_literal_string(state, KernelUtils::ToDeltaString(str), DuckDBEngineError::AllocateError); right = KernelUtils::UnpackResult(maybe_right, "VisitConstantFilter failed to visit_expression_literal_string"); break; } @@ -315,6 +319,7 @@ uintptr_t PredicateVisitor::VisitAndFilter(const string &col_name, const Conjunc return 0; } auto &child_filter = *it++; + return VisitFilter(col_name, *child_filter, state); }; auto eit = EngineIteratorFromCallable(get_next); @@ -322,9 +327,9 @@ uintptr_t PredicateVisitor::VisitAndFilter(const string &col_name, const Conjunc } uintptr_t PredicateVisitor::VisitIsNull(const string &col_name, ffi::KernelExpressionVisitorState *state) { - auto maybe_left = ffi::visit_expression_column(state, KernelUtils::ToDeltaString(col_name), DuckDBEngineError::AllocateError); - uintptr_t left = KernelUtils::UnpackResult(maybe_left, "VisitIsNull failed to visit_expression_column"); - return ffi::visit_expression_is_null(state, left); + auto maybe_inner = ffi::visit_expression_column(state, KernelUtils::ToDeltaString(col_name), DuckDBEngineError::AllocateError); + uintptr_t inner = KernelUtils::UnpackResult(maybe_inner, "VisitIsNull failed to visit_expression_column"); + return ffi::visit_expression_is_null(state, inner); } uintptr_t PredicateVisitor::VisitIsNotNull(const string &col_name, ffi::KernelExpressionVisitorState *state) { @@ -337,10 +342,10 @@ uintptr_t PredicateVisitor::VisitFilter(const string &col_name, const TableFilte return VisitConstantFilter(col_name, static_cast(filter), state); case TableFilterType::CONJUNCTION_AND: return VisitAndFilter(col_name, static_cast(filter), state); - // case TableFilterType::IS_NULL: - // return VisitIsNull(col_name, state); - // case TableFilterType::IS_NOT_NULL: - // return VisitIsNotNull(col_name, state); + case TableFilterType::IS_NULL: + return VisitIsNull(col_name, state); + case TableFilterType::IS_NOT_NULL: + return VisitIsNotNull(col_name, state); default: return ~0; } diff --git a/src/functions/delta_scan.cpp b/src/functions/delta_scan.cpp index d4320e5..ed968a2 100644 --- a/src/functions/delta_scan.cpp +++ b/src/functions/delta_scan.cpp @@ -31,8 +31,6 @@ static void visit_callback(ffi::NullableCvoid engine_context, struct ffi::Kernel StringUtil::RTrim(path_string, "/"); path_string += "/" + KernelUtils::FromDeltaString(path); - printf("Got File %s\n", path_string.c_str()); - // First we append the file to our resolved files context->resolved_files.push_back(DeltaSnapshot::ToDuckDBPath(path_string)); context->metadata.emplace_back(make_uniq()); @@ -589,7 +587,6 @@ TableFunctionSet DeltaFunctions::GetDeltaScanFunction(DatabaseInstance &instance function.deserialize = nullptr; function.statistics = nullptr; function.table_scan_progress = nullptr; - function.cardinality = nullptr; function.get_bind_info = nullptr; // Schema param is just confusing here From 7291aa51e970107095eefd25936e6dfc547d7610 Mon Sep 17 00:00:00 2001 From: Sam Ansmink Date: Sat, 15 Jun 2024 13:04:53 +0200 Subject: [PATCH 5/5] add tests for pushdown all types --- scripts/generate_test_data.py | 5 +++ .../generated/file_skipping_all_types.test | 44 +++++++++++++++++++ 2 files changed, 49 insertions(+) create mode 100644 test/sql/generated/file_skipping_all_types.test diff --git a/scripts/generate_test_data.py b/scripts/generate_test_data.py index e7bf588..eaf9d30 100644 --- a/scripts/generate_test_data.py +++ b/scripts/generate_test_data.py @@ -116,6 +116,11 @@ def generate_test_data_pyspark(name, current_path, input_path, delete_predicate query = "CREATE table test_table AS SELECT {'i':i, 'j':i+1} as value, i%2 as part from range(0,10) tbl(i);" generate_test_data_delta_rs("simple_partitioned_with_structs", query, "part"); +## Partitioned table with all types we can file skip on +for type in ["bool", "int", "tinyint", "smallint", "bigint", "float", "double", "varchar"]: + query = f"CREATE table test_table as select i::{type} as value, i::{type} as part from range(0,2) tbl(i)" + generate_test_data_delta_rs(f"test_file_skipping/{type}", query, "part"); + ## Simple table with deletion vector con = duckdb.connect() con.query(f"COPY (SELECT i as id, ('val' || i::VARCHAR) as value FROM range(0,1000000) tbl(i))TO '{TMP_PATH}/simple_sf1_with_dv.parquet'") diff --git a/test/sql/generated/file_skipping_all_types.test b/test/sql/generated/file_skipping_all_types.test new file mode 100644 index 0000000..e4348e8 --- /dev/null +++ b/test/sql/generated/file_skipping_all_types.test @@ -0,0 +1,44 @@ +# name: test/sql/generated/file_skipping_all_types.test +# description: Test filter pushdown succeeds on all file types we can push down +# group: [delta_generated] + +require parquet + +require delta + +require-env GENERATED_DATA_AVAILABLE + +# TODO: this doesn't appear to skip files yet +# TODO: add tests once https://github.com/duckdb/duckdb/pull/12488 is available + +query I +select value +from delta_scan('./data/generated/test_file_skipping/bool/delta_lake') +where part != false +order by value +---- +true + +foreach type bool int tinyint smallint bigint varchar + +query I +select value +from delta_scan('./data/generated/test_file_skipping/${type}/delta_lake') +where part != 0 +order by value +---- +1 + +endloop + +foreach type float double + +query I +select value +from delta_scan('./data/generated/test_file_skipping/${type}/delta_lake') +where part > 0.5 +order by value +---- +1.0 + +endloop