diff --git a/CMakeLists.txt b/CMakeLists.txt index 3fc459e..65c13ad 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -16,6 +16,7 @@ set(EXTENSION_SOURCES src/delta_functions.cpp src/delta_utils.cpp src/functions/delta_scan.cpp + src/functions/expression_functions.cpp src/storage/delta_catalog.cpp src/storage/delta_schema_entry.cpp src/storage/delta_table_entry.cpp @@ -149,7 +150,8 @@ ExternalProject_Add( # Build debug build BUILD_COMMAND ${CMAKE_COMMAND} -E env ${RUST_UNSET_ENV_VARS} ${RUST_ENV_VARS} cargo build - --package delta_kernel_ffi --workspace $<$:--release> --all-features ${RUST_PLATFORM_PARAM} + --package delta_kernel_ffi --workspace $<$:--release> + --all-features ${RUST_PLATFORM_PARAM} # Build DATs COMMAND ${CMAKE_COMMAND} -E env ${RUST_UNSET_ENV_VARS} ${RUST_ENV_VARS} cargo build @@ -176,13 +178,13 @@ set(CMAKE_OSX_DEPLOYMENT_TARGET add_compile_definitions(DEFINE_DEFAULT_ENGINE) # Link delta-kernal-rs to static lib -target_link_libraries( - ${EXTENSION_NAME} ${DELTA_KERNEL_LIBPATH} ${PLATFORM_LIBS}) +target_link_libraries(${EXTENSION_NAME} ${DELTA_KERNEL_LIBPATH} + ${PLATFORM_LIBS}) add_dependencies(${EXTENSION_NAME} delta_kernel) # Link delta-kernal-rs to dynamic lib -target_link_libraries( - ${LOADABLE_EXTENSION_NAME} ${DELTA_KERNEL_LIBPATH} ${PLATFORM_LIBS}) +target_link_libraries(${LOADABLE_EXTENSION_NAME} ${DELTA_KERNEL_LIBPATH} + ${PLATFORM_LIBS}) add_dependencies(${LOADABLE_EXTENSION_NAME} delta_kernel) install( diff --git a/src/delta_extension.cpp b/src/delta_extension.cpp index 0c21ade..50ce93d 100644 --- a/src/delta_extension.cpp +++ b/src/delta_extension.cpp @@ -44,11 +44,16 @@ class DeltaStorageExtension : public StorageExtension { }; static void LoadInternal(DatabaseInstance &instance) { - // Load functions + // Load Table functions for (const auto &function : DeltaFunctions::GetTableFunctions(instance)) { ExtensionUtil::RegisterFunction(instance, function); } + // Load Scalar functions + for (const auto &function : DeltaFunctions::GetScalarFunctions(instance)) { + ExtensionUtil::RegisterFunction(instance, function); + } + // Register the "single table" delta catalog (to ATTACH a single delta table) auto &config = DBConfig::GetConfig(instance); config.storage_extensions["delta"] = make_uniq(); diff --git a/src/delta_functions.cpp b/src/delta_functions.cpp index e79894b..922d0d2 100644 --- a/src/delta_functions.cpp +++ b/src/delta_functions.cpp @@ -2,8 +2,7 @@ #include "duckdb.hpp" #include "duckdb/main/extension_util.hpp" - -#include +#include "duckdb/parser/parsed_data/create_scalar_function_info.hpp" namespace duckdb { @@ -15,4 +14,12 @@ vector DeltaFunctions::GetTableFunctions(DatabaseInstance &ins return functions; } +vector DeltaFunctions::GetScalarFunctions(DatabaseInstance &instance) { + vector functions; + + functions.push_back(GetExpressionFunction(instance)); + + return functions; +} + }; // namespace duckdb diff --git a/src/delta_utils.cpp b/src/delta_utils.cpp index 35a3a3d..d47d99f 100644 --- a/src/delta_utils.cpp +++ b/src/delta_utils.cpp @@ -1,13 +1,318 @@ #include "delta_utils.hpp" +#include + #include "duckdb.hpp" #include "duckdb/main/extension_util.hpp" #include #include +#include "duckdb/parser/expression/conjunction_expression.hpp" +#include "duckdb/parser/expression/comparison_expression.hpp" +#include "duckdb/parser/expression/function_expression.hpp" +#include "duckdb/parser/expression/operator_expression.hpp" +#include "duckdb/common/types/decimal.hpp" namespace duckdb { +void ExpressionVisitor::VisitComparisonExpression(void *state, uintptr_t sibling_list_id, uintptr_t child_list_id) { + auto state_cast = static_cast(state); + + auto children = state_cast->TakeFieldList(child_list_id); + if (!children) { + return; + } + + auto &lhs = children->at(0); + auto &rhs = children->at(1); + unique_ptr expression = + make_uniq(ExpressionType::COMPARE_LESSTHAN, std::move(lhs), std::move(rhs)); + state_cast->AppendToList(sibling_list_id, std::move(expression)); +} + +unique_ptr>> +ExpressionVisitor::VisitKernelExpression(const ffi::Handle *expression) { + ExpressionVisitor state; + ffi::EngineExpressionVisitor visitor; + + visitor.data = &state; + visitor.make_field_list = (uintptr_t(*)(void *, uintptr_t)) & MakeFieldList; + + // Templated primitive functions + visitor.visit_literal_bool = VisitPrimitiveLiteralBool; + visitor.visit_literal_byte = VisitPrimitiveLiteralByte; + visitor.visit_literal_short = VisitPrimitiveLiteralShort; + visitor.visit_literal_int = VisitPrimitiveLiteralInt; + visitor.visit_literal_long = VisitPrimitiveLiteralLong; + visitor.visit_literal_float = VisitPrimitiveLiteralFloat; + visitor.visit_literal_double = VisitPrimitiveLiteralDouble; + + visitor.visit_literal_decimal = VisitDecimalLiteral; + + // Custom Implementations + visitor.visit_literal_timestamp = &VisitTimestampLiteral; + visitor.visit_literal_timestamp_ntz = &VisitTimestampNtzLiteral; + visitor.visit_literal_date = &VisitDateLiteral; + + visitor.visit_literal_string = &VisitStringLiteral; + + visitor.visit_literal_binary = &VisitBinaryLiteral; + visitor.visit_literal_null = &VisitNullLiteral; + visitor.visit_literal_array = &VisitArrayLiteral; + + visitor.visit_and = VisitVariadicExpression(); + visitor.visit_or = VisitVariadicExpression(); + + visitor.visit_lt = VisitBinaryExpression(); + visitor.visit_le = VisitBinaryExpression(); + visitor.visit_gt = VisitBinaryExpression(); + visitor.visit_ge = VisitBinaryExpression(); + + visitor.visit_eq = VisitBinaryExpression(); + visitor.visit_ne = VisitBinaryExpression(); + visitor.visit_distinct = VisitBinaryExpression(); + + visitor.visit_in = VisitVariadicExpression(); + visitor.visit_not_in = VisitVariadicExpression(); + + visitor.visit_add = VisitAdditionExpression; + visitor.visit_minus = VisitSubctractionExpression; + visitor.visit_multiply = VisitMultiplyExpression; + visitor.visit_divide = VisitDivideExpression; + + visitor.visit_column = &VisitColumnExpression; + visitor.visit_struct_expr = &VisitStructExpression; + + visitor.visit_literal_struct = &VisitStructLiteral; + + visitor.visit_not = &VisitNotExpression; + visitor.visit_is_null = &VisitIsNullExpression; + + uintptr_t result = ffi::visit_expression(expression, &visitor); + + if (state.error.HasError()) { + state.error.Throw(); + } + + return state.TakeFieldList(result); +} + +void ExpressionVisitor::VisitAdditionExpression(void *state, uintptr_t sibling_list_id, uintptr_t child_list_id) { + auto state_cast = static_cast(state); + auto children = state_cast->TakeFieldList(child_list_id); + if (!children) { + return; + } + unique_ptr expression = + make_uniq("+", std::move(*children), nullptr, nullptr, false, true); + state_cast->AppendToList(sibling_list_id, std::move(expression)); +} + +void ExpressionVisitor::VisitSubctractionExpression(void *state, uintptr_t sibling_list_id, uintptr_t child_list_id) { + auto state_cast = static_cast(state); + auto children = state_cast->TakeFieldList(child_list_id); + if (!children) { + return; + } + unique_ptr expression = + make_uniq("-", std::move(*children), nullptr, nullptr, false, true); + state_cast->AppendToList(sibling_list_id, std::move(expression)); +} + +void ExpressionVisitor::VisitDivideExpression(void *state, uintptr_t sibling_list_id, uintptr_t child_list_id) { + auto state_cast = static_cast(state); + auto children = state_cast->TakeFieldList(child_list_id); + if (!children) { + return; + } + unique_ptr expression = + make_uniq("/", std::move(*children), nullptr, nullptr, false, true); + state_cast->AppendToList(sibling_list_id, std::move(expression)); +} + +void ExpressionVisitor::VisitMultiplyExpression(void *state, uintptr_t sibling_list_id, uintptr_t child_list_id) { + auto state_cast = static_cast(state); + auto children = state_cast->TakeFieldList(child_list_id); + if (!children) { + return; + } + unique_ptr expression = + make_uniq("*", std::move(*children), nullptr, nullptr, false, true); + state_cast->AppendToList(sibling_list_id, std::move(expression)); +} + +void ExpressionVisitor::VisitPrimitiveLiteralBool(void *state, uintptr_t sibling_list_id, bool value) { + auto expression = make_uniq(Value::BOOLEAN(value)); + static_cast(state)->AppendToList(sibling_list_id, std::move(expression)); +} +void ExpressionVisitor::VisitPrimitiveLiteralByte(void *state, uintptr_t sibling_list_id, int8_t value) { + auto expression = make_uniq(Value::TINYINT(value)); + static_cast(state)->AppendToList(sibling_list_id, std::move(expression)); +} +void ExpressionVisitor::VisitPrimitiveLiteralShort(void *state, uintptr_t sibling_list_id, int16_t value) { + auto expression = make_uniq(Value::SMALLINT(value)); + static_cast(state)->AppendToList(sibling_list_id, std::move(expression)); +} +void ExpressionVisitor::VisitPrimitiveLiteralInt(void *state, uintptr_t sibling_list_id, int32_t value) { + auto expression = make_uniq(Value::INTEGER(value)); + static_cast(state)->AppendToList(sibling_list_id, std::move(expression)); +} +void ExpressionVisitor::VisitPrimitiveLiteralLong(void *state, uintptr_t sibling_list_id, int64_t value) { + auto expression = make_uniq(Value::BIGINT(value)); + static_cast(state)->AppendToList(sibling_list_id, std::move(expression)); +} +void ExpressionVisitor::VisitPrimitiveLiteralFloat(void *state, uintptr_t sibling_list_id, float value) { + auto expression = make_uniq(Value::FLOAT(value)); + static_cast(state)->AppendToList(sibling_list_id, std::move(expression)); +} +void ExpressionVisitor::VisitPrimitiveLiteralDouble(void *state, uintptr_t sibling_list_id, double value) { + auto expression = make_uniq(Value::DOUBLE(value)); + static_cast(state)->AppendToList(sibling_list_id, std::move(expression)); +} + +void ExpressionVisitor::VisitTimestampLiteral(void *state, uintptr_t sibling_list_id, int64_t value) { + auto expression = make_uniq(Value::TIMESTAMPTZ(static_cast(value))); + static_cast(state)->AppendToList(sibling_list_id, std::move(expression)); +} + +void ExpressionVisitor::VisitTimestampNtzLiteral(void *state, uintptr_t sibling_list_id, int64_t value) { + auto expression = make_uniq(Value::TIMESTAMP(static_cast(value))); + static_cast(state)->AppendToList(sibling_list_id, std::move(expression)); +} + +void ExpressionVisitor::VisitDateLiteral(void *state, uintptr_t sibling_list_id, int32_t value) { + auto expression = make_uniq(Value::DATE(static_cast(value))); + static_cast(state)->AppendToList(sibling_list_id, std::move(expression)); +} + +void ExpressionVisitor::VisitStringLiteral(void *state, uintptr_t sibling_list_id, ffi::KernelStringSlice value) { + auto expression = make_uniq(Value(string(value.ptr, value.len))); + static_cast(state)->AppendToList(sibling_list_id, std::move(expression)); +} +void ExpressionVisitor::VisitBinaryLiteral(void *state, uintptr_t sibling_list_id, const uint8_t *buffer, + uintptr_t len) { + auto expression = make_uniq(Value::BLOB(buffer, len)); + static_cast(state)->AppendToList(sibling_list_id, std::move(expression)); +} +void ExpressionVisitor::VisitNullLiteral(void *state, uintptr_t sibling_list_id) { + auto expression = make_uniq(Value()); + static_cast(state)->AppendToList(sibling_list_id, std::move(expression)); +} +void ExpressionVisitor::VisitArrayLiteral(void *state, uintptr_t sibling_list_id, uintptr_t child_id) { + auto state_cast = static_cast(state); + auto children = state_cast->TakeFieldList(child_id); + if (!children) { + return; + } + unique_ptr expression = make_uniq("list_value", std::move(*children)); + state_cast->AppendToList(sibling_list_id, std::move(expression)); +} + +void ExpressionVisitor::VisitStructLiteral(void *state, uintptr_t sibling_list_id, uintptr_t child_field_list_value, + uintptr_t child_value_list_id) { + auto state_cast = static_cast(state); + + auto children_keys = state_cast->TakeFieldList(child_field_list_value); + auto children_values = state_cast->TakeFieldList(child_value_list_id); + if (!children_values || !children_keys) { + return; + } + + if (children_values->size() != children_keys->size()) { + state_cast->error = + ErrorData("Size of Keys and Values vector do not match in ExpressionVisitor::VisitStructLiteral"); + return; + } + + for (idx_t i = 0; i < children_keys->size(); i++) { + (*children_values)[i]->alias = (*children_keys)[i]->ToString(); + } + + unique_ptr expression = make_uniq("struct_pack", std::move(*children_values)); + state_cast->AppendToList(sibling_list_id, std::move(expression)); +} + +void ExpressionVisitor::VisitNotExpression(void *state, uintptr_t sibling_list_id, uintptr_t child_list_id) { + auto state_cast = static_cast(state); + auto children = state_cast->TakeFieldList(child_list_id); + if (!children) { + return; + } + unique_ptr expression = + make_uniq("NOT", std::move(*children), nullptr, nullptr, false, true); + state_cast->AppendToList(sibling_list_id, std::move(expression)); +} + +void ExpressionVisitor::VisitIsNullExpression(void *state, uintptr_t sibling_list_id, uintptr_t child_list_id) { + auto state_cast = static_cast(state); + auto children = state_cast->TakeFieldList(child_list_id); + if (!children) { + return; + } + + children->push_back(make_uniq(Value())); + unique_ptr expression = + make_uniq("IS", std::move(*children), nullptr, nullptr, false, true); + state_cast->AppendToList(sibling_list_id, std::move(expression)); +} + +// FIXME: this is not 100% correct yet: value_ms is ignored +void ExpressionVisitor::VisitDecimalLiteral(void *state, uintptr_t sibling_list_id, uint64_t value_ms, + uint64_t value_ls, uint8_t precision, uint8_t scale) { + try { + if (precision >= Decimal::MAX_WIDTH_INT64 || value_ls > (uint64_t)NumericLimits::Maximum()) { + throw NotImplementedException("ExpressionVisitor::VisitDecimalLiteral HugeInt decimals"); + } + auto expression = make_uniq(Value::DECIMAL(42, 18, 10)); + static_cast(state)->AppendToList(sibling_list_id, std::move(expression)); + } catch (Exception &e) { + static_cast(state)->error = ErrorData(e); + } +} + +void ExpressionVisitor::VisitColumnExpression(void *state, uintptr_t sibling_list_id, ffi::KernelStringSlice name) { + auto expression = make_uniq(string(name.ptr, name.len)); + static_cast(state)->AppendToList(sibling_list_id, std::move(expression)); +} +void ExpressionVisitor::VisitStructExpression(void *state, uintptr_t sibling_list_id, uintptr_t child_list_id) { + static_cast(state)->AppendToList(sibling_list_id, + std::move(make_uniq(Value(42)))); +} + +uintptr_t ExpressionVisitor::MakeFieldList(ExpressionVisitor *state, uintptr_t capacity_hint) { + return state->MakeFieldListImpl(capacity_hint); +} +uintptr_t ExpressionVisitor::MakeFieldListImpl(uintptr_t capacity_hint) { + uintptr_t id = next_id++; + auto list = make_uniq(); + if (capacity_hint > 0) { + list->reserve(capacity_hint); + } + inflight_lists.emplace(id, std::move(list)); + return id; +} + +void ExpressionVisitor::AppendToList(uintptr_t id, unique_ptr child) { + auto it = inflight_lists.find(id); + if (it == inflight_lists.end()) { + error = ErrorData("ExpressionVisitor::AppendToList could not find " + Value::UBIGINT(id).ToString()); + return; + } + + it->second->emplace_back(std::move(child)); +} + +unique_ptr ExpressionVisitor::TakeFieldList(uintptr_t id) { + auto it = inflight_lists.find(id); + if (it == inflight_lists.end()) { + error = ErrorData("ExpressionVisitor::TakeFieldList could not find " + Value::UBIGINT(id).ToString()); + return nullptr; + } + auto rval = std::move(it->second); + inflight_lists.erase(it); + return rval; +} + unique_ptr SchemaVisitor::VisitSnapshotSchema(ffi::SharedSnapshot *snapshot) { SchemaVisitor state; ffi::EngineSchemaVisitor visitor; @@ -102,50 +407,48 @@ ffi::EngineError *DuckDBEngineError::AllocateError(ffi::KernelError etype, ffi:: } string DuckDBEngineError::KernelErrorEnumToString(ffi::KernelError err) { - const char *KERNEL_ERROR_ENUM_STRINGS[] = { - "UnknownError", - "FFIError", - "ArrowError", - "EngineDataTypeError", - "ExtractError", - "GenericError", - "IOErrorError", - "ParquetError", - "ObjectStoreError", - "ObjectStorePathError", - "ReqwestError", - "FileNotFoundError", - "MissingColumnError", - "UnexpectedColumnTypeError", - "MissingDataError", - "MissingVersionError", - "DeletionVectorError", - "InvalidUrlError", - "MalformedJsonError", - "MissingMetadataError", - "MissingProtocolError", - "InvalidProtocolError", - "MissingMetadataAndProtocolError", - "ParseError", - "JoinFailureError", - "Utf8Error", - "ParseIntError", - "InvalidColumnMappingModeError", - "InvalidTableLocationError", - "InvalidDecimalError", - "InvalidStructDataError", - "InternalError", - "InvalidExpression", - "InvalidLogPath", - "InvalidCommitInfo", - "FileAlreadyExists", - "MissingCommitInfo", - "UnsupportedError", - "ParseIntervalError", - "ChangeDataFeedUnsupported", - "ChangeDataFeedIncompatibleSchema", - "InvalidCheckpoint" - }; + const char *KERNEL_ERROR_ENUM_STRINGS[] = {"UnknownError", + "FFIError", + "ArrowError", + "EngineDataTypeError", + "ExtractError", + "GenericError", + "IOErrorError", + "ParquetError", + "ObjectStoreError", + "ObjectStorePathError", + "ReqwestError", + "FileNotFoundError", + "MissingColumnError", + "UnexpectedColumnTypeError", + "MissingDataError", + "MissingVersionError", + "DeletionVectorError", + "InvalidUrlError", + "MalformedJsonError", + "MissingMetadataError", + "MissingProtocolError", + "InvalidProtocolError", + "MissingMetadataAndProtocolError", + "ParseError", + "JoinFailureError", + "Utf8Error", + "ParseIntError", + "InvalidColumnMappingModeError", + "InvalidTableLocationError", + "InvalidDecimalError", + "InvalidStructDataError", + "InternalError", + "InvalidExpression", + "InvalidLogPath", + "InvalidCommitInfo", + "FileAlreadyExists", + "MissingCommitInfo", + "UnsupportedError", + "ParseIntervalError", + "ChangeDataFeedUnsupported", + "ChangeDataFeedIncompatibleSchema", + "InvalidCheckpoint"}; static_assert(sizeof(KERNEL_ERROR_ENUM_STRINGS) / sizeof(char *) - 1 == (int)ffi::KernelError::InvalidCheckpoint, "KernelErrorEnumStrings mismatched with kernel"); diff --git a/src/functions/delta_scan.cpp b/src/functions/delta_scan.cpp index 0018923..6836abb 100644 --- a/src/functions/delta_scan.cpp +++ b/src/functions/delta_scan.cpp @@ -47,8 +47,8 @@ string url_decode(string input) { } void DeltaSnapshot::VisitCallback(ffi::NullableCvoid engine_context, struct ffi::KernelStringSlice path, int64_t size, - const ffi::Stats *stats, const ffi::DvInfo *dv_info, - const struct ffi::CStringMap *partition_values) { + const ffi::Stats *stats, const ffi::DvInfo *dv_info, + const struct ffi::CStringMap *partition_values) { auto context = (DeltaSnapshot *)engine_context; auto path_string = context->GetPath(); StringUtil::RTrim(path_string, "/"); @@ -92,7 +92,7 @@ void DeltaSnapshot::VisitCallback(ffi::NullableCvoid engine_context, struct ffi: } void DeltaSnapshot::VisitData(void *engine_context, ffi::ExclusiveEngineData *engine_data, - const struct ffi::KernelBoolSlice selection_vec) { + const struct ffi::KernelBoolSlice selection_vec) { ffi::visit_scan_data(engine_data, selection_vec, engine_context, VisitCallback); } @@ -238,52 +238,52 @@ static ffi::EngineBuilder *CreateBuilder(ClientContext &context, const string &p // Here you would need to add the logic for setting the builder options for Azure // This is just a placeholder and will need to be replaced with the actual logic if (secret_type == "s3" || secret_type == "gcs" || secret_type == "r2") { - string key_id, secret, session_token, region, endpoint, url_style; - bool use_ssl = true; - secret_reader.TryGetSecretKey("key_id", key_id); - secret_reader.TryGetSecretKey("secret", secret); - secret_reader.TryGetSecretKey("session_token", session_token); - secret_reader.TryGetSecretKey("region", region); - secret_reader.TryGetSecretKey("endpoint", endpoint); - secret_reader.TryGetSecretKey("url_style", url_style); - secret_reader.TryGetSecretKey("use_ssl", use_ssl); - - if (key_id.empty() && secret.empty()) { - ffi::set_builder_option(builder, KernelUtils::ToDeltaString("skip_signature"), - KernelUtils::ToDeltaString("true")); - } - - if (!key_id.empty()) { - ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_access_key_id"), - KernelUtils::ToDeltaString(key_id)); - } - if (!secret.empty()) { - ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_secret_access_key"), - KernelUtils::ToDeltaString(secret)); - } - if (!session_token.empty()) { - ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_session_token"), - KernelUtils::ToDeltaString(session_token)); - } - if (!endpoint.empty() && endpoint != "s3.amazonaws.com") { - if (!StringUtil::StartsWith(endpoint, "https://") && !StringUtil::StartsWith(endpoint, "http://")) { - if (use_ssl) { - endpoint = "https://" + endpoint; - } else { - endpoint = "http://" + endpoint; - } - } - - if (StringUtil::StartsWith(endpoint, "http://")) { - ffi::set_builder_option(builder, KernelUtils::ToDeltaString("allow_http"), - KernelUtils::ToDeltaString("true")); - } - ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_endpoint"), - KernelUtils::ToDeltaString(endpoint)); - } else if (StringUtil::StartsWith(path, "gs://") || StringUtil::StartsWith(path, "gcs://")) { - ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_endpoint"), - KernelUtils::ToDeltaString("https://storage.googleapis.com")); - } + string key_id, secret, session_token, region, endpoint, url_style; + bool use_ssl = true; + secret_reader.TryGetSecretKey("key_id", key_id); + secret_reader.TryGetSecretKey("secret", secret); + secret_reader.TryGetSecretKey("session_token", session_token); + secret_reader.TryGetSecretKey("region", region); + secret_reader.TryGetSecretKey("endpoint", endpoint); + secret_reader.TryGetSecretKey("url_style", url_style); + secret_reader.TryGetSecretKey("use_ssl", use_ssl); + + if (key_id.empty() && secret.empty()) { + ffi::set_builder_option(builder, KernelUtils::ToDeltaString("skip_signature"), + KernelUtils::ToDeltaString("true")); + } + + if (!key_id.empty()) { + ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_access_key_id"), + KernelUtils::ToDeltaString(key_id)); + } + if (!secret.empty()) { + ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_secret_access_key"), + KernelUtils::ToDeltaString(secret)); + } + if (!session_token.empty()) { + ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_session_token"), + KernelUtils::ToDeltaString(session_token)); + } + if (!endpoint.empty() && endpoint != "s3.amazonaws.com") { + if (!StringUtil::StartsWith(endpoint, "https://") && !StringUtil::StartsWith(endpoint, "http://")) { + if (use_ssl) { + endpoint = "https://" + endpoint; + } else { + endpoint = "http://" + endpoint; + } + } + + if (StringUtil::StartsWith(endpoint, "http://")) { + ffi::set_builder_option(builder, KernelUtils::ToDeltaString("allow_http"), + KernelUtils::ToDeltaString("true")); + } + ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_endpoint"), + KernelUtils::ToDeltaString(endpoint)); + } else if (StringUtil::StartsWith(path, "gs://") || StringUtil::StartsWith(path, "gcs://")) { + ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_endpoint"), + KernelUtils::ToDeltaString("https://storage.googleapis.com")); + } ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_region"), KernelUtils::ToDeltaString(region)); @@ -415,7 +415,7 @@ string DeltaSnapshot::ToDeltaPath(const string &raw_path) { } void DeltaSnapshot::Bind(vector &return_types, vector &names) { - unique_lock lck(lock); + unique_lock lck(lock); if (have_bound) { names = this->names; @@ -478,9 +478,9 @@ string DeltaSnapshot::GetFileInternal(idx_t i) { } string DeltaSnapshot::GetFile(idx_t i) { - // TODO: profile this: we should be able to use atomics here to optimize - unique_lock lck(lock); - return GetFileInternal(i); + // TODO: profile this: we should be able to use atomics here to optimize + unique_lock lck(lock); + return GetFileInternal(i); } void DeltaSnapshot::InitializeSnapshot() { @@ -537,17 +537,17 @@ unique_ptr DeltaSnapshot::ComplexFilterPushdown(ClientContext &co filtered_list->names = names; // Copy over the snapshot, this avoids reparsing metadata - { - unique_lock lck(lock); - filtered_list->snapshot = snapshot; - } + { + unique_lock lck(lock); + filtered_list->snapshot = snapshot; + } auto &profiler = QueryProfiler::Get(context); // Note: this is potentially quite expensive: we are creating 2 scans of the snapshot and fully materializing both // file lists Therefore this is only done when profile is enabled. This is enable by default in debug mode or for // EXPLAIN ANALYZE queries - // TODO: check locking behaviour below + // TODO: check locking behaviour below if (profiler.IsEnabled()) { Value result; if (!context.TryGetCurrentSetting("delta_scan_explain_files_filtered", result)) { @@ -595,7 +595,7 @@ unique_ptr DeltaSnapshot::ComplexFilterPushdown(ClientContext &co } vector DeltaSnapshot::GetAllFiles() { - unique_lock lck(lock); + unique_lock lck(lock); idx_t i = resolved_files.size(); // TODO: this can probably be improved while (!GetFileInternal(i).empty()) { @@ -613,7 +613,7 @@ FileExpandResult DeltaSnapshot::GetExpandResult() { } idx_t DeltaSnapshot::GetTotalFileCount() { - unique_lock lck(lock); + unique_lock lck(lock); idx_t i = resolved_files.size(); while (!GetFileInternal(i).empty()) { i++; @@ -625,8 +625,8 @@ unique_ptr DeltaSnapshot::GetCardinality(ClientContext &context) // This also ensures all files are expanded auto total_file_count = DeltaSnapshot::GetTotalFileCount(); - // TODO: internalize above - unique_lock lck(lock); + // TODO: internalize above + unique_lock lck(lock); if (total_file_count == 0) { return make_uniq(0, 0); @@ -648,15 +648,14 @@ unique_ptr DeltaSnapshot::GetCardinality(ClientContext &context) return nullptr; } - idx_t DeltaSnapshot::GetVersion() { - unique_lock lck(lock); - return version; + unique_lock lck(lock); + return version; } DeltaFileMetaData &DeltaSnapshot::GetMetaData(idx_t index) const { - unique_lock lck(lock); - return *metadata[index]; + unique_lock lck(lock); + return *metadata[index]; } unique_ptr DeltaMultiFileReader::CreateInstance(const TableFunction &table_function) { @@ -737,7 +736,7 @@ void DeltaMultiFileReader::FinalizeBind(const MultiFileReaderOptions &file_optio // Get the metadata for this file D_ASSERT(global_state->file_list); const auto &snapshot = dynamic_cast(*global_state->file_list); - auto &file_metadata = snapshot.GetMetaData(reader_data.file_list_idx.GetIndex()); + auto &file_metadata = snapshot.GetMetaData(reader_data.file_list_idx.GetIndex()); if (!file_metadata.partition_map.empty()) { for (idx_t i = 0; i < global_column_ids.size(); i++) { @@ -997,7 +996,7 @@ void DeltaMultiFileReader::FinalizeChunk(ClientContext &context, const MultiFile // Get the metadata for this file const auto &snapshot = dynamic_cast(*global_state->file_list); - auto &metadata = snapshot.GetMetaData(reader_data.file_list_idx.GetIndex()); + auto &metadata = snapshot.GetMetaData(reader_data.file_list_idx.GetIndex()); if (metadata.selection_vector.ptr && chunk.size() != 0) { D_ASSERT(delta_global_state.file_row_number_idx != DConstants::INVALID_INDEX); @@ -1005,7 +1004,7 @@ void DeltaMultiFileReader::FinalizeChunk(ClientContext &context, const MultiFile // Construct the selection vector using the file_row_number column and the raw selection vector from delta idx_t select_count; - auto sv = DuckSVFromDeltaSV(metadata.selection_vector, file_row_number_column, chunk.size(), select_count); + auto sv = DuckSVFromDeltaSV(metadata.selection_vector, file_row_number_column, chunk.size(), select_count); chunk.Slice(sv, select_count); } diff --git a/src/include/delta_functions.hpp b/src/include/delta_functions.hpp index 4f819cb..0753551 100644 --- a/src/include/delta_functions.hpp +++ b/src/include/delta_functions.hpp @@ -15,8 +15,13 @@ namespace duckdb { class DeltaFunctions { public: static vector GetTableFunctions(DatabaseInstance &instance); + static vector GetScalarFunctions(DatabaseInstance &instance); private: + //! Table Functions static TableFunctionSet GetDeltaScanFunction(DatabaseInstance &instance); + + //! Scalar Functions + static ScalarFunctionSet GetExpressionFunction(DatabaseInstance &instance); }; } // namespace duckdb diff --git a/src/include/delta_kernel_ffi.hpp b/src/include/delta_kernel_ffi.hpp index ec9db0c..5bb6cae 100644 --- a/src/include/delta_kernel_ffi.hpp +++ b/src/include/delta_kernel_ffi.hpp @@ -9,99 +9,100 @@ namespace ffi { enum class KernelError { - UnknownError, - FFIError, + UnknownError, + FFIError, #if (defined(DEFINE_DEFAULT_ENGINE) || defined(DEFINE_SYNC_ENGINE)) - ArrowError, + ArrowError, #endif - EngineDataTypeError, - ExtractError, - GenericError, - IOErrorError, + EngineDataTypeError, + ExtractError, + GenericError, + IOErrorError, #if (defined(DEFINE_DEFAULT_ENGINE) || defined(DEFINE_SYNC_ENGINE)) - ParquetError, + ParquetError, #endif #if defined(DEFINE_DEFAULT_ENGINE) - ObjectStoreError, + ObjectStoreError, #endif #if defined(DEFINE_DEFAULT_ENGINE) - ObjectStorePathError, + ObjectStorePathError, #endif #if defined(DEFINE_DEFAULT_ENGINE) - ReqwestError, + ReqwestError, #endif - FileNotFoundError, - MissingColumnError, - UnexpectedColumnTypeError, - MissingDataError, - MissingVersionError, - DeletionVectorError, - InvalidUrlError, - MalformedJsonError, - MissingMetadataError, - MissingProtocolError, - InvalidProtocolError, - MissingMetadataAndProtocolError, - ParseError, - JoinFailureError, - Utf8Error, - ParseIntError, - InvalidColumnMappingModeError, - InvalidTableLocationError, - InvalidDecimalError, - InvalidStructDataError, - InternalError, - InvalidExpression, - InvalidLogPath, - InvalidCommitInfo, - FileAlreadyExists, - MissingCommitInfo, - UnsupportedError, - ParseIntervalError, - ChangeDataFeedUnsupported, - ChangeDataFeedIncompatibleSchema, - InvalidCheckpoint, + FileNotFoundError, + MissingColumnError, + UnexpectedColumnTypeError, + MissingDataError, + MissingVersionError, + DeletionVectorError, + InvalidUrlError, + MalformedJsonError, + MissingMetadataError, + MissingProtocolError, + InvalidProtocolError, + MissingMetadataAndProtocolError, + ParseError, + JoinFailureError, + Utf8Error, + ParseIntError, + InvalidColumnMappingModeError, + InvalidTableLocationError, + InvalidDecimalError, + InvalidStructDataError, + InternalError, + InvalidExpression, + InvalidLogPath, + InvalidCommitInfo, + FileAlreadyExists, + MissingCommitInfo, + UnsupportedError, + ParseIntervalError, + ChangeDataFeedUnsupported, + ChangeDataFeedIncompatibleSchema, + InvalidCheckpoint, }; /// Definitions of level verbosity. Verbose Levels are "greater than" less verbose ones. So /// Level::ERROR is the lowest, and Level::TRACE the highest. enum class Level { - ERROR = 0, - WARN = 1, - INFO = 2, - DEBUGGING = 3, - TRACE = 4, + ERROR = 0, + WARN = 1, + INFO = 2, + DEBUGGING = 3, + TRACE = 4, }; /// Format to use for log lines. These correspond to the formats from [`tracing_subscriber` /// formats](https://docs.rs/tracing-subscriber/latest/tracing_subscriber/fmt/format/index.html). enum class LogLineFormat { - /// The default formatter. This emits human-readable, single-line logs for each event that - /// occurs, with the context displayed before the formatted representation of the event. - /// Example: - /// `2022-02-15T18:40:14.289898Z INFO fmt: preparing to shave yaks number_of_yaks=3` - FULL, - /// A variant of the FULL formatter, optimized for short line lengths. Fields from the context - /// are appended to the fields of the formatted event, and targets are not shown. - /// Example: - /// `2022-02-17T19:51:05.809287Z INFO fmt_compact: preparing to shave yaks number_of_yaks=3` - COMPACT, - /// Emits excessively pretty, multi-line logs, optimized for human readability. This is - /// primarily intended to be used in local development and debugging, or for command-line - /// applications, where automated analysis and compact storage of logs is less of a priority - /// than readability and visual appeal. - /// Example: - /// ```ignore - /// 2022-02-15T18:44:24.535324Z INFO fmt_pretty: preparing to shave yaks, number_of_yaks: 3 - /// at examples/examples/fmt-pretty.rs:16 on main - /// ``` - PRETTY, - /// Outputs newline-delimited JSON logs. This is intended for production use with systems where - /// structured logs are consumed as JSON by analysis and viewing tools. The JSON output is not - /// optimized for human readability. - /// Example: - /// `{"timestamp":"2022-02-15T18:47:10.821315Z","level":"INFO","fields":{"message":"preparing to shave yaks","number_of_yaks":3},"target":"fmt_json"}` - JSON, + /// The default formatter. This emits human-readable, single-line logs for each event that + /// occurs, with the context displayed before the formatted representation of the event. + /// Example: + /// `2022-02-15T18:40:14.289898Z INFO fmt: preparing to shave yaks number_of_yaks=3` + FULL, + /// A variant of the FULL formatter, optimized for short line lengths. Fields from the context + /// are appended to the fields of the formatted event, and targets are not shown. + /// Example: + /// `2022-02-17T19:51:05.809287Z INFO fmt_compact: preparing to shave yaks number_of_yaks=3` + COMPACT, + /// Emits excessively pretty, multi-line logs, optimized for human readability. This is + /// primarily intended to be used in local development and debugging, or for command-line + /// applications, where automated analysis and compact storage of logs is less of a priority + /// than readability and visual appeal. + /// Example: + /// ```ignore + /// 2022-02-15T18:44:24.535324Z INFO fmt_pretty: preparing to shave yaks, number_of_yaks: 3 + /// at examples/examples/fmt-pretty.rs:16 on main + /// ``` + PRETTY, + /// Outputs newline-delimited JSON logs. This is intended for production use with systems where + /// structured logs are consumed as JSON by analysis and viewing tools. The JSON output is not + /// optimized for human readability. + /// Example: + /// `{"timestamp":"2022-02-15T18:47:10.821315Z","level":"INFO","fields":{"message":"preparing to shave + /// yaks","number_of_yaks":3},"target":"fmt_json"}` + JSON, }; struct CStringMap; @@ -143,15 +144,15 @@ struct StringSliceIterator; /// receives a `KernelBoolSlice` as a return value from a kernel method, engine is responsible /// to free that slice, by calling [super::free_bool_slice] exactly once. struct KernelBoolSlice { - bool *ptr; - uintptr_t len; + bool *ptr; + uintptr_t len; }; /// An owned slice of u64 row indexes allocated by the kernel. The engine is responsible for /// freeing this slice by calling [super::free_row_indexes] once. struct KernelRowIndexArray { - uint64_t *ptr; - uintptr_t len; + uint64_t *ptr; + uintptr_t len; }; /// Represents an object that crosses the FFI boundary and which outlives the scope that created @@ -186,8 +187,8 @@ struct KernelRowIndexArray { /// NOTE: Because the underlying type is always [`Sync`], multi-threaded external code can /// freely access shared (non-mutable) handles. /// -template -using Handle = H*; +template +using Handle = H *; /// An error that can be returned to the engine. Engines that wish to associate additional /// information can define and use any type that is [pointer @@ -196,31 +197,31 @@ using Handle = H*; /// of a [standard layout](https://en.cppreference.com/w/cpp/language/data_members#Standard-layout) /// class. struct EngineError { - KernelError etype; + KernelError etype; }; /// Semantics: Kernel will always immediately return the leaked engine error to the engine (if it /// allocated one at all), and engine is responsible for freeing it. -template +template struct ExternResult { - enum class Tag { - Ok, - Err, - }; - - struct Ok_Body { - T _0; - }; - - struct Err_Body { - EngineError *_0; - }; - - Tag tag; - union { - Ok_Body ok; - Err_Body err; - }; + enum class Tag { + Ok, + Err, + }; + + struct Ok_Body { + T _0; + }; + + struct Err_Body { + EngineError *_0; + }; + + Tag tag; + union { + Ok_Body ok; + Err_Body err; + }; }; /// A non-owned slice of a UTF8 string, intended for arg-passing between kernel and engine. The @@ -246,17 +247,17 @@ struct ExternResult { /// Meanwhile, the callee must assume that the slice is only valid until the function returns, and /// must not retain any references to the slice or its data that might outlive the function call. struct KernelStringSlice { - const char *ptr; - uintptr_t len; + const char *ptr; + uintptr_t len; }; -using AllocateErrorFn = EngineError*(*)(KernelError etype, KernelStringSlice msg); +using AllocateErrorFn = EngineError *(*)(KernelError etype, KernelStringSlice msg); -using NullableCvoid = void*; +using NullableCvoid = void *; /// Allow engines to allocate strings of their own type. the contract of calling a passed allocate /// function is that `kernel_str` is _only_ valid until the return from this function -using AllocateStringFn = NullableCvoid(*)(KernelStringSlice kernel_str); +using AllocateStringFn = NullableCvoid (*)(KernelStringSlice kernel_str); /// ABI-compatible struct for ArrowArray from C Data Interface /// See @@ -269,16 +270,16 @@ using AllocateStringFn = NullableCvoid(*)(KernelStringSlice kernel_str); /// } /// ``` struct FFI_ArrowArray { - int64_t length; - int64_t null_count; - int64_t offset; - int64_t n_buffers; - int64_t n_children; - const void **buffers; - FFI_ArrowArray **children; - FFI_ArrowArray *dictionary; - void (*release)(FFI_ArrowArray *arg1); - void *private_data; + int64_t length; + int64_t null_count; + int64_t offset; + int64_t n_buffers; + int64_t n_children; + const void **buffers; + FFI_ArrowArray **children; + FFI_ArrowArray *dictionary; + void (*release)(FFI_ArrowArray *arg1); + void *private_data; }; /// ABI-compatible struct for `ArrowSchema` from C Data Interface @@ -293,16 +294,16 @@ struct FFI_ArrowArray { /// ``` /// struct FFI_ArrowSchema { - const char *format; - const char *name; - const char *metadata; - /// Refer to [Arrow Flags](https://arrow.apache.org/docs/format/CDataInterface.html#c.ArrowSchema.flags) - int64_t flags; - int64_t n_children; - FFI_ArrowSchema **children; - FFI_ArrowSchema *dictionary; - void (*release)(FFI_ArrowSchema *arg1); - void *private_data; + const char *format; + const char *name; + const char *metadata; + /// Refer to [Arrow Flags](https://arrow.apache.org/docs/format/CDataInterface.html#c.ArrowSchema.flags) + int64_t flags; + int64_t n_children; + FFI_ArrowSchema **children; + FFI_ArrowSchema *dictionary; + void (*release)(FFI_ArrowSchema *arg1); + void *private_data; }; #if defined(DEFINE_DEFAULT_ENGINE) @@ -310,35 +311,35 @@ struct FFI_ArrowSchema { /// Interface](https://arrow.apache.org/docs/format/CDataInterface.html). This includes the data and /// the schema. struct ArrowFFIData { - FFI_ArrowArray array; - FFI_ArrowSchema schema; + FFI_ArrowArray array; + FFI_ArrowSchema schema; }; #endif struct FileMeta { - KernelStringSlice path; - int64_t last_modified; - uintptr_t size; + KernelStringSlice path; + int64_t last_modified; + uintptr_t size; }; /// Model iterators. This allows an engine to specify iteration however it likes, and we simply wrap /// the engine functions. The engine retains ownership of the iterator. struct EngineIterator { - void *data; - /// A function that should advance the iterator and return the next time from the data - /// If the iterator is complete, it should return null. It should be safe to - /// call `get_next()` multiple times if it returns null. - const void *(*get_next)(void *data); + void *data; + /// A function that should advance the iterator and return the next time from the data + /// If the iterator is complete, it should return null. It should be safe to + /// call `get_next()` multiple times if it returns null. + const void *(*get_next)(void *data); }; -template -using VisitLiteralFn = void(*)(void *data, uintptr_t sibling_list_id, T value); +template +using VisitLiteralFn = void (*)(void *data, uintptr_t sibling_list_id, T value); -using VisitVariadicFn = void(*)(void *data, uintptr_t sibling_list_id, uintptr_t child_list_id); +using VisitVariadicFn = void (*)(void *data, uintptr_t sibling_list_id, uintptr_t child_list_id); -using VisitUnaryFn = void(*)(void *data, uintptr_t sibling_list_id, uintptr_t child_list_id); +using VisitUnaryFn = void (*)(void *data, uintptr_t sibling_list_id, uintptr_t child_list_id); -using VisitBinaryOpFn = void(*)(void *data, uintptr_t sibling_list_id, uintptr_t child_list_id); +using VisitBinaryOpFn = void (*)(void *data, uintptr_t sibling_list_id, uintptr_t child_list_id); /// The [`EngineExpressionVisitor`] defines a visitor system to allow engines to build their own /// representation of a kernel expression. @@ -371,153 +372,144 @@ using VisitBinaryOpFn = void(*)(void *data, uintptr_t sibling_list_id, uintptr_t /// visitor. Note that struct literals are currently in flux, and may change significantly. Here is the relevant /// issue: https://github.com/delta-io/delta-kernel-rs/issues/412 struct EngineExpressionVisitor { - /// An opaque engine state pointer - void *data; - /// Creates a new expression list, optionally reserving capacity up front - uintptr_t (*make_field_list)(void *data, uintptr_t reserve); - /// Visit a 32bit `integer` belonging to the list identified by `sibling_list_id`. - VisitLiteralFn visit_literal_int; - /// Visit a 64bit `long` belonging to the list identified by `sibling_list_id`. - VisitLiteralFn visit_literal_long; - /// Visit a 16bit `short` belonging to the list identified by `sibling_list_id`. - VisitLiteralFn visit_literal_short; - /// Visit an 8bit `byte` belonging to the list identified by `sibling_list_id`. - VisitLiteralFn visit_literal_byte; - /// Visit a 32bit `float` belonging to the list identified by `sibling_list_id`. - VisitLiteralFn visit_literal_float; - /// Visit a 64bit `double` belonging to the list identified by `sibling_list_id`. - VisitLiteralFn visit_literal_double; - /// Visit a `string` belonging to the list identified by `sibling_list_id`. - VisitLiteralFn visit_literal_string; - /// Visit a `boolean` belonging to the list identified by `sibling_list_id`. - VisitLiteralFn visit_literal_bool; - /// Visit a 64bit timestamp belonging to the list identified by `sibling_list_id`. - /// The timestamp is microsecond precision and adjusted to UTC. - VisitLiteralFn visit_literal_timestamp; - /// Visit a 64bit timestamp belonging to the list identified by `sibling_list_id`. - /// The timestamp is microsecond precision with no timezone. - VisitLiteralFn visit_literal_timestamp_ntz; - /// Visit a 32bit intger `date` representing days since UNIX epoch 1970-01-01. The `date` belongs - /// to the list identified by `sibling_list_id`. - VisitLiteralFn visit_literal_date; - /// Visit binary data at the `buffer` with length `len` belonging to the list identified by - /// `sibling_list_id`. - void (*visit_literal_binary)(void *data, - uintptr_t sibling_list_id, - const uint8_t *buffer, - uintptr_t len); - /// Visit a 128bit `decimal` value with the given precision and scale. The 128bit integer - /// is split into the most significant 64 bits in `value_ms`, and the least significant 64 - /// bits in `value_ls`. The `decimal` belongs to the list identified by `sibling_list_id`. - void (*visit_literal_decimal)(void *data, - uintptr_t sibling_list_id, - uint64_t value_ms, - uint64_t value_ls, - uint8_t precision, - uint8_t scale); - /// Visit a struct literal belonging to the list identified by `sibling_list_id`. - /// The field names of the struct are in a list identified by `child_field_list_id`. - /// The values of the struct are in a list identified by `child_value_list_id`. - void (*visit_literal_struct)(void *data, - uintptr_t sibling_list_id, - uintptr_t child_field_list_id, - uintptr_t child_value_list_id); - /// Visit an array literal belonging to the list identified by `sibling_list_id`. - /// The values of the array are in a list identified by `child_list_id`. - void (*visit_literal_array)(void *data, uintptr_t sibling_list_id, uintptr_t child_list_id); - /// Visits a null value belonging to the list identified by `sibling_list_id. - void (*visit_literal_null)(void *data, uintptr_t sibling_list_id); - /// Visits an `and` expression belonging to the list identified by `sibling_list_id`. - /// The sub-expressions of the array are in a list identified by `child_list_id` - VisitVariadicFn visit_and; - /// Visits an `or` expression belonging to the list identified by `sibling_list_id`. - /// The sub-expressions of the array are in a list identified by `child_list_id` - VisitVariadicFn visit_or; - /// Visits a `not` expression belonging to the list identified by `sibling_list_id`. - /// The sub-expression will be in a _one_ item list identified by `child_list_id` - VisitUnaryFn visit_not; - /// Visits a `is_null` expression belonging to the list identified by `sibling_list_id`. - /// The sub-expression will be in a _one_ item list identified by `child_list_id` - VisitUnaryFn visit_is_null; - /// Visits the `LessThan` binary operator belonging to the list identified by `sibling_list_id`. - /// The operands will be in a _two_ item list identified by `child_list_id` - VisitBinaryOpFn visit_lt; - /// Visits the `LessThanOrEqual` binary operator belonging to the list identified by `sibling_list_id`. - /// The operands will be in a _two_ item list identified by `child_list_id` - VisitBinaryOpFn visit_le; - /// Visits the `GreaterThan` binary operator belonging to the list identified by `sibling_list_id`. - /// The operands will be in a _two_ item list identified by `child_list_id` - VisitBinaryOpFn visit_gt; - /// Visits the `GreaterThanOrEqual` binary operator belonging to the list identified by `sibling_list_id`. - /// The operands will be in a _two_ item list identified by `child_list_id` - VisitBinaryOpFn visit_ge; - /// Visits the `Equal` binary operator belonging to the list identified by `sibling_list_id`. - /// The operands will be in a _two_ item list identified by `child_list_id` - VisitBinaryOpFn visit_eq; - /// Visits the `NotEqual` binary operator belonging to the list identified by `sibling_list_id`. - /// The operands will be in a _two_ item list identified by `child_list_id` - VisitBinaryOpFn visit_ne; - /// Visits the `Distinct` binary operator belonging to the list identified by `sibling_list_id`. - /// The operands will be in a _two_ item list identified by `child_list_id` - VisitBinaryOpFn visit_distinct; - /// Visits the `In` binary operator belonging to the list identified by `sibling_list_id`. - /// The operands will be in a _two_ item list identified by `child_list_id` - VisitBinaryOpFn visit_in; - /// Visits the `NotIn` binary operator belonging to the list identified by `sibling_list_id`. - /// The operands will be in a _two_ item list identified by `child_list_id` - VisitBinaryOpFn visit_not_in; - /// Visits the `Add` binary operator belonging to the list identified by `sibling_list_id`. - /// The operands will be in a _two_ item list identified by `child_list_id` - VisitBinaryOpFn visit_add; - /// Visits the `Minus` binary operator belonging to the list identified by `sibling_list_id`. - /// The operands will be in a _two_ item list identified by `child_list_id` - VisitBinaryOpFn visit_minus; - /// Visits the `Multiply` binary operator belonging to the list identified by `sibling_list_id`. - /// The operands will be in a _two_ item list identified by `child_list_id` - VisitBinaryOpFn visit_multiply; - /// Visits the `Divide` binary operator belonging to the list identified by `sibling_list_id`. - /// The operands will be in a _two_ item list identified by `child_list_id` - VisitBinaryOpFn visit_divide; - /// Visits the `column` belonging to the list identified by `sibling_list_id`. - void (*visit_column)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); - /// Visits a `StructExpression` belonging to the list identified by `sibling_list_id`. - /// The sub-expressions of the `StructExpression` are in a list identified by `child_list_id` - void (*visit_struct_expr)(void *data, uintptr_t sibling_list_id, uintptr_t child_list_id); + /// An opaque engine state pointer + void *data; + /// Creates a new expression list, optionally reserving capacity up front + uintptr_t (*make_field_list)(void *data, uintptr_t reserve); + /// Visit a 32bit `integer` belonging to the list identified by `sibling_list_id`. + VisitLiteralFn visit_literal_int; + /// Visit a 64bit `long` belonging to the list identified by `sibling_list_id`. + VisitLiteralFn visit_literal_long; + /// Visit a 16bit `short` belonging to the list identified by `sibling_list_id`. + VisitLiteralFn visit_literal_short; + /// Visit an 8bit `byte` belonging to the list identified by `sibling_list_id`. + VisitLiteralFn visit_literal_byte; + /// Visit a 32bit `float` belonging to the list identified by `sibling_list_id`. + VisitLiteralFn visit_literal_float; + /// Visit a 64bit `double` belonging to the list identified by `sibling_list_id`. + VisitLiteralFn visit_literal_double; + /// Visit a `string` belonging to the list identified by `sibling_list_id`. + VisitLiteralFn visit_literal_string; + /// Visit a `boolean` belonging to the list identified by `sibling_list_id`. + VisitLiteralFn visit_literal_bool; + /// Visit a 64bit timestamp belonging to the list identified by `sibling_list_id`. + /// The timestamp is microsecond precision and adjusted to UTC. + VisitLiteralFn visit_literal_timestamp; + /// Visit a 64bit timestamp belonging to the list identified by `sibling_list_id`. + /// The timestamp is microsecond precision with no timezone. + VisitLiteralFn visit_literal_timestamp_ntz; + /// Visit a 32bit intger `date` representing days since UNIX epoch 1970-01-01. The `date` belongs + /// to the list identified by `sibling_list_id`. + VisitLiteralFn visit_literal_date; + /// Visit binary data at the `buffer` with length `len` belonging to the list identified by + /// `sibling_list_id`. + void (*visit_literal_binary)(void *data, uintptr_t sibling_list_id, const uint8_t *buffer, uintptr_t len); + /// Visit a 128bit `decimal` value with the given precision and scale. The 128bit integer + /// is split into the most significant 64 bits in `value_ms`, and the least significant 64 + /// bits in `value_ls`. The `decimal` belongs to the list identified by `sibling_list_id`. + void (*visit_literal_decimal)(void *data, uintptr_t sibling_list_id, uint64_t value_ms, uint64_t value_ls, + uint8_t precision, uint8_t scale); + /// Visit a struct literal belonging to the list identified by `sibling_list_id`. + /// The field names of the struct are in a list identified by `child_field_list_id`. + /// The values of the struct are in a list identified by `child_value_list_id`. + void (*visit_literal_struct)(void *data, uintptr_t sibling_list_id, uintptr_t child_field_list_id, + uintptr_t child_value_list_id); + /// Visit an array literal belonging to the list identified by `sibling_list_id`. + /// The values of the array are in a list identified by `child_list_id`. + void (*visit_literal_array)(void *data, uintptr_t sibling_list_id, uintptr_t child_list_id); + /// Visits a null value belonging to the list identified by `sibling_list_id. + void (*visit_literal_null)(void *data, uintptr_t sibling_list_id); + /// Visits an `and` expression belonging to the list identified by `sibling_list_id`. + /// The sub-expressions of the array are in a list identified by `child_list_id` + VisitVariadicFn visit_and; + /// Visits an `or` expression belonging to the list identified by `sibling_list_id`. + /// The sub-expressions of the array are in a list identified by `child_list_id` + VisitVariadicFn visit_or; + /// Visits a `not` expression belonging to the list identified by `sibling_list_id`. + /// The sub-expression will be in a _one_ item list identified by `child_list_id` + VisitUnaryFn visit_not; + /// Visits a `is_null` expression belonging to the list identified by `sibling_list_id`. + /// The sub-expression will be in a _one_ item list identified by `child_list_id` + VisitUnaryFn visit_is_null; + /// Visits the `LessThan` binary operator belonging to the list identified by `sibling_list_id`. + /// The operands will be in a _two_ item list identified by `child_list_id` + VisitBinaryOpFn visit_lt; + /// Visits the `LessThanOrEqual` binary operator belonging to the list identified by `sibling_list_id`. + /// The operands will be in a _two_ item list identified by `child_list_id` + VisitBinaryOpFn visit_le; + /// Visits the `GreaterThan` binary operator belonging to the list identified by `sibling_list_id`. + /// The operands will be in a _two_ item list identified by `child_list_id` + VisitBinaryOpFn visit_gt; + /// Visits the `GreaterThanOrEqual` binary operator belonging to the list identified by `sibling_list_id`. + /// The operands will be in a _two_ item list identified by `child_list_id` + VisitBinaryOpFn visit_ge; + /// Visits the `Equal` binary operator belonging to the list identified by `sibling_list_id`. + /// The operands will be in a _two_ item list identified by `child_list_id` + VisitBinaryOpFn visit_eq; + /// Visits the `NotEqual` binary operator belonging to the list identified by `sibling_list_id`. + /// The operands will be in a _two_ item list identified by `child_list_id` + VisitBinaryOpFn visit_ne; + /// Visits the `Distinct` binary operator belonging to the list identified by `sibling_list_id`. + /// The operands will be in a _two_ item list identified by `child_list_id` + VisitBinaryOpFn visit_distinct; + /// Visits the `In` binary operator belonging to the list identified by `sibling_list_id`. + /// The operands will be in a _two_ item list identified by `child_list_id` + VisitBinaryOpFn visit_in; + /// Visits the `NotIn` binary operator belonging to the list identified by `sibling_list_id`. + /// The operands will be in a _two_ item list identified by `child_list_id` + VisitBinaryOpFn visit_not_in; + /// Visits the `Add` binary operator belonging to the list identified by `sibling_list_id`. + /// The operands will be in a _two_ item list identified by `child_list_id` + VisitBinaryOpFn visit_add; + /// Visits the `Minus` binary operator belonging to the list identified by `sibling_list_id`. + /// The operands will be in a _two_ item list identified by `child_list_id` + VisitBinaryOpFn visit_minus; + /// Visits the `Multiply` binary operator belonging to the list identified by `sibling_list_id`. + /// The operands will be in a _two_ item list identified by `child_list_id` + VisitBinaryOpFn visit_multiply; + /// Visits the `Divide` binary operator belonging to the list identified by `sibling_list_id`. + /// The operands will be in a _two_ item list identified by `child_list_id` + VisitBinaryOpFn visit_divide; + /// Visits the `column` belonging to the list identified by `sibling_list_id`. + void (*visit_column)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); + /// Visits a `StructExpression` belonging to the list identified by `sibling_list_id`. + /// The sub-expressions of the `StructExpression` are in a list identified by `child_list_id` + void (*visit_struct_expr)(void *data, uintptr_t sibling_list_id, uintptr_t child_list_id); }; // This trickery is from https://github.com/mozilla/cbindgen/issues/402#issuecomment-578680163 struct im_an_unused_struct_that_tricks_msvc_into_compilation { - ExternResult field; - ExternResult field2; - ExternResult field3; - ExternResult> field4; - ExternResult> field5; - ExternResult field6; - ExternResult field7; - ExternResult> field8; - ExternResult> field9; - ExternResult> field10; - ExternResult field11; + ExternResult field; + ExternResult field2; + ExternResult field3; + ExternResult> field4; + ExternResult> field5; + ExternResult field6; + ExternResult field7; + ExternResult> field8; + ExternResult> field9; + ExternResult> field10; + ExternResult field11; }; /// An `Event` can generally be thought of a "log message". It contains all the relevant bits such /// that an engine can generate a log message in its format struct Event { - /// The log message associated with the event - KernelStringSlice message; - /// Level that the event was emitted at - Level level; - /// A string that specifies in what part of the system the event occurred - KernelStringSlice target; - /// source file line number where the event occurred, or 0 (zero) if unknown - uint32_t line; - /// file where the event occurred. If unknown the slice `ptr` will be null and the len will be 0 - KernelStringSlice file; + /// The log message associated with the event + KernelStringSlice message; + /// Level that the event was emitted at + Level level; + /// A string that specifies in what part of the system the event occurred + KernelStringSlice target; + /// source file line number where the event occurred, or 0 (zero) if unknown + uint32_t line; + /// file where the event occurred. If unknown the slice `ptr` will be null and the len will be 0 + KernelStringSlice file; }; -using TracingEventFn = void(*)(Event event); +using TracingEventFn = void (*)(Event event); -using TracingLogLineFn = void(*)(KernelStringSlice line); +using TracingLogLineFn = void (*)(KernelStringSlice line); /// A predicate that can be used to skip data when scanning. /// @@ -530,25 +522,21 @@ using TracingLogLineFn = void(*)(KernelStringSlice line); /// kernel each retain ownership of their respective objects, with no need to coordinate memory /// lifetimes with the other. struct EnginePredicate { - void *predicate; - uintptr_t (*visitor)(void *predicate, KernelExpressionVisitorState *state); + void *predicate; + uintptr_t (*visitor)(void *predicate, KernelExpressionVisitorState *state); }; /// Give engines an easy way to consume stats struct Stats { - /// For any file where the deletion vector is not present (see [`DvInfo::has_vector`]), the - /// `num_records` statistic must be present and accurate, and must equal the number of records - /// in the data file. In the presence of Deletion Vectors the statistics may be somewhat - /// outdated, i.e. not reflecting deleted rows yet. - uint64_t num_records; + /// For any file where the deletion vector is not present (see [`DvInfo::has_vector`]), the + /// `num_records` statistic must be present and accurate, and must equal the number of records + /// in the data file. In the presence of Deletion Vectors the statistics may be somewhat + /// outdated, i.e. not reflecting deleted rows yet. + uint64_t num_records; }; -using CScanCallback = void(*)(NullableCvoid engine_context, - KernelStringSlice path, - int64_t size, - const Stats *stats, - const DvInfo *dv_info, - const CStringMap *partition_map); +using CScanCallback = void (*)(NullableCvoid engine_context, KernelStringSlice path, int64_t size, const Stats *stats, + const DvInfo *dv_info, const CStringMap *partition_map); /// The `EngineSchemaVisitor` defines a visitor system to allow engines to build their own /// representation of a schema from a particular schema within kernel. @@ -576,61 +564,49 @@ using CScanCallback = void(*)(NullableCvoid engine_context, /// that element's (already-visited) children. /// 4. The [`visit_schema`] method returns the id of the list of top-level columns struct EngineSchemaVisitor { - /// opaque state pointer - void *data; - /// Creates a new field list, optionally reserving capacity up front - uintptr_t (*make_field_list)(void *data, uintptr_t reserve); - /// Indicate that the schema contains a `Struct` type. The top level of a Schema is always a - /// `Struct`. The fields of the `Struct` are in the list identified by `child_list_id`. - void (*visit_struct)(void *data, - uintptr_t sibling_list_id, - KernelStringSlice name, - uintptr_t child_list_id); - /// Indicate that the schema contains an Array type. `child_list_id` will be a _one_ item list - /// with the array's element type - void (*visit_array)(void *data, - uintptr_t sibling_list_id, - KernelStringSlice name, - bool contains_null, - uintptr_t child_list_id); - /// Indicate that the schema contains an Map type. `child_list_id` will be a _two_ item list - /// where the first element is the map's key type and the second element is the - /// map's value type - void (*visit_map)(void *data, - uintptr_t sibling_list_id, - KernelStringSlice name, - bool value_contains_null, - uintptr_t child_list_id); - /// visit a `decimal` with the specified `precision` and `scale` - void (*visit_decimal)(void *data, - uintptr_t sibling_list_id, - KernelStringSlice name, - uint8_t precision, - uint8_t scale); - /// Visit a `string` belonging to the list identified by `sibling_list_id`. - void (*visit_string)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); - /// Visit a `long` belonging to the list identified by `sibling_list_id`. - void (*visit_long)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); - /// Visit an `integer` belonging to the list identified by `sibling_list_id`. - void (*visit_integer)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); - /// Visit a `short` belonging to the list identified by `sibling_list_id`. - void (*visit_short)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); - /// Visit a `byte` belonging to the list identified by `sibling_list_id`. - void (*visit_byte)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); - /// Visit a `float` belonging to the list identified by `sibling_list_id`. - void (*visit_float)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); - /// Visit a `double` belonging to the list identified by `sibling_list_id`. - void (*visit_double)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); - /// Visit a `boolean` belonging to the list identified by `sibling_list_id`. - void (*visit_boolean)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); - /// Visit `binary` belonging to the list identified by `sibling_list_id`. - void (*visit_binary)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); - /// Visit a `date` belonging to the list identified by `sibling_list_id`. - void (*visit_date)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); - /// Visit a `timestamp` belonging to the list identified by `sibling_list_id`. - void (*visit_timestamp)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); - /// Visit a `timestamp` with no timezone belonging to the list identified by `sibling_list_id`. - void (*visit_timestamp_ntz)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); + /// opaque state pointer + void *data; + /// Creates a new field list, optionally reserving capacity up front + uintptr_t (*make_field_list)(void *data, uintptr_t reserve); + /// Indicate that the schema contains a `Struct` type. The top level of a Schema is always a + /// `Struct`. The fields of the `Struct` are in the list identified by `child_list_id`. + void (*visit_struct)(void *data, uintptr_t sibling_list_id, KernelStringSlice name, uintptr_t child_list_id); + /// Indicate that the schema contains an Array type. `child_list_id` will be a _one_ item list + /// with the array's element type + void (*visit_array)(void *data, uintptr_t sibling_list_id, KernelStringSlice name, bool contains_null, + uintptr_t child_list_id); + /// Indicate that the schema contains an Map type. `child_list_id` will be a _two_ item list + /// where the first element is the map's key type and the second element is the + /// map's value type + void (*visit_map)(void *data, uintptr_t sibling_list_id, KernelStringSlice name, bool value_contains_null, + uintptr_t child_list_id); + /// visit a `decimal` with the specified `precision` and `scale` + void (*visit_decimal)(void *data, uintptr_t sibling_list_id, KernelStringSlice name, uint8_t precision, + uint8_t scale); + /// Visit a `string` belonging to the list identified by `sibling_list_id`. + void (*visit_string)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); + /// Visit a `long` belonging to the list identified by `sibling_list_id`. + void (*visit_long)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); + /// Visit an `integer` belonging to the list identified by `sibling_list_id`. + void (*visit_integer)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); + /// Visit a `short` belonging to the list identified by `sibling_list_id`. + void (*visit_short)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); + /// Visit a `byte` belonging to the list identified by `sibling_list_id`. + void (*visit_byte)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); + /// Visit a `float` belonging to the list identified by `sibling_list_id`. + void (*visit_float)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); + /// Visit a `double` belonging to the list identified by `sibling_list_id`. + void (*visit_double)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); + /// Visit a `boolean` belonging to the list identified by `sibling_list_id`. + void (*visit_boolean)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); + /// Visit `binary` belonging to the list identified by `sibling_list_id`. + void (*visit_binary)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); + /// Visit a `date` belonging to the list identified by `sibling_list_id`. + void (*visit_date)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); + /// Visit a `timestamp` belonging to the list identified by `sibling_list_id`. + void (*visit_timestamp)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); + /// Visit a `timestamp` with no timezone belonging to the list identified by `sibling_list_id`. + void (*visit_timestamp_ntz)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); }; extern "C" { @@ -659,8 +635,7 @@ void free_engine_data(Handle engine_data); /// /// # Safety /// Caller is responsible for passing a valid path pointer. -ExternResult get_engine_builder(KernelStringSlice path, - AllocateErrorFn allocate_error); +ExternResult get_engine_builder(KernelStringSlice path, AllocateErrorFn allocate_error); #endif #if defined(DEFINE_DEFAULT_ENGINE) @@ -687,8 +662,7 @@ ExternResult> builder_build(EngineBuilder *builder); /// # Safety /// /// Caller is responsible for passing a valid path pointer. -ExternResult> get_default_engine(KernelStringSlice path, - AllocateErrorFn allocate_error); +ExternResult> get_default_engine(KernelStringSlice path, AllocateErrorFn allocate_error); #endif #if defined(DEFINE_SYNC_ENGINE) @@ -708,8 +682,7 @@ void free_engine(Handle engine); /// # Safety /// /// Caller is responsible for passing valid handles and path pointer. -ExternResult> snapshot(KernelStringSlice path, - Handle engine); +ExternResult> snapshot(KernelStringSlice path, Handle engine); /// # Safety /// @@ -735,8 +708,7 @@ NullableCvoid snapshot_table_root(Handle snapshot, AllocateStrin /// /// The iterator must be valid (returned by [kernel_scan_data_init]) and not yet freed by /// [kernel_scan_data_free]. The visitor function pointer must be non-null. -bool string_slice_next(Handle data, - NullableCvoid engine_context, +bool string_slice_next(Handle data, NullableCvoid engine_context, void (*engine_visitor)(NullableCvoid engine_context, KernelStringSlice slice)); /// # Safety @@ -767,8 +739,7 @@ void *get_raw_engine_data(Handle data); /// # Safety /// data_handle must be a valid ExclusiveEngineData as read by the /// [`delta_kernel::engine::default::DefaultEngine`] obtained from `get_default_engine`. -ExternResult get_raw_arrow_data(Handle data, - Handle engine); +ExternResult get_raw_arrow_data(Handle data, Handle engine); #endif /// Call the engine back with the next `EngingeData` batch read by Parquet/Json handler. The @@ -780,8 +751,7 @@ ExternResult get_raw_arrow_data(Handle data, /// /// The iterator must be valid (returned by [`read_parquet_file`]) and not yet freed by /// [`free_read_result_iter`]. The visitor function pointer must be non-null. -ExternResult read_result_next(Handle data, - NullableCvoid engine_context, +ExternResult read_result_next(Handle data, NullableCvoid engine_context, void (*engine_visitor)(NullableCvoid engine_context, Handle engine_data)); @@ -796,9 +766,8 @@ void free_read_result_iter(Handle data); /// /// # Safety /// Caller is responsible for calling with a valid `ExternEngineHandle` and `FileMeta` -ExternResult> read_parquet_file(Handle engine, - const FileMeta *file, - Handle physical_schema); +ExternResult> +read_parquet_file(Handle engine, const FileMeta *file, Handle physical_schema); uintptr_t visit_expression_and(KernelExpressionVisitorState *state, EngineIterator *children); @@ -814,8 +783,7 @@ uintptr_t visit_expression_eq(KernelExpressionVisitorState *state, uintptr_t a, /// # Safety /// The string slice must be valid -ExternResult visit_expression_column(KernelExpressionVisitorState *state, - KernelStringSlice name, +ExternResult visit_expression_column(KernelExpressionVisitorState *state, KernelStringSlice name, AllocateErrorFn allocate_error); uintptr_t visit_expression_not(KernelExpressionVisitorState *state, uintptr_t inner_expr); @@ -824,8 +792,7 @@ uintptr_t visit_expression_is_null(KernelExpressionVisitorState *state, uintptr_ /// # Safety /// The string slice must be valid -ExternResult visit_expression_literal_string(KernelExpressionVisitorState *state, - KernelStringSlice value, +ExternResult visit_expression_literal_string(KernelExpressionVisitorState *state, KernelStringSlice value, AllocateErrorFn allocate_error); uintptr_t visit_expression_literal_int(KernelExpressionVisitorState *state, int32_t value); @@ -857,8 +824,7 @@ void free_kernel_predicate(Handle data); /// # Safety /// /// The caller must pass a valid SharedExpression Handle and expression visitor -uintptr_t visit_expression(const Handle *expression, - EngineExpressionVisitor *visitor); +uintptr_t visit_expression(const Handle *expression, EngineExpressionVisitor *visitor); /// Enable getting called back for tracing (logging) events in the kernel. `max_level` specifies /// that only events `<=` to the specified level should be reported. More verbose Levels are "greater @@ -877,8 +843,7 @@ uintptr_t visit_expression(const Handle *expression, /// /// # Safety /// Caller must pass a valid function pointer for the callback -bool enable_event_tracing(TracingEventFn callback, - Level max_level); +bool enable_event_tracing(TracingEventFn callback, Level max_level); /// Enable getting called back with log lines in the kernel using default settings: /// - FULL format @@ -928,13 +893,8 @@ bool enable_log_line_tracing(TracingLogLineFn callback, Level max_level); /// /// # Safety /// Caller must pass a valid function pointer for the callback -bool enable_formatted_log_line_tracing(TracingLogLineFn callback, - Level max_level, - LogLineFormat format, - bool ansi, - bool with_time, - bool with_level, - bool with_target); +bool enable_formatted_log_line_tracing(TracingLogLineFn callback, Level max_level, LogLineFormat format, bool ansi, + bool with_time, bool with_level, bool with_target); /// Drops a scan. /// # Safety @@ -947,8 +907,7 @@ void free_scan(Handle scan); /// # Safety /// /// Caller is responsible for passing a valid snapshot pointer, and engine pointer -ExternResult> scan(Handle snapshot, - Handle engine, +ExternResult> scan(Handle snapshot, Handle engine, EnginePredicate *predicate); /// Get the global state for a scan. See the docs for [`delta_kernel::scan::state::GlobalScanState`] @@ -1006,8 +965,7 @@ ExternResult> kernel_scan_data_init(Handle kernel_scan_data_next(Handle data, - NullableCvoid engine_context, +ExternResult kernel_scan_data_next(Handle data, NullableCvoid engine_context, void (*engine_visitor)(NullableCvoid engine_context, Handle engine_data, KernelBoolSlice selection_vector)); @@ -1025,24 +983,20 @@ void free_kernel_scan_data(Handle data); /// # Safety /// /// The engine is responsible for providing a valid [`CStringMap`] pointer and [`KernelStringSlice`] -NullableCvoid get_from_map(const CStringMap *map, - KernelStringSlice key, - AllocateStringFn allocate_fn); +NullableCvoid get_from_map(const CStringMap *map, KernelStringSlice key, AllocateStringFn allocate_fn); /// Get a selection vector out of a [`DvInfo`] struct /// /// # Safety /// Engine is responsible for providing valid pointers for each argument -ExternResult selection_vector_from_dv(const DvInfo *dv_info, - Handle engine, +ExternResult selection_vector_from_dv(const DvInfo *dv_info, Handle engine, Handle state); /// Get a vector of row indexes out of a [`DvInfo`] struct /// /// # Safety /// Engine is responsible for providing valid pointers for each argument -ExternResult row_indexes_from_dv(const DvInfo *dv_info, - Handle engine, +ExternResult row_indexes_from_dv(const DvInfo *dv_info, Handle engine, Handle state); /// Shim for ffi to call visit_scan_data. This will generally be called when iterating through scan @@ -1050,9 +1004,7 @@ ExternResult row_indexes_from_dv(const DvInfo *dv_info, /// /// # Safety /// engine is responsbile for passing a valid [`ExclusiveEngineData`] and selection vector. -void visit_scan_data(Handle data, - KernelBoolSlice selection_vec, - NullableCvoid engine_context, +void visit_scan_data(Handle data, KernelBoolSlice selection_vec, NullableCvoid engine_context, CScanCallback callback); /// Visit the schema of the passed `SnapshotHandle`, using the provided `visitor`. See the @@ -1073,6 +1025,6 @@ uintptr_t visit_schema(Handle snapshot, EngineSchemaVisitor *vis /// [`free_kernel_predicate`], or [`Handle::drop_handle`] Handle get_testing_kernel_expression(); -} // extern "C" +} // extern "C" -} // namespace ffi +} // namespace ffi diff --git a/src/include/delta_utils.hpp b/src/include/delta_utils.hpp index 4e8b670..540bdb6 100644 --- a/src/include/delta_utils.hpp +++ b/src/include/delta_utils.hpp @@ -4,7 +4,11 @@ #include "duckdb/common/enum_util.hpp" #include "duckdb/planner/filter/conjunction_filter.hpp" #include "duckdb/planner/filter/constant_filter.hpp" - +#include "duckdb/planner/expression.hpp" +#include "duckdb/parser/expression/constant_expression.hpp" +#include "duckdb/parser/expression/conjunction_expression.hpp" +#include "duckdb/common/error_data.hpp" +#include "duckdb/parser/expression/comparison_expression.hpp" #include #include @@ -12,6 +16,118 @@ namespace duckdb { +class ExpressionVisitor : public ffi::EngineExpressionVisitor { + using FieldList = vector>; + +public: + unique_ptr>> + VisitKernelExpression(const ffi::Handle *expression); + +private: + unordered_map> inflight_lists; + uintptr_t next_id = 1; + + ErrorData error; + + // Literals + template + static ffi::VisitLiteralFn VisitPrimitiveLiteral() { + return (ffi::VisitLiteralFn)&VisitPrimitiveLiteral; + } + template + static void VisitPrimitiveLiteral(void *state, uintptr_t sibling_list_id, CPP_TYPE value) { + auto state_cast = static_cast(state); + auto duckdb_value = CREATE_VALUE_FUN(value); + auto expression = make_uniq(duckdb_value); + state_cast->AppendToList(sibling_list_id, std::move(expression)); + } + + static void VisitPrimitiveLiteralBool(void *state, uintptr_t sibling_list_id, bool value); + static void VisitPrimitiveLiteralByte(void *state, uintptr_t sibling_list_id, int8_t value); + static void VisitPrimitiveLiteralShort(void *state, uintptr_t sibling_list_id, int16_t value); + static void VisitPrimitiveLiteralInt(void *state, uintptr_t sibling_list_id, int32_t value); + static void VisitPrimitiveLiteralLong(void *state, uintptr_t sibling_list_id, int64_t value); + static void VisitPrimitiveLiteralFloat(void *state, uintptr_t sibling_list_id, float value); + static void VisitPrimitiveLiteralDouble(void *state, uintptr_t sibling_list_id, double value); + + static void VisitTimestampLiteral(void *state, uintptr_t sibling_list_id, int64_t value); + static void VisitTimestampNtzLiteral(void *state, uintptr_t sibling_list_id, int64_t value); + static void VisitDateLiteral(void *state, uintptr_t sibling_list_id, int32_t value); + static void VisitStringLiteral(void *state, uintptr_t sibling_list_id, ffi::KernelStringSlice value); + static void VisitBinaryLiteral(void *state, uintptr_t sibling_list_id, const uint8_t *buffer, uintptr_t len); + static void VisitNullLiteral(void *state, uintptr_t sibling_list_id); + static void VisitArrayLiteral(void *state, uintptr_t sibling_list_id, uintptr_t child_id); + static void VisitStructLiteral(void *data, uintptr_t sibling_list_id, uintptr_t child_field_list_value, + uintptr_t child_value_list_id); + static void VisitDecimalLiteral(void *state, uintptr_t sibling_list_id, uint64_t value_ms, uint64_t value_ls, + uint8_t precision, uint8_t scale); + static void VisitColumnExpression(void *state, uintptr_t sibling_list_id, ffi::KernelStringSlice name); + static void VisitStructExpression(void *state, uintptr_t sibling_list_id, uintptr_t child_list_id); + static void VisitNotExpression(void *state, uintptr_t sibling_list_id, uintptr_t child_list_id); + static void VisitIsNullExpression(void *state, uintptr_t sibling_list_id, uintptr_t child_list_id); + + template + static ffi::VisitVariadicFn VisitUnaryExpression() { + return &VisitVariadicExpression; + } + template + static ffi::VisitVariadicFn VisitBinaryExpression() { + return &VisitBinaryExpression; + } + template + static ffi::VisitVariadicFn VisitVariadicExpression() { + return &VisitVariadicExpression; + } + + template + static void VisitVariadicExpression(void *state, uintptr_t sibling_list_id, uintptr_t child_list_id) { + auto state_cast = static_cast(state); + auto children = state_cast->TakeFieldList(child_list_id); + if (!children) { + state_cast->AppendToList(sibling_list_id, std::move(make_uniq(Value(42)))); + return; + } + unique_ptr expression = make_uniq(EXPRESSION_TYPE, std::move(*children)); + state_cast->AppendToList(sibling_list_id, std::move(expression)); + } + + static void VisitAdditionExpression(void *state, uintptr_t sibling_list_id, uintptr_t child_list_id); + static void VisitSubctractionExpression(void *state, uintptr_t sibling_list_id, uintptr_t child_list_id); + static void VisitDivideExpression(void *state, uintptr_t sibling_list_id, uintptr_t child_list_id); + static void VisitMultiplyExpression(void *state, uintptr_t sibling_list_id, uintptr_t child_list_id); + + template + static void VisitBinaryExpression(void *state, uintptr_t sibling_list_id, uintptr_t child_list_id) { + auto state_cast = static_cast(state); + auto children = state_cast->TakeFieldList(child_list_id); + if (!children) { + state_cast->AppendToList(sibling_list_id, std::move(make_uniq(Value(42)))); + return; + } + + if (children->size() != 2) { + state_cast->AppendToList(sibling_list_id, std::move(make_uniq(Value(42)))); + state_cast->error = + ErrorData("INCORRECT SIZE IN VISIT_BINARY_EXPRESSION" + EnumUtil::ToString(EXPRESSION_TYPE)); + return; + } + + auto &lhs = children->at(0); + auto &rhs = children->at(1); + unique_ptr expression = + make_uniq(EXPRESSION_TYPE, std::move(lhs), std::move(rhs)); + state_cast->AppendToList(sibling_list_id, std::move(expression)); + } + + static void VisitComparisonExpression(void *state, uintptr_t sibling_list_id, uintptr_t child_list_id); + + // List functions + static uintptr_t MakeFieldList(ExpressionVisitor *state, uintptr_t capacity_hint); + void AppendToList(uintptr_t id, unique_ptr child); + uintptr_t MakeFieldListImpl(uintptr_t capacity_hint); + unique_ptr TakeFieldList(uintptr_t id); +}; + // SchemaVisitor is used to parse the schema of a Delta table from the Kernel class SchemaVisitor { public: @@ -191,9 +307,9 @@ struct KernelUtils { error_cast->Throw(from_where); } throw IOException("Hit DeltaKernel FFI error (from: %s): Hit error, but error was nullptr", - from_where.c_str()); + from_where.c_str()); } - if (result.tag == ffi::ExternResult::Tag::Ok) { + if (result.tag == ffi::ExternResult::Tag::Ok) { return result.ok._0; } throw IOException("Invalid error ExternResult tag found!"); diff --git a/src/include/functions/delta_scan.hpp b/src/include/functions/delta_scan.hpp index fe842d3..86edd2e 100644 --- a/src/include/functions/delta_scan.hpp +++ b/src/include/functions/delta_scan.hpp @@ -58,15 +58,15 @@ struct DeltaSnapshot : public MultiFileList { idx_t GetTotalFileCount() override; unique_ptr GetCardinality(ClientContext &context) override; - idx_t GetVersion(); - DeltaFileMetaData &GetMetaData(idx_t index) const; + idx_t GetVersion(); + DeltaFileMetaData &GetMetaData(idx_t index) const; protected: //! Get the i-th expanded file string GetFile(idx_t i) override; protected: - string GetFileInternal(idx_t i); + string GetFileInternal(idx_t i); void InitializeSnapshot(); void InitializeScan(); @@ -76,14 +76,14 @@ struct DeltaSnapshot : public MultiFileList { result, StringUtil::Format("While trying to read from delta table: '%s'", paths[0])); } - static void VisitData(void *engine_context, ffi::ExclusiveEngineData *engine_data, - const struct ffi::KernelBoolSlice selection_vec); - static void VisitCallback(ffi::NullableCvoid engine_context, struct ffi::KernelStringSlice path, int64_t size, - const ffi::Stats *stats, const ffi::DvInfo *dv_info, - const struct ffi::CStringMap *partition_values); + static void VisitData(void *engine_context, ffi::ExclusiveEngineData *engine_data, + const struct ffi::KernelBoolSlice selection_vec); + static void VisitCallback(ffi::NullableCvoid engine_context, struct ffi::KernelStringSlice path, int64_t size, + const ffi::Stats *stats, const ffi::DvInfo *dv_info, + const struct ffi::CStringMap *partition_values); protected: - mutable mutex lock; + mutable mutex lock; idx_t version; diff --git a/src/storage/delta_catalog.cpp b/src/storage/delta_catalog.cpp index 44e03e7..5ecba54 100644 --- a/src/storage/delta_catalog.cpp +++ b/src/storage/delta_catalog.cpp @@ -60,19 +60,24 @@ bool DeltaCatalog::UseCachedSnapshot() { optional_idx DeltaCatalog::GetCatalogVersion(ClientContext &context) { auto &delta_transaction = DeltaTransaction::Get(context, *this); + idx_t version = DConstants::INVALID_INDEX; // Option 1: snapshot is cached table-wide auto cached_snapshot = main_schema->GetCachedTable(); if (cached_snapshot) { - return cached_snapshot->snapshot->GetVersion(); + version = cached_snapshot->snapshot->GetVersion(); } // Option 2: snapshot is cached in transaction if (delta_transaction.table_entry) { - return delta_transaction.table_entry->snapshot->GetVersion(); + version = delta_transaction.table_entry->snapshot->GetVersion(); } - return {}; + if (version != DConstants::INVALID_INDEX) { + return version; + } + + return optional_idx::Invalid(); } DatabaseSize DeltaCatalog::GetDatabaseSize(ClientContext &context) { diff --git a/test/sql/main/test_expression.test b/test/sql/main/test_expression.test new file mode 100644 index 0000000..8eca5d4 --- /dev/null +++ b/test/sql/main/test_expression.test @@ -0,0 +1,50 @@ +# name: test/sql/main/test_expression.test +# description: Test the get_delta_test_expression function +# group: [delta_generated] + +require parquet + +require delta + +# TODO still broken: +# - Decimal +# - StructExpression +query I +SELECT unnest(get_delta_test_expression()) +---- +127 +-128 +3.4028235e+38 +-3.4028235e+38 +1.7976931348623157e+308 +-1.7976931348623157e+308 +2147483647 +-2147483648 +9223372036854775807 +-9223372036854775808 +'hello expressions' +true +false +'1970-01-01 00:00:00.00005+00'::TIMESTAMP WITH TIME ZONE +'1970-01-01 00:00:00.0001'::TIMESTAMP +'1970-02-02'::DATE +'\x00\x00\xDE\xAD\xBE\xEF\xCA\xFE'::BLOB +0.0000000042 +NULL +struct_pack("'top'" := struct_pack("'a'" := 500, "'b'" := list_value(5, 0))) +list_value(5, 0) +42 +not((col is NULL)) +(0 IN (0)) +(0 + 0) +(0 - 0) +(0 = 0) +(0 != 0) +(0 NOT IN (0)) +(0 / 0) +(0 * 0) +(0 < 0) +(0 <= 0) +(0 > 0) +(0 >= 0) +(0 IS DISTINCT FROM 0) \ No newline at end of file