diff --git a/CMakeLists.txt b/CMakeLists.txt index 81aad27e..bf6029b5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -111,6 +111,8 @@ set(CLP_FFI_PY_LIB_IR_SOURCES ${CLP_FFI_PY_LIB_SRC_DIR}/ir/native/PyMetadata.hpp ${CLP_FFI_PY_LIB_SRC_DIR}/ir/native/PyQuery.cpp ${CLP_FFI_PY_LIB_SRC_DIR}/ir/native/PyQuery.hpp + ${CLP_FFI_PY_LIB_SRC_DIR}/ir/native/PySerializer.cpp + ${CLP_FFI_PY_LIB_SRC_DIR}/ir/native/PySerializer.hpp ${CLP_FFI_PY_LIB_SRC_DIR}/ir/native/Query.cpp ${CLP_FFI_PY_LIB_SRC_DIR}/ir/native/Query.hpp ${CLP_FFI_PY_LIB_SRC_DIR}/ir/native/serialization_methods.cpp diff --git a/clp_ffi_py/ir/__init__.py b/clp_ffi_py/ir/__init__.py index defc702e..b89ffda4 100644 --- a/clp_ffi_py/ir/__init__.py +++ b/clp_ffi_py/ir/__init__.py @@ -18,6 +18,7 @@ "Metadata", # native "Query", # native "QueryBuilder", # query_builder + "Serializer", # native "ClpIrFileReader", # readers "ClpIrStreamReader", # readers ] diff --git a/clp_ffi_py/ir/native.pyi b/clp_ffi_py/ir/native.pyi index 16e2e9c2..7ba19f14 100644 --- a/clp_ffi_py/ir/native.pyi +++ b/clp_ffi_py/ir/native.pyi @@ -1,5 +1,8 @@ +from __future__ import annotations + from datetime import tzinfo -from typing import Any, Dict, IO, List, Optional +from types import TracebackType +from typing import Any, Dict, IO, List, Optional, Type from clp_ffi_py.wildcard_query import WildcardQuery @@ -86,4 +89,18 @@ class KeyValuePairLogEvent: def __init__(self, dictionary: Dict[Any, Any]): ... def to_dict(self) -> Dict[Any, Any]: ... +class Serializer: + def __init__(self, output_stream: IO[bytes], buffer_size_limit: int = 65536): ... + def __enter__(self) -> Serializer: ... + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc_value: Optional[BaseException], + traceback: Optional[TracebackType], + ) -> None: ... + def serialize_log_event_from_msgpack_map(self, msgpack_map: bytes) -> int: ... + def get_num_bytes_serialized(self) -> int: ... + def flush(self) -> None: ... + def close(self) -> None: ... + class IncompleteStreamError(Exception): ... diff --git a/src/clp_ffi_py/PyObjectCast.hpp b/src/clp_ffi_py/PyObjectCast.hpp index 909793cf..196c47f8 100644 --- a/src/clp_ffi_py/PyObjectCast.hpp +++ b/src/clp_ffi_py/PyObjectCast.hpp @@ -118,6 +118,7 @@ class PyKeyValuePairLogEvent; class PyLogEvent; class PyMetadata; class PyQuery; +class PySerializer; } // namespace ir::native CLP_FFI_PY_MARK_AS_PYOBJECT(ir::native::PyDeserializerBuffer); @@ -126,6 +127,7 @@ CLP_FFI_PY_MARK_AS_PYOBJECT(ir::native::PyKeyValuePairLogEvent); CLP_FFI_PY_MARK_AS_PYOBJECT(ir::native::PyLogEvent); CLP_FFI_PY_MARK_AS_PYOBJECT(ir::native::PyMetadata); CLP_FFI_PY_MARK_AS_PYOBJECT(ir::native::PyQuery); +CLP_FFI_PY_MARK_AS_PYOBJECT(ir::native::PySerializer); CLP_FFI_PY_MARK_AS_PYOBJECT(PyBytesObject); CLP_FFI_PY_MARK_AS_PYOBJECT(PyDictObject); CLP_FFI_PY_MARK_AS_PYOBJECT(PyTypeObject); diff --git a/src/clp_ffi_py/PyObjectUtils.hpp b/src/clp_ffi_py/PyObjectUtils.hpp index 914e24ad..c8168792 100644 --- a/src/clp_ffi_py/PyObjectUtils.hpp +++ b/src/clp_ffi_py/PyObjectUtils.hpp @@ -5,6 +5,8 @@ #include +#include + namespace clp_ffi_py { /** * A specialized deleter for PyObjectPtr which decrements the pointed PyObject reference count when @@ -47,6 +49,32 @@ using PyObjectPtr = std::unique_ptr> */ template using PyObjectStaticPtr = std::unique_ptr>; + +/** + * A guard class for Python exceptions. In certain CPython methods, such as `tp_finalize`, + * the exception state must remain unchanged throughout execution. This class saves the current + * exception state upon initialization and restores it upon destruction, ensuring the exception + * status is preserved. + * Docs: https://docs.python.org/3/c-api/typeobj.html#c.PyTypeObject.tp_finalize + */ +class PyErrGuard { +public: + // Constructor + PyErrGuard() = default; + + // Destructor + ~PyErrGuard() { m_exception_context.restore(); } + + // Delete copy/move constructor and assignment + PyErrGuard(PyErrGuard const&) = delete; + PyErrGuard(PyErrGuard&&) = delete; + auto operator=(PyErrGuard const&) -> PyErrGuard& = delete; + auto operator=(PyErrGuard&&) -> PyErrGuard& = delete; + +private: + // Variables + PyExceptionContext m_exception_context; +}; } // namespace clp_ffi_py #endif // CLP_FFI_PY_PY_OBJECT_UTILS_HPP diff --git a/src/clp_ffi_py/Python.hpp b/src/clp_ffi_py/Python.hpp index a31eed46..8d6451bd 100644 --- a/src/clp_ffi_py/Python.hpp +++ b/src/clp_ffi_py/Python.hpp @@ -13,14 +13,18 @@ #ifdef CLP_FFI_PY_ENABLE_LINTING // Inform IWYU of the headers that we use that are exported by Python.h // IWYU pragma: begin_exports +#include #include #include +#include +#include #include #include #include #include #include #include +#include #include // IWYU pragma: end_exports #endif diff --git a/src/clp_ffi_py/ir/native/PySerializer.cpp b/src/clp_ffi_py/ir/native/PySerializer.cpp new file mode 100644 index 00000000..7b473451 --- /dev/null +++ b/src/clp_ffi_py/ir/native/PySerializer.cpp @@ -0,0 +1,575 @@ +#include // Must always be included before any other header files + +#include "PySerializer.hpp" + +#include +#include +#include +#include +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include + +namespace clp_ffi_py::ir::native { +namespace { +/** + * Callback of `PySerializer`'s `__init__` method: + */ +// NOLINTNEXTLINE(cppcoreguidelines-avoid-c-arrays) +PyDoc_STRVAR( + cPySerializerDoc, + "Serializer for serializing CLP key-value pair IR streams.\n" + "This class serializes log events into the CLP key-value pair IR format and writes the" + " serialized data to a specified byte stream object.\n\n" + "__init__(self, output_stream, buffer_size_limit=65536)\n\n" + "Initializes a :class:`Serializer` instance with the given output stream. Note that each" + " object should only be initialized once. Double initialization will result in a memory" + " leak.\n\n" + ":param output_stream: A writable byte output stream to which the serializer will write the" + " serialized IR byte sequences.\n" + ":type output_stream: IO[bytes]\n" + ":param buffer_size_limit: The maximum amount of serialized data to buffer before flushing" + " it to `output_stream`. Defaults to 64 KiB.\n" + ":type buffer_size_limit: int\n" +); +CLP_FFI_PY_METHOD auto +PySerializer_init(PySerializer* self, PyObject* args, PyObject* keywords) -> int; + +/** + * Callback of `PySerializer`'s `serialize_msgpack` method. + */ +// NOLINTNEXTLINE(cppcoreguidelines-avoid-c-arrays) +PyDoc_STRVAR( + cPySerializerSerializeLogEventFromMsgpackMapDoc, + "serialize_log_event_from_msgpack_map(self, msgpack_map)\n" + "--\n\n" + "Serializes the given log event.\n\n" + ":param msgpack_map: The log event as a packed msgpack map where all keys are" + " strings.\n" + ":type msgpack_map: bytes\n" + ":return: The number of bytes serialized.\n" + ":rtype: int\n" + ":raise IOError: If the serializer has already been closed.\n" + ":raise TypeError: If `msgpack_map` is not a packed msgpack map.\n" + ":raise RuntimeError: If `msgpack_map` couldn't be unpacked or serialization into the IR" + " stream failed.\n" +); +CLP_FFI_PY_METHOD auto PySerializer_serialize_log_event_from_msgpack_map( + PySerializer* self, + PyObject* msgpack_map +) -> PyObject*; + +/** + * Callback of `PySerializer`'s `get_num_bytes_serialized` method. + */ +// NOLINTNEXTLINE(cppcoreguidelines-avoid-c-arrays) +PyDoc_STRVAR( + cPySerializerGetNumBytesSerializedDoc, + "get_num_bytes_serialized(self)\n" + "--\n\n" + ":return: The total number of bytes serialized.\n" + ":rtype: int\n" + ":raise IOError: If the serializer has already been closed.\n" +); +CLP_FFI_PY_METHOD auto PySerializer_get_num_bytes_serialized(PySerializer* self) -> PyObject*; + +/** + * Callback of `PySerializer`'s `flush` method. + */ +// NOLINTNEXTLINE(cppcoreguidelines-avoid-c-arrays) +PyDoc_STRVAR( + cPySerializerFlushDoc, + "flush(self)\n" + "--\n\n" + "Flushes any buffered data and the output stream.\n\n" + ":raise IOError: If the serializer has already been closed.\n" +); +CLP_FFI_PY_METHOD auto PySerializer_flush(PySerializer* self) -> PyObject*; + +/** + * Callback of `PySerializer`'s `close` method. + */ +// NOLINTNEXTLINE(cppcoreguidelines-avoid-c-arrays) +PyDoc_STRVAR( + cPySerializerCloseDoc, + "close(self)\n" + "--\n\n" + "Closes the serializer, writing any buffered data to the output stream and appending a byte" + " sequence to mark the end of the CLP IR stream. The output stream is then flushed and" + " closed.\n" + "NOTE: This method must be called to properly terminate an IR stream. If it isn't called," + " the stream will be incomplete, and any buffered data may be lost.\n\n" + ":raise IOError: If the serializer has already been closed.\n" +); +CLP_FFI_PY_METHOD auto PySerializer_close(PySerializer* self) -> PyObject*; + +/** + * Callback of `PySerializer`'s `__enter__` method. + */ +// NOLINTNEXTLINE(cppcoreguidelines-avoid-c-arrays) +PyDoc_STRVAR( + cPySerializerEnterDoc, + "__enter__(self)\n" + "--\n\n" + "Enters the runtime context.\n\n" + ":return: self.\n" + ":rtype: :class:`Serializer`\n" +); +CLP_FFI_PY_METHOD auto PySerializer_enter(PySerializer* self) -> PyObject*; + +/** + * Callback of `PySerializer`'s `__exit__` method. + */ +PyDoc_STRVAR( + cPySerializerExitDoc, + "__exit__(self, exc_type, exc_value, traceback)\n" + "--\n\n" + "Exits the runtime context, automatically calling :meth:`close` to flush all buffered data" + " into the output stream." + ":param exc_type: The type of the exception that caused the exit. Unused.\n" + ":param exc_value: The value of the exception that caused the exit. Unused.\n" + ":param exc_traceable: The traceback. Unused.\n" +); +CLP_FFI_PY_METHOD auto +PySerializer_exit(PySerializer* self, PyObject* args, PyObject* keywords) -> PyObject*; + +/** + * Callback of `PySerializer`'s deallocator. + */ +CLP_FFI_PY_METHOD auto PySerializer_dealloc(PySerializer* self) -> void; + +// NOLINTNEXTLINE(cppcoreguidelines-avoid-c-arrays) +PyMethodDef PySerializer_method_table[]{ + {"serialize_log_event_from_msgpack_map", + py_c_function_cast(PySerializer_serialize_log_event_from_msgpack_map), + METH_O, + static_cast(cPySerializerSerializeLogEventFromMsgpackMapDoc)}, + + {"get_num_bytes_serialized", + py_c_function_cast(PySerializer_get_num_bytes_serialized), + METH_NOARGS, + static_cast(cPySerializerGetNumBytesSerializedDoc)}, + + {"flush", + py_c_function_cast(PySerializer_flush), + METH_NOARGS, + static_cast(cPySerializerFlushDoc)}, + + {"close", + py_c_function_cast(PySerializer_close), + METH_NOARGS, + static_cast(cPySerializerCloseDoc)}, + + {"__enter__", + py_c_function_cast(PySerializer_enter), + METH_NOARGS, + static_cast(cPySerializerEnterDoc)}, + + {"__exit__", + py_c_function_cast(PySerializer_exit), + METH_VARARGS | METH_KEYWORDS, + static_cast(cPySerializerExitDoc)}, + + {nullptr} +}; + +// NOLINTBEGIN(cppcoreguidelines-avoid-c-arrays, cppcoreguidelines-pro-type-*-cast) +PyType_Slot PySerializer_slots[]{ + {Py_tp_alloc, reinterpret_cast(PyType_GenericAlloc)}, + {Py_tp_dealloc, reinterpret_cast(PySerializer_dealloc)}, + {Py_tp_new, reinterpret_cast(PyType_GenericNew)}, + {Py_tp_init, reinterpret_cast(PySerializer_init)}, + {Py_tp_methods, static_cast(PySerializer_method_table)}, + {Py_tp_doc, const_cast(static_cast(cPySerializerDoc))}, + {0, nullptr} +}; +// NOLINTEND(cppcoreguidelines-avoid-c-arrays, cppcoreguidelines-pro-type-*-cast) + +/** + * `PySerializer`'s Python type specifications. + */ +PyType_Spec PySerializer_type_spec{ + "clp_ffi_py.ir.native.Serializer", + sizeof(PySerializer), + 0, + Py_TPFLAGS_DEFAULT, + static_cast(PySerializer_slots) +}; + +CLP_FFI_PY_METHOD auto +PySerializer_init(PySerializer* self, PyObject* args, PyObject* keywords) -> int { + static char keyword_output_stream[]{"output_stream"}; + static char keyword_buffer_size_limit[]{"buffer_size_limit"}; + static char* keyword_table[]{ + static_cast(keyword_output_stream), + static_cast(keyword_buffer_size_limit), + nullptr + }; + + // If the argument parsing fails, `self` will be deallocated. We must reset all pointers to + // nullptr in advance, otherwise the deallocator might trigger segmentation fault. + self->default_init(); + + PyObject* output_stream{Py_None}; + Py_ssize_t buffer_size_limit{PySerializer::cDefaultBufferSizeLimit}; + if (false + == static_cast(PyArg_ParseTupleAndKeywords( + args, + keywords, + "O|n", + static_cast(keyword_table), + &output_stream, + &buffer_size_limit + ))) + { + return -1; + } + + // Ensure the `output_stream` has `write`, `flush`, and `close` methods + auto output_stream_has_method = [&](char const* method_name) -> bool { + PyObjectPtr const method{PyObject_GetAttrString(output_stream, method_name)}; + if (nullptr == method) { + return false; + } + if (false == static_cast(PyCallable_Check(method.get()))) { + PyErr_SetString(PyExc_TypeError, ""); + PyErr_Format( + PyExc_TypeError, + "The attribute `%s` of the given output stream object is not callable.", + method_name + ); + return false; + } + return true; + }; + + if (false == output_stream_has_method("write")) { + return -1; + } + if (false == output_stream_has_method("flush")) { + return -1; + } + if (false == output_stream_has_method("close")) { + return -1; + } + + if (0 > buffer_size_limit) { + PyErr_SetString(PyExc_ValueError, "The buffer size limit cannot be negative"); + return -1; + } + + auto serializer_result{PySerializer::ClpIrSerializer::create()}; + if (serializer_result.has_error()) { + PyErr_Format( + PyExc_RuntimeError, + cSerializerCreateErrorFormatStr.data(), + serializer_result.error().message().c_str() + ); + return -1; + } + + if (false == self->init(output_stream, std::move(serializer_result.value()), buffer_size_limit)) + { + return -1; + } + + return 0; +} + +CLP_FFI_PY_METHOD auto PySerializer_serialize_log_event_from_msgpack_map( + PySerializer* self, + PyObject* msgpack_map +) -> PyObject* { + if (false == static_cast(PyBytes_Check(msgpack_map))) { + PyErr_SetString( + PyExc_TypeError, + "`msgpack_byte_sequence` is supposed to return a `bytes` object" + ); + return nullptr; + } + + auto* py_bytes_msgpack_map{py_reinterpret_cast(msgpack_map)}; + // Since the type is already checked, we can use the macro to avoid duplicated type checking. + auto const num_byte_serialized{self->serialize_log_event_from_msgpack_map( + {PyBytes_AS_STRING(py_bytes_msgpack_map), + static_cast(PyBytes_GET_SIZE(py_bytes_msgpack_map))} + )}; + if (false == num_byte_serialized.has_value()) { + return nullptr; + } + + return PyLong_FromSsize_t(num_byte_serialized.value()); +} + +CLP_FFI_PY_METHOD auto PySerializer_get_num_bytes_serialized(PySerializer* self) -> PyObject* { + return PyLong_FromSsize_t(self->get_num_bytes_serialized()); +} + +CLP_FFI_PY_METHOD auto PySerializer_flush(PySerializer* self) -> PyObject* { + if (false == self->flush()) { + return nullptr; + } + Py_RETURN_NONE; +} + +CLP_FFI_PY_METHOD auto PySerializer_close(PySerializer* self) -> PyObject* { + if (false == self->close()) { + return nullptr; + } + Py_RETURN_NONE; +} + +CLP_FFI_PY_METHOD auto PySerializer_enter(PySerializer* self) -> PyObject* { + Py_INCREF(self); + return py_reinterpret_cast(self); +} + +CLP_FFI_PY_METHOD auto +PySerializer_exit(PySerializer* self, PyObject* args, PyObject* keywords) -> PyObject* { + static char keyword_exc_type[]{"exc_type"}; + static char keyword_exc_value[]{"exc_value"}; + static char keyword_traceback[]{"traceback"}; + static char* keyword_table[]{ + static_cast(keyword_exc_type), + static_cast(keyword_exc_value), + static_cast(keyword_traceback), + nullptr + }; + + PyObject* py_exc_type{}; + PyObject* py_exc_value{}; + PyObject* py_traceback{}; + if (false + == static_cast(PyArg_ParseTupleAndKeywords( + args, + keywords, + "|OOO", + static_cast(keyword_table), + &py_exc_type, + &py_exc_value, + &py_traceback + ))) + { + return nullptr; + } + + // We don't do anything with the given exception. It is the caller's responsibility to raise + // the exceptions: https://docs.python.org/3/reference/datamodel.html#object.__exit__ + if (false == self->close()) { + return nullptr; + } + + Py_RETURN_NONE; +} + +CLP_FFI_PY_METHOD auto PySerializer_dealloc(PySerializer* self) -> void { + PyErrGuard const err_guard; + + if (false == self->is_closed()) { + if (0 + != PyErr_WarnEx( + PyExc_ResourceWarning, + "`Serializer.close()` is not called before object destruction, which will leave" + " the stream incomplete, and potentially resulting in data" + " loss due to data buffering", + 1 + )) + { + PyErr_Clear(); + } + } + + self->clean(); + Py_TYPE(self)->tp_free(py_reinterpret_cast(self)); +} +} // namespace + +auto PySerializer::init( + PyObject* output_stream, + PySerializer::ClpIrSerializer serializer, + Py_ssize_t buffer_size_limit +) -> bool { + m_output_stream = output_stream; + Py_INCREF(output_stream); + m_serializer = new PySerializer::ClpIrSerializer{std::move(serializer)}; + m_buffer_size_limit = buffer_size_limit; + if (nullptr == m_serializer) { + PyErr_SetString(PyExc_RuntimeError, clp_ffi_py::cOutofMemoryError); + return false; + } + auto const preamble_size{get_ir_buf_size()}; + if (preamble_size > m_buffer_size_limit && false == write_ir_buf_to_output_stream()) { + return false; + } + m_num_total_bytes_serialized += preamble_size; + return true; +} + +auto PySerializer::assert_is_not_closed() const -> bool { + if (is_closed()) { + PyErr_SetString(PyExc_IOError, "Serializer has already been closed."); + return false; + } + return true; +} + +auto PySerializer::serialize_log_event_from_msgpack_map(std::span msgpack_byte_sequence +) -> std::optional { + if (false == assert_is_not_closed()) { + return std::nullopt; + } + + auto const unpack_result{unpack_msgpack(msgpack_byte_sequence)}; + if (unpack_result.has_error()) { + PyErr_SetString(PyExc_RuntimeError, unpack_result.error().c_str()); + return std::nullopt; + } + + auto const& msgpack_obj{unpack_result.value().get()}; + if (msgpack::type::MAP != msgpack_obj.type) { + PyErr_SetString(PyExc_TypeError, "Unpacked msgpack is not a map"); + return std::nullopt; + } + + auto const buffer_size_before_serialization{get_ir_buf_size()}; + if (false == m_serializer->serialize_msgpack_map(msgpack_obj.via.map)) { + PyErr_SetString(PyExc_RuntimeError, cSerializerSerializeMsgpackMapError.data()); + return std::nullopt; + } + auto const buffer_size_after_serialization{get_ir_buf_size()}; + auto const num_bytes_serialized{ + buffer_size_after_serialization - buffer_size_before_serialization + }; + m_num_total_bytes_serialized += num_bytes_serialized; + + if (buffer_size_after_serialization > m_buffer_size_limit + && false == write_ir_buf_to_output_stream()) + { + return std::nullopt; + } + return num_bytes_serialized; +} + +auto PySerializer::flush() -> bool { + if (false == assert_is_not_closed()) { + return false; + } + if (false == write_ir_buf_to_output_stream()) { + return false; + } + return flush_output_stream(); +} + +auto PySerializer::close() -> bool { + if (false == assert_is_not_closed()) { + return false; + } + + if (false == write_ir_buf_to_output_stream()) { + return false; + } + + // Write end-of-stream + constexpr std::array cEndOfStreamBuf{clp::ffi::ir_stream::cProtocol::Eof}; + if (false == write_to_output_stream({cEndOfStreamBuf.cbegin(), cEndOfStreamBuf.cend()})) { + return false; + } + m_num_total_bytes_serialized += cEndOfStreamBuf.size(); + + if (false == (flush_output_stream() && close_output_stream())) { + return false; + } + + close_serializer(); + return true; +} + +auto PySerializer::module_level_init(PyObject* py_module) -> bool { + static_assert(std::is_trivially_destructible()); + auto* type{py_reinterpret_cast(PyType_FromSpec(&PySerializer_type_spec))}; + m_py_type.reset(type); + if (nullptr == type) { + return false; + } + return add_python_type(get_py_type(), "Serializer", py_module); +} + +auto PySerializer::write_ir_buf_to_output_stream() -> bool { + if (false == assert_is_not_closed()) { + return false; + } + + auto const optional_num_bytes_written{write_to_output_stream(m_serializer->get_ir_buf_view())}; + if (false == optional_num_bytes_written.has_value()) { + return false; + } + if (optional_num_bytes_written.value() != get_ir_buf_size()) { + PyErr_SetString( + PyExc_RuntimeError, + "The number of bytes written to the output stream doesn't match the size of the " + "internal buffer" + ); + return false; + } + + m_serializer->clear_ir_buf(); + return true; +} + +auto PySerializer::write_to_output_stream(PySerializer::BufferView buf +) -> std::optional { + if (buf.empty()) { + return 0; + } + + // `PyBUF_READ` ensures the buffer is read-only, so it should be safe to cast `char const*` to + // `char*` + PyObjectPtr const ir_buf_mem_view{PyMemoryView_FromMemory( + // NOLINTNEXTLINE(bugprone-casting-through-void, cppcoreguidelines-pro-type-*-cast) + static_cast(const_cast(static_cast(buf.data()))), + static_cast(buf.size()), + PyBUF_READ + )}; + if (nullptr == ir_buf_mem_view) { + return std::nullopt; + } + + PyObjectPtr const py_num_bytes_written{ + PyObject_CallMethod(m_output_stream, "write", "O", ir_buf_mem_view.get()) + }; + if (nullptr == py_num_bytes_written) { + return std::nullopt; + } + + Py_ssize_t num_bytes_written{}; + if (false == parse_py_int(py_num_bytes_written.get(), num_bytes_written)) { + return std::nullopt; + } + return num_bytes_written; +} + +auto PySerializer::flush_output_stream() -> bool { + PyObjectPtr const ret_val{PyObject_CallMethod(m_output_stream, "flush", "")}; + if (nullptr == ret_val) { + return false; + } + return true; +} + +auto PySerializer::close_output_stream() -> bool { + PyObjectPtr const ret_val{PyObject_CallMethod(m_output_stream, "close", "")}; + if (nullptr == ret_val) { + return false; + } + return true; +} +} // namespace clp_ffi_py::ir::native diff --git a/src/clp_ffi_py/ir/native/PySerializer.hpp b/src/clp_ffi_py/ir/native/PySerializer.hpp new file mode 100644 index 00000000..c5cecb40 --- /dev/null +++ b/src/clp_ffi_py/ir/native/PySerializer.hpp @@ -0,0 +1,187 @@ +#ifndef CLP_FFI_PY_IR_NATIVE_PYSERIALIZER_HPP +#define CLP_FFI_PY_IR_NATIVE_PYSERIALIZER_HPP + +#include // Must always be included before any other header files + +#include +#include +#include + +#include +#include +#include + +#include + +namespace clp_ffi_py::ir::native { +/** + * A PyObject structure for CLP key-value pair IR format serialization (using four-byte encoding). + * The underlying serializer is pointed by `m_serializer`, and the serialized IR stream is written + * into an `IO[byte]` stream pointed by `m_output_stream`. + */ +class PySerializer { +public: + using ClpIrSerializer = clp::ffi::ir_stream::Serializer; + using BufferView = ClpIrSerializer::BufferView; + + // Delete default constructor to disable direct instantiation. + PySerializer() = delete; + + // Delete copy & move constructors and assignment operators + PySerializer(PySerializer const&) = delete; + PySerializer(PySerializer&&) = delete; + auto operator=(PySerializer const&) -> PySerializer& = delete; + auto operator=(PySerializer&&) -> PySerializer& = delete; + + // Destructor + ~PySerializer() = default; + + /** + * The default buffer size limit. Any change to the value should also be applied to `__init__`'s + * doc string and Python stub file. + */ + static constexpr size_t cDefaultBufferSizeLimit{65'536}; + + /** + * Initializes the underlying data with the given inputs. Since the memory allocation of + * `PySerializer` is handled by CPython's allocator, cpp constructors will not be explicitly + * called. This function serves as the default constructor initialize the underlying serializer. + * It has to be called manually to create a `PySerializer` object through CPython APIs. + * @param output_stream + * @param serializer + * @param buffer_size_limit + * @return true on success. + * @return false on failure with the relevant Python exception and error set. + */ + [[nodiscard]] auto + init(PyObject* output_stream, ClpIrSerializer serializer, Py_ssize_t buffer_size_limit) -> bool; + + /** + * Initializes the pointers to nullptr by default. Should be called once the object is + * allocated. + */ + auto default_init() -> void { + m_output_stream = nullptr; + m_serializer = nullptr; + m_num_total_bytes_serialized = 0; + m_buffer_size_limit = 0; + } + + /** + * Releases the memory allocated for underlying data fields. + */ + auto clean() -> void { + close_serializer(); + Py_XDECREF(m_output_stream); + } + + [[nodiscard]] auto is_closed() const -> bool { return nullptr == m_serializer; } + + /** + * Serializes the log event from the given msgpack map into IR format. + * @param msgpack_byte_sequence + * @return the number of bytes serialized on success. + * @return std::nullptr on failure with the relevant Python exception and error set. + */ + [[nodiscard]] auto serialize_log_event_from_msgpack_map( + std::span msgpack_byte_sequence + ) -> std::optional; + + [[nodiscard]] auto get_num_bytes_serialized() const -> Py_ssize_t { + return m_num_total_bytes_serialized; + } + + /** + * Flushes the underlying IR buffer and `m_output_stream`. + * @return true on success. + * @return false on failure with the relevant Python exception and error set. + */ + [[nodiscard]] auto flush() -> bool; + + /** + * Closes the serializer by writing the buffered results into the output stream with + * end-of-stream IR Unit appended in the end. + * @return true on success. + * @return false on failure with the relevant Python exception and error set. + */ + [[nodiscard]] auto close() -> bool; + + /** + * Gets the `PyTypeObject` that represents `PySerializer`'s Python type. This type is + * dynamically created and initialized during the execution of `module_level_init`. + * @return Python type object associated with `PySerializer`. + */ + [[nodiscard]] static auto get_py_type() -> PyTypeObject* { return m_py_type.get(); } + + /** + * Creates and initializes `PySerializer` as a Python type, and then incorporates this + * type as a Python object into the py_module module. + * @param py_module The Python module where the initialized `PySerializer` will be incorporated. + * @return true on success. + * @return false on failure with the relevant Python exception and error set. + */ + [[nodiscard]] static auto module_level_init(PyObject* py_module) -> bool; + +private: + /** + * Asserts the serializer has not been closed. + * @return true on success, false if it's already been closed with `IOError` set. + */ + [[nodiscard]] auto assert_is_not_closed() const -> bool; + + [[nodiscard]] auto get_ir_buf_size() const -> Py_ssize_t { + return static_cast(m_serializer->get_ir_buf_view().size()); + } + + /** + * Writes the underlying IR buffer into `m_output_stream`. + * NOTE: the serializer must not be closed to call this method. + * @return true on success. + * @return false on failure with the relevant Python exception and error set. + */ + [[nodiscard]] auto write_ir_buf_to_output_stream() -> bool; + + /** + * Closes `m_serializer` by releasing the allocated memory. + * NOTE: it is safe to call this method more than once as it resets `m_serializer` to nullptr. + */ + auto close_serializer() -> void { + delete m_serializer; + m_serializer = nullptr; + } + + /** + * Wrapper of `output_stream`'s `write` method. + * @param buf + * @return The number of bytes written on success. + * @return std::nullopt on failure with the relevant Python exception and error set. + */ + [[nodiscard]] auto write_to_output_stream(BufferView buf) -> std::optional; + + /** + * Wrapper of `output_stream`'s `flush` method. + * @return true on success. + * @return false on failure with the relevant Python exception and error set. + */ + [[nodiscard]] auto flush_output_stream() -> bool; + + /** + * Wrapper of `output_stream`'s `close` method. + * @return true on success. + * @return false on failure with the relevant Python exception and error set. + */ + [[nodiscard]] auto close_output_stream() -> bool; + + // Variables + PyObject_HEAD; + PyObject* m_output_stream; + // NOLINTNEXTLINE(cppcoreguidelines-owning-memory) + gsl::owner m_serializer; + Py_ssize_t m_num_total_bytes_serialized; + Py_ssize_t m_buffer_size_limit; + + static inline PyObjectStaticPtr m_py_type{nullptr}; +}; +} // namespace clp_ffi_py::ir::native + +#endif // CLP_FFI_PY_IR_NATIVE_PYSERIALIZER_HPP diff --git a/src/clp_ffi_py/modules/ir_native.cpp b/src/clp_ffi_py/modules/ir_native.cpp index c362d7be..c2b802b4 100644 --- a/src/clp_ffi_py/modules/ir_native.cpp +++ b/src/clp_ffi_py/modules/ir_native.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include namespace { @@ -75,5 +76,10 @@ PyMODINIT_FUNC PyInit_native() { return nullptr; } + if (false == clp_ffi_py::ir::native::PySerializer::module_level_init(new_module)) { + Py_DECREF(new_module); + return nullptr; + } + return new_module; } diff --git a/tests/test_ir/test_serializer.py b/tests/test_ir/test_serializer.py index beb4b92f..9a2fb3f3 100644 --- a/tests/test_ir/test_serializer.py +++ b/tests/test_ir/test_serializer.py @@ -1,6 +1,11 @@ -from test_ir.test_utils import TestCLPBase +from io import BytesIO +from pathlib import Path +from typing import List, Optional -from clp_ffi_py.ir import FourByteSerializer +from test_ir.test_utils import JsonLinesFileReader, TestCLPBase + +from clp_ffi_py.ir import FourByteSerializer, Serializer +from clp_ffi_py.utils import serialize_dict_to_msgpack class TestCaseFourByteSerializer(TestCLPBase): @@ -38,3 +43,127 @@ def test_serialization_methods_consistency(self) -> None: timestamp_delta ) self.assertEqual(serialized_message_and_ts_delta, serialized_message + serialized_ts_delta) + + +class TestCaseSerializer(TestCLPBase): + """ + Class for testing `clp_ffi_py.ir.Serializer`. + """ + + jsonl_test_data_dir: Path = Path("test_data") / "jsonl" + current_dir: Path = Path(__file__).resolve().parent + test_data_dir: Path = current_dir / jsonl_test_data_dir + + def test_serialize_json(self) -> None: + """ + Tests serializing JSON files. + + The JSON parser will parse the file into Python dictionaries, + and then convert them into msgpack and feed into `clp_ffi_py.ir.Serializer`. + """ + + test_files: List[Path] = self.__get_test_files() + byte_buffer: BytesIO + num_bytes_serialized: int + serializer: Serializer + + # Test with context manager + for file_path in test_files: + byte_buffer = BytesIO() + with Serializer(byte_buffer) as serializer: + num_bytes_serialized = serializer.get_num_bytes_serialized() + for json_obj in JsonLinesFileReader(file_path).read_lines(): + num_bytes_serialized += serializer.serialize_log_event_from_msgpack_map( + serialize_dict_to_msgpack(json_obj) + ) + serializer.flush() + self.assertEqual( + len(byte_buffer.getvalue()), serializer.get_num_bytes_serialized() + ) + self.assertEqual(num_bytes_serialized, serializer.get_num_bytes_serialized()) + + # Test without context manager + for file_path in test_files: + byte_buffer = BytesIO() + serializer = Serializer(byte_buffer) + num_bytes_serialized = serializer.get_num_bytes_serialized() + for json_obj in JsonLinesFileReader(file_path).read_lines(): + num_bytes_serialized += serializer.serialize_log_event_from_msgpack_map( + serialize_dict_to_msgpack(json_obj) + ) + serializer.flush() + self.assertEqual(len(byte_buffer.getvalue()), serializer.get_num_bytes_serialized()) + self.assertEqual(num_bytes_serialized, serializer.get_num_bytes_serialized()) + serializer.close() + self.assertEqual( + num_bytes_serialized + 1, + serializer.get_num_bytes_serialized(), + "End-of-stream byte is missing", + ) + + def test_serialize_with_customized_buffer_size_limit(self) -> None: + """ + Tests serializing with customized buffer size limit. + """ + buffer_size_limit: int = 3000 + for file_path in self.__get_test_files(): + byte_buffer: BytesIO = BytesIO() + with Serializer( + buffer_size_limit=buffer_size_limit, output_stream=byte_buffer + ) as serializer: + serializer.flush() + num_bytes_in_ir_buffer: int = 0 + for json_obj in JsonLinesFileReader(file_path).read_lines(): + num_bytes_serialized: int = serializer.serialize_log_event_from_msgpack_map( + serialize_dict_to_msgpack(json_obj) + ) + self.assertNotEqual(0, num_bytes_serialized) + num_bytes_in_ir_buffer += num_bytes_serialized + if num_bytes_in_ir_buffer > buffer_size_limit: + # The IR buffer should already be written to the stream + self.assertEqual( + serializer.get_num_bytes_serialized(), len(byte_buffer.getvalue()) + ) + num_bytes_in_ir_buffer = 0 + else: + self.assertEqual( + serializer.get_num_bytes_serialized(), + num_bytes_in_ir_buffer + len(byte_buffer.getvalue()), + ) + + def test_closing_empty(self) -> None: + """ + Tests closing an empty serializer. + """ + byte_buffer: BytesIO + serializer: Serializer + + byte_buffer = BytesIO() + preamble_size: int + with Serializer(byte_buffer, 0) as serializer: + self.assertTrue(len(byte_buffer.getvalue()) > 0) + self.assertEqual(len(byte_buffer.getvalue()), serializer.get_num_bytes_serialized()) + preamble_size = serializer.get_num_bytes_serialized() + + byte_buffer = BytesIO() + serializer = Serializer(byte_buffer) + serializer.close() + self.assertEqual( + serializer.get_num_bytes_serialized(), + preamble_size + 1, + "End-of-stream byte is missing", + ) + + def test_not_closed(self) -> None: + serializer: Optional[Serializer] = Serializer(BytesIO()) + with self.assertWarns(ResourceWarning) as _: + serializer = None # noqa + + def __get_test_files(self) -> List[Path]: + test_files: List[Path] = [] + for file_path in TestCaseSerializer.test_data_dir.rglob("*"): + if not file_path.is_file(): + continue + test_files.append(file_path) + self.assertFalse(0 == len(test_files), "No test files found") + return test_files