diff --git a/backend/danswer/background/update.py b/backend/danswer/background/update.py index ec5378dd313..0ea834e79da 100755 --- a/backend/danswer/background/update.py +++ b/backend/danswer/background/update.py @@ -3,6 +3,7 @@ from datetime import datetime from datetime import timezone +import dask import torch from dask.distributed import Client from dask.distributed import Future @@ -44,6 +45,10 @@ logger = setup_logger() +# If the indexing dies, it's most likely due to resource constraints, +# restarting just delays the eventual failure, not useful to the user +dask.config.set({"distributed.scheduler.allowed-failures": 0}) + _UNEXPECTED_STATE_FAILURE_REASON = ( "Stopped mid run, likely due to the background process being killed" ) @@ -144,6 +149,9 @@ def cleanup_indexing_jobs( if not job.done(): continue + if job.status == "error": + logger.error(job.exception()) + job.release() del existing_jobs_copy[attempt_id] index_attempt = get_index_attempt( @@ -156,7 +164,7 @@ def cleanup_indexing_jobs( ) continue - if index_attempt.status == IndexingStatus.IN_PROGRESS: + if index_attempt.status == IndexingStatus.IN_PROGRESS or job.status == "error": mark_run_failed( db_session=db_session, index_attempt=index_attempt, @@ -286,10 +294,10 @@ def _index( run_dt=run_dt, ) + net_doc_change = 0 + document_count = 0 + chunk_count = 0 try: - net_doc_change = 0 - document_count = 0 - chunk_count = 0 for doc_batch in doc_batch_generator: logger.debug( f"Indexing batch of documents: {[doc.to_short_descriptor() for doc in doc_batch]}" @@ -418,7 +426,14 @@ def kickoff_indexing_jobs( ) -> dict[int, Future]: existing_jobs_copy = existing_jobs.copy() - new_indexing_attempts = get_not_started_index_attempts(db_session) + # Don't include jobs waiting in the Dask queue that just haven't started running + # Also (rarely) don't include for jobs that started but haven't updated the indexing tables yet + new_indexing_attempts = [ + attempt + for attempt in get_not_started_index_attempts(db_session) + if attempt.id not in existing_jobs + ] + logger.info(f"Found {len(new_indexing_attempts)} new indexing tasks.") if not new_indexing_attempts: @@ -440,9 +455,6 @@ def kickoff_indexing_jobs( ) continue - if attempt.id in existing_jobs: - continue - logger.info( f"Kicking off indexing attempt for connector: '{attempt.connector.name}', " f"with config: '{attempt.connector.connector_specific_config}', and " diff --git a/backend/danswer/main.py b/backend/danswer/main.py index b0b452ec1eb..cdb621709a0 100644 --- a/backend/danswer/main.py +++ b/backend/danswer/main.py @@ -211,5 +211,5 @@ def startup_event() -> None: if __name__ == "__main__": - logger.info(f"Running QA Service on http://{APP_HOST}:{str(APP_PORT)}/") + logger.info(f"Starting Danswer Backend on http://{APP_HOST}:{str(APP_PORT)}/") uvicorn.run(app, host=APP_HOST, port=APP_PORT) diff --git a/backend/supervisord.conf b/backend/supervisord.conf index 9075f0548e7..63b9a2024d8 100644 --- a/backend/supervisord.conf +++ b/backend/supervisord.conf @@ -1,9 +1,11 @@ [supervisord] nodaemon=true -logfile=/dev/stdout -logfile_maxbytes=0 +logfile=/var/log/supervisord.log -[program:indexing] +# Indexing is the heaviest job, also requires some CPU intensive steps +# Cannot place this in Celery for now because Celery must run as a single process (see note below) +# Indexing uses multi-processing to speed things up +[program:document_indexing] command=python danswer/background/update.py stdout_logfile=/var/log/update.log stdout_logfile_maxbytes=52428800