Skip to content

Commit

Permalink
Merge pull request #183 from ORNL/loops
Browse files Browse the repository at this point in the history
Improving loop instrumentation
  • Loading branch information
renan-souza authored Dec 13, 2024
2 parents 373197c + 6a188b3 commit dc4e908
Show file tree
Hide file tree
Showing 16 changed files with 311 additions and 119 deletions.
16 changes: 11 additions & 5 deletions examples/instrumented_loop_example.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
import random
from time import sleep

from flowcept import Flowcept, flowcept_loop
from flowcept import Flowcept, FlowceptLoop

iterations = 3

epochs = range(1, 3)
with Flowcept():
for _ in flowcept_loop(items=epochs, loop_name="epochs", item_name='epoch'):
loop = FlowceptLoop(iterations)
for item in loop:
loss = random.random()
sleep(0.05)
print(item, loss)
# The following is optional, in case you want to capture values generated inside the loop.
loop.end_iter({"item": item, "loss": loss})

docs = Flowcept.db.query(filter={"workflow_id": Flowcept.current_workflow_id})
print(len(docs))
assert len(docs) == 3 # 1 (parent_task) + 2 (sub_tasks)
assert len(docs) == iterations + 1 # The whole loop itself is a task
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ line-length = 100

[tool.ruff.lint]
extend-select = ["E501", "D"]
ignore = ["D200", "D212"]
ignore = ["D200", "D212", "D105", "D401"]

[tool.ruff.lint.pydocstyle]
convention = "numpy"
Expand Down
28 changes: 24 additions & 4 deletions src/flowcept/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,33 @@

from flowcept.configs import SETTINGS_PATH
from flowcept.version import __version__
from flowcept.flowcept_api.flowcept_controller import Flowcept
from flowcept.instrumentation.decorators.flowcept_task import flowcept_task, flowcept_loop

from flowcept.commons.flowcept_dataclasses.workflow_object import (
WorkflowObject,
)


def __getattr__(name):
if name == "Flowcept":
from flowcept.flowcept_api.flowcept_controller import Flowcept

return Flowcept

elif name == "flowcept_task":
from flowcept.instrumentation.decorators.flowcept_task import flowcept_task

return flowcept_task

elif name == "FlowceptLoop":
from flowcept.instrumentation.decorators.flowcept_loop import FlowceptLoop

return FlowceptLoop

elif name == "telemetry_flowcept_task":
from flowcept.instrumentation.decorators.flowcept_task import telemetry_flowcept_task

return telemetry_flowcept_task

if name == "MLFlowInterceptor":
from flowcept.flowceptor.adapters.mlflow.mlflow_interceptor import (
MLFlowInterceptor,
Expand Down Expand Up @@ -54,10 +73,11 @@ def __getattr__(name):
"TensorboardInterceptor",
"ZambezeInterceptor",
"TaskQueryAPI",
"WorkflowObject",
"flowcept_task",
"flowcept_loop",
"FlowceptLoop",
"telemetry_flowcept_task",
"Flowcept",
"WorkflowObject",
"__version__",
"SETTINGS_PATH",
]
21 changes: 1 addition & 20 deletions src/flowcept/commons/flowcept_dataclasses/task_object.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
"""Task object module."""

from enum import Enum
from typing import Dict, AnyStr, Any, Union, List
import msgpack

import flowcept
from flowcept.commons.flowcept_dataclasses.telemetry import Telemetry
from flowcept.commons.vocabulary import Status
from flowcept.configs import (
HOSTNAME,
PRIVATE_IP,
Expand All @@ -16,25 +16,6 @@
)


class Status(str, Enum):
"""Status class.
Inheriting from str here for JSON serialization.
"""

SUBMITTED = "SUBMITTED"
WAITING = "WAITING"
RUNNING = "RUNNING"
FINISHED = "FINISHED"
ERROR = "ERROR"
UNKNOWN = "UNKNOWN"

@staticmethod
def get_finished_statuses():
"""Get finished status."""
return [Status.FINISHED, Status.ERROR]


class TaskObject:
"""Task class."""

Expand Down
2 changes: 1 addition & 1 deletion src/flowcept/commons/query_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import pandas as pd

from flowcept.commons.flowcept_dataclasses.task_object import Status
from flowcept.commons.vocabulary import Status


def get_doc_status(row):
Expand Down
2 changes: 1 addition & 1 deletion src/flowcept/commons/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from flowcept import configs
from flowcept.commons.flowcept_logger import FlowceptLogger
from flowcept.configs import PERF_LOG
from flowcept.commons.flowcept_dataclasses.task_object import Status
from flowcept.commons.vocabulary import Status


def get_utc_now() -> float:
Expand Down
21 changes: 21 additions & 0 deletions src/flowcept/commons/vocabulary.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Vocab module."""

from enum import Enum


class Vocabulary:
"""Vocab class."""
Expand All @@ -14,3 +16,22 @@ class Settings:
MLFLOW_KIND = "mlflow"
TENSORBOARD_KIND = "tensorboard"
DASK_KIND = "dask"


class Status(str, Enum):
"""Status class.
Inheriting from str here for JSON serialization.
"""

SUBMITTED = "SUBMITTED"
WAITING = "WAITING"
RUNNING = "RUNNING"
FINISHED = "FINISHED"
ERROR = "ERROR"
UNKNOWN = "UNKNOWN"

@staticmethod
def get_finished_statuses():
"""Get finished status."""
return [Status.FINISHED, Status.ERROR]
2 changes: 1 addition & 1 deletion src/flowcept/flowceptor/adapters/dask/dask_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
from flowcept import WorkflowObject
from flowcept.commons.flowcept_dataclasses.task_object import (
TaskObject,
Status,
)
from flowcept.commons.vocabulary import Status
from flowcept.flowceptor.adapters.base_interceptor import (
BaseInterceptor,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@

from flowcept.commons.flowcept_dataclasses.task_object import (
TaskObject,
Status,
)
from flowcept.commons.vocabulary import Status
from flowcept.commons.utils import get_utc_now
from flowcept.flowceptor.adapters.interceptor_state_manager import (
InterceptorStateManager,
Expand Down
2 changes: 1 addition & 1 deletion src/flowcept/flowceptor/consumers/document_inserter.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ def _message_handler(self, msg_obj: dict):
self._handle_workflow_message(msg_obj)
return True
elif msg_type is None:
self.logger.warning(f"Message without type???\n {msg_obj}")
self.logger.error(f"Message without type??? --> {msg_obj}")
return True
else:
self.logger.error("Unexpected message type")
Expand Down
153 changes: 153 additions & 0 deletions src/flowcept/instrumentation/decorators/flowcept_loop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
"""FlowCept Loop module."""

import typing
import uuid
from time import time

from flowcept import Flowcept
from flowcept.commons.flowcept_logger import FlowceptLogger
from flowcept.commons.vocabulary import Status
from flowcept.flowceptor.adapters.instrumentation_interceptor import InstrumentationInterceptor


class FlowceptLoop:
"""
A utility class to wrap and instrument iterable loops for telemetry and tracking.
The `FlowceptLoop` class supports iterating over a collection of items or a numeric range
while capturing metadata for each iteration and for the loop as a whole. This is particularly
useful in scenarios where tracking and instrumentation of loop executions is required.
Parameters
----------
items : typing.Union[typing.Sized, int]
The items to iterate over. Must either be an iterable with a `__len__` method or an integer
representing the range of iteration.
loop_name : str, optional
A descriptive name for the loop (default is "loop").
item_name : str, optional
The name used for each item in the telemetry (default is "item").
parent_task_id : str, optional
The ID of the parent task associated with the loop, if applicable (default is None).
workflow_id : str, optional
The workflow ID to associate with this loop. If not provided, it will be generated or
inferred from the current workflow context.
Raises
------
Exception
If `items` is not an iterable with a `__len__` method or an integer.
Notes
-----
This class integrates with the `Flowcept` system for telemetry and tracking, ensuring
detailed monitoring of loops and their iterations. It is designed for cases where
capturing granular runtime behavior of loops is critical.
"""

def __init__(
self,
items: typing.Union[typing.Sized, int],
loop_name="loop",
item_name="item",
parent_task_id=None,
workflow_id=None,
):
self._next_counter = 0
self.logger = FlowceptLogger()
if hasattr(items, "__len__"):
self._iterable = items
self._max = len(self._iterable)
elif isinstance(items, int):
self._iterable = range(items)
self._max = len(self._iterable)
else:
raise Exception("You must use an iterable has at least a __len__ method defined.")

self._interceptor = InstrumentationInterceptor.get_instance()
self._iterator = iter(self._iterable)
self._last_iteration_task = None
self._current_iteration_task = {}
self._loop_name = loop_name
self._item_name = item_name
self._parent_task_id = parent_task_id
self._workflow_id = workflow_id or Flowcept.current_workflow_id or str(uuid.uuid4())

def __iter__(self):
return self

def _begin_loop(self):
self.logger.debug("Capturing loop init.")
self._whole_loop_task = {
"started_at": (started_at := time()),
"task_id": str(started_at),
"type": "task",
"activity_id": self._loop_name,
"workflow_id": self._workflow_id,
}
if self._parent_task_id:
self._whole_loop_task["parent_task_id"] = self._parent_task_id
self._interceptor.intercept(self._whole_loop_task)
self._capture_iteration_bounds()

def _end_loop(self):
self._capture_iteration_bounds()
self.logger.debug("Capturing loop end.")
self._end_iteration_task(self._last_iteration_task)
self._whole_loop_task["status"] = Status.FINISHED.value
self._whole_loop_task["ended_at"] = time()
self._interceptor.intercept(self._whole_loop_task)

def __next__(self):
# Basic idea: the beginning of the current iteration is the end of the last
self._current_item = next(self._iterator)

if self._next_counter == 0:
self._begin_loop()
elif self._next_counter == self._max - 1:
self._end_loop()
elif self._next_counter < self._max - 1:
self._capture_iteration_bounds()

self._next_counter += 1
return self._current_item

def _capture_iteration_bounds(self):
if self._last_iteration_task is not None:
self.logger.debug(f"Capturing the end of iteration {self._next_counter-1}.")
self._end_iteration_task(self._last_iteration_task)

self.logger.debug(f"Capturing the init of iteration {self._next_counter}.")
self._current_iteration_task = self._begin_iteration_task(self._current_item)
self._last_iteration_task = self._current_iteration_task

def _begin_iteration_task(self, item):
iteration_task = {
"workflow_id": self._workflow_id,
"activity_id": self._loop_name + "_iteration",
"used": {"i": self._next_counter, self._item_name: item},
"parent_task_id": self._whole_loop_task["task_id"],
"started_at": time(),
"telemetry_at_start": self._interceptor.telemetry_capture.capture().to_dict(),
"type": "task",
}
return iteration_task

def _end_iteration_task(self, iteration_task):
iteration_task["status"] = "FINISHED"
self._interceptor.intercept(self._last_iteration_task)

def end_iter(self, generated_value: typing.Dict):
"""
Finalizes the current iteration by associating generated values with the iteration metadata.
This method updates the metadata of the current iteration to include the values generated
during the iteration, ensuring they are properly logged and tracked.
Parameters
----------
generated_value : dict
A dictionary containing the generated values for the current iteration. These values
will be stored in the `generated` field of the iteration's metadata.
"""
self._current_iteration_task["generated"] = generated_value
Loading

0 comments on commit dc4e908

Please sign in to comment.