Skip to content

Commit

Permalink
Merge pull request #3607 from onyx-dot-app/bugfix/locking_redux
Browse files Browse the repository at this point in the history
add detailed timings to monitor vespa sync
  • Loading branch information
rkuo-danswer authored Jan 6, 2025
2 parents c8090ab + ca54bd0 commit 9800551
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 4 deletions.
4 changes: 2 additions & 2 deletions backend/onyx/background/celery/apps/primary.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,12 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:
app_base.wait_for_db(sender, **kwargs)
app_base.wait_for_vespa(sender, **kwargs)

logger.info("Running as the primary celery worker.")

# Less startup checks in multi-tenant case
if MULTI_TENANT:
return

logger.info("Running as the primary celery worker.")

# This is singleton work that should be done on startup exactly once
# by the primary worker. This is unnecessary in the multi tenant scenario
r = get_redis_client(tenant_id=None)
Expand Down
39 changes: 38 additions & 1 deletion backend/onyx/background/celery/tasks/vespa/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,13 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool:
Returns True if the task actually did work, False if it exited early to prevent overlap
"""
task_logger.info(f"monitor_vespa_sync starting: tenant={tenant_id}")

time_start = time.monotonic()

timings: dict[str, float] = {}
timings["start"] = time_start

r = get_redis_client(tenant_id=tenant_id)

lock_beat: RedisLock = r.lock(
Expand All @@ -771,6 +777,7 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool:
try:
# prevent overlapping tasks
if not lock_beat.acquire(blocking=False):
task_logger.info("monitor_vespa_sync exiting due to overlap")
return False

# print current queue lengths
Expand Down Expand Up @@ -812,20 +819,28 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool:
f"permissions_upsert={n_permissions_upsert} "
)

timings["queues"] = time.monotonic() - timings["start"]

# scan and monitor activity to completion
lock_beat.reacquire()
if r.exists(RedisConnectorCredentialPair.get_fence_key()):
monitor_connector_taskset(r)

timings["connector"] = time.monotonic() - timings["queues"]

for key_bytes in r.scan_iter(RedisConnectorDelete.FENCE_PREFIX + "*"):
lock_beat.reacquire()
monitor_connector_deletion_taskset(tenant_id, key_bytes, r)

timings["connector_deletion"] = time.monotonic() - timings["connector"]

for key_bytes in r.scan_iter(RedisDocumentSet.FENCE_PREFIX + "*"):
lock_beat.reacquire()
with get_session_with_tenant(tenant_id) as db_session:
monitor_document_set_taskset(tenant_id, key_bytes, r, db_session)

timings["document_set"] = time.monotonic() - timings["connector_deletion"]

for key_bytes in r.scan_iter(RedisUserGroup.FENCE_PREFIX + "*"):
lock_beat.reacquire()
monitor_usergroup_taskset = fetch_versioned_implementation_with_fallback(
Expand All @@ -836,31 +851,53 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool:
with get_session_with_tenant(tenant_id) as db_session:
monitor_usergroup_taskset(tenant_id, key_bytes, r, db_session)

timings["usergroup"] = time.monotonic() - timings["document_set"]

for key_bytes in r.scan_iter(RedisConnectorPrune.FENCE_PREFIX + "*"):
lock_beat.reacquire()
with get_session_with_tenant(tenant_id) as db_session:
monitor_ccpair_pruning_taskset(tenant_id, key_bytes, r, db_session)

timings["pruning"] = time.monotonic() - timings["usergroup"]

for key_bytes in r.scan_iter(RedisConnectorIndex.FENCE_PREFIX + "*"):
lock_beat.reacquire()
with get_session_with_tenant(tenant_id) as db_session:
monitor_ccpair_indexing_taskset(tenant_id, key_bytes, r, db_session)

timings["indexing"] = time.monotonic() - timings["pruning"]

for key_bytes in r.scan_iter(RedisConnectorPermissionSync.FENCE_PREFIX + "*"):
lock_beat.reacquire()
with get_session_with_tenant(tenant_id) as db_session:
monitor_ccpair_permissions_taskset(tenant_id, key_bytes, r, db_session)

timings["permissions"] = time.monotonic() - timings["indexing"]
except SoftTimeLimitExceeded:
task_logger.info(
"Soft time limit exceeded, task is being terminated gracefully."
)
finally:
if lock_beat.owned():
lock_beat.release()
else:
t = timings
task_logger.error(
"monitor_vespa_sync - Lock not owned on completion: "
f"tenant={tenant_id} "
f"queues={t.get('queues')} "
f"connector={t.get('connector')} "
f"connector_deletion={t.get('connector_deletion')} "
f"document_set={t.get('document_set')} "
f"usergroup={t.get('usergroup')} "
f"pruning={t.get('pruning')} "
f"indexing={t.get('indexing')} "
f"permissions={t.get('permissions')}"
)
redis_lock_dump(lock_beat, r)

time_elapsed = time.monotonic() - time_start
task_logger.debug(f"monitor_vespa_sync finished: elapsed={time_elapsed:.2f}")
task_logger.info(f"monitor_vespa_sync finished: elapsed={time_elapsed:.2f}")
return True


Expand Down
2 changes: 1 addition & 1 deletion backend/onyx/redis/redis_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ def redis_lock_dump(lock: RedisLock, r: Redis) -> None:
remote_token = None

logger.warning(
f"RedisLock diagnostic logging: "
f"RedisLock diagnostic: "
f"name={name} "
f"locked={locked} "
f"owned={owned} "
Expand Down

0 comments on commit 9800551

Please sign in to comment.