Skip to content

Commit

Permalink
Add support for using a caller-defined lambda to determine when to te…
Browse files Browse the repository at this point in the history
…rminate decoding an IR-stream. (#42)
  • Loading branch information
LinZhihao-723 authored Dec 8, 2023
1 parent 22f707d commit d64fec2
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 60 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build_wheels.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-22.04, macos-11]
os: [ubuntu-22.04, macos-12]

steps:
- uses: actions/checkout@v3
Expand Down
2 changes: 1 addition & 1 deletion src/clp_ffi_py/ir/native/PyLogEvent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ auto PyLogEvent::module_level_init(PyObject* py_module) -> bool {
}

auto PyLogEvent::create_new_log_event(
std::string const& log_message,
std::string_view log_message,
ffi::epoch_time_ms_t timestamp,
size_t index,
PyMetadata* metadata
Expand Down
2 changes: 1 addition & 1 deletion src/clp_ffi_py/ir/native/PyLogEvent.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class PyLogEvent {
* set.
*/
[[nodiscard]] static auto create_new_log_event(
std::string const& log_message,
std::string_view log_message,
ffi::epoch_time_ms_t timestamp,
size_t index,
PyMetadata* metadata
Expand Down
174 changes: 117 additions & 57 deletions src/clp_ffi_py/ir/native/decoding_methods.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,51 @@
namespace clp_ffi_py::ir::native {
namespace {
/**
* Decodes the next log event from the CLP IR buffer `decoder_buffer`. If
* `py_query` is non-null decode until finding a log event that matches the
* query.
* This template defines the function signature of a termination handler
* required by `generic_decode_log_event`. Signature: (
* ffi::epoch_timestamp_ms timestamp,
* std::string_view decoded_log_message,
* size_t decoded_log_event_idx,
* PyObject*& return_value
* ) -> bool;
* @tparam TerminateHandler
*/
template <typename TerminateHandler>
concept TerminateHandlerSignature = requires(TerminateHandler handler) {
{
handler(std::declval<ffi::epoch_time_ms_t>(),
std::declval<std::string_view>(),
std::declval<size_t>(),
std::declval<PyObject*&>())
} -> std::same_as<bool>;
};

/**
* Decodes the next log event from the CLP IR buffer `decoder_buffer` until
* terminate handler returns true.
* @tparam TerminateHandler Method to determine if the decoding should
* terminate, and set the return value for termination.
* @param decoder_buffer IR decoder buffer of the input IR stream.
* @param py_metadata The metadata associated with the input IR stream.
* @param py_query Search query to filter log events.
* @param allow_incomplete_stream A flag to indicate whether the incomplete
* stream error should be ignored. If it is set to true, incomplete stream error
* should be treated as the termination.
* @return Log event represented as PyLogEvent on success.
* @return PyNone on termination.
* should be treated as the IR stream is terminated.
* @param terminate_handler
* @return The return value set by `terminate_handler`.
* @return PyNone if the IR stream is terminated.
* @return nullptr on failure with the relevant Python exception and error set.
*/
auto decode(
template <TerminateHandlerSignature TerminateHandler>
auto generic_decode_log_events(
PyDecoderBuffer* decoder_buffer,
PyMetadata* py_metadata,
PyQuery* py_query,
bool allow_incomplete_stream
bool allow_incomplete_stream,
TerminateHandler terminate_handler
) -> PyObject* {
std::string decoded_message;
ffi::epoch_time_ms_t timestamp_delta{0};
auto timestamp{decoder_buffer->get_ref_timestamp()};
size_t current_log_event_idx{0};
bool reached_eof{false};
PyObject* return_value{nullptr};

while (true) {
auto const unconsumed_bytes{decoder_buffer->get_unconsumed_bytes()};
BufferReader ir_buffer{
Expand All @@ -59,22 +80,21 @@ auto decode(
timestamp_delta
)};
if (ffi::ir_stream::IRErrorCode_Incomplete_IR == err) {
if (false == decoder_buffer->try_read()) {
if (allow_incomplete_stream
&& static_cast<bool>(PyErr_ExceptionMatches(
PyDecoderBuffer::get_py_incomplete_stream_error()
)))
{
PyErr_Clear();
Py_RETURN_NONE;
}
return nullptr;
if (decoder_buffer->try_read()) {
continue;
}
if (allow_incomplete_stream
&& static_cast<bool>(
PyErr_ExceptionMatches(PyDecoderBuffer::get_py_incomplete_stream_error())
))
{
PyErr_Clear();
Py_RETURN_NONE;
}
continue;
return nullptr;
}
if (ffi::ir_stream::IRErrorCode_Eof == err) {
reached_eof = true;
break;
Py_RETURN_NONE;
}
if (ffi::ir_stream::IRErrorCode_Success != err) {
PyErr_Format(PyExc_RuntimeError, cDecoderErrorCodeFormatStr, err);
Expand All @@ -83,35 +103,24 @@ auto decode(

timestamp += timestamp_delta;
current_log_event_idx = decoder_buffer->get_and_increment_decoded_message_count();
decoder_buffer->commit_read_buffer_consumption(static_cast<Py_ssize_t>(ir_buffer.get_pos())
);
auto const num_bytes_consumed{static_cast<Py_ssize_t>(ir_buffer.get_pos())};
decoder_buffer->commit_read_buffer_consumption(num_bytes_consumed);

if (nullptr == py_query) {
break;
}

auto* query{py_query->get_query()};
if (query->ts_safely_outside_time_range(timestamp)) {
Py_RETURN_NONE;
}
if (query->matches_time_range(timestamp)
&& query->matches_wildcard_queries(decoded_message))
{
if (terminate_handler(timestamp, decoded_message, current_log_event_idx, return_value)) {
decoder_buffer->set_ref_timestamp(timestamp);
break;
}
}

if (reached_eof) {
Py_RETURN_NONE;
}
return return_value;
}

decoder_buffer->set_ref_timestamp(timestamp);
return py_reinterpret_cast<PyObject>(PyLogEvent::create_new_log_event(
decoded_message,
timestamp,
current_log_event_idx,
py_metadata
));
/**
* @return A new reference to `Py_None`.
*/
[[nodiscard]] auto get_new_ref_to_py_none() -> PyObject* {
Py_INCREF(Py_None);
return Py_None;
}
} // namespace

Expand Down Expand Up @@ -242,7 +251,7 @@ auto decode_next_log_event(PyObject* Py_UNUSED(self), PyObject* args, PyObject*
};

PyDecoderBuffer* decoder_buffer{nullptr};
PyObject* query{Py_None};
PyObject* query_obj{Py_None};
int allow_incomplete_stream{0};

if (false
Expand All @@ -253,16 +262,16 @@ auto decode_next_log_event(PyObject* Py_UNUSED(self), PyObject* args, PyObject*
static_cast<char**>(keyword_table),
PyDecoderBuffer::get_py_type(),
&decoder_buffer,
&query,
&query_obj,
&allow_incomplete_stream
)))
{
return nullptr;
}

bool const is_query_given{Py_None != query};
bool const is_query_given{Py_None != query_obj};
if (is_query_given
&& false == static_cast<bool>(PyObject_TypeCheck(query, PyQuery::get_py_type())))
&& false == static_cast<bool>(PyObject_TypeCheck(query_obj, PyQuery::get_py_type())))
{
PyErr_SetString(PyExc_TypeError, cPyTypeError);
return nullptr;
Expand All @@ -275,12 +284,63 @@ auto decode_next_log_event(PyObject* Py_UNUSED(self), PyObject* args, PyObject*
);
return nullptr;
}
auto* metadata{decoder_buffer->get_metadata()};

return decode(
if (false == is_query_given) {
auto terminate_handler{
[metadata](
ffi::epoch_time_ms_t timestamp,
std::string_view log_message,
size_t log_event_idx,
PyObject*& return_value
) -> bool {
return_value = py_reinterpret_cast<PyObject>(PyLogEvent::create_new_log_event(
log_message,
timestamp,
log_event_idx,
metadata
));
return true;
}
};
return generic_decode_log_events(
decoder_buffer,
static_cast<bool>(allow_incomplete_stream),
terminate_handler
);
}

auto* py_query{py_reinterpret_cast<PyQuery>(query_obj)};
auto const* query{py_query->get_query()};
auto query_terminate_handler{
[query, metadata](
ffi::epoch_time_ms_t timestamp,
std::string_view log_message,
size_t log_event_idx,
PyObject*& return_value
) -> bool {
if (query->ts_safely_outside_time_range(timestamp)) {
return_value = get_new_ref_to_py_none();
return true;
}
if (false == query->matches_time_range(timestamp)
|| false == query->matches_wildcard_queries(log_message))
{
return false;
}
return_value = py_reinterpret_cast<PyObject>(PyLogEvent::create_new_log_event(
log_message,
timestamp,
log_event_idx,
metadata
));
return true;
}
};
return generic_decode_log_events(
decoder_buffer,
decoder_buffer->get_metadata(),
is_query_given ? py_reinterpret_cast<PyQuery>(query) : nullptr,
static_cast<bool>(allow_incomplete_stream)
static_cast<bool>(allow_incomplete_stream),
query_terminate_handler
);
}
}
Expand Down

0 comments on commit d64fec2

Please sign in to comment.