Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read in process Python objects like Dataframe, Numpy or dict #211

Merged
merged 26 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
b085bbb
Fix Python ide index issue
auxten Apr 12, 2024
27d17ad
Add simple StoragePython to fix compile flags
auxten Apr 12, 2024
5e917f3
Add PyReader and PyWriter ABC
auxten Apr 22, 2024
614d392
Prototype works
auxten May 3, 2024
1e789dc
GetSchema in PyReader works on dict
auxten May 7, 2024
0f58c45
Merge all convert_and_insert and getTableStructureFromData v1
auxten May 7, 2024
209646a
Refactor convert_and_insert
auxten May 7, 2024
b79246f
Fix prototype of PyReader.read
auxten May 8, 2024
bc6603b
Remove trampoline class
auxten May 8, 2024
81da2aa
Fix gc for read returned data
auxten May 8, 2024
da85c29
Fix reader type to py::object
auxten May 8, 2024
7461e83
Fix gil cross threads between C++ and Python
auxten May 8, 2024
623b322
Use inspect.current_frame and f_back to find py obj
auxten May 9, 2024
68b6729
Add append_raw in PODArray
auxten May 13, 2024
656bccf
Add appendRawData in ColumnVectorHelper
auxten May 13, 2024
b4923dc
Treat binary[pyarrow] as string
auxten May 13, 2024
f050319
Fix pandas arrow dtype
auxten May 16, 2024
451f8d0
Benchmark on clickbench data
auxten May 16, 2024
093d9dc
Add pybind headers for ColumnPyObject
auxten May 24, 2024
ce37839
2x faster on Q23 with better getPyUtf8StrData
auxten May 24, 2024
9db0a0a
Add PythonUtils
auxten May 30, 2024
fef6cda
Support SQL on data objects without PyReader
auxten May 30, 2024
0e53dcb
Do things with GIL in batch
auxten May 30, 2024
187edc4
GIL less scanDataToChunk
auxten May 31, 2024
22c0bb8
Move prepareColumnCache to StoragePython
auxten Jun 3, 2024
4725983
Convert UTF-16 and UTF-32 without copy in FillColumnString
auxten Jun 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions chdb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,10 @@ def query(sql, output_format="CSV", path="", udf_path=""):
# alias for query
sql = query

PyReader = _chdb.PyReader

__all__ = [
"PyReader",
"ChdbError",
"query",
"sql",
Expand Down
65 changes: 65 additions & 0 deletions chdb/rwabc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from abc import ABC, abstractmethod
from typing import List, Any


class PyReader(ABC):
def __init__(self, data: Any):
"""
Initialize the reader with data. The exact type and structure of `data` can vary.

Args:
data (Any): The data with which to initialize the reader, format and type are not strictly defined.
"""
self.data = data

@abstractmethod
def read(self, col_names: List[str], count: int) -> List[Any]:
"""
Read a specified number of rows from the given columns and return a list of objects,
where each object is a sequence of values for a column.

Args:
col_names (List[str]): List of column names to read.
count (int): Maximum number of rows to read.

Returns:
List[Any]: List of sequences, one for each column.
"""
pass


class PyWriter(ABC):
def __init__(self, col_names: List[str], types: List[type], data: Any):
"""
Initialize the writer with column names, their types, and initial data.

Args:
col_names (List[str]): List of column names.
types (List[type]): List of types corresponding to each column.
data (Any): Initial data to setup the writer, format and type are not strictly defined.
"""
self.col_names = col_names
self.types = types
self.data = data
self.blocks = []

@abstractmethod
def write(self, col_names: List[str], columns: List[List[Any]]) -> None:
"""
Save columns of data to blocks. Must be implemented by subclasses.

Args:
col_names (List[str]): List of column names that are being written.
columns (List[List[Any]]): List of columns data, each column is represented by a list.
"""
pass

@abstractmethod
def finalize(self) -> bytes:
"""
Assemble and return the final data from blocks. Must be implemented by subclasses.

Returns:
bytes: The final serialized data.
"""
pass
33 changes: 31 additions & 2 deletions programs/local/LocalChdb.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#include "LocalChdb.h"

#include <iostream>
#include <Storages/StoragePython.h>
#include <pybind11/gil.h>


extern bool inside_main = true;
Expand Down Expand Up @@ -51,6 +51,7 @@ local_result_v2 * queryToBuffer(
for (auto & arg : argv)
argv_char.push_back(const_cast<char *>(arg.c_str()));

py::gil_scoped_release release;
return query_stable_v2(argv_char.size(), argv_char.data());
}

Expand Down Expand Up @@ -147,6 +148,34 @@ PYBIND11_MODULE(_chdb, m)
.def("has_error", &query_result::has_error)
.def("error_message", &query_result::error_message);

py::class_<DB::PyReader, std::shared_ptr<DB::PyReader>>(m, "PyReader")
.def(
py::init<const py::object &>(),
"Initialize the reader with data. The exact type and structure of `data` can vary."
"you must hold the data with `self.data` in your inherit class\n\n"
"Args:\n"
" data (Any): The data with which to initialize the reader, format and type are not strictly defined.")
.def(
"read",
[](DB::PyReader & self, const std::vector<std::string> & col_names, int count)
{
// GIL is held when called from Python code. Release it to avoid deadlock
py::gil_scoped_release release;
return std::move(self.read(col_names, count));
},
"Read a specified number of rows from the given columns and return a list of objects, "
"where each object is a sequence of values for a column.\n\n"
"Args:\n"
" col_names (List[str]): List of column names to read.\n"
" count (int): Maximum number of rows to read.\n\n"
"Returns:\n"
" List[Any]: List of sequences, one for each column.")
.def(
"get_schema",
&DB::PyReader::getSchema,
"Return a list of column names and their types.\n\n"
"Returns:\n"
" List[str, str]: List of column name and type pairs.");

m.def(
"query",
Expand Down
5 changes: 5 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,8 @@ build-backend = "setuptools.build_meta"

[tool.cibuildwheel]
build-frontend = "pip"

[tool.pyright]
include = ["chdb"]
exclude = ["src", "contrib", "programs", "build", "buildlib", "dist", "venv", ".venv", ".vscode", ".git", "__pycache__", ".mypy_cache", ".pytest_cache"]

79 changes: 78 additions & 1 deletion src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ if (TARGET ch_contrib::jemalloc)
target_link_libraries (clickhouse_common_io PUBLIC ch_contrib::jemalloc)
endif()

target_link_libraries (clickhouse_common_io PUBLIC ch_contrib::sparsehash ch_contrib::incbin)
target_link_libraries (clickhouse_common_io PUBLIC ch_contrib::sparsehash ch_contrib::incbin ch_contrib::icu)

add_subdirectory(Access/Common)
add_subdirectory(Common/ZooKeeper)
Expand Down Expand Up @@ -264,6 +264,83 @@ target_link_libraries (dbms PRIVATE ch_contrib::libdivide)
if (TARGET ch_contrib::jemalloc)
target_link_libraries (dbms PRIVATE ch_contrib::jemalloc)
endif()

# Include path from shell cmd "python3 -m pybind11 --includes"
execute_process(COMMAND python3 -m pybind11 --includes
OUTPUT_VARIABLE PYBIND11_INCLUDES
OUTPUT_STRIP_TRAILING_WHITESPACE
)

# Extract and set include directories specifically for source using pybind11
string(REGEX MATCHALL "-I([^ ]+)" INCLUDE_DIRS_MATCHES ${PYBIND11_INCLUDES})
set(PYTHON_INCLUDE_DIRS "")
foreach(INCLUDE_DIR_MATCH ${INCLUDE_DIRS_MATCHES})
string(REGEX REPLACE "-I" "" INCLUDE_DIR_MATCH ${INCLUDE_DIR_MATCH})
# Accumulate all include directories
set(PYTHON_INCLUDE_DIRS "${PYTHON_INCLUDE_DIRS};${INCLUDE_DIR_MATCH}")
endforeach()

# Apply the include directories to Storages/StoragePython.cpp and Processors/Sources/PythonSource.cpp
set_source_files_properties(Storages/StoragePython.cpp PROPERTIES INCLUDE_DIRECTORIES "${PYTHON_INCLUDE_DIRS}")
set_source_files_properties(Processors/Sources/PythonSource.cpp PROPERTIES INCLUDE_DIRECTORIES "${PYTHON_INCLUDE_DIRS}")
set_source_files_properties(Columns/ColumnPyObject.cpp PROPERTIES INCLUDE_DIRECTORIES "${PYTHON_INCLUDE_DIRS}")
set_source_files_properties(Common/PythonUtils.cpp PROPERTIES INCLUDE_DIRECTORIES "${PYTHON_INCLUDE_DIRS}")

# get python version, something like python3.x
execute_process(COMMAND python3 -c "import sys; print('python3.'+str(sys.version_info[1]))"
OUTPUT_VARIABLE PYTHON_VERSION
OUTPUT_STRIP_TRAILING_WHITESPACE
)

# remove all warning, because pybind11 will generate a lot of warning
if (OS_LINUX)
# pybind11 will try to find x86_64-linux-gnu/${PYTHON_VERSION}/pyconfig.h
# use -idirafter to make it find the right one and not polute the include path
# set_source_files_properties(Storages/StoragePython.cpp PROPERTIES COMPILE_FLAGS
# "-w -idirafter /usr/include -include x86_64-linux-gnu/${PYTHON_VERSION}/pyconfig.h"
# )
if (PYTHON_VERSION STREQUAL "python3.6" OR PYTHON_VERSION STREQUAL "python3.7" OR PYTHON_VERSION STREQUAL "python3.8")
set_source_files_properties(Storages/StoragePython.cpp PROPERTIES COMPILE_FLAGS
"-w -idirafter /usr/include -include crypt.h"
)
set_source_files_properties(Processors/Sources/PythonSource.cpp PROPERTIES COMPILE_FLAGS
"-w -idirafter /usr/include -include crypt.h"
)
set_source_files_properties(Columns/ColumnPyObject.cpp PROPERTIES COMPILE_FLAGS
"-w -idirafter /usr/include -include crypt.h"
)
set_source_files_properties(Common/PythonUtils.cpp PROPERTIES COMPILE_FLAGS
"-w -idirafter /usr/include -include crypt.h"
)
else()
set_source_files_properties(Storages/StoragePython.cpp PROPERTIES COMPILE_FLAGS
"-w"
)
set_source_files_properties(Processors/Sources/PythonSource.cpp PROPERTIES COMPILE_FLAGS
"-w"
)
set_source_files_properties(Columns/ColumnPyObject.cpp PROPERTIES COMPILE_FLAGS
"-w"
)
set_source_files_properties(Common/PythonUtils.cpp PROPERTIES COMPILE_FLAGS
"-w"
)
endif()
elseif (OS_DARWIN)
set_source_files_properties(Storages/StoragePython.cpp PROPERTIES COMPILE_FLAGS
"-w"
)
set_source_files_properties(Processors/Sources/PythonSource.cpp PROPERTIES COMPILE_FLAGS
"-w"
)
set_source_files_properties(Columns/ColumnPyObject.cpp PROPERTIES COMPILE_FLAGS
"-w"
)
set_source_files_properties(Common/PythonUtils.cpp PROPERTIES COMPILE_FLAGS
"-w"
)
endif()

set (all_modules dbms)

macro (dbms_target_include_directories)
Expand Down
4 changes: 2 additions & 2 deletions src/Columns/ColumnString.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#pragma once

#include <cstring>
#include <cassert>
#include <cstddef>
#include <cstring>

#include <Columns/IColumn.h>
#include <Columns/IColumnImpl.h>
Expand Down Expand Up @@ -154,7 +155,6 @@ class ColumnString final : public COWHelper<IColumn, ColumnString>
{
const size_t old_size = chars.size();
const size_t new_size = old_size + length + 1;

chars.resize(new_size);
if (length)
memcpy(chars.data() + old_size, pos, length);
Expand Down
8 changes: 8 additions & 0 deletions src/Columns/ColumnVectorHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ class ColumnVectorHelper : public IColumn
reinterpret_cast<char *>(this) + sizeof(*this))
->push_back_raw(ptr);
}

template <size_t ELEMENT_SIZE>
void appendRawData(const char * ptr, size_t count)
{
return reinterpret_cast<PODArrayBase<ELEMENT_SIZE, 4096, Allocator<false>, PADDING_FOR_SIMD - 1, PADDING_FOR_SIMD> *>(
reinterpret_cast<char *>(this) + sizeof(*this))
->append_raw(ptr, count);
}
};

}
2 changes: 2 additions & 0 deletions src/Common/ErrorCodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,8 @@
M(704, CANNOT_USE_QUERY_CACHE_WITH_NONDETERMINISTIC_FUNCTIONS) \
M(705, TABLE_NOT_EMPTY) \
M(706, LIBSSH_ERROR) \
M(707, PY_EXCEPTION_OCCURED) \
M(708, PY_OBJECT_NOT_FOUND) \
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \
M(1001, STD_EXCEPTION) \
Expand Down
12 changes: 12 additions & 0 deletions src/Common/PODArray.h
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,18 @@ class PODArrayBase : private boost::noncopyable, private TAllocator /// empty
c_end += ELEMENT_SIZE;
}

template <typename... TAllocatorParams>
void append_raw(const void * ptr, size_t count, TAllocatorParams &&... allocator_params) /// NOLINT
{
size_t bytes_to_copy = byte_size(count);
size_t required_capacity = size() + bytes_to_copy;
if (unlikely(required_capacity > capacity()))
reserve(required_capacity, std::forward<TAllocatorParams>(allocator_params)...);

memcpy(c_end, ptr, bytes_to_copy);
c_end += bytes_to_copy;
}

void protect()
{
#ifndef NDEBUG
Expand Down
Loading
Loading