Skip to content

Commit

Permalink
feat(dask): capture logs from dask pods (#616)
Browse files Browse the repository at this point in the history
Closes #610
  • Loading branch information
Alputer committed Jan 20, 2025
1 parent afd1400 commit 7d9e4c5
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 14 deletions.
77 changes: 68 additions & 9 deletions reana_workflow_controller/opensearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,13 @@ def __init__(
os_client: OpenSearch | None = None,
job_index: str = "fluentbit-job_log",
workflow_index: str = "fluentbit-workflow_log",
dask_index: str = "fluentbit-dask_log",
max_rows: int = 5000,
log_key: str = "log",
order: str = "asc",
job_log_matcher: str = "kubernetes.labels.job-name.keyword",
workflow_log_matcher: str = "kubernetes.labels.reana-run-batch-workflow-uuid.keyword",
dask_log_matcher: str = "kubernetes.labels.dask.org/cluster-name.keyword",
timeout: int = 5,
) -> None:
"""
Expand All @@ -91,34 +93,57 @@ def __init__(
self.os_client = os_client
self.job_index = job_index
self.workflow_index = workflow_index
self.dask_index = dask_index
self.max_rows = max_rows
self.log_key = log_key
self.order = order
self.job_log_matcher = job_log_matcher
self.workflow_log_matcher = workflow_log_matcher
self.dask_log_matcher = dask_log_matcher
self.timeout = timeout

def fetch_logs(self, id: str, index: str, match: str) -> str | None:
def fetch_logs(
self, id: str, index: str, match: str = None, matches: dict | None = None
) -> str | None:
"""
Fetch logs of a specific job or workflow.
Fetch logs of a specific job, workflow or Dask cluster.
:param id: Job or workflow ID.
:param index: Index name for logs.
:param match: Matcher for logs.
:param match: Single matcher for logs (mutually exclusive with `matches`).
:param matches: Dictionary of field-to-value pairs for multiple match conditions.
:return: Job or workflow logs.
:return: Job, workflow or Dask cluster logs matching the conditions.
"""
query = {
"query": {"match": {match: id}},
"sort": [{"@timestamp": {"order": self.order}}],
}
if matches:
# Build a bool query with multiple conditions
query = {
"query": {
"bool": {
"must": [
{"match": {field: value}}
for field, value in matches.items()
]
}
},
"sort": [{"@timestamp": {"order": self.order}}],
}
elif match:
# Build a simple single-match query
query = {
"query": {"match": {match: id}},
"sort": [{"@timestamp": {"order": self.order}}],
}
else:
logging.error("Either `match` or `matches` must be provided.")
return None

try:
response = self.os_client.search(
index=index, body=query, size=self.max_rows, timeout=self.timeout
)
except Exception as e:
logging.error("Failed to fetch logs for {0}: {1}".format(id, e))
logging.error(f"Failed to fetch logs for {id}: {e}")
return None

return self._concat_rows(response["hits"]["hits"])
Expand Down Expand Up @@ -151,6 +176,40 @@ def fetch_workflow_logs(self, workflow_id: str) -> str | None:
self.workflow_log_matcher,
)

def fetch_dask_scheduler_logs(self, workflow_id: str) -> str | None:
"""
Fetch logs of the scheduler of a Dask cluster.
:param workflow_id: Workflow ID.
:return: Dask cluster scheduler logs.
"""
return self.fetch_logs(
id=None,
index=self.dask_index,
matches={
self.dask_log_matcher: f"reana-run-dask-{workflow_id}",
"kubernetes.labels.dask.org/component": "scheduler",
},
)

def fetch_dask_worker_logs(self, workflow_id: str) -> str | None:
"""
Fetch logs of the workers of a Dask cluster.
:param workflow_id: Workflow ID.
:return: Dask cluster worker logs.
"""
return self.fetch_logs(
id=None,
index=self.dask_index,
matches={
self.dask_log_matcher: f"reana-run-dask-{workflow_id}",
"kubernetes.labels.dask.org/component": "worker",
},
)

def _concat_rows(self, rows: list) -> str | None:
"""
Concatenate log messages from rows.
Expand Down
47 changes: 42 additions & 5 deletions reana_workflow_controller/rest/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,12 @@
from werkzeug.exceptions import BadRequest, NotFound

from reana_workflow_controller.config import (
DASK_ENABLED,
PROGRESS_STATUSES,
REANA_GITLAB_HOST,
PREVIEWABLE_MIME_TYPE_PREFIXES,
)
from reana_workflow_controller.dask import requires_dask
from reana_workflow_controller.consumer import _update_workflow_status
from reana_workflow_controller.errors import (
REANAExternalCallError,
Expand Down Expand Up @@ -183,11 +185,46 @@ def build_workflow_logs(workflow, steps=None, paginate=None):

open_search_log_fetcher = build_opensearch_log_fetcher()

logs = (
open_search_log_fetcher.fetch_job_logs(job.backend_job_id)
if open_search_log_fetcher
else None
)
logs = None

if DASK_ENABLED and requires_dask(workflow):
logs = (
open_search_log_fetcher.fetch_job_logs(job.backend_job_id)
+ """
-------------------------------------------------------------------
-------------------------------------------------------------------
---------------- DASK SCHEDULER LOGS ----------------
-------------------------------------------------------------------
-------------------------------------------------------------------
"""
+ open_search_log_fetcher.fetch_dask_scheduler_logs(job.workflow_uuid)
+ """
-------------------------------------------------------------------
-------------------------------------------------------------------
---------------- DASK WORKER LOGS ----------------
-------------------------------------------------------------------
-------------------------------------------------------------------
"""
+ open_search_log_fetcher.fetch_dask_worker_logs(job.workflow_uuid)
if open_search_log_fetcher
else None
)

else:
logs = (
open_search_log_fetcher.fetch_job_logs(job.backend_job_id)
if open_search_log_fetcher
else None
)

item = {
"workflow_uuid": str(job.workflow_uuid) or "",
Expand Down

0 comments on commit 7d9e4c5

Please sign in to comment.