Skip to content

Commit

Permalink
Merge branch 'main' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
renan-souza authored Sep 24, 2024
2 parents 492b7cb + c8641fc commit 1b93444
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 14 deletions.
9 changes: 4 additions & 5 deletions .github/workflows/create-release-n-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,10 @@ jobs:
run: pip install flowcept[dask]
- name: Test pip install multiple adapters
run: pip install flowcept[mlflow,tensorboard]
- name: Test pip install full
run: pip install flowcept[full]
- name: Install dev dependencies
run: |
pip install -r extra_requirements/dev-requirements.txt
- name: Install our dependencies
run: pip install -e .[fulldev] # This will install all dependencies, for all adapters and dev deps.
- name: Pip list
run: pip list
- name: Run Docker Compose
run: docker compose -f deployment/compose-full.yml up -d
- name: Test with pytest
Expand Down
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ different parallel computing systems (e.g., Dask, Spark, Workflow Management Sys
capability to seamless and automatically integrate data from various workflows using data observability.
It builds an integrated data view at runtime enabling end-to-end exploratory data analysis and monitoring.
It follows [W3C PROV](https://www.w3.org/TR/prov-overview/) recommendations for its data schema.
It does not require changes in user codes or systems (i.e., instrumentation), but we provide instrumentation options for convenience. For example, by adding a `@flowcept_task` decorator on functions, FlowCept will observe their executions when they run. Also, we provide special features for PyTorch modules. Adding `@torch_task` to them will enable extra model inspection to be captured and integrated in the database at runtime.
All users need to do is to create adapters for their systems or tools, if one is not available yet.
It does not require changes in user codes or systems (i.e., instrumentation). All users need to do is to create adapters for their systems or tools, if one is not available yet.
In addition to observability, we provide instrumentation options for convenience. For example, by adding a `@flowcept_task` decorator on functions, FlowCept will observe their executions when they run. Also, we provide special features for PyTorch modules. Adding `@torch_task` to them will enable extra model inspection to be captured and integrated in the database at runtime.


Currently, FlowCept provides adapters for: [Dask](https://www.dask.org/), [MLFlow](https://mlflow.org/), [TensorBoard](https://www.tensorflow.org/tensorboard), and [Zambeze](https://github.com/ORNL/zambeze).

Expand Down
12 changes: 6 additions & 6 deletions flowcept/commons/daos/document_db_dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ def _create_indices(self):
list(x["key"].keys())[0]
for x in self._tasks_collection.list_indexes()
]
if not TaskObject.task_id_field() in existing_indices:
if TaskObject.task_id_field() not in existing_indices:
self._tasks_collection.create_index(
TaskObject.task_id_field(), unique=True
)
if not TaskObject.workflow_id_field() in existing_indices:
if TaskObject.workflow_id_field() not in existing_indices:
self._tasks_collection.create_index(
TaskObject.workflow_id_field()
)
Expand All @@ -72,7 +72,7 @@ def _create_indices(self):
list(x["key"].keys())[0]
for x in self._wfs_collection.list_indexes()
]
if not WorkflowObject.workflow_id_field() in existing_indices:
if WorkflowObject.workflow_id_field() not in existing_indices:
self._wfs_collection.create_index(
WorkflowObject.workflow_id_field(), unique=True
)
Expand All @@ -83,14 +83,14 @@ def _create_indices(self):
for x in self._obj_collection.list_indexes()
]

if not "object_id" in existing_indices:
if "object_id" not in existing_indices:
self._obj_collection.create_index("object_id", unique=True)

if not WorkflowObject.workflow_id_field() in existing_indices:
if WorkflowObject.workflow_id_field() not in existing_indices:
self._obj_collection.create_index(
WorkflowObject.workflow_id_field(), unique=False
)
if not TaskObject.task_id_field() in existing_indices:
if TaskObject.task_id_field() not in existing_indices:
self._obj_collection.create_index(
TaskObject.task_id_field(), unique=False
)
Expand Down
2 changes: 1 addition & 1 deletion resources/sample_settings.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ experiment:
campaign_id: super_campaign

mq:
type: redis
type: redis # or kafka; if kafka, please adjust the port
host: localhost
# instances: ["localhost:6379"] # We can have multiple redis instances being accessed by the consumers but each interceptor will currently access one single redis.
port: 6379
Expand Down

0 comments on commit 1b93444

Please sign in to comment.