Skip to content

Commit

Permalink
Rename "encoding/decoding" to "serialization/deserialization" to matc…
Browse files Browse the repository at this point in the history
…h latest CLP terminology. (#68)
  • Loading branch information
LinZhihao-723 authored Jul 15, 2024
1 parent 36e08a3 commit a504ce6
Show file tree
Hide file tree
Showing 33 changed files with 732 additions and 692 deletions.
14 changes: 7 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

This module provides Python packages to interface with [CLP Core Features][1]
through CLP's FFI (foreign function interface). At present, this library
supplies built-in functions for encoding/decoding log messages using [CLP][2].
supplies built-in functions for serializing/deserializing log messages using [CLP][2].

> [!IMPORTANT]
> This project is no longer built for Python3.6.
Expand Down Expand Up @@ -66,12 +66,12 @@ To manually build a package for distribution, follow the steps below.

## CLP IR Readers

CLP IR Readers provide a convenient interface for CLP IR decoding and search
CLP IR Readers provide a convenient interface for CLP IR deserialization and search
methods.

### ClpIrStreamReader

- Read/decode any arbitrary CLP IR stream (as an instance of `IO[bytes]`).
- Read+deserialize any arbitrary CLP IR stream (as an instance of `IO[bytes]`).
- Can be used as an iterator that returns each log event as a `LogEvent` object.
- Can search target log events by giving a search query:
- Searching log events within a certain time range.
Expand Down Expand Up @@ -159,7 +159,7 @@ wildcard_search_query: Query = query_builder.build()
matched_log_messages: List[Tuple[int, str]] = []

# A convenience file reader class is also available to interact with a file that
# represents an encoded CLP IR stream directly.
# represents a CLP IR stream directly.
with ClpIrFileReader(Path("example.clp.zst")) as clp_reader:
for log_event in clp_reader.search(wildcard_search_query):
matched_log_messages.append((log_event.get_timestamp(), log_event.get_log_message()))
Expand All @@ -180,7 +180,7 @@ help(FullStringWildcardQuery)
help(SubstringWildcardQuery)
```

### Streaming Decode/Search Directly from S3 Remote Storage
### Streaming Deserialize/Search Directly from S3 Remote Storage

When working with CLP IR files stored on S3-compatible storage systems,
[smart_open][17] can be used to open and read the IR stream for the following
Expand All @@ -207,7 +207,7 @@ session = boto3.Session(
)

url = 's3://clp-example-s3-bucket/example.clp.zst'
# Using `smart_open.open` to stream the encoded CLP IR:
# Using `smart_open.open` to stream the CLP IR byte sequence:
with smart_open.open(
url, mode="rb", compression="disable", transport_params={"client": session.client("s3")}
) as istream:
Expand All @@ -231,7 +231,7 @@ closed.
### Parallel Processing

The `Query` and `LogEvent` classes can be serialized by [pickle][15]. Therefore,
decoding and search can be parallelized across streams/files using libraries
deserializing and searching can be parallelized across streams/files using libraries
such as [multiprocessing][13] and [tqlm][14].

## Testing
Expand Down
6 changes: 3 additions & 3 deletions clp_ffi_py/ir/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
from typing import List

__all__: List[str] = [
"Decoder", # native
"DecoderBuffer", # native
"FourByteEncoder", # native
"Deserializer", # native
"DeserializerBuffer", # native
"FourByteSerializer", # native
"IncompleteStreamError", # native
"LogEvent", # native
"Metadata", # native
Expand Down
26 changes: 14 additions & 12 deletions clp_ffi_py/ir/native.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ from typing import Any, Dict, IO, List, Optional

from clp_ffi_py.wildcard_query import WildcardQuery

class DecoderBuffer:
class DeserializerBuffer:
def __init__(self, input_stream: IO[bytes], initial_buffer_capacity: int = 4096): ...
def get_num_decoded_log_messages(self) -> int: ...
def get_num_deserialized_log_messages(self) -> int: ...
def _test_streaming(self, seed: int) -> bytearray: ...

class Metadata:
Expand Down Expand Up @@ -58,24 +58,26 @@ class Query:
def get_wildcard_queries(self) -> Optional[List[WildcardQuery]]: ...
def match_log_event(self, log_event: LogEvent) -> bool: ...

class FourByteEncoder:
class FourByteSerializer:
@staticmethod
def encode_preamble(ref_timestamp: int, timestamp_format: str, timezone: str) -> bytearray: ...
def serialize_preamble(
ref_timestamp: int, timestamp_format: str, timezone: str
) -> bytearray: ...
@staticmethod
def encode_message_and_timestamp_delta(timestamp_delta: int, msg: bytes) -> bytearray: ...
def serialize_message_and_timestamp_delta(timestamp_delta: int, msg: bytes) -> bytearray: ...
@staticmethod
def encode_message(msg: bytes) -> bytearray: ...
def serialize_message(msg: bytes) -> bytearray: ...
@staticmethod
def encode_timestamp_delta(timestamp_delta: int) -> bytearray: ...
def serialize_timestamp_delta(timestamp_delta: int) -> bytearray: ...
@staticmethod
def encode_end_of_ir() -> bytearray: ...
def serialize_end_of_ir() -> bytearray: ...

class Decoder:
class Deserializer:
@staticmethod
def decode_preamble(decoder_buffer: DecoderBuffer) -> Metadata: ...
def deserialize_preamble(decoder_buffer: DeserializerBuffer) -> Metadata: ...
@staticmethod
def decode_next_log_event(
decoder_buffer: DecoderBuffer,
def deserialize_next_log_event(
decoder_buffer: DeserializerBuffer,
query: Optional[Query] = None,
allow_incomplete_stream: bool = False,
) -> Optional[LogEvent]: ...
Expand Down
50 changes: 25 additions & 25 deletions clp_ffi_py/ir/readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,28 @@

from zstandard import ZstdDecompressionReader, ZstdDecompressor

from clp_ffi_py.ir.native import Decoder, DecoderBuffer, LogEvent, Metadata, Query
from clp_ffi_py.ir.native import Deserializer, DeserializerBuffer, LogEvent, Metadata, Query


class ClpIrStreamReader(Iterator[LogEvent]):
"""
This class represents a stream reader used to read/decode encoded log events from a CLP IR
stream. It also provides method(s) to instantiate a log event generator with a customized search
query.
This class represents a stream reader used to read/deserialize log events from a CLP IR stream.
It also provides method(s) to instantiate a log event generator with a customized search query.
:param istream: Input stream that contains encoded CLP IR.
:param decoder_buffer_size: Initial size of the decoder buffer.
:param istream: Input stream that contains CLP IR byte sequence.
:param deserializer_buffer_size: Initial size of the deserializer buffer.
:param enable_compression: A flag indicating whether the istream is compressed using `zstd`.
:param allow_incomplete_stream: If set to `True`, an incomplete CLP IR stream is not treated as
an error. Instead, encountering such a stream is seen as reaching its end without raising
any exceptions.
"""

DEFAULT_DECODER_BUFFER_SIZE: int = 65536
DEFAULT_DESERIALIZER_BUFFER_SIZE: int = 65536

def __init__(
self,
istream: IO[bytes],
decoder_buffer_size: int = DEFAULT_DECODER_BUFFER_SIZE,
deserializer_buffer_size: int = DEFAULT_DESERIALIZER_BUFFER_SIZE,
enable_compression: bool = True,
allow_incomplete_stream: bool = False,
):
Expand All @@ -39,40 +38,42 @@ def __init__(
self.__istream = dctx.stream_reader(istream, read_across_frames=True)
else:
self.__istream = istream
self._decoder_buffer: DecoderBuffer = DecoderBuffer(self.__istream, decoder_buffer_size)
self._deserializer_buffer: DeserializerBuffer = DeserializerBuffer(
self.__istream, deserializer_buffer_size
)
self._metadata: Optional[Metadata] = None
self._allow_incomplete_stream: bool = allow_incomplete_stream

def read_next_log_event(self) -> Optional[LogEvent]:
"""
Reads and decodes the next encoded log event from the IR stream.
Reads and deserializes the next log event from the IR stream.
:return:
- Next unread log event represented as an instance of LogEvent.
- None if the end of IR stream is reached.
:raise Exception:
If :meth:`~clp_ffi_py.ir.native.Decoder.decode_next_log_event` fails.
If :meth:`~clp_ffi_py.ir.native.Deserializer.deserialize_next_log_event` fails.
"""
return Decoder.decode_next_log_event(
self._decoder_buffer, allow_incomplete_stream=self._allow_incomplete_stream
return Deserializer.deserialize_next_log_event(
self._deserializer_buffer, allow_incomplete_stream=self._allow_incomplete_stream
)

def read_preamble(self) -> None:
"""
Try to decode the preamble and set `metadata`. If `metadata` has been set already, it will
instantly return. It is separated from `__init__` so that the input stream does not need to
be readable on a reader's construction, but until the user starts to iterate logs.
Try to deserialize the preamble and set `metadata`. If `metadata` has been set already, it
will instantly return. It is separated from `__init__` so that the input stream does not
need to be readable on a reader's construction, but until the user starts to iterate logs.
:raise Exception:
If :meth:`~clp_ffi_py.ir.native.Decoder.decode_preamble` fails.
If :meth:`~clp_ffi_py.ir.native.Deserializer.deserialize_preamble` fails.
"""
if self.has_metadata():
return
self._metadata = Decoder.decode_preamble(self._decoder_buffer)
self._metadata = Deserializer.deserialize_preamble(self._deserializer_buffer)

def get_metadata(self) -> Metadata:
if None is self._metadata:
raise RuntimeError("The metadata has not been successfully decoded yet.")
raise RuntimeError("The metadata has not been successfully deserialized yet.")
return self._metadata

def has_metadata(self) -> bool:
Expand All @@ -84,14 +85,13 @@ def search(self, query: Query) -> Generator[LogEvent, None, None]:
:param query: The input query object used to match log events. Check the document of
:class:`~clp_ffi_py.ir.Query` for more details.
:yield: The next unread encoded log event that matches the given search query from the IR
stream.
:yield: The next unread log event that matches the given search query from the IR stream.
"""
if False is self.has_metadata():
self.read_preamble()
while True:
log_event: Optional[LogEvent] = Decoder.decode_next_log_event(
self._decoder_buffer,
log_event: Optional[LogEvent] = Deserializer.deserialize_next_log_event(
self._deserializer_buffer,
query=query,
allow_incomplete_stream=self._allow_incomplete_stream,
)
Expand Down Expand Up @@ -135,14 +135,14 @@ class ClpIrFileReader(ClpIrStreamReader):
def __init__(
self,
fpath: Path,
decoder_buffer_size: int = ClpIrStreamReader.DEFAULT_DECODER_BUFFER_SIZE,
deserializer_buffer_size: int = ClpIrStreamReader.DEFAULT_DESERIALIZER_BUFFER_SIZE,
enable_compression: bool = True,
allow_incomplete_stream: bool = False,
):
self._path: Path = fpath
super().__init__(
open(fpath, "rb"),
decoder_buffer_size=decoder_buffer_size,
deserializer_buffer_size=deserializer_buffer_size,
enable_compression=enable_compression,
allow_incomplete_stream=allow_incomplete_stream,
)
Expand Down
10 changes: 5 additions & 5 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,16 @@
f"{clp_src_root}/ReaderInterface.cpp",
f"{clp_src_root}/string_utils/string_utils.cpp",

f"{clp_ffi_py_src_root}/ir/native/decoding_methods.cpp",
f"{clp_ffi_py_src_root}/ir/native/encoding_methods.cpp",
f"{clp_ffi_py_src_root}/ir/native/deserialization_methods.cpp",
f"{clp_ffi_py_src_root}/ir/native/Metadata.cpp",
f"{clp_ffi_py_src_root}/ir/native/PyDecoder.cpp",
f"{clp_ffi_py_src_root}/ir/native/PyDecoderBuffer.cpp",
f"{clp_ffi_py_src_root}/ir/native/PyFourByteEncoder.cpp",
f"{clp_ffi_py_src_root}/ir/native/PyDeserializer.cpp",
f"{clp_ffi_py_src_root}/ir/native/PyDeserializerBuffer.cpp",
f"{clp_ffi_py_src_root}/ir/native/PyFourByteSerializer.cpp",
f"{clp_ffi_py_src_root}/ir/native/PyLogEvent.cpp",
f"{clp_ffi_py_src_root}/ir/native/PyMetadata.cpp",
f"{clp_ffi_py_src_root}/ir/native/PyQuery.cpp",
f"{clp_ffi_py_src_root}/ir/native/Query.cpp",
f"{clp_ffi_py_src_root}/ir/native/serialization_methods.cpp",
f"{clp_ffi_py_src_root}/modules/ir_native.cpp",
f"{clp_ffi_py_src_root}/Py_utils.cpp",
f"{clp_ffi_py_src_root}/utils.cpp",
Expand Down
8 changes: 4 additions & 4 deletions src/clp_ffi_py/PyObjectCast.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,15 @@ auto py_reinterpret_cast(Src* src) noexcept -> Dst* {
}

namespace ir::native {
class PyDecoder;
class PyDecoderBuffer;
class PyDeserializer;
class PyDeserializerBuffer;
class PyLogEvent;
class PyMetadata;
class PyQuery;
} // namespace ir::native

CLP_FFI_PY_MARK_AS_PYOBJECT(ir::native::PyDecoder);
CLP_FFI_PY_MARK_AS_PYOBJECT(ir::native::PyDecoderBuffer);
CLP_FFI_PY_MARK_AS_PYOBJECT(ir::native::PyDeserializer);
CLP_FFI_PY_MARK_AS_PYOBJECT(ir::native::PyDeserializerBuffer);
CLP_FFI_PY_MARK_AS_PYOBJECT(ir::native::PyLogEvent);
CLP_FFI_PY_MARK_AS_PYOBJECT(ir::native::PyMetadata);
CLP_FFI_PY_MARK_AS_PYOBJECT(ir::native::PyQuery);
Expand Down
2 changes: 1 addition & 1 deletion src/clp_ffi_py/ir/native/LogEvent.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

namespace clp_ffi_py::ir::native {
/**
* A class that represents a decoded IR log event. Contains ways to access (get or set) the log
* A class that represents a deserialized IR log event. Contains ways to access (get or set) the log
* message, the timestamp, and the log event index.
*/
class LogEvent {
Expand Down
6 changes: 3 additions & 3 deletions src/clp_ffi_py/ir/native/Metadata.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@

namespace clp_ffi_py::ir::native {
/**
* A class that represents a decoded IR preamble. Contains ways to access (get) metadata such as the
* timestamp format. After construction, the metadata is readonly.
* A class that represents a deserialized IR preamble. Contains ways to access (get) metadata such
* as the timestamp format. After construction, the metadata is readonly.
*/
class Metadata {
public:
/**
* Constructs a new Metadata object by reading values from a JSON object decoded from the
* Constructs a new Metadata object by reading values from a JSON object deserialized from the
* preamble. This constructor will validate the JSON data and throw exceptions when failing to
* extract required values.
* @param metadata JSON data that contains the metadata.
Expand Down
Loading

0 comments on commit a504ce6

Please sign in to comment.