Skip to content

Commit

Permalink
Merge pull request #152 from ORNL/dev
Browse files Browse the repository at this point in the history
Main < Dev: fixes for UTC timestamps and improve instrumentation.
  • Loading branch information
renan-souza authored Oct 16, 2024
2 parents 253542b + 5341c83 commit be686a1
Show file tree
Hide file tree
Showing 18 changed files with 323 additions and 77 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/run-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ jobs:
python flowcept/flowcept_webserver/app.py &
sleep 3
export FLOWCEPT_SETTINGS_PATH=~/.flowcept/settings.yaml
pytest --nbmake "notebooks/" --nbmake-timeout=600 --ignore=notebooks/dask_from_CLI.ipynb
pytest --nbmake "notebooks/" --nbmake-timeout=600 --ignore=notebooks/dask_from_CLI.ipynb --ignore=notebooks/mlflow.ipynb
- name: Shut down compose
run: docker compose -f deployment/compose-full.yml down
- name: Start Docker Compose with Kafka
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ test.py
**/*dump*
time.txt
tmp/
deployment/data
14 changes: 12 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,19 @@ If you are doing extensive performance evaluation experiments using this softwar
## Install AMD GPU Lib
On the machines that have AMD GPUs, we use the official AMD ROCM library to capture GPU runtime data.
Unfortunately, this library is not available as a pypi/conda package, so you must manually install it. See instructions
in the link: https://rocm.docs.amd.com/projects/amdsmi/en/latest/
Unfortunately, this library is not available as a pypi/conda package, so you must manually install it. See instructions in the link: https://rocm.docs.amd.com/projects/amdsmi/en/latest/
Here is a summary:
1. Install the AMD drivers on the machine (check if they are available already under `/opt/rocm-*`).
2. Suppose it is /opt/rocm-6.2.0. Then, make sure it has a share/amd_smi subdirectory and pyproject.toml or setup.py in it.
3. Copy the amd_smi to your home directory: `cp -r /opt/rocm-6.2.0/share/amd_smi ~`
4. cd ~/amd_smi
5. In your python environment, do a pip install .

Current code is compatible with this version: amdsmi==24.6.2+2b02a07
Which was installed using Frontier's /opt/rocm-6.2.0/share/amd_smi

## See also

Expand Down
12 changes: 12 additions & 0 deletions extra_requirements/amd-requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# On the machines that have AMD GPUs, we use the official AMD ROCM library to capture GPU runtime data. Unfortunately, this library is not available as a pypi/conda package, so you must manually install it. See instructions in the link: https://rocm.docs.amd.com/projects/amdsmi/en/latest/

# Here is a summary:

# 1. Install the AMD drivers on the machine (check if they are available already under `/opt/rocm-*`).
# 2. Suppose it is /opt/rocm-6.2.0. Then, make sure it has a share/amd_smi subdirectory and pyproject.toml or setup.py in it.
# 3. Copy the amd_smi to your home directory: `cp -r /opt/rocm-6.2.0/share/amd_smi ~`
# 4. cd ~/amd_smi
# 5. In your python environment, do a pip install .

# Current code is compatible with this version: amdsmi==24.6.2+2b02a07
# Which was installed using Frontier's /opt/rocm-6.2.0/share/amd_smi
1 change: 0 additions & 1 deletion flowcept/commons/daos/document_db_dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import pickle
import zipfile

import pymongo
from bson import ObjectId
from bson.json_util import dumps
from pymongo import MongoClient, UpdateOne
Expand Down
16 changes: 16 additions & 0 deletions flowcept/commons/flowcept_dataclasses/task_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,22 @@ def to_dict(self):
def serialize(self):
return msgpack.dumps(self.to_dict())

@staticmethod
def enrich_task_dict(task_dict: dict):
attributes = {
"campaign_id": CAMPAIGN_ID,
"node_name": NODE_NAME,
"login_name": LOGIN_NAME,
"public_ip": PUBLIC_IP,
"private_ip": PRIVATE_IP,
"hostname": HOSTNAME,
}
for key, fallback_value in attributes.items():
if (
key not in task_dict or task_dict[key] is None
) and fallback_value is not None:
task_dict[key] = fallback_value

# @staticmethod
# def deserialize(serialized_data) -> 'TaskObject':
# dict_obj = msgpack.loads(serialized_data)
Expand Down
25 changes: 16 additions & 9 deletions flowcept/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@
##########################

FLOWCEPT_USER = settings["experiment"].get("user", "blank_user")
CAMPAIGN_ID = settings["experiment"].get("campaign_id", "super_campaign")
CAMPAIGN_ID = settings["experiment"].get(
"campaign_id", os.environ.get("CAMPAIGN_ID", "super_campaign")
)

######################
# MQ Settings #
Expand Down Expand Up @@ -81,9 +83,13 @@
######################
# MongoDB Settings #
######################
MONGO_URI = settings["mongodb"].get("uri", None)
MONGO_HOST = settings["mongodb"].get("host", "localhost")
MONGO_PORT = int(settings["mongodb"].get("port", "27017"))
MONGO_URI = settings["mongodb"].get("uri", os.environ.get("MONGO_URI", None))
MONGO_HOST = settings["mongodb"].get(
"host", os.environ.get("MONGO_HOST", "localhost")
)
MONGO_PORT = int(
settings["mongodb"].get("port", os.environ.get("MONGO_PORT", "27017"))
)
MONGO_DB = settings["mongodb"].get("db", PROJECT_NAME)
MONGO_CREATE_INDEX = settings["mongodb"].get("create_collection_index", True)

Expand Down Expand Up @@ -126,9 +132,6 @@
TELEMETRY_CAPTURE = settings["project"].get("telemetry_capture", None)

REGISTER_WORKFLOW = settings["project"].get("register_workflow", True)
REGISTER_INSTRUMENTED_TASKS = settings["project"].get(
"register_instrumented_tasks", True
)

##################################
# GPU TELEMETRY CAPTURE SETTINGS #
Expand Down Expand Up @@ -247,8 +250,9 @@
# Web Server #
######################

WEBSERVER_HOST = settings["web_server"].get("host", "0.0.0.0")
WEBSERVER_PORT = int(settings["web_server"].get("port", "5000"))
_webserver_settings = settings.get("web_server", {})
WEBSERVER_HOST = _webserver_settings.get("host", "0.0.0.0")
WEBSERVER_PORT = int(_webserver_settings.get("port", 5000))

######################
# ANALYTICS #
Expand All @@ -260,6 +264,9 @@
####

INSTRUMENTATION = settings.get("instrumentation", None)
INSTRUMENTATION_ENABLED = False
if INSTRUMENTATION:
INSTRUMENTATION_ENABLED = INSTRUMENTATION.get("enabled", False)

################# Enabled ADAPTERS

Expand Down
47 changes: 33 additions & 14 deletions flowcept/flowcept_api/flowcept_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@
from flowcept.commons import logger
from flowcept.commons.daos.document_db_dao import DocumentDBDao
from flowcept.commons.daos.mq_dao.mq_dao_base import MQDao
from flowcept.configs import MQ_INSTANCES
from flowcept.configs import (
MQ_INSTANCES,
INSTRUMENTATION,
INSTRUMENTATION_ENABLED,
)
from flowcept.flowcept_api.db_api import DBAPI
from flowcept.flowceptor.consumers.document_inserter import DocumentInserter
from flowcept.commons.flowcept_logger import FlowceptLogger
Expand Down Expand Up @@ -53,31 +57,31 @@ def __init__(
self._bundle_exec_id = id(self)
else:
self._bundle_exec_id = bundle_exec_id
self.enabled = True
self.is_started = False
if isinstance(interceptors, str):
self._interceptors = None
else:
if interceptors is None:
if not INSTRUMENTATION_ENABLED:
self.enabled = False
return
interceptors = [
flowcept.instrumentation.decorators.instrumentation_interceptor
]
elif not isinstance(interceptors, list):
interceptors = [interceptors]
self._interceptors: List[BaseInterceptor] = interceptors

if workflow_id or workflow_args or workflow_name:
wf_obj = WorkflowObject(
workflow_id, workflow_name, used=workflow_args
)
Flowcept.db.insert_or_update_workflow(wf_obj)
Flowcept.current_workflow_id = wf_obj.workflow_id
else:
Flowcept.current_workflow_id = None

self.is_started = False
self.current_workflow_id = workflow_id
self.workflow_name = workflow_name
self.workflow_args = workflow_args

def start(self):
if self.is_started:
self.logger.warning("Consumer is already started!")
if self.is_started or not self.enabled:
self.logger.warning(
"Consumer may be already started or instrumentation is not set"
)
return self

if self._interceptors and len(self._interceptors):
Expand All @@ -91,6 +95,21 @@ def start(self):
interceptor.start(bundle_exec_id=self._bundle_exec_id)
self.logger.debug(f"...Flowceptor {key} started ok!")

if (
self.current_workflow_id
or self.workflow_args
or self.workflow_name
) and interceptor.kind == "instrumentation":
wf_obj = WorkflowObject(
self.current_workflow_id,
self.workflow_name,
used=self.workflow_args,
)
interceptor.send_workflow_message(wf_obj)
Flowcept.current_workflow_id = wf_obj.workflow_id
else:
Flowcept.current_workflow_id = None

if self._start_doc_inserter:
self.logger.debug("Flowcept Consumer starting...")

Expand Down Expand Up @@ -119,7 +138,7 @@ def start(self):
return self

def stop(self):
if not self.is_started:
if not self.is_started or not self.enabled:
self.logger.warning("Consumer is already stopped!")
return
sleep_time = 1
Expand Down
13 changes: 6 additions & 7 deletions flowcept/flowceptor/adapters/base_interceptor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from abc import ABCMeta, abstractmethod
from uuid import uuid4

from flowcept.commons.flowcept_dataclasses.workflow_object import (
WorkflowObject,
Expand All @@ -24,7 +25,7 @@
# in the code. https://github.com/ORNL/flowcept/issues/109
# class BaseInterceptor(object, metaclass=ABCMeta):
class BaseInterceptor(object):
def __init__(self, plugin_key=None):
def __init__(self, plugin_key=None, kind=None):
self.logger = FlowceptLogger()
if (
plugin_key is not None
Expand All @@ -38,6 +39,7 @@ def __init__(self, plugin_key=None):
self.telemetry_capture = TelemetryCapture()
self._saved_workflows = set()
self._generated_workflow_id = False
self.kind = kind

def prepare_task_msg(self, *args, **kwargs) -> TaskObject:
raise NotImplementedError()
Expand Down Expand Up @@ -79,12 +81,8 @@ def callback(self, *args, **kwargs):
raise NotImplementedError()

def send_workflow_message(self, workflow_obj: WorkflowObject):
wf_id = workflow_obj.workflow_id
if wf_id is None:
self.logger.warning(
f"Workflow_id is empty, we can't save this workflow_obj: {workflow_obj}"
)
return
wf_id = workflow_obj.workflow_id or str(uuid4())
workflow_obj.workflow_id = wf_id
if wf_id in self._saved_workflows:
return
self._saved_workflows.add(wf_id)
Expand All @@ -105,6 +103,7 @@ def send_workflow_message(self, workflow_obj: WorkflowObject):
if ENRICH_MESSAGES:
workflow_obj.enrich(self.settings.key if self.settings else None)
self.intercept(workflow_obj.to_dict())
return wf_id

def intercept(self, obj_msg):
self._mq_dao.buffer.append(obj_msg)
Expand Down
28 changes: 19 additions & 9 deletions flowcept/flowceptor/consumers/document_inserter.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from datetime import datetime
from time import time, sleep
from threading import Thread, Event, Lock
from typing import Dict
from uuid import uuid4

import pytz

import flowcept.commons
from flowcept.commons.daos.autoflush_buffer import AutoflushBuffer
from flowcept.commons.flowcept_dataclasses.workflow_object import (
Expand All @@ -17,6 +20,7 @@
MONGO_ADAPTIVE_BUFFER_SIZE,
JSON_SERIALIZER,
MONGO_REMOVE_EMPTY_FIELDS,
ENRICH_MESSAGES,
)
from flowcept.commons.flowcept_logger import FlowceptLogger
from flowcept.commons.daos.mq_dao.mq_dao_base import MQDao
Expand Down Expand Up @@ -107,10 +111,6 @@ def flush_function(buffer, doc_dao, logger=flowcept.commons.logger):
logger.info(f"Flushed {len(buffer)} msgs to DocDB!")

def _handle_task_message(self, message: Dict):
# if "utc_timestamp" in message:
# dt = datetime.fromtimestamp(message["utc_timestamp"])
# message["timestamp"] = dt.utcnow()

# if DEBUG_MODE:
# message["debug"] = True
if "task_id" not in message:
Expand All @@ -121,11 +121,21 @@ def _handle_task_message(self, message: Dict):
if wf_id:
message["workflow_id"] = wf_id

if not any(
time_field in message
for time_field in TaskObject.get_time_field_names()
):
message["registered_at"] = time()
has_time_fields = False
for time_field in TaskObject.get_time_field_names():
if time_field in message:
has_time_fields = True
message[time_field] = datetime.fromtimestamp(
message[time_field], pytz.utc
)

if not has_time_fields:
message["registered_at"] = datetime.fromtimestamp(
time(), pytz.utc
)

if ENRICH_MESSAGES:
TaskObject.enrich_task_dict(message)

message.pop("type")

Expand Down
2 changes: 1 addition & 1 deletion flowcept/instrumentation/decorators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# Perhaps we should have a BaseAdaptor that would work for both and
# observability and instrumentation adapters. This would be a major refactor
# in the code. https://github.com/ORNL/flowcept/issues/109
instrumentation_interceptor = BaseInterceptor()
instrumentation_interceptor = BaseInterceptor(kind="instrumentation")
# TODO This above is bad because I am reusing the same BaseInterceptor both
# for adapter-based observability + traditional instrumentation via @decorator
# I'm just setting _registered_workflow to avoid the auto wf register that
Expand Down
4 changes: 2 additions & 2 deletions flowcept/instrumentation/decorators/flowcept_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from flowcept.commons.utils import replace_non_serializable
from flowcept.configs import (
REPLACE_NON_JSON_SERIALIZABLE,
REGISTER_INSTRUMENTED_TASKS,
INSTRUMENTATION_ENABLED,
)


Expand Down Expand Up @@ -120,7 +120,7 @@ def flowcept_task(func=None, **decorator_kwargs):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
if not REGISTER_INSTRUMENTED_TASKS:
if not INSTRUMENTATION_ENABLED:
return func(*args, **kwargs)

args_handler = decorator_kwargs.get(
Expand Down
Loading

0 comments on commit be686a1

Please sign in to comment.