Skip to content

Commit

Permalink
add debug logging for a particular timeout issue
Browse files Browse the repository at this point in the history
  • Loading branch information
Richard Kuo (Danswer) committed Jan 7, 2025
1 parent d93c34f commit 2eaa7c8
Showing 1 changed file with 41 additions and 12 deletions.
53 changes: 41 additions & 12 deletions backend/onyx/background/celery/tasks/indexing/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,10 @@ def get_unfenced_index_attempt_ids(db_session: Session, r: redis.Redis) -> list[
def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
"""a lightweight task used to kick off indexing tasks.
Occcasionally does some validation of existing state to clear up error conditions"""
debug_tenants = {
"tenant_i-043470d740845ec56",
"tenant_82b497ce-88aa-4fbd-841a-92cae43529c8",
}
time_start = time.monotonic()

tasks_created = 0
Expand Down Expand Up @@ -247,29 +251,27 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
)

# gather cc_pair_ids
lock_beat.reacquire()
cc_pair_ids: list[int] = []
with get_session_with_tenant(tenant_id) as db_session:
lock_beat.reacquire()
cc_pairs = fetch_connector_credential_pairs(db_session)
for cc_pair_entry in cc_pairs:
cc_pair_ids.append(cc_pair_entry.id)

# kick off index attempts
for cc_pair_id in cc_pair_ids:
lock_beat.reacquire()

# debugging logic - remove after we're done
if (
tenant_id == "tenant_i-043470d740845ec56"
or tenant_id == "tenant_82b497ce-88aa-4fbd-841a-92cae43529c8"
):
if tenant_id in debug_tenants:
ttl = redis_client.ttl(OnyxRedisLocks.CHECK_INDEXING_BEAT_LOCK)
task_logger.info(
f"check_for_indexing lock: "
f"check_for_indexing cc_pair lock: "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"ttl={redis_client.ttl(OnyxRedisLocks.CHECK_INDEXING_BEAT_LOCK)}"
f"ttl={ttl}"
)

lock_beat.reacquire()

redis_connector = RedisConnector(tenant_id, cc_pair_id)
with get_session_with_tenant(tenant_id) as db_session:
search_settings_list: list[SearchSettings] = get_active_search_settings(
Expand Down Expand Up @@ -344,14 +346,33 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
)
tasks_created += 1

# debugging logic - remove after we're done
if tenant_id in debug_tenants:
ttl = redis_client.ttl(OnyxRedisLocks.CHECK_INDEXING_BEAT_LOCK)
task_logger.info(
f"check_for_indexing unfenced lock: "
f"tenant={tenant_id} "
f"ttl={ttl}"
)

lock_beat.reacquire()

# Fail any index attempts in the DB that don't have fences
# This shouldn't ever happen!
with get_session_with_tenant(tenant_id) as db_session:
lock_beat.reacquire()
unfenced_attempt_ids = get_unfenced_index_attempt_ids(
db_session, redis_client
)
for attempt_id in unfenced_attempt_ids:
# debugging logic - remove after we're done
if tenant_id in debug_tenants:
ttl = redis_client.ttl(OnyxRedisLocks.CHECK_INDEXING_BEAT_LOCK)
task_logger.info(
f"check_for_indexing unfenced attempt id lock: "
f"tenant={tenant_id} "
f"ttl={ttl}"
)

lock_beat.reacquire()

attempt = get_index_attempt(db_session, attempt_id)
Expand All @@ -369,9 +390,18 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
attempt.id, db_session, failure_reason=failure_reason
)

# debugging logic - remove after we're done
if tenant_id in debug_tenants:
ttl = redis_client.ttl(OnyxRedisLocks.CHECK_INDEXING_BEAT_LOCK)
task_logger.info(
f"check_for_indexing validate fences lock: "
f"tenant={tenant_id} "
f"ttl={ttl}"
)

lock_beat.reacquire()
# we want to run this less frequently than the overall task
if not redis_client.exists(OnyxRedisSignals.VALIDATE_INDEXING_FENCES):
lock_beat.reacquire()
# clear any indexing fences that don't have associated celery tasks in progress
# tasks can be in the queue in redis, in reserved tasks (prefetched by the worker),
# or be currently executing
Expand All @@ -383,7 +413,6 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
task_logger.exception("Exception while validating indexing fences")

redis_client.set(OnyxRedisSignals.VALIDATE_INDEXING_FENCES, 1, ex=60)

except SoftTimeLimitExceeded:
task_logger.info(
"Soft time limit exceeded, task is being terminated gracefully."
Expand Down

0 comments on commit 2eaa7c8

Please sign in to comment.