diff --git a/CMakeLists.txt b/CMakeLists.txt index 351f307..50df657 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,7 +1,7 @@ cmake_minimum_required(VERSION 2.8.12) include(ExternalProject) -### Core config +# Core config set(TARGET_NAME delta) set(EXTENSION_NAME ${TARGET_NAME}_extension) @@ -12,30 +12,43 @@ project(${TARGET_NAME}) include_directories(src/include) set(EXTENSION_SOURCES - src/delta_extension.cpp - src/delta_functions.cpp - src/delta_utils.cpp - src/functions/delta_scan.cpp - src/storage/delta_catalog.cpp - src/storage/delta_schema_entry.cpp - src/storage/delta_table_entry.cpp - src/storage/delta_transaction.cpp - src/storage/delta_transaction_manager.cpp -) - -### Custom config -# TODO: figure out if we really need this? + src/delta_extension.cpp + src/delta_functions.cpp + src/delta_utils.cpp + src/functions/delta_scan.cpp + src/storage/delta_catalog.cpp + src/storage/delta_schema_entry.cpp + src/storage/delta_table_entry.cpp + src/storage/delta_transaction.cpp + src/storage/delta_transaction_manager.cpp) + +# Custom config TODO: figure out if we really need this? if(APPLE) - set(PLATFORM_LIBS m c System resolv "-framework Corefoundation -framework SystemConfiguration -framework Security") + set(PLATFORM_LIBS + m + c + System + resolv + "-framework Corefoundation -framework SystemConfiguration -framework Security" + ) elseif(UNIX) - set(PLATFORM_LIBS m c resolv) + set(PLATFORM_LIBS m c resolv) elseif(WIN32) - set(PLATFORM_LIBS ntdll ncrypt secur32 ws2_32 userenv bcrypt msvcrt advapi32 RuntimeObject) + set(PLATFORM_LIBS + ntdll + ncrypt + secur32 + ws2_32 + userenv + bcrypt + msvcrt + advapi32 + RuntimeObject) else() - message(STATUS "UNKNOWN OS") + message(STATUS "UNKNOWN OS") endif() -### Setup delta-kernel-rs dependency +# Setup delta-kernel-rs dependency set(KERNEL_NAME delta_kernel) # Set default ExternalProject root directory @@ -46,40 +59,50 @@ set(RUST_ENV_VARS "") # Propagate arch to rust build for CI set(RUST_PLATFORM_TARGET "") if("${OS_NAME}" STREQUAL "linux") - if ("${OS_ARCH}" STREQUAL "arm64") - set(RUST_PLATFORM_TARGET "aarch64-unknown-linux-gnu") - elseif("${CMAKE_CXX_COMPILER}" MATCHES "aarch64") - set(RUST_ENV_VARS ${RUST_ENV_VARS} CFLAGS_aarch64_unknown_linux_gnu=--sysroot=/usr/aarch64-linux-gnu) - set(RUST_ENV_VARS ${RUST_ENV_VARS} CARGO_TARGET_AARCH64_UNKNOWN_LINUX_GNU_LINKER=aarch64-linux-gnu-gcc) - set(RUST_ENV_VARS ${RUST_ENV_VARS} OPENSSL_LIB_DIR=${CMAKE_BINARY_DIR}/vcpkg_installed/${VCPKG_TARGET_TRIPLET}/lib) - set(RUST_ENV_VARS ${RUST_ENV_VARS} OPENSSL_INCLUDE_DIR=${CMAKE_BINARY_DIR}/vcpkg_installed/${VCPKG_TARGET_TRIPLET}/include) - set(RUST_PLATFORM_TARGET "aarch64-unknown-linux-gnu") - else() - set(RUST_PLATFORM_TARGET "x86_64-unknown-linux-gnu") - endif() + if("${OS_ARCH}" STREQUAL "arm64") + set(RUST_PLATFORM_TARGET "aarch64-unknown-linux-gnu") + elseif("${CMAKE_CXX_COMPILER}" MATCHES "aarch64") + set(RUST_ENV_VARS + ${RUST_ENV_VARS} + CFLAGS_aarch64_unknown_linux_gnu=--sysroot=/usr/aarch64-linux-gnu) + set(RUST_ENV_VARS + ${RUST_ENV_VARS} + CARGO_TARGET_AARCH64_UNKNOWN_LINUX_GNU_LINKER=aarch64-linux-gnu-gcc) + set(RUST_ENV_VARS + ${RUST_ENV_VARS} + OPENSSL_LIB_DIR=${CMAKE_BINARY_DIR}/vcpkg_installed/${VCPKG_TARGET_TRIPLET}/lib + ) + set(RUST_ENV_VARS + ${RUST_ENV_VARS} + OPENSSL_INCLUDE_DIR=${CMAKE_BINARY_DIR}/vcpkg_installed/${VCPKG_TARGET_TRIPLET}/include + ) + set(RUST_PLATFORM_TARGET "aarch64-unknown-linux-gnu") + else() + set(RUST_PLATFORM_TARGET "x86_64-unknown-linux-gnu") + endif() elseif("${OS_NAME}" STREQUAL "osx") - if ("${OSX_BUILD_ARCH}" STREQUAL "arm64") - set(RUST_PLATFORM_TARGET "aarch64-apple-darwin") - elseif ("${OSX_BUILD_ARCH}" STREQUAL "x86_64") - set(RUST_PLATFORM_TARGET "x86_64-apple-darwin") - elseif ("${OS_ARCH}" STREQUAL "arm64") - set(RUST_PLATFORM_TARGET "aarch64-apple-darwin") - endif() + if("${OSX_BUILD_ARCH}" STREQUAL "arm64") + set(RUST_PLATFORM_TARGET "aarch64-apple-darwin") + elseif("${OSX_BUILD_ARCH}" STREQUAL "x86_64") + set(RUST_PLATFORM_TARGET "x86_64-apple-darwin") + elseif("${OS_ARCH}" STREQUAL "arm64") + set(RUST_PLATFORM_TARGET "aarch64-apple-darwin") + endif() elseif(WIN32) - if (MINGW AND "${OS_ARCH}" STREQUAL "arm64") - set(RUST_PLATFORM_TARGET "aarch64-pc-windows-gnu") - elseif (MINGW AND "${OS_ARCH}" STREQUAL "amd64") - set(RUST_PLATFORM_TARGET "x86_64-pc-windows-gnu") - elseif (MSVC AND "${OS_ARCH}" STREQUAL "arm64") - set(RUST_PLATFORM_TARGET "aarch64-pc-windows-msvc") - elseif (MSVC AND "${OS_ARCH}" STREQUAL "amd64") - set(RUST_PLATFORM_TARGET "x86_64-pc-windows-msvc") - endif() + if(MINGW AND "${OS_ARCH}" STREQUAL "arm64") + set(RUST_PLATFORM_TARGET "aarch64-pc-windows-gnu") + elseif(MINGW AND "${OS_ARCH}" STREQUAL "amd64") + set(RUST_PLATFORM_TARGET "x86_64-pc-windows-gnu") + elseif(MSVC AND "${OS_ARCH}" STREQUAL "arm64") + set(RUST_PLATFORM_TARGET "aarch64-pc-windows-msvc") + elseif(MSVC AND "${OS_ARCH}" STREQUAL "amd64") + set(RUST_PLATFORM_TARGET "x86_64-pc-windows-msvc") + endif() endif() # We currently only support the predefined targets. -if ("${RUST_PLATFORM_TARGET}" STREQUAL "") - message(FATAL_ERROR "Failed to detect the correct platform") +if("${RUST_PLATFORM_TARGET}" STREQUAL "") + message(FATAL_ERROR "Failed to detect the correct platform") endif() set(RUST_PLATFORM_PARAM "--target=${RUST_PLATFORM_TARGET}") @@ -92,69 +115,84 @@ string(STRIP "${RUST_ENV_VARS}" RUST_ENV_VARS) set(RUST_UNSET_ENV_VARS --unset=CC --unset=CXX --unset=LD) # Define all the relevant delta-kernel-rs paths/names -set(DELTA_KERNEL_LIBNAME "${CMAKE_STATIC_LIBRARY_PREFIX}delta_kernel_ffi${CMAKE_STATIC_LIBRARY_SUFFIX}") -set(DELTA_KERNEL_LIBPATH_DEBUG "${CMAKE_BINARY_DIR}/rust/src/delta_kernel/target/${RUST_PLATFORM_TARGET}/debug/${DELTA_KERNEL_LIBNAME}") -set(DELTA_KERNEL_LIBPATH_RELEASE "${CMAKE_BINARY_DIR}/rust/src/delta_kernel/target/${RUST_PLATFORM_TARGET}/release/${DELTA_KERNEL_LIBNAME}") -set(DELTA_KERNEL_FFI_HEADER_PATH "${CMAKE_BINARY_DIR}/rust/src/delta_kernel/target/ffi-headers") -set(DELTA_KERNEL_FFI_HEADER_C "${CMAKE_BINARY_DIR}/rust/src/delta_kernel/target/ffi-headers/delta_kernel_ffi.h") -set(DELTA_KERNEL_FFI_HEADER_CXX "${CMAKE_BINARY_DIR}/rust/src/delta_kernel/target/ffi-headers/delta_kernel_ffi.hpp") +set(DELTA_KERNEL_LIBNAME + "${CMAKE_STATIC_LIBRARY_PREFIX}delta_kernel_ffi${CMAKE_STATIC_LIBRARY_SUFFIX}" +) +set(DELTA_KERNEL_LIBPATH_DEBUG + "${CMAKE_BINARY_DIR}/rust/src/delta_kernel/target/${RUST_PLATFORM_TARGET}/debug/${DELTA_KERNEL_LIBNAME}" +) +set(DELTA_KERNEL_LIBPATH_RELEASE + "${CMAKE_BINARY_DIR}/rust/src/delta_kernel/target/${RUST_PLATFORM_TARGET}/release/${DELTA_KERNEL_LIBNAME}" +) +set(DELTA_KERNEL_FFI_HEADER_PATH + "${CMAKE_BINARY_DIR}/rust/src/delta_kernel/target/ffi-headers") +set(DELTA_KERNEL_FFI_HEADER_C + "${CMAKE_BINARY_DIR}/rust/src/delta_kernel/target/ffi-headers/delta_kernel_ffi.h" +) +set(DELTA_KERNEL_FFI_HEADER_CXX + "${CMAKE_BINARY_DIR}/rust/src/delta_kernel/target/ffi-headers/delta_kernel_ffi.hpp" +) # Add rust_example as a CMake target ExternalProject_Add( - ${KERNEL_NAME} - GIT_REPOSITORY "https://github.com/delta-incubator/delta-kernel-rs" - # WARNING: the FFI headers are currently pinned due to the C linkage issue of the c++ headers. Currently, when bumping - # the kernel version, the produced header in ./src/include/delta_kernel_ffi.hpp should be also bumped, applying the fix - GIT_TAG v0.4.0 - # Prints the env variables passed to the cargo build to the terminal, useful in debugging because passing them - # through CMake is an error-prone mess - CONFIGURE_COMMAND ${CMAKE_COMMAND} -E env ${RUST_UNSET_ENV_VARS} ${RUST_ENV_VARS} env - UPDATE_COMMAND "" - BUILD_IN_SOURCE 1 - # Build debug build - BUILD_COMMAND - ${CMAKE_COMMAND} -E env ${RUST_UNSET_ENV_VARS} ${RUST_ENV_VARS} - cargo build --package delta_kernel_ffi --workspace --all-features ${RUST_PLATFORM_PARAM} - # Build release build - COMMAND - ${CMAKE_COMMAND} -E env ${RUST_UNSET_ENV_VARS} ${RUST_ENV_VARS} - cargo build --package delta_kernel_ffi --workspace --all-features --release ${RUST_PLATFORM_PARAM} - # Build DATs - COMMAND - ${CMAKE_COMMAND} -E env ${RUST_UNSET_ENV_VARS} ${RUST_ENV_VARS} - cargo build --manifest-path=${CMAKE_BINARY_DIR}/rust/src/delta_kernel/acceptance/Cargo.toml - # Define the byproducts, required for building with Ninja - BUILD_BYPRODUCTS "${DELTA_KERNEL_LIBPATH_DEBUG}" - BUILD_BYPRODUCTS "${DELTA_KERNEL_LIBPATH_RELEASE}" - BUILD_BYPRODUCTS "${DELTA_KERNEL_FFI_HEADER_C}" - BUILD_BYPRODUCTS "${DELTA_KERNEL_FFI_HEADER_CXX}" - INSTALL_COMMAND "" - LOG_BUILD ON) + ${KERNEL_NAME} + GIT_REPOSITORY "https://github.com/delta-incubator/delta-kernel-rs" + # WARNING: the FFI headers are currently pinned due to the C linkage issue of + # the c++ headers. Currently, when bumping the kernel version, the produced + # header in ./src/include/delta_kernel_ffi.hpp should be also bumped, applying + # the fix + GIT_TAG v0.4.0 + # Prints the env variables passed to the cargo build to the terminal, useful + # in debugging because passing them through CMake is an error-prone mess + CONFIGURE_COMMAND ${CMAKE_COMMAND} -E env ${RUST_UNSET_ENV_VARS} + ${RUST_ENV_VARS} env + UPDATE_COMMAND "" + BUILD_IN_SOURCE 1 + # Build debug build + BUILD_COMMAND + ${CMAKE_COMMAND} -E env ${RUST_UNSET_ENV_VARS} ${RUST_ENV_VARS} cargo build + --package delta_kernel_ffi --workspace --all-features ${RUST_PLATFORM_PARAM} + # Build release build + COMMAND + ${CMAKE_COMMAND} -E env ${RUST_UNSET_ENV_VARS} ${RUST_ENV_VARS} cargo build + --package delta_kernel_ffi --workspace --all-features --release + ${RUST_PLATFORM_PARAM} + # Build DATs + COMMAND + ${CMAKE_COMMAND} -E env ${RUST_UNSET_ENV_VARS} ${RUST_ENV_VARS} cargo build + --manifest-path=${CMAKE_BINARY_DIR}/rust/src/delta_kernel/acceptance/Cargo.toml + # Define the byproducts, required for building with Ninja + BUILD_BYPRODUCTS "${DELTA_KERNEL_LIBPATH_DEBUG}" + BUILD_BYPRODUCTS "${DELTA_KERNEL_LIBPATH_RELEASE}" + BUILD_BYPRODUCTS "${DELTA_KERNEL_FFI_HEADER_C}" + BUILD_BYPRODUCTS "${DELTA_KERNEL_FFI_HEADER_CXX}" + INSTALL_COMMAND "" + LOG_BUILD ON) build_static_extension(${TARGET_NAME} ${EXTENSION_SOURCES}) build_loadable_extension(${TARGET_NAME} " " ${EXTENSION_SOURCES}) -# TODO: when C linkage issue is resolved, we should switch back to using the generated headers -#include_directories(${DELTA_KERNEL_FFI_HEADER_PATH}) +# TODO: when C linkage issue is resolved, we should switch back to using the +# generated headers include_directories(${DELTA_KERNEL_FFI_HEADER_PATH}) # Hides annoying linker warnings -set(CMAKE_OSX_DEPLOYMENT_TARGET 13.3 CACHE STRING "Minimum OS X deployment version" FORCE) +set(CMAKE_OSX_DEPLOYMENT_TARGET + 13.3 + CACHE STRING "Minimum OS X deployment version" FORCE) # Add the default client add_compile_definitions(DEFINE_DEFAULT_ENGINE) # Link delta-kernal-rs to static lib -target_link_libraries(${EXTENSION_NAME} - debug ${DELTA_KERNEL_LIBPATH_DEBUG} - optimized ${DELTA_KERNEL_LIBPATH_RELEASE} - ${PLATFORM_LIBS}) +target_link_libraries( + ${EXTENSION_NAME} debug ${DELTA_KERNEL_LIBPATH_DEBUG} optimized + ${DELTA_KERNEL_LIBPATH_RELEASE} ${PLATFORM_LIBS}) add_dependencies(${EXTENSION_NAME} delta_kernel) # Link delta-kernal-rs to dynamic lib -target_link_libraries(${LOADABLE_EXTENSION_NAME} - debug ${DELTA_KERNEL_LIBPATH_DEBUG} - optimized ${DELTA_KERNEL_LIBPATH_RELEASE} - ${PLATFORM_LIBS}) +target_link_libraries( + ${LOADABLE_EXTENSION_NAME} debug ${DELTA_KERNEL_LIBPATH_DEBUG} optimized + ${DELTA_KERNEL_LIBPATH_RELEASE} ${PLATFORM_LIBS}) add_dependencies(${LOADABLE_EXTENSION_NAME} delta_kernel) install( diff --git a/src/delta_extension.cpp b/src/delta_extension.cpp index 97d1b53..36003a3 100644 --- a/src/delta_extension.cpp +++ b/src/delta_extension.cpp @@ -13,18 +13,18 @@ namespace duckdb { static unique_ptr DeltaCatalogAttach(StorageExtensionInfo *storage_info, ClientContext &context, - AttachedDatabase &db, const string &name, AttachInfo &info, - AccessMode access_mode) { + AttachedDatabase &db, const string &name, AttachInfo &info, + AccessMode access_mode) { - auto res = make_uniq(db, info.path, access_mode); + auto res = make_uniq(db, info.path, access_mode); - for (const auto& option : info.options) { - if (StringUtil::Lower(option.first) == "pin_snapshot") { - res->use_cache = option.second.GetValue(); - } - } + for (const auto &option : info.options) { + if (StringUtil::Lower(option.first) == "pin_snapshot") { + res->use_cache = option.second.GetValue(); + } + } - res->SetDefaultTable(DEFAULT_SCHEMA, DEFAULT_DELTA_TABLE); + res->SetDefaultTable(DEFAULT_SCHEMA, DEFAULT_DELTA_TABLE); return std::move(res); } @@ -44,14 +44,14 @@ class DeltaStorageExtension : public StorageExtension { }; static void LoadInternal(DatabaseInstance &instance) { - // Load functions - for (const auto &function : DeltaFunctions::GetTableFunctions(instance)) { - ExtensionUtil::RegisterFunction(instance, function); - } - - // Register the "single table" delta catalog (to ATTACH a single delta table) - auto &config = DBConfig::GetConfig(instance); - config.storage_extensions["delta"] = make_uniq(); + // Load functions + for (const auto &function : DeltaFunctions::GetTableFunctions(instance)) { + ExtensionUtil::RegisterFunction(instance, function); + } + + // Register the "single table" delta catalog (to ATTACH a single delta table) + auto &config = DBConfig::GetConfig(instance); + config.storage_extensions["delta"] = make_uniq(); } void DeltaExtension::Load(DuckDB &db) { diff --git a/src/delta_utils.cpp b/src/delta_utils.cpp index 035d300..1a8ff04 100644 --- a/src/delta_utils.cpp +++ b/src/delta_utils.cpp @@ -13,11 +13,11 @@ unique_ptr SchemaVisitor::VisitSnapshotSchema(ffi::Sha ffi::EngineSchemaVisitor visitor; visitor.data = &state; - visitor.make_field_list = (uintptr_t (*)(void *, uintptr_t))&MakeFieldList; - visitor.visit_struct = (void (*)(void *, uintptr_t, ffi::KernelStringSlice, uintptr_t))&VisitStruct; - visitor.visit_array = (void (*)(void *, uintptr_t, ffi::KernelStringSlice, bool, uintptr_t))&VisitArray; - visitor.visit_map = (void (*)(void *, uintptr_t, ffi::KernelStringSlice, bool, uintptr_t))&VisitMap; - visitor.visit_decimal = (void (*)(void *, uintptr_t, ffi::KernelStringSlice, uint8_t, uint8_t))&VisitDecimal; + visitor.make_field_list = (uintptr_t(*)(void *, uintptr_t)) & MakeFieldList; + visitor.visit_struct = (void (*)(void *, uintptr_t, ffi::KernelStringSlice, uintptr_t)) & VisitStruct; + visitor.visit_array = (void (*)(void *, uintptr_t, ffi::KernelStringSlice, bool, uintptr_t)) & VisitArray; + visitor.visit_map = (void (*)(void *, uintptr_t, ffi::KernelStringSlice, bool, uintptr_t)) & VisitMap; + visitor.visit_decimal = (void (*)(void *, uintptr_t, ffi::KernelStringSlice, uint8_t, uint8_t)) & VisitDecimal; visitor.visit_string = VisitSimpleType(); visitor.visit_long = VisitSimpleType(); visitor.visit_integer = VisitSimpleType(); @@ -176,7 +176,7 @@ vector KernelUtils::FromDeltaBoolSlice(const struct ffi::KernelBoolSlice s PredicateVisitor::PredicateVisitor(const vector &column_names, optional_ptr filters) { predicate = this; - visitor = (uintptr_t (*)(void *, ffi::KernelExpressionVisitorState *))&VisitPredicate; + visitor = (uintptr_t(*)(void *, ffi::KernelExpressionVisitorState *)) & VisitPredicate; if (filters) { for (auto &filter : filters->filters) { diff --git a/src/functions/delta_scan.cpp b/src/functions/delta_scan.cpp index d88d597..4e35b17 100644 --- a/src/functions/delta_scan.cpp +++ b/src/functions/delta_scan.cpp @@ -278,7 +278,8 @@ static ffi::EngineBuilder *CreateBuilder(ClientContext &context, const string &p } if (StringUtil::StartsWith(endpoint, "http://")) { - ffi::set_builder_option(builder, KernelUtils::ToDeltaString("allow_http"), KernelUtils::ToDeltaString("true")); + ffi::set_builder_option(builder, KernelUtils::ToDeltaString("allow_http"), + KernelUtils::ToDeltaString("true")); } ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_endpoint"), KernelUtils::ToDeltaString(endpoint)); @@ -363,7 +364,8 @@ static ffi::EngineBuilder *CreateBuilder(ClientContext &context, const string &p } // Set the use_emulator option for when the azurite test server is used if (account_name == "devstoreaccount1" || connection_string.find("devstoreaccount1") != string::npos) { - ffi::set_builder_option(builder, KernelUtils::ToDeltaString("use_emulator"), KernelUtils::ToDeltaString("true")); + ffi::set_builder_option(builder, KernelUtils::ToDeltaString("use_emulator"), + KernelUtils::ToDeltaString("true")); } if (!account_name.empty()) { ffi::set_builder_option(builder, KernelUtils::ToDeltaString("account_name"), @@ -373,7 +375,8 @@ static ffi::EngineBuilder *CreateBuilder(ClientContext &context, const string &p ffi::set_builder_option(builder, KernelUtils::ToDeltaString("azure_endpoint"), KernelUtils::ToDeltaString(endpoint)); } - ffi::set_builder_option(builder, KernelUtils::ToDeltaString("container_name"), KernelUtils::ToDeltaString(bucket)); + ffi::set_builder_option(builder, KernelUtils::ToDeltaString("container_name"), + KernelUtils::ToDeltaString(bucket)); } return builder; } @@ -412,46 +415,46 @@ string DeltaSnapshot::ToDeltaPath(const string &raw_path) { } void DeltaSnapshot::Bind(vector &return_types, vector &names) { - if (have_bound) { - names = this->names; - return_types = this->types; - return; - } - - if (!initialized_snapshot) { - InitializeSnapshot(); - } - - unique_ptr schema; - - { - auto snapshot_ref = snapshot->GetLockingRef(); - schema = SchemaVisitor::VisitSnapshotSchema(snapshot_ref.GetPtr()); - } - - for (const auto &field: *schema) { - names.push_back(field.first); - return_types.push_back(field.second); - } - // Store the bound names for resolving the complex filter pushdown later - have_bound = true; - this->names = names; - this->types = return_types; + if (have_bound) { + names = this->names; + return_types = this->types; + return; + } + + if (!initialized_snapshot) { + InitializeSnapshot(); + } + + unique_ptr schema; + + { + auto snapshot_ref = snapshot->GetLockingRef(); + schema = SchemaVisitor::VisitSnapshotSchema(snapshot_ref.GetPtr()); + } + + for (const auto &field : *schema) { + names.push_back(field.first); + return_types.push_back(field.second); + } + // Store the bound names for resolving the complex filter pushdown later + have_bound = true; + this->names = names; + this->types = return_types; } string DeltaSnapshot::GetFile(idx_t i) { - if (!initialized_snapshot) { - InitializeSnapshot(); - } + if (!initialized_snapshot) { + InitializeSnapshot(); + } - if(!initialized_scan) { - InitializeScan(); - } + if (!initialized_scan) { + InitializeScan(); + } - // We already have this file - if (i < resolved_files.size()) { - return resolved_files[i]; - } + // We already have this file + if (i < resolved_files.size()) { + return resolved_files[i]; + } if (files_exhausted) { return ""; @@ -478,59 +481,62 @@ string DeltaSnapshot::GetFile(idx_t i) { } void DeltaSnapshot::InitializeSnapshot() { - auto path_slice = KernelUtils::ToDeltaString(paths[0]); + auto path_slice = KernelUtils::ToDeltaString(paths[0]); - auto interface_builder = CreateBuilder(context, paths[0]); - extern_engine = TryUnpackKernelResult( ffi::builder_build(interface_builder)); + auto interface_builder = CreateBuilder(context, paths[0]); + extern_engine = TryUnpackKernelResult(ffi::builder_build(interface_builder)); - if (!snapshot) { - snapshot = make_shared_ptr(TryUnpackKernelResult(ffi::snapshot(path_slice, extern_engine.get()))); - } + if (!snapshot) { + snapshot = make_shared_ptr( + TryUnpackKernelResult(ffi::snapshot(path_slice, extern_engine.get()))); + } - initialized_snapshot = true; + initialized_snapshot = true; } void DeltaSnapshot::InitializeScan() { - auto snapshot_ref = snapshot->GetLockingRef(); + auto snapshot_ref = snapshot->GetLockingRef(); - // Create Scan - PredicateVisitor visitor(names, &table_filters); - scan = TryUnpackKernelResult(ffi::scan(snapshot_ref.GetPtr(), extern_engine.get(), &visitor)); + // Create Scan + PredicateVisitor visitor(names, &table_filters); + scan = TryUnpackKernelResult(ffi::scan(snapshot_ref.GetPtr(), extern_engine.get(), &visitor)); // Create GlobalState global_state = ffi::get_global_scan_state(scan.get()); - // Set version - this->version = ffi::version(snapshot_ref.GetPtr()); + // Set version + this->version = ffi::version(snapshot_ref.GetPtr()); // Create scan data iterator scan_data_iterator = TryUnpackKernelResult(ffi::kernel_scan_data_init(extern_engine.get(), scan.get())); - initialized_scan = true; + initialized_scan = true; } -unique_ptr DeltaSnapshot::ComplexFilterPushdown(ClientContext &context, const MultiFileReaderOptions &options, MultiFilePushdownInfo &info, - vector> &filters) { - FilterCombiner combiner(context); +unique_ptr DeltaSnapshot::ComplexFilterPushdown(ClientContext &context, + const MultiFileReaderOptions &options, + MultiFilePushdownInfo &info, + vector> &filters) { + FilterCombiner combiner(context); - if (filters.empty()) { - return nullptr; - } + if (filters.empty()) { + return nullptr; + } - for (const auto &filter : filters) { - combiner.AddFilter(filter->Copy()); - } - auto filterstmp = combiner.GenerateTableScanFilters(info.column_ids); + for (const auto &filter : filters) { + combiner.AddFilter(filter->Copy()); + } + auto filterstmp = combiner.GenerateTableScanFilters(info.column_ids); // TODO: can/should we figure out if this filtered anything? auto filtered_list = make_uniq(context, paths[0]); filtered_list->table_filters = std::move(filterstmp); filtered_list->names = names; - // Copy over the snapshot, this avoids reparsing metadata - filtered_list->snapshot = snapshot; + // Copy over the snapshot, this avoids reparsing metadata + filtered_list->snapshot = snapshot; - return std::move(filtered_list); + return std::move(filtered_list); } vector DeltaSnapshot::GetAllFiles() { @@ -543,11 +549,11 @@ vector DeltaSnapshot::GetAllFiles() { } FileExpandResult DeltaSnapshot::GetExpandResult() { - // We avoid exposing the ExpandResult to DuckDB here because we want to materialize the Snapshot as late as possible: - // materializing too early (GetExpandResult is called *before* filter pushdown by the Parquet scanner), will lead into - // needing to create 2 scans of the snapshot TODO: we need to investigate if this is actually a sensible decision with - // some benchmarking, its currently based on intuition. - return FileExpandResult::MULTIPLE_FILES; + // We avoid exposing the ExpandResult to DuckDB here because we want to materialize the Snapshot as late as + // possible: materializing too early (GetExpandResult is called *before* filter pushdown by the Parquet scanner), + // will lead into needing to create 2 scans of the snapshot TODO: we need to investigate if this is actually a + // sensible decision with some benchmarking, its currently based on intuition. + return FileExpandResult::MULTIPLE_FILES; } idx_t DeltaSnapshot::GetTotalFileCount() { @@ -584,13 +590,13 @@ unique_ptr DeltaSnapshot::GetCardinality(ClientContext &context) } unique_ptr DeltaMultiFileReader::CreateInstance(const TableFunction &table_function) { - auto result = make_uniq(); + auto result = make_uniq(); - if (table_function.function_info) { - result->snapshot = table_function.function_info->Cast().snapshot; - } + if (table_function.function_info) { + result->snapshot = table_function.function_info->Cast().snapshot; + } - return std::move(result); + return std::move(result); } bool DeltaMultiFileReader::Bind(MultiFileReaderOptions &options, MultiFileList &files, @@ -683,21 +689,21 @@ void DeltaMultiFileReader::FinalizeBind(const MultiFileReaderOptions &file_optio } } -shared_ptr DeltaMultiFileReader::CreateFileList(ClientContext &context, const vector& paths, FileGlobOptions options) { - if (paths.size() != 1) { - throw BinderException("'delta_scan' only supports single path as input"); - } - +shared_ptr DeltaMultiFileReader::CreateFileList(ClientContext &context, const vector &paths, + FileGlobOptions options) { + if (paths.size() != 1) { + throw BinderException("'delta_scan' only supports single path as input"); + } - if (snapshot) { - // TODO: assert that we are querying the same path as this injected snapshot - // This takes the kernel snapshot from the delta snapshot and ensures we use that snapshot for reading - if (snapshot) { - return snapshot; - } - } + if (snapshot) { + // TODO: assert that we are querying the same path as this injected snapshot + // This takes the kernel snapshot from the delta snapshot and ensures we use that snapshot for reading + if (snapshot) { + return snapshot; + } + } - return make_uniq(context, paths[0]); + return make_uniq(context, paths[0]); } // Generate the correct Selection Vector Based on the Raw delta KernelBoolSlice dv and the row_id_column diff --git a/src/functions/expression_functions.cpp b/src/functions/expression_functions.cpp new file mode 100644 index 0000000..373e42f --- /dev/null +++ b/src/functions/expression_functions.cpp @@ -0,0 +1,46 @@ +#include +#include + +#include "duckdb/function/scalar_function.hpp" +#include "duckdb/planner/expression/bound_constant_expression.hpp" + +#include "delta_utils.hpp" +#include "delta_functions.hpp" + +namespace duckdb { + +static void GetDeltaTestExpression(DataChunk &input, ExpressionState &state, Vector &output) { + output.SetVectorType(VectorType::CONSTANT_VECTOR); + + auto test_expression = ffi::get_testing_kernel_expression(); + ExpressionVisitor visitor; + + auto result = visitor.VisitKernelExpression(&test_expression); + if (result->size() != 1) { + throw InternalException("Unexpected result: expected single expression"); + } + + auto &expr = result->back(); + if (expr->GetExpressionType() != ExpressionType::CONJUNCTION_AND) { + throw InternalException("Unexpected result: expected single top level Conjuntion"); + } + + vector result_to_string; + for (auto &expr : expr->Cast().children) { + result_to_string.push_back(expr->ToString()); + } + + output.SetValue(0, Value::LIST(result_to_string)); +}; + +ScalarFunctionSet DeltaFunctions::GetExpressionFunction(DatabaseInstance &instance) { + ScalarFunctionSet result; + result.name = "get_delta_test_expression"; + + ScalarFunction getvar({}, LogicalType::LIST(LogicalType::VARCHAR), GetDeltaTestExpression, nullptr, nullptr); + result.AddFunction(getvar); + + return result; +} + +} // namespace duckdb \ No newline at end of file diff --git a/src/include/delta_utils.hpp b/src/include/delta_utils.hpp index 24806a4..8760862 100644 --- a/src/include/delta_utils.hpp +++ b/src/include/delta_utils.hpp @@ -115,57 +115,64 @@ typedef TemplatedUniqueKernelPointer KernelScan typedef TemplatedUniqueKernelPointer KernelGlobalScanState; typedef TemplatedUniqueKernelPointer KernelScanDataIterator; -template +template struct SharedKernelPointer; // A reference to a SharedKernelPointer, only 1 can be handed out at the same time -template +template struct SharedKernelRef { - friend struct SharedKernelPointer; + friend struct SharedKernelPointer; + public: - KernelType* GetPtr() { - return owning_pointer.kernel_ptr.get(); - } - ~SharedKernelRef() { - owning_pointer.lock.unlock(); - } + KernelType *GetPtr() { + return owning_pointer.kernel_ptr.get(); + } + ~SharedKernelRef() { + owning_pointer.lock.unlock(); + } protected: - SharedKernelRef(SharedKernelPointer& owning_pointer_p) : owning_pointer(owning_pointer_p) { - owning_pointer.lock.lock(); - } + SharedKernelRef(SharedKernelPointer &owning_pointer_p) + : owning_pointer(owning_pointer_p) { + owning_pointer.lock.lock(); + } protected: - // The pointer that owns this ref - SharedKernelPointer& owning_pointer; + // The pointer that owns this ref + SharedKernelPointer &owning_pointer; }; // Wrapper around ffi objects to share between threads -template +template struct SharedKernelPointer { - friend struct SharedKernelRef; + friend struct SharedKernelRef; + public: - SharedKernelPointer(TemplatedUniqueKernelPointer unique_kernel_ptr) : kernel_ptr(unique_kernel_ptr) {} - SharedKernelPointer(KernelType* ptr) : kernel_ptr(ptr){} - SharedKernelPointer(){} - - SharedKernelPointer(SharedKernelPointer&& other) : SharedKernelPointer() { - other.lock.lock(); - lock.lock(); - kernel_ptr = std::move(other.kernel_ptr); - lock.lock(); - other.lock.lock(); - } - - // Returns a reference to the underlying kernel object. The SharedKernelPointer to this object will be locked for the - // lifetime of this reference - SharedKernelRef GetLockingRef() { - return SharedKernelRef(*this); - } + SharedKernelPointer(TemplatedUniqueKernelPointer unique_kernel_ptr) + : kernel_ptr(unique_kernel_ptr) { + } + SharedKernelPointer(KernelType *ptr) : kernel_ptr(ptr) { + } + SharedKernelPointer() { + } + + SharedKernelPointer(SharedKernelPointer &&other) : SharedKernelPointer() { + other.lock.lock(); + lock.lock(); + kernel_ptr = std::move(other.kernel_ptr); + lock.lock(); + other.lock.lock(); + } + + // Returns a reference to the underlying kernel object. The SharedKernelPointer to this object will be locked for + // the lifetime of this reference + SharedKernelRef GetLockingRef() { + return SharedKernelRef(*this); + } protected: - TemplatedUniqueKernelPointer kernel_ptr; - mutex lock; + TemplatedUniqueKernelPointer kernel_ptr; + mutex lock; }; typedef SharedKernelPointer SharedKernelSnapshot; diff --git a/src/include/functions/delta_scan.hpp b/src/include/functions/delta_scan.hpp index eb2de6e..32662a2 100644 --- a/src/include/functions/delta_scan.hpp +++ b/src/include/functions/delta_scan.hpp @@ -15,8 +15,8 @@ namespace duckdb { struct DeltaSnapshot; struct DeltaFunctionInfo : public TableFunctionInfo { - shared_ptr snapshot; - string expected_path; + shared_ptr snapshot; + string expected_path; }; struct DeltaFileMetaData { @@ -63,8 +63,8 @@ struct DeltaSnapshot : public MultiFileList { string GetFile(idx_t i) override; protected: - void InitializeSnapshot(); - void InitializeScan(); + void InitializeSnapshot(); + void InitializeScan(); template T TryUnpackKernelResult(ffi::ExternResult result) { @@ -76,29 +76,29 @@ struct DeltaSnapshot : public MultiFileList { public: idx_t version; - //! Delta Kernel Structures - shared_ptr snapshot; + //! Delta Kernel Structures + shared_ptr snapshot; - KernelExternEngine extern_engine; - KernelScan scan; - KernelGlobalScanState global_state; - KernelScanDataIterator scan_data_iterator; + KernelExternEngine extern_engine; + KernelScan scan; + KernelGlobalScanState global_state; + KernelScanDataIterator scan_data_iterator; - //! Names - vector names; - vector types; - bool have_bound = false; + //! Names + vector names; + vector types; + bool have_bound = false; //! Metadata map for files vector> metadata; - //! Current file list resolution state - bool initialized_snapshot = false; - bool initialized_scan = false; + //! Current file list resolution state + bool initialized_snapshot = false; + bool initialized_scan = false; - bool files_exhausted = false; - vector resolved_files; - TableFilterSet table_filters; + bool files_exhausted = false; + vector resolved_files; + TableFilterSet table_filters; ClientContext &context; }; @@ -116,10 +116,10 @@ struct DeltaMultiFileReaderGlobalState : public MultiFileReaderGlobalState { }; struct DeltaMultiFileReader : public MultiFileReader { - static unique_ptr CreateInstance(const TableFunction &table_function); - //! Return a DeltaSnapshot - shared_ptr CreateFileList(ClientContext &context, const vector &paths, - FileGlobOptions options) override; + static unique_ptr CreateInstance(const TableFunction &table_function); + //! Return a DeltaSnapshot + shared_ptr CreateFileList(ClientContext &context, const vector &paths, + FileGlobOptions options) override; //! Override the regular parquet bind using the MultiFileReader Bind. The bind from these are what DuckDB's file //! readers will try read @@ -153,13 +153,13 @@ struct DeltaMultiFileReader : public MultiFileReader { const MultiFileReaderData &reader_data, DataChunk &chunk, optional_ptr global_state) override; - //! Override the ParseOption call to parse delta_scan specific options - bool ParseOption(const string &key, const Value &val, MultiFileReaderOptions &options, - ClientContext &context) override; + //! Override the ParseOption call to parse delta_scan specific options + bool ParseOption(const string &key, const Value &val, MultiFileReaderOptions &options, + ClientContext &context) override; - // A snapshot can be injected into the multifilereader, this ensures the GetMultiFileList can return this snapshot - // (note that the path should match the one passed to CreateFileList) - shared_ptr snapshot; + // A snapshot can be injected into the multifilereader, this ensures the GetMultiFileList can return this snapshot + // (note that the path should match the one passed to CreateFileList) + shared_ptr snapshot; }; } // namespace duckdb diff --git a/src/include/functions/expression_functions.hpp b/src/include/functions/expression_functions.hpp new file mode 100644 index 0000000..e69de29 diff --git a/src/include/storage/delta_catalog.hpp b/src/include/storage/delta_catalog.hpp index faeb00c..17549dd 100644 --- a/src/include/storage/delta_catalog.hpp +++ b/src/include/storage/delta_catalog.hpp @@ -30,7 +30,7 @@ class DeltaCatalog : public Catalog { string path; AccessMode access_mode; - bool use_cache; + bool use_cache; public: void Initialize(bool load_builtin) override; @@ -59,22 +59,22 @@ class DeltaCatalog : public Catalog { DatabaseSize GetDatabaseSize(ClientContext &context) override; - optional_idx GetCatalogVersion(ClientContext &context) override; + optional_idx GetCatalogVersion(ClientContext &context) override; bool InMemory() override; string GetDBPath() override; - bool UseCachedSnapshot(); + bool UseCachedSnapshot(); - DeltaSchemaEntry& GetMainSchema() { - return *main_schema; - } + DeltaSchemaEntry &GetMainSchema() { + return *main_schema; + } private: void DropSchema(ClientContext &context, DropInfo &info) override; private: - unique_ptr main_schema; + unique_ptr main_schema; string default_schema; }; diff --git a/src/include/storage/delta_schema_entry.hpp b/src/include/storage/delta_schema_entry.hpp index c8a8d09..dc41a4c 100644 --- a/src/include/storage/delta_schema_entry.hpp +++ b/src/include/storage/delta_schema_entry.hpp @@ -40,13 +40,13 @@ class DeltaSchemaEntry : public SchemaCatalogEntry { void DropEntry(ClientContext &context, DropInfo &info) override; optional_ptr GetEntry(CatalogTransaction transaction, CatalogType type, const string &name) override; - optional_ptr GetCachedTable(); + optional_ptr GetCachedTable(); private: - //! Delta tables may be cached in the SchemaEntry. Since the TableEntry holds the snapshot, this allows sharing a snapshot - //! between different scans. - unique_ptr cached_table; - mutex lock; + //! Delta tables may be cached in the SchemaEntry. Since the TableEntry holds the snapshot, this allows sharing a + //! snapshot between different scans. + unique_ptr cached_table; + mutex lock; }; } // namespace duckdb diff --git a/src/include/storage/delta_table_entry.hpp b/src/include/storage/delta_table_entry.hpp index c131694..5263e88 100644 --- a/src/include/storage/delta_table_entry.hpp +++ b/src/include/storage/delta_table_entry.hpp @@ -17,7 +17,7 @@ struct DeltaSnapshot; class DeltaTableEntry : public TableCatalogEntry { public: DeltaTableEntry(Catalog &catalog, SchemaCatalogEntry &schema, CreateTableInfo &info); - ~DeltaTableEntry(); + ~DeltaTableEntry(); public: unique_ptr GetStatistics(ClientContext &context, column_t column_id) override; @@ -30,7 +30,7 @@ class DeltaTableEntry : public TableCatalogEntry { ClientContext &context) override; public: - shared_ptr snapshot; + shared_ptr snapshot; }; } // namespace duckdb diff --git a/src/include/storage/delta_transaction.hpp b/src/include/storage/delta_transaction.hpp index 3a004ef..b9d369c 100644 --- a/src/include/storage/delta_transaction.hpp +++ b/src/include/storage/delta_transaction.hpp @@ -30,11 +30,12 @@ class DeltaTransaction : public Transaction { static DeltaTransaction &Get(ClientContext &context, Catalog &catalog); AccessMode GetAccessMode() const; - void SetReadWrite() override { - throw NotImplementedException("Can not start read-write transaction"); - }; + void SetReadWrite() override { + throw NotImplementedException("Can not start read-write transaction"); + }; + public: - unique_ptr table_entry; + unique_ptr table_entry; private: // DeltaConnection connection; diff --git a/src/storage/delta_catalog.cpp b/src/storage/delta_catalog.cpp index 1e8ac4e..53b1195 100644 --- a/src/storage/delta_catalog.cpp +++ b/src/storage/delta_catalog.cpp @@ -18,31 +18,32 @@ DeltaCatalog::DeltaCatalog(AttachedDatabase &db_p, const string &path, AccessMod DeltaCatalog::~DeltaCatalog() = default; void DeltaCatalog::Initialize(bool load_builtin) { - CreateSchemaInfo info; - main_schema = make_uniq(*this, info); + CreateSchemaInfo info; + main_schema = make_uniq(*this, info); } optional_ptr DeltaCatalog::CreateSchema(CatalogTransaction transaction, CreateSchemaInfo &info) { - throw BinderException("Delta tables do not support creating new schemas"); + throw BinderException("Delta tables do not support creating new schemas"); } void DeltaCatalog::DropSchema(ClientContext &context, DropInfo &info) { - throw BinderException("Delta tables do not support dropping schemas"); + throw BinderException("Delta tables do not support dropping schemas"); } void DeltaCatalog::ScanSchemas(ClientContext &context, std::function callback) { - callback(*main_schema); + callback(*main_schema); } optional_ptr DeltaCatalog::GetSchema(CatalogTransaction transaction, const string &schema_name, - OnEntryNotFound if_not_found, QueryErrorContext error_context) { - if (schema_name == DEFAULT_SCHEMA || schema_name == INVALID_SCHEMA) { - return main_schema.get(); - } - if (if_not_found == OnEntryNotFound::RETURN_NULL) { - return nullptr; - } - return nullptr; + OnEntryNotFound if_not_found, + QueryErrorContext error_context) { + if (schema_name == DEFAULT_SCHEMA || schema_name == INVALID_SCHEMA) { + return main_schema.get(); + } + if (if_not_found == OnEntryNotFound::RETURN_NULL) { + return nullptr; + } + return nullptr; } bool DeltaCatalog::InMemory() { @@ -54,24 +55,24 @@ string DeltaCatalog::GetDBPath() { } bool DeltaCatalog::UseCachedSnapshot() { - return use_cache; + return use_cache; } optional_idx DeltaCatalog::GetCatalogVersion(ClientContext &context) { - auto &delta_transaction = DeltaTransaction::Get(context, *this); + auto &delta_transaction = DeltaTransaction::Get(context, *this); - // Option 1: snapshot is cached table-wide - auto cached_snapshot = main_schema->GetCachedTable(); - if (cached_snapshot) { - return cached_snapshot->snapshot->version; - } + // Option 1: snapshot is cached table-wide + auto cached_snapshot = main_schema->GetCachedTable(); + if (cached_snapshot) { + return cached_snapshot->snapshot->version; + } - // Option 2: snapshot is cached in transaction - if (delta_transaction.table_entry) { - return delta_transaction.table_entry->snapshot->version; - } + // Option 2: snapshot is cached in transaction + if (delta_transaction.table_entry) { + return delta_transaction.table_entry->snapshot->version; + } - return {}; + return {}; } DatabaseSize DeltaCatalog::GetDatabaseSize(ClientContext &context) { @@ -84,23 +85,23 @@ DatabaseSize DeltaCatalog::GetDatabaseSize(ClientContext &context) { } unique_ptr DeltaCatalog::PlanInsert(ClientContext &context, LogicalInsert &op, - unique_ptr plan) { + unique_ptr plan) { throw NotImplementedException("DeltaCatalog does not support inserts"); } unique_ptr DeltaCatalog::PlanCreateTableAs(ClientContext &context, LogicalCreateTable &op, - unique_ptr plan) { + unique_ptr plan) { throw NotImplementedException("DeltaCatalog does not support creating new tables"); } unique_ptr DeltaCatalog::PlanDelete(ClientContext &context, LogicalDelete &op, - unique_ptr plan) { + unique_ptr plan) { throw NotImplementedException("DeltaCatalog does not support deletes"); } unique_ptr DeltaCatalog::PlanUpdate(ClientContext &context, LogicalUpdate &op, - unique_ptr plan) { + unique_ptr plan) { throw NotImplementedException("DeltaCatalog does not support updates"); } -unique_ptr DeltaCatalog::BindCreateIndex(Binder &binder, CreateStatement &stmt, TableCatalogEntry &table, - unique_ptr plan) { +unique_ptr DeltaCatalog::BindCreateIndex(Binder &binder, CreateStatement &stmt, + TableCatalogEntry &table, unique_ptr plan) { throw NotImplementedException("DeltaCatalog does not support creating indices"); } diff --git a/src/storage/delta_schema_entry.cpp b/src/storage/delta_schema_entry.cpp index 7e15c5b..61348d4 100644 --- a/src/storage/delta_schema_entry.cpp +++ b/src/storage/delta_schema_entry.cpp @@ -17,11 +17,9 @@ #include "duckdb/parser/parsed_data/alter_table_info.hpp" #include "duckdb/parser/parsed_expression_iterator.hpp" - namespace duckdb { -DeltaSchemaEntry::DeltaSchemaEntry(Catalog &catalog, CreateSchemaInfo &info) - : SchemaCatalogEntry(catalog, info) { +DeltaSchemaEntry::DeltaSchemaEntry(Catalog &catalog, CreateSchemaInfo &info) : SchemaCatalogEntry(catalog, info) { } DeltaSchemaEntry::~DeltaSchemaEntry() { @@ -35,7 +33,7 @@ DeltaTransaction &GetDeltaTransaction(CatalogTransaction transaction) { } optional_ptr DeltaSchemaEntry::CreateTable(CatalogTransaction transaction, BoundCreateTableInfo &info) { - throw BinderException("Delta tables do not support creating tables"); + throw BinderException("Delta tables do not support creating tables"); } optional_ptr DeltaSchemaEntry::CreateFunction(CatalogTransaction transaction, CreateFunctionInfo &info) { @@ -53,7 +51,7 @@ void DeltaUnqualifyColumnRef(ParsedExpression &expr) { } optional_ptr DeltaSchemaEntry::CreateIndex(CatalogTransaction transaction, CreateIndexInfo &info, - TableCatalogEntry &table) { + TableCatalogEntry &table) { throw NotImplementedException("CreateIndex"); } @@ -62,7 +60,7 @@ string GetDeltaCreateView(CreateViewInfo &info) { } optional_ptr DeltaSchemaEntry::CreateView(CatalogTransaction transaction, CreateViewInfo &info) { - throw BinderException("Delta tables do not support creating views"); + throw BinderException("Delta tables do not support creating views"); } optional_ptr DeltaSchemaEntry::CreateType(CatalogTransaction transaction, CreateTypeInfo &info) { @@ -74,26 +72,27 @@ optional_ptr DeltaSchemaEntry::CreateSequence(CatalogTransaction t } optional_ptr DeltaSchemaEntry::CreateTableFunction(CatalogTransaction transaction, - CreateTableFunctionInfo &info) { + CreateTableFunctionInfo &info) { throw BinderException("Delta databases do not support creating table functions"); } optional_ptr DeltaSchemaEntry::CreateCopyFunction(CatalogTransaction transaction, - CreateCopyFunctionInfo &info) { + CreateCopyFunctionInfo &info) { throw BinderException("Delta databases do not support creating copy functions"); } optional_ptr DeltaSchemaEntry::CreatePragmaFunction(CatalogTransaction transaction, - CreatePragmaFunctionInfo &info) { + CreatePragmaFunctionInfo &info) { throw BinderException("Delta databases do not support creating pragma functions"); } -optional_ptr DeltaSchemaEntry::CreateCollation(CatalogTransaction transaction, CreateCollationInfo &info) { +optional_ptr DeltaSchemaEntry::CreateCollation(CatalogTransaction transaction, + CreateCollationInfo &info) { throw BinderException("Delta databases do not support creating collations"); } void DeltaSchemaEntry::Alter(CatalogTransaction transaction, AlterInfo &info) { - throw NotImplementedException("Delta tables do not support altering"); + throw NotImplementedException("Delta tables do not support altering"); } bool CatalogTypeIsSupported(CatalogType type) { @@ -105,80 +104,80 @@ bool CatalogTypeIsSupported(CatalogType type) { } } -static unique_ptr CreateTableEntry(ClientContext &context, DeltaCatalog &delta_catalog, DeltaSchemaEntry &schema_entry) { - auto snapshot = make_shared_ptr(context, delta_catalog.GetDBPath()); +static unique_ptr CreateTableEntry(ClientContext &context, DeltaCatalog &delta_catalog, + DeltaSchemaEntry &schema_entry) { + auto snapshot = make_shared_ptr(context, delta_catalog.GetDBPath()); - // Get the names and types from the delta snapshot - vector return_types; - vector names; - snapshot->Bind(return_types, names); + // Get the names and types from the delta snapshot + vector return_types; + vector names; + snapshot->Bind(return_types, names); - CreateTableInfo table_info; - for (idx_t i = 0; i < return_types.size(); i++) { - table_info.columns.AddColumn(ColumnDefinition(names[i], return_types[i])); - } - table_info.table = DEFAULT_DELTA_TABLE; - auto table_entry = make_uniq(delta_catalog, schema_entry, table_info); - table_entry->snapshot = std::move(snapshot); + CreateTableInfo table_info; + for (idx_t i = 0; i < return_types.size(); i++) { + table_info.columns.AddColumn(ColumnDefinition(names[i], return_types[i])); + } + table_info.table = DEFAULT_DELTA_TABLE; + auto table_entry = make_uniq(delta_catalog, schema_entry, table_info); + table_entry->snapshot = std::move(snapshot); - return table_entry; + return table_entry; } void DeltaSchemaEntry::Scan(ClientContext &context, CatalogType type, - const std::function &callback) { + const std::function &callback) { if (!CatalogTypeIsSupported(type)) { - auto transaction = catalog.GetCatalogTransaction(context); + auto transaction = catalog.GetCatalogTransaction(context); auto default_table = GetEntry(transaction, type, DEFAULT_DELTA_TABLE); - if (default_table) { - callback(*default_table); - } + if (default_table) { + callback(*default_table); + } } - } void DeltaSchemaEntry::Scan(CatalogType type, const std::function &callback) { throw NotImplementedException("Scan without context not supported"); } void DeltaSchemaEntry::DropEntry(ClientContext &context, DropInfo &info) { - throw NotImplementedException("Delta tables do not support dropping"); + throw NotImplementedException("Delta tables do not support dropping"); } optional_ptr DeltaSchemaEntry::GetEntry(CatalogTransaction transaction, CatalogType type, - const string &name) { - if (!transaction.HasContext()) { - throw NotImplementedException("Can not DeltaSchemaEntry::GetEntry without context"); - } - auto &context = transaction.GetContext(); - - if (type == CatalogType::TABLE_ENTRY && name == DEFAULT_DELTA_TABLE) { - auto &delta_transaction = GetDeltaTransaction(transaction); - auto &delta_catalog = catalog.Cast(); - - if (delta_transaction.table_entry) { - return *delta_transaction.table_entry; - } - - if (delta_catalog.UseCachedSnapshot()) { - unique_lock l(lock); - if (!cached_table) { - cached_table = CreateTableEntry(context, delta_catalog, *this); - } - return *cached_table; - } - - delta_transaction.table_entry = CreateTableEntry(context, delta_catalog, *this); - return *delta_transaction.table_entry; - } + const string &name) { + if (!transaction.HasContext()) { + throw NotImplementedException("Can not DeltaSchemaEntry::GetEntry without context"); + } + auto &context = transaction.GetContext(); + + if (type == CatalogType::TABLE_ENTRY && name == DEFAULT_DELTA_TABLE) { + auto &delta_transaction = GetDeltaTransaction(transaction); + auto &delta_catalog = catalog.Cast(); + + if (delta_transaction.table_entry) { + return *delta_transaction.table_entry; + } + + if (delta_catalog.UseCachedSnapshot()) { + unique_lock l(lock); + if (!cached_table) { + cached_table = CreateTableEntry(context, delta_catalog, *this); + } + return *cached_table; + } + + delta_transaction.table_entry = CreateTableEntry(context, delta_catalog, *this); + return *delta_transaction.table_entry; + } - return nullptr; + return nullptr; } optional_ptr DeltaSchemaEntry::GetCachedTable() { - lock_guard lck(lock); - if (cached_table) { - return *cached_table; - } - return nullptr; + lock_guard lck(lock); + if (cached_table) { + return *cached_table; + } + return nullptr; } } // namespace duckdb diff --git a/src/storage/delta_table_entry.cpp b/src/storage/delta_table_entry.cpp index f82caa4..6f7f829 100644 --- a/src/storage/delta_table_entry.cpp +++ b/src/storage/delta_table_entry.cpp @@ -32,7 +32,7 @@ unique_ptr DeltaTableEntry::GetStatistics(ClientContext &context } void DeltaTableEntry::BindUpdateConstraints(Binder &binder, LogicalGet &, LogicalProjection &, LogicalUpdate &, - ClientContext &) { + ClientContext &) { throw NotImplementedException("BindUpdateConstraints for delta table"); } @@ -43,11 +43,11 @@ TableFunction DeltaTableEntry::GetScanFunction(ClientContext &context, unique_pt auto delta_scan_function = delta_function_set.functions.GetFunctionByArguments(context, {LogicalType::VARCHAR}); auto &delta_catalog = catalog.Cast(); - // Copy over the internal kernel snapshot - auto function_info = make_shared_ptr(); + // Copy over the internal kernel snapshot + auto function_info = make_shared_ptr(); - function_info->snapshot = this->snapshot; - delta_scan_function.function_info = std::move(function_info); + function_info->snapshot = this->snapshot; + delta_scan_function.function_info = std::move(function_info); vector inputs = {delta_catalog.GetDBPath()}; named_parameter_map_t param_map; @@ -55,7 +55,6 @@ TableFunction DeltaTableEntry::GetScanFunction(ClientContext &context, unique_pt vector names; TableFunctionRef empty_ref; - TableFunctionBindInput bind_input(inputs, param_map, return_types, names, nullptr, nullptr, delta_scan_function, empty_ref); diff --git a/src/storage/delta_transaction.cpp b/src/storage/delta_transaction.cpp index 3846c47..2af1a46 100644 --- a/src/storage/delta_transaction.cpp +++ b/src/storage/delta_transaction.cpp @@ -27,7 +27,7 @@ void DeltaTransaction::Commit() { void DeltaTransaction::Rollback() { if (transaction_state == DeltaTransactionState::TRANSACTION_STARTED) { transaction_state = DeltaTransactionState::TRANSACTION_FINISHED; - // NOP: we only support read-only transactions currently + // NOP: we only support read-only transactions currently } } @@ -36,7 +36,7 @@ DeltaTransaction &DeltaTransaction::Get(ClientContext &context, Catalog &catalog } AccessMode DeltaTransaction::GetAccessMode() const { - return access_mode; + return access_mode; } } // namespace duckdb