Skip to content

Commit

Permalink
feat(opensearch): capture logs from Dask cluster pods (reanahub#616)
Browse files Browse the repository at this point in the history
This commit collects logs from Dask scheduler and workers and propagates\nthem to all REANA jobs that are using the same Dask cluster. This is not\nideal, since Dask logs can become thusly duplicated for different\nworkflow steps of the workflow, which could be confusing for the user.\n\nHowever, when a user uses Dask to parallelise the workflow jobs, usually\nthe workflow steps are defined only within Dask, so this situation does\nnot occur. Hence we can afford doing this in usual real-life conditions.\n\nSeparating Dask scheduler and worker logs from regular Kubernetes job\nlogs would require a larger architectural change and is therefore\ndeferred to a future commit.

Closes reanahub#610
  • Loading branch information
Alputer committed Jan 22, 2025
1 parent afd1400 commit fc03fb9
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 14 deletions.
77 changes: 68 additions & 9 deletions reana_workflow_controller/opensearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,13 @@ def __init__(
os_client: OpenSearch | None = None,
job_index: str = "fluentbit-job_log",
workflow_index: str = "fluentbit-workflow_log",
dask_index: str = "fluentbit-dask_log",
max_rows: int = 5000,
log_key: str = "log",
order: str = "asc",
job_log_matcher: str = "kubernetes.labels.job-name.keyword",
workflow_log_matcher: str = "kubernetes.labels.reana-run-batch-workflow-uuid.keyword",
dask_log_matcher: str = "kubernetes.labels.dask.org/cluster-name.keyword",
timeout: int = 5,
) -> None:
"""
Expand All @@ -91,34 +93,57 @@ def __init__(
self.os_client = os_client
self.job_index = job_index
self.workflow_index = workflow_index
self.dask_index = dask_index
self.max_rows = max_rows
self.log_key = log_key
self.order = order
self.job_log_matcher = job_log_matcher
self.workflow_log_matcher = workflow_log_matcher
self.dask_log_matcher = dask_log_matcher
self.timeout = timeout

def fetch_logs(self, id: str, index: str, match: str) -> str | None:
def fetch_logs(
self, id: str, index: str, match: str = None, matches: dict | None = None
) -> str | None:
"""
Fetch logs of a specific job or workflow.
Fetch logs of a specific job, workflow or Dask cluster.
:param id: Job or workflow ID.
:param index: Index name for logs.
:param match: Matcher for logs.
:param match: Single matcher for logs (mutually exclusive with `matches`).
:param matches: Dictionary of field-to-value pairs for multiple match conditions.
:return: Job or workflow logs.
:return: Job, workflow or Dask cluster logs matching the conditions.
"""
query = {
"query": {"match": {match: id}},
"sort": [{"@timestamp": {"order": self.order}}],
}
if matches:
# Build a bool query with multiple conditions
query = {
"query": {
"bool": {
"must": [
{"match": {field: value}}
for field, value in matches.items()
]
}
},
"sort": [{"@timestamp": {"order": self.order}}],
}
elif match:
# Build a simple single-match query
query = {
"query": {"match": {match: id}},
"sort": [{"@timestamp": {"order": self.order}}],
}
else:
logging.error("Either `match` or `matches` must be provided.")
return None

try:
response = self.os_client.search(
index=index, body=query, size=self.max_rows, timeout=self.timeout
)
except Exception as e:
logging.error("Failed to fetch logs for {0}: {1}".format(id, e))
logging.error(f"Failed to fetch logs for {id}: {e}")
return None

return self._concat_rows(response["hits"]["hits"])
Expand Down Expand Up @@ -151,6 +176,40 @@ def fetch_workflow_logs(self, workflow_id: str) -> str | None:
self.workflow_log_matcher,
)

def fetch_dask_scheduler_logs(self, workflow_id: str) -> str | None:
"""
Fetch logs of the scheduler of a Dask cluster.
:param workflow_id: Workflow ID.
:return: Dask cluster scheduler logs.
"""
return self.fetch_logs(
id=None,
index=self.dask_index,
matches={
self.dask_log_matcher: f"reana-run-dask-{workflow_id}",
"kubernetes.labels.dask.org/component": "scheduler",
},
)

def fetch_dask_worker_logs(self, workflow_id: str) -> str | None:
"""
Fetch logs of the workers of a Dask cluster.
:param workflow_id: Workflow ID.
:return: Dask cluster worker logs.
"""
return self.fetch_logs(
id=None,
index=self.dask_index,
matches={
self.dask_log_matcher: f"reana-run-dask-{workflow_id}",
"kubernetes.labels.dask.org/component": "worker",
},
)

def _concat_rows(self, rows: list) -> str | None:
"""
Concatenate log messages from rows.
Expand Down
53 changes: 48 additions & 5 deletions reana_workflow_controller/rest/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,12 @@
from werkzeug.exceptions import BadRequest, NotFound

from reana_workflow_controller.config import (
DASK_ENABLED,
PROGRESS_STATUSES,
REANA_GITLAB_HOST,
PREVIEWABLE_MIME_TYPE_PREFIXES,
)
from reana_workflow_controller.dask import requires_dask
from reana_workflow_controller.consumer import _update_workflow_status
from reana_workflow_controller.errors import (
REANAExternalCallError,
Expand Down Expand Up @@ -183,11 +185,52 @@ def build_workflow_logs(workflow, steps=None, paginate=None):

open_search_log_fetcher = build_opensearch_log_fetcher()

logs = (
open_search_log_fetcher.fetch_job_logs(job.backend_job_id)
if open_search_log_fetcher
else None
)
logs = None

if DASK_ENABLED and requires_dask(workflow):
logs = (
(open_search_log_fetcher.fetch_job_logs(job.backend_job_id) or "")
+ """
-------------------------------------------------------------------
-------------------------------------------------------------------
---------------- DASK SCHEDULER LOGS ----------------
-------------------------------------------------------------------
-------------------------------------------------------------------
"""
+ (
open_search_log_fetcher.fetch_dask_scheduler_logs(job.workflow_uuid)
or ""
)
+ """
-------------------------------------------------------------------
-------------------------------------------------------------------
---------------- DASK WORKER LOGS ----------------
-------------------------------------------------------------------
-------------------------------------------------------------------
"""
+ (
open_search_log_fetcher.fetch_dask_worker_logs(job.workflow_uuid)
or ""
)
if open_search_log_fetcher
else None
)

else:
logs = (
open_search_log_fetcher.fetch_job_logs(job.backend_job_id)
if open_search_log_fetcher
else None
)

item = {
"workflow_uuid": str(job.workflow_uuid) or "",
Expand Down

0 comments on commit fc03fb9

Please sign in to comment.