Skip to content

Commit

Permalink
log attempt id, log elapsed since task execution start, remove log sp…
Browse files Browse the repository at this point in the history
…am (#3539)

* log attempt id, log elapsed since task execution start, remove log spam

* diagnostic lock logs

---------

Co-authored-by: Richard Kuo (Danswer) <[email protected]>
  • Loading branch information
rkuo-danswer and Richard Kuo (Danswer) authored Dec 21, 2024
1 parent b9567ea commit eb369ca
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 8 deletions.
30 changes: 27 additions & 3 deletions backend/onyx/background/celery/tasks/indexing/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from datetime import timezone
from http import HTTPStatus
from time import sleep
from typing import cast

import redis
import sentry_sdk
Expand Down Expand Up @@ -100,13 +101,37 @@ def progress(self, tag: str, amount: int) -> None:
self.last_lock_reacquire = datetime.now(timezone.utc)
except LockError:
logger.exception(
f"IndexingCallback - lock.reacquire exceptioned. "
f"IndexingCallback - lock.reacquire exceptioned: "
f"lock_timeout={self.redis_lock.timeout} "
f"start={self.started} "
f"last_tag={self.last_tag} "
f"last_reacquired={self.last_lock_reacquire} "
f"now={datetime.now(timezone.utc)}"
)

# diagnostic logging for lock errors
name = self.redis_lock.name
ttl = self.redis_client.ttl(name)
locked = self.redis_lock.locked()
owned = self.redis_lock.owned()
local_token: str | None = self.redis_lock.local.token # type: ignore

remote_token_raw = self.redis_client.get(self.redis_lock.name)
if remote_token_raw:
remote_token_bytes = cast(bytes, remote_token_raw)
remote_token = remote_token_bytes.decode("utf-8")
else:
remote_token = None

logger.warning(
f"IndexingCallback - lock diagnostics: "
f"name={name} "
f"locked={locked} "
f"owned={owned} "
f"local_token={local_token} "
f"remote_token={remote_token} "
f"ttl={ttl}"
)
raise

self.redis_client.incrby(self.generator_progress_key, amount)
Expand Down Expand Up @@ -325,7 +350,6 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
# tasks can be in the queue in redis, in reserved tasks (prefetched by the worker),
# or be currently executing
try:
task_logger.info("Validating indexing fences...")
validate_indexing_fences(
tenant_id, self.app, redis_client, redis_client_celery, lock_beat
)
Expand Down Expand Up @@ -363,7 +387,7 @@ def validate_indexing_fences(
lock_beat: RedisLock,
) -> None:
reserved_indexing_tasks = celery_get_unacked_task_ids(
"connector_indexing", r_celery
OnyxCeleryQueues.CONNECTOR_INDEXING, r_celery
)

# validate all existing indexing jobs
Expand Down
21 changes: 16 additions & 5 deletions backend/onyx/background/celery/tasks/vespa/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ def check_for_vespa_sync_task(self: Task, *, tenant_id: str | None) -> None:
lock_beat.release()

time_elapsed = time.monotonic() - time_start
task_logger.info(f"check_for_vespa_sync_task finished: elapsed={time_elapsed:.2f}")
task_logger.debug(f"check_for_vespa_sync_task finished: elapsed={time_elapsed:.2f}")
return


Expand Down Expand Up @@ -637,15 +637,23 @@ def monitor_ccpair_indexing_taskset(
if not payload:
return

elapsed_started_str = None
if payload.started:
elapsed_started = datetime.now(timezone.utc) - payload.started
elapsed_started_str = f"{elapsed_started.total_seconds():.2f}"

elapsed_submitted = datetime.now(timezone.utc) - payload.submitted

progress = redis_connector_index.get_progress()
if progress is not None:
task_logger.info(
f"Connector indexing progress: cc_pair={cc_pair_id} "
f"Connector indexing progress: "
f"attempt={payload.index_attempt_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id} "
f"progress={progress} "
f"elapsed_submitted={elapsed_submitted.total_seconds():.2f}"
f"elapsed_submitted={elapsed_submitted.total_seconds():.2f} "
f"elapsed_started={elapsed_started_str}"
)

if payload.index_attempt_id is None or payload.celery_task_id is None:
Expand Down Expand Up @@ -716,11 +724,14 @@ def monitor_ccpair_indexing_taskset(
status_enum = HTTPStatus(status_int)

task_logger.info(
f"Connector indexing finished: cc_pair={cc_pair_id} "
f"Connector indexing finished: "
f"attempt={payload.index_attempt_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id} "
f"progress={progress} "
f"status={status_enum.name} "
f"elapsed_submitted={elapsed_submitted.total_seconds():.2f}"
f"elapsed_submitted={elapsed_submitted.total_seconds():.2f} "
f"elapsed_started={elapsed_started_str}"
)

redis_connector_index.reset()
Expand Down

0 comments on commit eb369ca

Please sign in to comment.