From 6a188b3d922ca5427ef5a8ab14f6a4082a6d2829 Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Fri, 13 Dec 2024 12:15:03 -0500 Subject: [PATCH] Improvements in loop and code format --- examples/instrumented_loop_example.py | 3 +- pyproject.toml | 2 +- src/flowcept/__init__.py | 4 + src/flowcept/commons/vocabulary.py | 1 + .../decorators/flowcept_loop.py | 166 ++++++++++++------ .../decorators/flowcept_task.py | 1 + .../flowcept_task_decorator_test.py | 37 ++++ 7 files changed, 160 insertions(+), 54 deletions(-) diff --git a/examples/instrumented_loop_example.py b/examples/instrumented_loop_example.py index 73b91414..b3e343bb 100644 --- a/examples/instrumented_loop_example.py +++ b/examples/instrumented_loop_example.py @@ -11,9 +11,8 @@ loss = random.random() sleep(0.05) print(item, loss) + # The following is optional, in case you want to capture values generated inside the loop. loop.end_iter({"item": item, "loss": loss}) docs = Flowcept.db.query(filter={"workflow_id": Flowcept.current_workflow_id}) -print(len(docs)) assert len(docs) == iterations + 1 # The whole loop itself is a task - diff --git a/pyproject.toml b/pyproject.toml index dbb6338c..5224a6ab 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -100,7 +100,7 @@ line-length = 100 [tool.ruff.lint] extend-select = ["E501", "D"] -ignore = ["D200", "D212"] +ignore = ["D200", "D212", "D105", "D401"] [tool.ruff.lint.pydocstyle] convention = "numpy" diff --git a/src/flowcept/__init__.py b/src/flowcept/__init__.py index 5dc58490..2d6b273b 100644 --- a/src/flowcept/__init__.py +++ b/src/flowcept/__init__.py @@ -11,18 +11,22 @@ def __getattr__(name): if name == "Flowcept": from flowcept.flowcept_api.flowcept_controller import Flowcept + return Flowcept elif name == "flowcept_task": from flowcept.instrumentation.decorators.flowcept_task import flowcept_task + return flowcept_task elif name == "FlowceptLoop": from flowcept.instrumentation.decorators.flowcept_loop import FlowceptLoop + return FlowceptLoop elif name == "telemetry_flowcept_task": from flowcept.instrumentation.decorators.flowcept_task import telemetry_flowcept_task + return telemetry_flowcept_task if name == "MLFlowInterceptor": diff --git a/src/flowcept/commons/vocabulary.py b/src/flowcept/commons/vocabulary.py index fd34fd3b..8c7010b7 100644 --- a/src/flowcept/commons/vocabulary.py +++ b/src/flowcept/commons/vocabulary.py @@ -1,4 +1,5 @@ """Vocab module.""" + from enum import Enum diff --git a/src/flowcept/instrumentation/decorators/flowcept_loop.py b/src/flowcept/instrumentation/decorators/flowcept_loop.py index fcbd93dc..7bd05f11 100644 --- a/src/flowcept/instrumentation/decorators/flowcept_loop.py +++ b/src/flowcept/instrumentation/decorators/flowcept_loop.py @@ -1,6 +1,7 @@ +"""FlowCept Loop module.""" + import typing import uuid -from collections.abc import Iterable from time import time from flowcept import Flowcept @@ -10,80 +11,143 @@ class FlowceptLoop: - def __init__(self, items: typing.Union[typing.Iterable, int], loop_name="loop", item_name="item", parent_task_id=None, workflow_id=None): + """ + A utility class to wrap and instrument iterable loops for telemetry and tracking. + + The `FlowceptLoop` class supports iterating over a collection of items or a numeric range + while capturing metadata for each iteration and for the loop as a whole. This is particularly + useful in scenarios where tracking and instrumentation of loop executions is required. + + Parameters + ---------- + items : typing.Union[typing.Sized, int] + The items to iterate over. Must either be an iterable with a `__len__` method or an integer + representing the range of iteration. + loop_name : str, optional + A descriptive name for the loop (default is "loop"). + item_name : str, optional + The name used for each item in the telemetry (default is "item"). + parent_task_id : str, optional + The ID of the parent task associated with the loop, if applicable (default is None). + workflow_id : str, optional + The workflow ID to associate with this loop. If not provided, it will be generated or + inferred from the current workflow context. + + Raises + ------ + Exception + If `items` is not an iterable with a `__len__` method or an integer. + + Notes + ----- + This class integrates with the `Flowcept` system for telemetry and tracking, ensuring + detailed monitoring of loops and their iterations. It is designed for cases where + capturing granular runtime behavior of loops is critical. + """ + + def __init__( + self, + items: typing.Union[typing.Sized, int], + loop_name="loop", + item_name="item", + parent_task_id=None, + workflow_id=None, + ): self._next_counter = 0 self.logger = FlowceptLogger() - self._interceptor = InstrumentationInterceptor.get_instance() - if isinstance(items, range): + if hasattr(items, "__len__"): self._iterable = items - self._max = items.stop + self._max = len(self._iterable) elif isinstance(items, int): self._iterable = range(items) - self._max = self._iterable.stop - elif isinstance(items, Iterable): - self._iterable = items - self._max = 10**100 # TODO: more complex iterables won't work; needs to implement the end of the loop + self._max = len(self._iterable) else: - raise NotImplementedError + raise Exception("You must use an iterable has at least a __len__ method defined.") + + self._interceptor = InstrumentationInterceptor.get_instance() self._iterator = iter(self._iterable) + self._last_iteration_task = None self._current_iteration_task = {} self._loop_name = loop_name self._item_name = item_name self._parent_task_id = parent_task_id self._workflow_id = workflow_id or Flowcept.current_workflow_id or str(uuid.uuid4()) - def __iter__(self): return self - def _capture_begin_loop(self): - self.logger.debug("Registering loop init.") - self.whole_loop_task = { + def _begin_loop(self): + self.logger.debug("Capturing loop init.") + self._whole_loop_task = { "started_at": (started_at := time()), "task_id": str(started_at), "type": "task", "activity_id": self._loop_name, - "workflow_id": self._workflow_id + "workflow_id": self._workflow_id, } if self._parent_task_id: - self.whole_loop_task["parent_task_id"] = self._parent_task_id + self._whole_loop_task["parent_task_id"] = self._parent_task_id + self._interceptor.intercept(self._whole_loop_task) + self._capture_iteration_bounds() - def _capture_end_loop(self): - self.logger.debug("Registering the end of the loop.") - self.whole_loop_task["status"] = Status.FINISHED.value - self.whole_loop_task["ended_at"] = time()#self._current_iteration_task["ended_at"] - self._interceptor.intercept(self.whole_loop_task) + def _end_loop(self): + self._capture_iteration_bounds() + self.logger.debug("Capturing loop end.") + self._end_iteration_task(self._last_iteration_task) + self._whole_loop_task["status"] = Status.FINISHED.value + self._whole_loop_task["ended_at"] = time() + self._interceptor.intercept(self._whole_loop_task) def __next__(self): - self.logger.debug(f"Calling next for the {self._next_counter}th time.") + # Basic idea: the beginning of the current iteration is the end of the last + self._current_item = next(self._iterator) + + if self._next_counter == 0: + self._begin_loop() + elif self._next_counter == self._max - 1: + self._end_loop() + elif self._next_counter < self._max - 1: + self._capture_iteration_bounds() + self._next_counter += 1 - if self._next_counter == 1: - self._capture_begin_loop() - elif self._next_counter > self._max: - self._capture_end_loop() - - item = next(self._iterator) - if self._next_counter <= self._max: - self.logger.debug(f"Registering the init of the {self._next_counter - 1}th iteration.") - self._current_iteration_task = { - "workflow_id": self._workflow_id, - "activity_id": self._loop_name + "_iteration", - "used": { - "i": self._next_counter-1, - self._item_name: item - }, - "parent_task_id": self.whole_loop_task["task_id"], - "started_at": time(), - "telemetry_at_start": self._interceptor.telemetry_capture.capture().to_dict(), - "type": "task" - } - return item - - def end_iter(self, value: typing.Dict): - self.logger.debug(f"Registering the end of the {self._next_counter - 1}th iteration.") - self._current_iteration_task["generated"] = value - #self._current_iteration_task["ended_at"] = time() - #self._current_iteration_task["telemetry_at_end"] = self._interceptor.telemetry_capture.capture().to_dict(), - self._current_iteration_task["status"] = Status.FINISHED.value - self._interceptor.intercept(self._current_iteration_task) + return self._current_item + + def _capture_iteration_bounds(self): + if self._last_iteration_task is not None: + self.logger.debug(f"Capturing the end of iteration {self._next_counter-1}.") + self._end_iteration_task(self._last_iteration_task) + + self.logger.debug(f"Capturing the init of iteration {self._next_counter}.") + self._current_iteration_task = self._begin_iteration_task(self._current_item) + self._last_iteration_task = self._current_iteration_task + + def _begin_iteration_task(self, item): + iteration_task = { + "workflow_id": self._workflow_id, + "activity_id": self._loop_name + "_iteration", + "used": {"i": self._next_counter, self._item_name: item}, + "parent_task_id": self._whole_loop_task["task_id"], + "started_at": time(), + "telemetry_at_start": self._interceptor.telemetry_capture.capture().to_dict(), + "type": "task", + } + return iteration_task + + def _end_iteration_task(self, iteration_task): + iteration_task["status"] = "FINISHED" + self._interceptor.intercept(self._last_iteration_task) + + def end_iter(self, generated_value: typing.Dict): + """ + Finalizes the current iteration by associating generated values with the iteration metadata. + + This method updates the metadata of the current iteration to include the values generated + during the iteration, ensuring they are properly logged and tracked. + Parameters + ---------- + generated_value : dict + A dictionary containing the generated values for the current iteration. These values + will be stored in the `generated` field of the iteration's metadata. + """ + self._current_iteration_task["generated"] = generated_value diff --git a/src/flowcept/instrumentation/decorators/flowcept_task.py b/src/flowcept/instrumentation/decorators/flowcept_task.py index d24dd6ab..13677378 100644 --- a/src/flowcept/instrumentation/decorators/flowcept_task.py +++ b/src/flowcept/instrumentation/decorators/flowcept_task.py @@ -1,4 +1,5 @@ """Task module.""" + from time import time from functools import wraps from flowcept.commons.flowcept_dataclasses.task_object import ( diff --git a/tests/decorator_tests/flowcept_task_decorator_test.py b/tests/decorator_tests/flowcept_task_decorator_test.py index 7d027710..c72fab1e 100644 --- a/tests/decorator_tests/flowcept_task_decorator_test.py +++ b/tests/decorator_tests/flowcept_task_decorator_test.py @@ -260,6 +260,41 @@ def test_decorated_function_timed(self): print("Overheads: " + str(overheads)) assert all(map(lambda v: v < threshold, overheads)) + def test_flowcept_loop_types(self): + + with Flowcept(): + items = range(3) + loop = FlowceptLoop(items=items) + for _ in loop: + pass + docs = Flowcept.db.query(filter={"workflow_id": Flowcept.current_workflow_id}) + assert len(docs) == len(items) + 1 + + with Flowcept(): + items = [10, 20, 30] + loop = FlowceptLoop(items=items) + for _ in loop: + pass + docs = Flowcept.db.query(filter={"workflow_id": Flowcept.current_workflow_id}) + assert len(docs) == len(items) + 1 + + with Flowcept(): + items = "abcd" + loop = FlowceptLoop(items=items) + for _ in loop: + pass + docs = Flowcept.db.query(filter={"workflow_id": Flowcept.current_workflow_id}) + assert len(docs) == len(items) + 1 + + with Flowcept(): + items = np.array([0.5, 1.0, 1.5]) + loop = FlowceptLoop(items=items, loop_name="our_loop") + for _ in loop: + loop.end_iter({"a": 1}) + docs = Flowcept.db.query(filter={"workflow_id": Flowcept.current_workflow_id, "activity_id": "our_loop_iteration"}) + assert len(docs) == len(items) + assert all(d["generated"]["a"] == 1 for d in docs) + def test_flowcept_loop_generator(self): number_of_epochs = 3 epochs = range(0, number_of_epochs) @@ -294,3 +329,5 @@ def test_flowcept_loop_generator(self): assert t["used"]["epoch"] == i assert t["status"] == Status.FINISHED.value assert t["parent_task_id"] == whole_loop_task["task_id"] + +