Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Indexing Frozen #660

Merged
merged 2 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 22 additions & 5 deletions backend/danswer/background/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from datetime import datetime
from datetime import timezone

import dask
import torch
from dask.distributed import Client
from dask.distributed import Future
Expand Down Expand Up @@ -44,6 +45,10 @@

logger = setup_logger()

# If the indexing dies, it's most likely due to resource constraints,
# restarting just delays the eventual failure, not useful to the user
dask.config.set({"distributed.scheduler.allowed-failures": 0})

_UNEXPECTED_STATE_FAILURE_REASON = (
"Stopped mid run, likely due to the background process being killed"
)
Expand Down Expand Up @@ -144,6 +149,9 @@ def cleanup_indexing_jobs(
if not job.done():
continue

if job.status == "error":
logger.error(job.exception())

job.release()
del existing_jobs_copy[attempt_id]
index_attempt = get_index_attempt(
Expand All @@ -156,7 +164,7 @@ def cleanup_indexing_jobs(
)
continue

if index_attempt.status == IndexingStatus.IN_PROGRESS:
if index_attempt.status == IndexingStatus.IN_PROGRESS or job.status == "error":
mark_run_failed(
db_session=db_session,
index_attempt=index_attempt,
Expand Down Expand Up @@ -286,10 +294,10 @@ def _index(
run_dt=run_dt,
)

net_doc_change = 0
document_count = 0
chunk_count = 0
try:
net_doc_change = 0
document_count = 0
chunk_count = 0
for doc_batch in doc_batch_generator:
logger.debug(
f"Indexing batch of documents: {[doc.to_short_descriptor() for doc in doc_batch]}"
Expand Down Expand Up @@ -419,7 +427,14 @@ def kickoff_indexing_jobs(
existing_jobs_copy = existing_jobs.copy()

new_indexing_attempts = get_not_started_index_attempts(db_session)
logger.info(f"Found {len(new_indexing_attempts)} new indexing tasks.")
new_tasks = len(
[
attempt.id
for attempt in new_indexing_attempts
if attempt.id not in existing_jobs
]
)
logger.info(f"Found {new_tasks} new indexing tasks.")

if not new_indexing_attempts:
return existing_jobs
Expand All @@ -440,6 +455,8 @@ def kickoff_indexing_jobs(
)
continue

# For jobs waiting in the queue that haven't started
# Also rarely for jobs that started but haven't updated the indexing tables yet
if attempt.id in existing_jobs:
continue

Expand Down
2 changes: 1 addition & 1 deletion backend/danswer/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,5 +211,5 @@ def startup_event() -> None:


if __name__ == "__main__":
logger.info(f"Running QA Service on http://{APP_HOST}:{str(APP_PORT)}/")
logger.info(f"Starting Danswer Backend on http://{APP_HOST}:{str(APP_PORT)}/")
uvicorn.run(app, host=APP_HOST, port=APP_PORT)
8 changes: 5 additions & 3 deletions backend/supervisord.conf
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
[supervisord]
nodaemon=true
logfile=/dev/stdout
logfile_maxbytes=0
logfile=/var/log/supervisord.log

[program:indexing]
# Indexing is the heaviest job, also requires some CPU intensive steps
# Cannot place this in Celery for now because Celery must run as a single process (see note below)
# Indexing uses multi-processing to speed things up
[program:document_indexing]
command=python danswer/background/update.py
stdout_logfile=/var/log/update.log
stdout_logfile_maxbytes=52428800
Expand Down