From 4ae3b489385f72d259b8acdccfcff118782674b4 Mon Sep 17 00:00:00 2001 From: rkuo-danswer Date: Thu, 12 Dec 2024 10:47:45 -0800 Subject: [PATCH] use redis completion signal to double check exit code (#3435) Co-authored-by: Richard Kuo (Danswer) --- .../background/celery/tasks/indexing/tasks.py | 41 +++++++++++++++---- 1 file changed, 32 insertions(+), 9 deletions(-) diff --git a/backend/danswer/background/celery/tasks/indexing/tasks.py b/backend/danswer/background/celery/tasks/indexing/tasks.py index 99acb591df7..22572c0fd8e 100644 --- a/backend/danswer/background/celery/tasks/indexing/tasks.py +++ b/backend/danswer/background/celery/tasks/indexing/tasks.py @@ -640,18 +640,41 @@ def connector_indexing_proxy_task( continue if job.status == "error": + ignore_exitcode = False + exit_code: int | None = None if job.process: exit_code = job.process.exitcode - task_logger.error( - "Indexing watchdog - spawned task exceptioned: " - f"attempt={index_attempt_id} " - f"tenant={tenant_id} " - f"cc_pair={cc_pair_id} " - f"search_settings={search_settings_id} " - f"exit_code={exit_code} " - f"error={job.exception()}" - ) + + # seeing non-deterministic behavior where spawned tasks occasionally return exit code 1 + # even though logging clearly indicates that they completed successfully + # to work around this, we ignore the job error state if the completion signal is OK + status_int = redis_connector_index.get_completion() + if status_int: + status_enum = HTTPStatus(status_int) + if status_enum == HTTPStatus.OK: + ignore_exitcode = True + + if ignore_exitcode: + task_logger.warning( + "Indexing watchdog - spawned task has non-zero exit code " + "but completion signal is OK. Continuing...: " + f"attempt={index_attempt_id} " + f"tenant={tenant_id} " + f"cc_pair={cc_pair_id} " + f"search_settings={search_settings_id} " + f"exit_code={exit_code}" + ) + else: + task_logger.error( + "Indexing watchdog - spawned task exceptioned: " + f"attempt={index_attempt_id} " + f"tenant={tenant_id} " + f"cc_pair={cc_pair_id} " + f"search_settings={search_settings_id} " + f"exit_code={exit_code} " + f"error={job.exception()}" + ) job.release() break