Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix a few dat tests by bumping kernel #40

Merged
merged 4 commits into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,10 @@
# Add rust_example as a CMake target
ExternalProject_Add(
${KERNEL_NAME}
GIT_REPOSITORY "https://github.com/nicklan/delta-kernel-rs"
GIT_REPOSITORY "https://github.com/delta-incubator/delta-kernel-rs"
# WARNING: the FFI headers are currently pinned due to the C linkage issue of the c++ headers. Currently, when bumping
# the kernel version, the produced header in ./src/include/delta_kernel_ffi.hpp should be also bumped, applying the fix
GIT_TAG 181232a45562ca78be763c2f5fb46b88a2463b5c
GIT_TAG ed2b80b127984481adba8e59879f39b9e5f871d1
# Prints the env variables passed to the cargo build to the terminal, useful in debugging because passing them
# through CMake is an error-prone mess
CONFIGURE_COMMAND ${CMAKE_COMMAND} -E env ${RUST_UNSET_ENV_VARS} ${RUST_ENV_VARS} env
Expand Down
21 changes: 2 additions & 19 deletions src/delta_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,20 +218,8 @@ static bool CanHandleFilter(TableFilter *filter) {
}
}

// Prunes the list of predicates to ones that we can handle
static unordered_map<string, TableFilter*> PrunePredicates(unordered_map<string, TableFilter*> predicates) {
unordered_map<string, TableFilter*> result;
for (const auto &predicate : predicates) {
if (CanHandleFilter(predicate.second)) {
result[predicate.first] = predicate.second;
}

}
return result;
}

uintptr_t PredicateVisitor::VisitPredicate(PredicateVisitor* predicate, ffi::KernelExpressionVisitorState* state) {
auto filters = predicate->column_filters;
auto &filters = predicate->column_filters;

auto it = filters.begin();
auto end = filters.end();
Expand All @@ -244,12 +232,7 @@ uintptr_t PredicateVisitor::VisitPredicate(PredicateVisitor* predicate, ffi::Ker
};
auto eit = EngineIteratorFromCallable(get_next);

// TODO: this should be fixed upstream?
try {
return visit_expression_and(state, &eit);
} catch (...) {
return ~0;
}
return visit_expression_and(state, &eit);
}

uintptr_t PredicateVisitor::VisitConstantFilter(const string &col_name, const ConstantFilter &filter, ffi::KernelExpressionVisitorState* state) {
Expand Down
91 changes: 89 additions & 2 deletions src/functions/delta_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,33 @@ static void* allocate_string(const struct ffi::KernelStringSlice slice) {
return new string(slice.ptr, slice.len);
}

static void visit_callback(ffi::NullableCvoid engine_context, struct ffi::KernelStringSlice path, int64_t size, const ffi::DvInfo *dv_info, const struct ffi::CStringMap *partition_values) {
string url_decode(string input) {
string result;
result.reserve(input.size());
char ch;
replace(input.begin(), input.end(), '+', ' ');
for (idx_t i = 0; i < input.length(); i++) {
if (int(input[i]) == 37) {
unsigned int ii;
sscanf(input.substr(i + 1, 2).c_str(), "%x", &ii);
ch = static_cast<char>(ii);
result += ch;
i += 2;
} else {
result += input[i];
}
}
return result;
}

static void visit_callback(ffi::NullableCvoid engine_context, struct ffi::KernelStringSlice path, int64_t size, const ffi::Stats *, const ffi::DvInfo *dv_info, const struct ffi::CStringMap *partition_values) {
auto context = (DeltaSnapshot *) engine_context;
auto path_string = context->GetPath();
StringUtil::RTrim(path_string, "/");
path_string += "/" + KernelUtils::FromDeltaString(path);

path_string = url_decode(path_string);

// First we append the file to our resolved files
context->resolved_files.push_back(DeltaSnapshot::ToDuckDBPath(path_string));
context->metadata.emplace_back(make_uniq<DeltaFileMetaData>());
Expand Down Expand Up @@ -467,13 +488,79 @@ unique_ptr<MultiFileReaderGlobalState> DeltaMultiFileReader::InitializeGlobalSta
return std::move(res);
}

// This code is duplicated from MultiFileReader::CreateNameMapping the difference is that for columns that are not found
// in the parquet files, we just add null constant columns
static void CustomMulfiFileNameMapping(const string &file_name, const vector<LogicalType> &local_types,
const vector<string> &local_names, const vector<LogicalType> &global_types,
const vector<string> &global_names, const vector<column_t> &global_column_ids,
MultiFileReaderData &reader_data, const string &initial_file,
optional_ptr<MultiFileReaderGlobalState> global_state) {
D_ASSERT(global_types.size() == global_names.size());
D_ASSERT(local_types.size() == local_names.size());
// we have expected types: create a map of name -> column index
case_insensitive_map_t<idx_t> name_map;
for (idx_t col_idx = 0; col_idx < local_names.size(); col_idx++) {
name_map[local_names[col_idx]] = col_idx;
}
for (idx_t i = 0; i < global_column_ids.size(); i++) {
// check if this is a constant column
bool constant = false;
for (auto &entry : reader_data.constant_map) {
if (entry.column_id == i) {
constant = true;
break;
}
}
if (constant) {
// this column is constant for this file
continue;
}
// not constant - look up the column in the name map
auto global_id = global_column_ids[i];
if (global_id >= global_types.size()) {
throw InternalException(
"MultiFileReader::CreatePositionalMapping - global_id is out of range in global_types for this file");
}
auto &global_name = global_names[global_id];
auto entry = name_map.find(global_name);
if (entry == name_map.end()) {
string candidate_names;
for (auto &local_name : local_names) {
if (!candidate_names.empty()) {
candidate_names += ", ";
}
candidate_names += local_name;
}
// FIXME: this override is pretty hacky: for missing columns we just insert NULL constants
auto &global_type = global_types[global_id];
Value val (global_type);
reader_data.constant_map.push_back({i, val});
continue;
}
// we found the column in the local file - check if the types are the same
auto local_id = entry->second;
D_ASSERT(global_id < global_types.size());
D_ASSERT(local_id < local_types.size());
auto &global_type = global_types[global_id];
auto &local_type = local_types[local_id];
if (global_type != local_type) {
reader_data.cast_map[local_id] = global_type;
}
// the types are the same - create the mapping
reader_data.column_mapping.push_back(i);
reader_data.column_ids.push_back(local_id);
}

reader_data.empty_columns = reader_data.column_ids.empty();
}

void DeltaMultiFileReader::CreateNameMapping(const string &file_name, const vector<LogicalType> &local_types,
const vector<string> &local_names, const vector<LogicalType> &global_types,
const vector<string> &global_names, const vector<column_t> &global_column_ids,
MultiFileReaderData &reader_data, const string &initial_file,
optional_ptr<MultiFileReaderGlobalState> global_state) {
// First call the base implementation to do most mapping
MultiFileReader::CreateNameMapping(file_name, local_types, local_names, global_types, global_names, global_column_ids, reader_data, initial_file, global_state);
CustomMulfiFileNameMapping(file_name, local_types, local_names, global_types, global_names, global_column_ids, reader_data, initial_file, global_state);

// Then we handle delta specific mapping
D_ASSERT(global_state);
Expand Down
19 changes: 14 additions & 5 deletions src/include/delta_kernel_ffi.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ enum class KernelError {
ObjectStorePathError,
#endif
#if defined(DEFINE_DEFAULT_ENGINE)
Reqwest,
ReqwestError,
#endif
FileNotFoundError,
MissingColumnError,
Expand All @@ -45,10 +45,10 @@ enum class KernelError {
JoinFailureError,
Utf8Error,
ParseIntError,
InvalidColumnMappingMode,
InvalidTableLocation,
InvalidColumnMappingModeError,
InvalidTableLocationError,
InvalidDecimalError,
InvalidStructData,
InvalidStructDataError,
};

#if defined(DEFINE_DEFAULT_ENGINE)
Expand Down Expand Up @@ -304,9 +304,19 @@ using NullableCvoid = void*;
/// function is that `kernel_str` is _only_ valid until the return from this function
using AllocateStringFn = NullableCvoid(*)(KernelStringSlice kernel_str);

/// Give engines an easy way to consume stats
struct Stats {
/// For any file where the deletion vector is not present (see [`DvInfo::has_vector`]), the
/// `num_records` statistic must be present and accurate, and must equal the number of records
/// in the data file. In the presence of Deletion Vectors the statistics may be somewhat
/// outdated, i.e. not reflecting deleted rows yet.
uint64_t num_records;
};

using CScanCallback = void(*)(NullableCvoid engine_context,
KernelStringSlice path,
int64_t size,
const Stats *stats,
const DvInfo *dv_info,
const CStringMap *partition_map);

Expand All @@ -324,7 +334,6 @@ struct im_an_unused_struct_that_tricks_msvc_into_compilation {
ExternResult<Handle<SharedScan>> field10;
};


extern "C" {

/// # Safety
Expand Down
25 changes: 8 additions & 17 deletions test/sql/dat/all.test
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,6 @@ SELECT *
FROM parquet_scan('${DAT_PATH}/out/reader_tests/generated/with_schema_change/expected/latest/**/*.parquet')
----


### FAILING DAT TESTS

# TODO fix all of these
mode skip

# basic_partitioned
query I rowsort basic_partitioned
SELECT *
Expand All @@ -78,20 +72,12 @@ FROM delta_scan('${DAT_PATH}/out/reader_tests/generated/multi_partitioned/delta'
----

query I rowsort multi_partitioned
SELECT *
SELECT letter, date, decode(data) as data, number
FROM parquet_scan('${DAT_PATH}/out/reader_tests/generated/multi_partitioned/expected/latest/**/*.parquet')
----

# multi_partitioned
query I rowsort multi_partitioned
SELECT *
FROM delta_scan('${DAT_PATH}/out/reader_tests/generated/multi_partitioned/delta')
----

query I rowsort multi_partitioned
SELECT *
FROM parquet_scan('${DAT_PATH}/out/reader_tests/generated/multi_partitioned/expected/latest/**/*.parquet')
----
# TODO: fix this
require notwindows

# multi_partitioned_2
query I rowsort multi_partitioned_2
Expand All @@ -104,6 +90,11 @@ SELECT *
FROM parquet_scan('${DAT_PATH}/out/reader_tests/generated/multi_partitioned_2/expected/latest/**/*.parquet')
----

### FAILING DAT TESTS

# TODO fix all of these
mode skip

# no_replay
query I rowsort no_replay
SELECT *
Expand Down
12 changes: 7 additions & 5 deletions test/sql/delta_kernel_rs/basic_partitioned.test
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ require delta

require-env DELTA_KERNEL_TESTS_PATH

# FIXME: this fails due some weird error
mode skip

statement error
query III
SELECT * FROM delta_scan('${DELTA_KERNEL_TESTS_PATH}/basic_partitioned')
----
Failed to read file "/Users/sam/Development/delta-kernel-testing/delta-kernel-rs/kernel/tests/data/basic_partitioned/letter=__HIVE_DEFAULT_PARTITION__
NULL 6 6.6
a 4 4.4
e 5 5.5
a 1 1.1
b 2 2.2
c 3 3.3
Loading