diff --git a/backend/onyx/document_index/interfaces.py b/backend/onyx/document_index/interfaces.py index a9c5eddc139..c97c87fd07e 100644 --- a/backend/onyx/document_index/interfaces.py +++ b/backend/onyx/document_index/interfaces.py @@ -41,8 +41,8 @@ class IndexBatchParams: Information necessary for efficiently indexing a batch of documents """ - doc_id_to_previous_chunks_indexed: dict[str, int | None] - doc_id_to_current_chunks_indexed: dict[str, int] + doc_id_to_previous_chunk_cnt: dict[str, int | None] + doc_id_to_new_chunk_cnt: dict[str, int] tenant_id: str | None large_chunks_enabled: bool diff --git a/backend/onyx/document_index/vespa/deletion.py b/backend/onyx/document_index/vespa/deletion.py index 3a42af27175..8c6f0cf6c75 100644 --- a/backend/onyx/document_index/vespa/deletion.py +++ b/backend/onyx/document_index/vespa/deletion.py @@ -1,11 +1,9 @@ import concurrent.futures -from typing import cast from uuid import UUID import httpx from retry import retry -from onyx.document_index.vespa.indexing_utils import _does_doc_chunk_exist from onyx.document_index.vespa_constants import DOCUMENT_ID_ENDPOINT from onyx.document_index.vespa_constants import NUM_THREADS from onyx.utils.logger import setup_logger @@ -22,14 +20,13 @@ def _retryable_http_delete(http_client: httpx.Client, url: str) -> None: res.raise_for_status() -@retry(tries=3, delay=1, backoff=2) def _delete_vespa_chunk( doc_chunk_id: UUID, index_name: str, http_client: httpx.Client ) -> None: try: _retryable_http_delete( http_client, - f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{cast(str, doc_chunk_id)}", + f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{doc_chunk_id}", ) except httpx.HTTPStatusError as e: logger.error(f"Failed to delete chunk, details: {e.response.text}") @@ -42,9 +39,6 @@ def delete_vespa_chunks( http_client: httpx.Client, executor: concurrent.futures.ThreadPoolExecutor | None = None, ) -> None: - if not _does_doc_chunk_exist(doc_chunk_ids[0], index_name, http_client): - raise ValueError(f"Chunk {doc_chunk_ids[0]} does not exist in Vespa!!!") - external_executor = True if not executor: diff --git a/backend/onyx/document_index/vespa/index.py b/backend/onyx/document_index/vespa/index.py index a86eac2a807..eccd79252a3 100644 --- a/backend/onyx/document_index/vespa/index.py +++ b/backend/onyx/document_index/vespa/index.py @@ -316,12 +316,8 @@ def index( multiple chunk batches calling this function multiple times, otherwise only the last set of chunks will be kept""" - doc_id_to_previous_chunks_indexed = ( - index_batch_params.doc_id_to_previous_chunks_indexed - ) - doc_id_to_current_chunks_indexed = ( - index_batch_params.doc_id_to_current_chunks_indexed - ) + doc_id_to_previous_chunk_cnt = index_batch_params.doc_id_to_previous_chunk_cnt + doc_id_to_new_chunk_cnt = index_batch_params.doc_id_to_new_chunk_cnt tenant_id = index_batch_params.tenant_id large_chunks_enabled = index_batch_params.large_chunks_enabled @@ -336,32 +332,33 @@ def index( concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor, get_vespa_http_client() as http_client, ): + # We require the start and end index for each document in order to + # know precisely which chunks to delete. This information exists for + # documents that have `chunk_count` in the database, but not for + # `old_version` documents. + enriched_doc_infos: list[EnrichedDocumentIndexingInfo] = [] - for document_id, _ in doc_id_to_previous_chunks_indexed.items(): - last_indexed_chunk = doc_id_to_previous_chunks_indexed.get( - document_id, None - ) + for document_id, _ in doc_id_to_previous_chunk_cnt.items(): + last_indexed_chunk = doc_id_to_previous_chunk_cnt.get(document_id, None) + # If the document has no `chunk_count` in the database, we know that it + # has the old chunk ID system and we must check for the final chunk index is_old_version = False if last_indexed_chunk is None: is_old_version = True minimal_doc_info = MinimalDocumentIndexingInfo( doc_id=document_id, - chunk_start_index=doc_id_to_current_chunks_indexed.get( - document_id, 0 - ), + chunk_start_index=doc_id_to_new_chunk_cnt.get(document_id, 0), ) last_indexed_chunk = check_for_final_chunk_existence( minimal_doc_info=minimal_doc_info, - start_index=doc_id_to_current_chunks_indexed[document_id], + start_index=doc_id_to_new_chunk_cnt[document_id], index_name=self.index_name, http_client=http_client, ) enriched_doc_info = EnrichedDocumentIndexingInfo( doc_id=document_id, - chunk_start_index=doc_id_to_current_chunks_indexed.get( - document_id, 0 - ), + chunk_start_index=doc_id_to_new_chunk_cnt.get(document_id, 0), chunk_end_index=last_indexed_chunk, old_version=is_old_version, ) diff --git a/backend/onyx/indexing/indexing_pipeline.py b/backend/onyx/indexing/indexing_pipeline.py index e6a1ea7ae73..1a2e73b2ab1 100644 --- a/backend/onyx/indexing/indexing_pipeline.py +++ b/backend/onyx/indexing/indexing_pipeline.py @@ -383,7 +383,7 @@ def index_doc_batch( ) } - doc_id_to_previous_chunks_indexed: dict[str, int | None] = { + doc_id_to_previous_chunk_cnt: dict[str, int | None] = { document_id: chunk_count for document_id, chunk_count in fetch_chunk_counts_for_documents( document_ids=updatable_ids, @@ -391,7 +391,7 @@ def index_doc_batch( ) } - doc_id_to_current_chunks_indexed: dict[str, int] = { + doc_id_to_new_chunk_cnt: dict[str, int] = { document_id: len( [ chunk @@ -433,8 +433,8 @@ def index_doc_batch( insertion_records = document_index.index( chunks=access_aware_chunks, index_batch_params=IndexBatchParams( - doc_id_to_previous_chunks_indexed=doc_id_to_previous_chunks_indexed, - doc_id_to_current_chunks_indexed=doc_id_to_current_chunks_indexed, + doc_id_to_previous_chunk_cnt=doc_id_to_previous_chunk_cnt, + doc_id_to_new_chunk_cnt=doc_id_to_new_chunk_cnt, tenant_id=tenant_id, large_chunks_enabled=chunker.enable_large_chunks, ), @@ -465,7 +465,7 @@ def index_doc_batch( update_docs_chunk_count__no_commit( document_ids=updatable_ids, - doc_id_to_chunk_count=doc_id_to_current_chunks_indexed, + doc_id_to_chunk_count=doc_id_to_new_chunk_cnt, db_session=db_session, ) diff --git a/backend/onyx/seeding/load_docs.py b/backend/onyx/seeding/load_docs.py index 8f805bb6acf..44d0750e02f 100644 --- a/backend/onyx/seeding/load_docs.py +++ b/backend/onyx/seeding/load_docs.py @@ -222,8 +222,8 @@ def seed_initial_documents( index_with_retries( chunks=chunks, index_batch_params=IndexBatchParams( - doc_id_to_previous_chunks_indexed={}, - doc_id_to_current_chunks_indexed={}, + doc_id_to_previous_chunk_cnt={}, + doc_id_to_new_chunk_cnt={}, large_chunks_enabled=False, tenant_id=tenant_id, ), diff --git a/backend/scripts/query_time_check/seed_dummy_docs.py b/backend/scripts/query_time_check/seed_dummy_docs.py index 861c701cdd8..ce71b1d28b6 100644 --- a/backend/scripts/query_time_check/seed_dummy_docs.py +++ b/backend/scripts/query_time_check/seed_dummy_docs.py @@ -108,8 +108,8 @@ def do_insertion( insertion_records = vespa_index.index( chunks=all_chunks, index_batch_params=IndexBatchParams( - doc_id_to_previous_chunks_indexed={}, - doc_id_to_current_chunks_indexed={}, + doc_id_to_previous_chunk_cnt={}, + doc_id_to_new_chunk_cnt={}, tenant_id=POSTGRES_DEFAULT_SCHEMA, large_chunks_enabled=False, ),