Skip to content

Commit

Permalink
Convert UTF-16 and UTF-32 without copy in FillColumnString
Browse files Browse the repository at this point in the history
  • Loading branch information
auxten committed Jun 17, 2024
1 parent 1437cc2 commit eeb6b68
Show file tree
Hide file tree
Showing 5 changed files with 198 additions and 21 deletions.
137 changes: 130 additions & 7 deletions src/Common/PythonUtils.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
#include <cstddef>

#include <pybind11/gil.h>
#include <pybind11/pytypes.h>
#include <unicode/bytestream.h>
#include "Common/logger_useful.h"
#include <unicode/unistr.h>
#include <Common/PythonUtils.h>
#include "pybind11/gil.h"
#include "pybind11/pytypes.h"
#include <Common/logger_useful.h>
#include "Columns/ColumnString.h"

namespace DB
{
Expand Down Expand Up @@ -79,6 +82,119 @@ const char * ConvertPyUnicodeToUtf8(const void * input, int kind, size_t codepoi
return output_buffer;
}

size_t
ConvertPyUnicodeToUtf8(const void * input, int kind, size_t codepoint_cnt, ColumnString::Offsets & offsets, ColumnString::Chars & chars)
{
if (input == nullptr)
return 0;

size_t estimated_size = codepoint_cnt * 4 + 1; // Allocate buffer for UTF-8 output
size_t chars_cursor = chars.size();
size_t target_size = chars_cursor + estimated_size;
chars.resize(target_size);

switch (kind)
{
case 1: { // Handle 1-byte characters (Latin1/ASCII equivalent in ICU)
const char * start = (const char *)input;
const char * end = start + codepoint_cnt;
char code_unit;
int32_t append_size = 0;

while (start < end)
{
code_unit = *start++;
U8_APPEND_UNSAFE(chars.data(), chars_cursor, code_unit);
}
break;
}
case 2: { // Handle 2-byte characters (UTF-16 equivalent)
const UChar * start = (const UChar *)input;
const UChar * end = start + codepoint_cnt;
UChar code_unit;
int32_t append_size = 0;

while (start < end)
{
code_unit = *start++;
U8_APPEND_UNSAFE(chars.data(), chars_cursor, code_unit);
}
break;
}
case 4: { // Handle 4-byte characters (Assume UCS-4/UTF-32)
const UInt32 * start = (const UInt32 *)input;
const UInt32 * end = start + codepoint_cnt;
UInt32 code_unit;
int32_t append_size = 0;

while (start < end)
{
code_unit = *start++;
U8_APPEND_UNSAFE(chars.data(), chars_cursor, code_unit);
}
break;
}
default:
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported unicode kind {}", kind);
}

chars[chars_cursor++] = '\0'; // Null terminate the output string and increase the cursor
offsets.push_back(chars_cursor);
chars.resize_assume_reserved(chars_cursor);

return chars_cursor;
}

void FillColumnString(PyObject * obj, ColumnString * column)
{
ColumnString::Offsets & offsets = column->getOffsets();
ColumnString::Chars & chars = column->getChars();
if (PyUnicode_IS_COMPACT_ASCII(obj))
{
const char * data = reinterpret_cast<const char *>(PyUnicode_1BYTE_DATA(obj));
size_t unicode_len = PyUnicode_GET_LENGTH(obj);
column->insertData(data, unicode_len);
}
else
{
PyCompactUnicodeObject * unicode = reinterpret_cast<PyCompactUnicodeObject *>(obj);
if (unicode->utf8 != nullptr)
{
// It's utf8 string, treat it like ASCII
const char * data = reinterpret_cast<const char *>(unicode->utf8);
column->insertData(data, unicode->utf8_length);
}
else if (PyUnicode_IS_COMPACT(obj))
{
auto kind = PyUnicode_KIND(obj);
const char * data;
size_t codepoint_cnt;

if (kind == PyUnicode_1BYTE_KIND)
data = reinterpret_cast<const char *>(PyUnicode_1BYTE_DATA(obj));
else if (kind == PyUnicode_2BYTE_KIND)
data = reinterpret_cast<const char *>(PyUnicode_2BYTE_DATA(obj));
else if (kind == PyUnicode_4BYTE_KIND)
data = reinterpret_cast<const char *>(PyUnicode_4BYTE_DATA(obj));
else
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported unicode kind {}", kind);
codepoint_cnt = PyUnicode_GET_LENGTH(obj);
ConvertPyUnicodeToUtf8(data, kind, codepoint_cnt, offsets, chars);
}
else
{
// always convert it to utf8, but this case is rare, here goes the slow path
py::gil_scoped_acquire acquire;
Py_ssize_t bytes_size = -1;
const char * data = PyUnicode_AsUTF8AndSize(obj, &bytes_size);
if (bytes_size < 0)
throw Exception(ErrorCodes::PY_EXCEPTION_OCCURED, "Failed to convert Python unicode object to UTF-8");
column->insertData(data, bytes_size);
}
}
}


const char * GetPyUtf8StrData(PyObject * obj, size_t & buf_len)
{
// See: https://github.com/python/cpython/blob/3.9/Include/cpython/unicodeobject.h#L81
Expand All @@ -101,6 +217,7 @@ const char * GetPyUtf8StrData(PyObject * obj, size_t & buf_len)
else if (PyUnicode_IS_COMPACT(obj))
{
auto kind = PyUnicode_KIND(obj);
/// We could not use the implementation provided by CPython like below because it requires GIL holded by the caller
// if (kind == PyUnicode_1BYTE_KIND || kind == PyUnicode_2BYTE_KIND || kind == PyUnicode_4BYTE_KIND)
// {
// // always convert it to utf8
Expand All @@ -125,6 +242,8 @@ const char * GetPyUtf8StrData(PyObject * obj, size_t & buf_len)
// holded by the caller. So we have to do it manually with libicu
codepoint_cnt = PyUnicode_GET_LENGTH(obj);
data = ConvertPyUnicodeToUtf8(data, kind, codepoint_cnt, buf_len);
// set the utf8 buffer back like PyUnicode_AsUTF8AndSize does, so that we can reuse it
// and also we can avoid the memory leak
unicode->utf8 = const_cast<char *>(data);
unicode->utf8_length = buf_len;
return data;
Expand All @@ -133,10 +252,14 @@ const char * GetPyUtf8StrData(PyObject * obj, size_t & buf_len)
{
// always convert it to utf8, but this case is rare, here goes the slow path
py::gil_scoped_acquire acquire;
const char * data = PyUnicode_AsUTF8AndSize(obj, &unicode->utf8_length);
buf_len = unicode->utf8_length;
// set the utf8 buffer back
unicode->utf8 = const_cast<char *>(data);
// PyUnicode_AsUTF8AndSize caches the UTF-8 encoded string in the unicodeobject
// and subsequent calls will return the same string. The memory is released
// when the unicodeobject is deallocated.
Py_ssize_t bytes_size = -1;
const char * data = PyUnicode_AsUTF8AndSize(obj, &bytes_size);
if (bytes_size < 0)
throw Exception(ErrorCodes::PY_EXCEPTION_OCCURED, "Failed to convert Python unicode object to UTF-8");
buf_len = bytes_size;
return data;
}
}
Expand Down
8 changes: 7 additions & 1 deletion src/Common/PythonUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#include <cstddef>
#include <stdexcept>
// #include <unicodeobject.h>
#include <Columns/ColumnString.h>
#include <Columns/IColumn.h>
#include <DataTypes/Serializations/SerializationNumber.h>
#include <pybind11/gil.h>
#include <pybind11/numpy.h>
Expand All @@ -15,13 +17,13 @@
#include <unicode/utypes.h>
#include <Common/Exception.h>


namespace DB
{

namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int PY_EXCEPTION_OCCURED;
}

namespace py = pybind11;
Expand Down Expand Up @@ -58,8 +60,12 @@ auto execWithGIL(Func func, Args &&... args) -> decltype(func(std::forward<Args>
// 4 for 4-byte characters (Assume UCS-4/UTF-32)
const char * ConvertPyUnicodeToUtf8(const void * input, int kind, size_t codepoint_cnt, size_t & output_size);

size_t
ConvertPyUnicodeToUtf8(const void * input, int kind, size_t codepoint_cnt, ColumnString::Offsets & offsets, ColumnString::Chars & chars);

const char * GetPyUtf8StrData(PyObject * obj, size_t & buf_len);

void FillColumnString(PyObject * obj, ColumnString * column);

inline const char * GetPyUtf8StrDataWithGIL(PyObject * obj, size_t & buf_len)
{
Expand Down
56 changes: 44 additions & 12 deletions src/Processors/Sources/PythonSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
#include <memory>
#include <vector>
#include <Columns/ColumnDecimal.h>
#include <pybind11/numpy.h>
// #include <Columns/ColumnPyObject.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnVectorHelper.h>
#include <Columns/IColumn.h>
Expand All @@ -19,13 +17,15 @@
#include <base/Decimal_fwd.h>
#include <base/types.h>
#include <pybind11/gil.h>
#include <pybind11/numpy.h>
#include <pybind11/pybind11.h>
#include <pybind11/pytypes.h>
#include <Poco/Logger.h>
#include <Common/COW.h>
#include <Common/Exception.h>
#include <Common/PythonUtils.h>
#include <Common/logger_useful.h>
#include <Common/typeid_cast.h>


namespace DB
Expand Down Expand Up @@ -61,14 +61,14 @@ PythonSource::PythonSource(
}

template <typename T>
void insert_from_list(const py::list & obj, const MutableColumnPtr & column)
void PythonSource::insert_from_list(const py::list & obj, const MutableColumnPtr & column)
{
py::gil_scoped_acquire acquire;
for (auto && item : obj)
column->insert(item.cast<T>());
}

void insert_string_from_array(const py::handle obj, const MutableColumnPtr & column)
void PythonSource::insert_string_from_array(const py::handle obj, const MutableColumnPtr & column)
{
auto array = castToPyHandleVector(obj);
for (auto && item : array)
Expand All @@ -79,7 +79,8 @@ void insert_string_from_array(const py::handle obj, const MutableColumnPtr & col
}
}

void insert_string_from_array_raw(PyObject ** buf, const MutableColumnPtr & column, const size_t offset, const size_t row_count)
void PythonSource::insert_string_from_array_raw(
PyObject ** buf, const MutableColumnPtr & column, const size_t offset, const size_t row_count)
{
column->reserve(row_count);
for (size_t i = offset; i < offset + row_count; ++i)
Expand All @@ -90,8 +91,37 @@ void insert_string_from_array_raw(PyObject ** buf, const MutableColumnPtr & colu
}
}

void PythonSource::convert_string_array_to_block(
PyObject ** buf, const MutableColumnPtr & column, const size_t offset, const size_t row_count)
{
ColumnString * string_column = typeid_cast<ColumnString *>(column.get());
if (string_column == nullptr)
throw Exception(ErrorCodes::BAD_TYPE_OF_FIELD, "Column is not a string column");
ColumnString::Chars & data = string_column->getChars();
ColumnString::Offsets & offsets = string_column->getOffsets();
offsets.reserve(row_count);
for (size_t i = offset; i < offset + row_count; ++i)
{
FillColumnString(buf[i], string_column);
// Try to help reserve memory for the string column data every 100 rows to avoid frequent reallocations
// Check the avg size of the string column data and reserve memory accordingly
if ((i - offset) % 100 == 99)
{
size_t data_size = data.size();
size_t counter = i - offset + 1;
size_t avg_size = data_size / counter;
size_t reserve_size = avg_size * row_count;
if (reserve_size > data.capacity())
{
LOG_DEBUG(logger, "Reserving memory for string column data: {} bytes", reserve_size);
data.reserve(reserve_size);
}
}
}
}

template <typename T>
void insert_from_ptr(const void * ptr, const MutableColumnPtr & column, const size_t offset, const size_t row_count)
void PythonSource::insert_from_ptr(const void * ptr, const MutableColumnPtr & column, const size_t offset, const size_t row_count)
{
column->reserve(row_count);
// get the raw data from the array and memcpy it into the column
Expand All @@ -102,7 +132,7 @@ void insert_from_ptr(const void * ptr, const MutableColumnPtr & column, const si


template <typename T>
ColumnPtr convert_and_insert(const py::object & obj, UInt32 scale = 0)
ColumnPtr PythonSource::convert_and_insert(const py::object & obj, UInt32 scale)
{
MutableColumnPtr column;
if constexpr (std::is_same_v<T, DateTime64> || std::is_same_v<T, Decimal128> || std::is_same_v<T, Decimal256>)
Expand Down Expand Up @@ -138,7 +168,7 @@ ColumnPtr convert_and_insert(const py::object & obj, UInt32 scale = 0)


template <typename T>
ColumnPtr convert_and_insert_array(const ColumnWrapper & col_wrap, size_t & cursor, const size_t count, UInt32 scale = 0)
ColumnPtr PythonSource::convert_and_insert_array(const ColumnWrapper & col_wrap, size_t & cursor, const size_t count, UInt32 scale)
{
MutableColumnPtr column;
if constexpr (std::is_same_v<T, DateTime64> || std::is_same_v<T, Decimal128> || std::is_same_v<T, Decimal256>)
Expand All @@ -152,7 +182,7 @@ ColumnPtr convert_and_insert_array(const ColumnWrapper & col_wrap, size_t & curs
throw Exception(ErrorCodes::PY_EXCEPTION_OCCURED, "Column data is None");

if constexpr (std::is_same_v<T, String>)
insert_string_from_array_raw(static_cast<PyObject **>(col_wrap.buf), column, cursor, count);
convert_string_array_to_block(static_cast<PyObject **>(col_wrap.buf), column, cursor, count);
else
insert_from_ptr<T>(col_wrap.buf, column, cursor, count);

Expand Down Expand Up @@ -354,13 +384,15 @@ Chunk PythonSource::scanDataToChunk()
if (logger->debug())
{
// log first 10 rows of the column
std::stringstream ss;
LOG_DEBUG(logger, "Column {} structure: {}", col.name, columns[i]->dumpStructure());
for (size_t j = 0; j < std::min(count, static_cast<size_t>(10)); ++j)
{
std::stringstream ss;

LOG_DEBUG(logger, "Column {} row {}: {}", col.name, j, columns[i]->getDataAt(j).toString());
Field value;
columns[i]->get(j, value);
ss << toString(value) << ", ";
}
LOG_DEBUG(logger, "Column {} data: {}", col.name, ss.str());
}
}
catch (const Exception & e)
Expand Down
16 changes: 16 additions & 0 deletions src/Processors/Sources/PythonSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,22 @@ class PythonSource : public ISource
ExternalResultDescription description;

PyObjectVecPtr scanData(const py::object & data, const std::vector<std::string> & col_names, size_t & cursor, size_t count);
template <typename T>
ColumnPtr convert_and_insert_array(const ColumnWrapper & col_wrap, size_t & cursor, size_t count, UInt32 scale = 0);
template <typename T>
ColumnPtr convert_and_insert(const py::object & obj, UInt32 scale = 0);
template <typename T>
void insert_from_ptr(const void * ptr, const MutableColumnPtr & column, size_t offset, size_t row_count);

void convert_string_array_to_block(PyObject ** buf, const MutableColumnPtr & column, size_t offset, size_t row_count);


template <typename T>
void insert_from_list(const py::list & obj, const MutableColumnPtr & column);

void insert_string_from_array(py::handle obj, const MutableColumnPtr & column);

void insert_string_from_array_raw(PyObject ** buf, const MutableColumnPtr & column, size_t offset, size_t row_count);
void prepareColumnCache(Names & names, Columns & columns);
Chunk scanDataToChunk();
void destory(PyObjectVecPtr & data);
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/StoragePython.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ Pipe StoragePython::read(

Block sample_block = prepareSampleBlock(column_names, storage_snapshot);

// num_streams = 3; // for testing
// num_streams = 3; // for chdb testing

prepareColumnCache(column_names, sample_block.getColumns(), sample_block);

Expand Down

0 comments on commit eeb6b68

Please sign in to comment.