Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bump delta extension #32

Merged
merged 5 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@
# Add rust_example as a CMake target
ExternalProject_Add(
${KERNEL_NAME}
GIT_REPOSITORY "https://github.com/delta-incubator/delta-kernel-rs"
GIT_TAG 08f0764a00e89f42136fd478823d28278adc7ee8
GIT_REPOSITORY "https://github.com/nicklan/delta-kernel-rs"
GIT_TAG 181232a45562ca78be763c2f5fb46b88a2463b5c
CONFIGURE_COMMAND ""
UPDATE_COMMAND ""
BUILD_IN_SOURCE 1
Expand Down
5 changes: 0 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,6 @@ test_debug: export DAT_PATH=./build/debug/rust/src/delta_kernel/acceptance/tests
# Include the Makefile from extension-ci-tools
include extension-ci-tools/makefiles/duckdb_extension.Makefile

reldebug:
mkdir -p build/reldebug && \
cmake $(GENERATOR) $(BUILD_FLAGS) $(EXT_RELEASE_FLAGS) -DCMAKE_BUILD_TYPE=RelWithDebInfo -S ./duckdb/ -B build/reldebug && \
cmake --build build/reldebug --config RelWithDebInfo

# Generate some test data to test with
generate-data:
python3 -m pip install delta-spark duckdb pandas deltalake pyspark delta
Expand Down
16 changes: 14 additions & 2 deletions scripts/generate_test_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ def generate_test_data_pyspark(name, current_path, input_path, delete_predicate

## CREATE
## CONFIGURE USAGE OF DELETION VECTORS
spark.sql(f"ALTER TABLE test_table_{name} SET TBLPROPERTIES ('delta.enableDeletionVectors' = true);")
if (delete_predicate):
spark.sql(f"ALTER TABLE test_table_{name} SET TBLPROPERTIES ('delta.enableDeletionVectors' = true);")

## ADDING DELETES
deltaTable = DeltaTable.forPath(spark, delta_table_path)
Expand Down Expand Up @@ -115,6 +116,11 @@ def generate_test_data_pyspark(name, current_path, input_path, delete_predicate
query = "CREATE table test_table AS SELECT {'i':i, 'j':i+1} as value, i%2 as part from range(0,10) tbl(i);"
generate_test_data_delta_rs("simple_partitioned_with_structs", query, "part");

## Partitioned table with all types we can file skip on
for type in ["bool", "int", "tinyint", "smallint", "bigint", "float", "double", "varchar"]:
query = f"CREATE table test_table as select i::{type} as value, i::{type} as part from range(0,2) tbl(i)"
generate_test_data_delta_rs(f"test_file_skipping/{type}", query, "part");

## Simple table with deletion vector
con = duckdb.connect()
con.query(f"COPY (SELECT i as id, ('val' || i::VARCHAR) as value FROM range(0,1000000) tbl(i))TO '{TMP_PATH}/simple_sf1_with_dv.parquet'")
Expand All @@ -136,8 +142,14 @@ def generate_test_data_pyspark(name, current_path, input_path, delete_predicate
for table in ["customer","lineitem","nation","orders","part","partsupp","region","supplier"]:
generate_test_data_pyspark(f"tpch_sf0_01_{table}", f'tpch_sf0_01/{table}', f'{TMP_PATH}/tpch_sf0_01_export/{table}.parquet')

## TPCH SF1 full dataset
con = duckdb.connect()
con.query(f"call dbgen(sf=1); EXPORT DATABASE '{TMP_PATH}/tpch_sf1_export' (FORMAT parquet)")
for table in ["customer","lineitem","nation","orders","part","partsupp","region","supplier"]:
generate_test_data_pyspark(f"tpch_sf1_{table}", f'tpch_sf1/{table}', f'{TMP_PATH}/tpch_sf1_export/{table}.parquet')

## TPCDS SF0.01 full dataset
con = duckdb.connect()
con.query(f"call dsdgen(sf=0.01); EXPORT DATABASE '{TMP_PATH}/tpcds_sf0_01_export' (FORMAT parquet)")
for table in ["call_center","catalog_page","catalog_returns","catalog_sales","customer","customer_demographics","customer_address","date_dim","household_demographics","inventory","income_band","item","promotion","reason","ship_mode","store","store_returns","store_sales","time_dim","warehouse","web_page","web_returns","web_sales","web_site"]:
generate_test_data_pyspark(f"tpcds_sf0_01_{table}", f'tpcds_sf0_01/{table}', f'{TMP_PATH}/tpcds_sf0_01_export/{table}.parquet')
generate_test_data_pyspark(f"tpcds_sf0_01_{table}", f'tpcds_sf0_01/{table}', f'{TMP_PATH}/tpcds_sf0_01_export/{table}.parquet')
47 changes: 41 additions & 6 deletions src/delta_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "duckdb.hpp"
#include "duckdb/main/extension_util.hpp"
#include <duckdb/parser/parsed_data/create_scalar_function_info.hpp>
#include <duckdb/planner/filter/null_filter.hpp>

namespace duckdb {

Expand Down Expand Up @@ -199,6 +200,10 @@ static bool CanHandleFilter(TableFilter *filter) {
switch (filter->filter_type) {
case TableFilterType::CONSTANT_COMPARISON:
return true;
case TableFilterType::IS_NULL:
return true;
case TableFilterType::IS_NOT_NULL:
return true;
case TableFilterType::CONJUNCTION_AND: {
auto &conjunction = static_cast<const ConjunctionAndFilter&>(*filter);
bool can_handle = true;
Expand Down Expand Up @@ -226,7 +231,7 @@ static unordered_map<string, TableFilter*> PrunePredicates(unordered_map<string,
}

uintptr_t PredicateVisitor::VisitPredicate(PredicateVisitor* predicate, ffi::KernelExpressionVisitorState* state) {
auto filters = PrunePredicates(predicate->column_filters);
auto filters = predicate->column_filters;

auto it = filters.begin();
auto end = filters.end();
Expand Down Expand Up @@ -257,16 +262,31 @@ uintptr_t PredicateVisitor::VisitConstantFilter(const string &col_name, const Co
case LogicalType::BIGINT:
right = visit_expression_literal_long(state, BigIntValue::Get(value));
break;


case LogicalType::INTEGER:
right = visit_expression_literal_int(state, IntegerValue::Get(value));
break;
case LogicalType::SMALLINT:
right = visit_expression_literal_short(state, SmallIntValue::Get(value));
break;
case LogicalType::TINYINT:
right = visit_expression_literal_byte(state, TinyIntValue::Get(value));
break;
case LogicalType::FLOAT:
right = visit_expression_literal_float(state, FloatValue::Get(value));
break;
case LogicalType::DOUBLE:
right = visit_expression_literal_double(state, DoubleValue::Get(value));
break;
case LogicalType::BOOLEAN:
right = visit_expression_literal_bool(state, BooleanValue::Get(value));
break;
case LogicalType::VARCHAR: {
// WARNING: C++ lifetime extension rules don't protect calls of the form foo(std::string(...).c_str())
auto str = StringValue::Get(value);
auto maybe_right = ffi::visit_expression_literal_string(state, KernelUtils::ToDeltaString(col_name), DuckDBEngineError::AllocateError);
auto maybe_right = ffi::visit_expression_literal_string(state, KernelUtils::ToDeltaString(str), DuckDBEngineError::AllocateError);
right = KernelUtils::UnpackResult(maybe_right, "VisitConstantFilter failed to visit_expression_literal_string");
break;
}

default:
break; // unsupported type
}
Expand Down Expand Up @@ -299,20 +319,35 @@ uintptr_t PredicateVisitor::VisitAndFilter(const string &col_name, const Conjunc
return 0;
}
auto &child_filter = *it++;

return VisitFilter(col_name, *child_filter, state);
};
auto eit = EngineIteratorFromCallable(get_next);
return visit_expression_and(state, &eit);
}

uintptr_t PredicateVisitor::VisitIsNull(const string &col_name, ffi::KernelExpressionVisitorState *state) {
auto maybe_inner = ffi::visit_expression_column(state, KernelUtils::ToDeltaString(col_name), DuckDBEngineError::AllocateError);
uintptr_t inner = KernelUtils::UnpackResult(maybe_inner, "VisitIsNull failed to visit_expression_column");
return ffi::visit_expression_is_null(state, inner);
}

uintptr_t PredicateVisitor::VisitIsNotNull(const string &col_name, ffi::KernelExpressionVisitorState *state) {
return ffi::visit_expression_not(state, VisitIsNull(col_name, state));
}

uintptr_t PredicateVisitor::VisitFilter(const string &col_name, const TableFilter &filter, ffi::KernelExpressionVisitorState* state) {
switch (filter.filter_type) {
case TableFilterType::CONSTANT_COMPARISON:
return VisitConstantFilter(col_name, static_cast<const ConstantFilter&>(filter), state);
case TableFilterType::CONJUNCTION_AND:
return VisitAndFilter(col_name, static_cast<const ConjunctionAndFilter&>(filter), state);
case TableFilterType::IS_NULL:
return VisitIsNull(col_name, state);
case TableFilterType::IS_NOT_NULL:
return VisitIsNotNull(col_name, state);
default:
throw NotImplementedException("Attempted to push down unimplemented filter type: '%s'", EnumUtil::ToString(filter.filter_type));
return ~0;
}
}

Expand Down
1 change: 0 additions & 1 deletion src/functions/delta_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,6 @@ TableFunctionSet DeltaFunctions::GetDeltaScanFunction(DatabaseInstance &instance
function.deserialize = nullptr;
function.statistics = nullptr;
function.table_scan_progress = nullptr;
function.cardinality = nullptr;
function.get_bind_info = nullptr;

// Schema param is just confusing here
Expand Down
5 changes: 5 additions & 0 deletions src/include/delta_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "duckdb/planner/filter/conjunction_filter.hpp"
#include "duckdb/common/enum_util.hpp"
#include <iostream>
#include <duckdb/planner/filter/null_filter.hpp>

// TODO: clean up this file as we go

Expand Down Expand Up @@ -140,6 +141,10 @@ class PredicateVisitor : public ffi::EnginePredicate {

uintptr_t VisitConstantFilter(const string &col_name, const ConstantFilter &filter, ffi::KernelExpressionVisitorState* state);
uintptr_t VisitAndFilter(const string &col_name, const ConjunctionAndFilter &filter, ffi::KernelExpressionVisitorState* state);

uintptr_t VisitIsNull(const string &col_name, ffi::KernelExpressionVisitorState* state);
uintptr_t VisitIsNotNull(const string &col_name, ffi::KernelExpressionVisitorState* state);

uintptr_t VisitFilter(const string &col_name, const TableFilter &filter, ffi::KernelExpressionVisitorState* state);
};

Expand Down
7 changes: 0 additions & 7 deletions test/sql/dat/basic_append.test
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,6 @@ FROM delta_scan('${DAT_PATH}/out/reader_tests/generated/basic_append/delta')
2
3

# TODO: Figure out what's wrong here
mode skip

# Now we add a filter that filters out one of the files
query II
SELECT letter, number
Expand All @@ -67,8 +64,6 @@ WHERE number < 2
----
a 1

mode unskip

# Now we add a filter that filters out the other file
query III
SELECT a_float, letter, number,
Expand All @@ -77,8 +72,6 @@ WHERE number > 4
----
5.5 e 5

mode skip

# Now we add a filter that filters out all columns
query III
SELECT a_float, number, letter
Expand Down
44 changes: 44 additions & 0 deletions test/sql/generated/file_skipping_all_types.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# name: test/sql/generated/file_skipping_all_types.test
# description: Test filter pushdown succeeds on all file types we can push down
# group: [delta_generated]

require parquet

require delta

require-env GENERATED_DATA_AVAILABLE

# TODO: this doesn't appear to skip files yet
# TODO: add tests once https://github.com/duckdb/duckdb/pull/12488 is available

query I
select value
from delta_scan('./data/generated/test_file_skipping/bool/delta_lake')
where part != false
order by value
----
true

foreach type bool int tinyint smallint bigint varchar

query I
select value
from delta_scan('./data/generated/test_file_skipping/${type}/delta_lake')
where part != 0
order by value
----
1

endloop

foreach type float double

query I
select value
from delta_scan('./data/generated/test_file_skipping/${type}/delta_lake')
where part > 0.5
order by value
----
1.0

endloop
Loading