Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
pablonyx committed Jan 5, 2025
1 parent 2251093 commit c5c14ec
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 35 deletions.
4 changes: 2 additions & 2 deletions backend/onyx/document_index/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ class IndexBatchParams:
Information necessary for efficiently indexing a batch of documents
"""

doc_id_to_previous_chunks_indexed: dict[str, int | None]
doc_id_to_current_chunks_indexed: dict[str, int]
doc_id_to_previous_chunk_cnt: dict[str, int | None]
doc_id_to_new_chunk_cnt: dict[str, int]
tenant_id: str | None
large_chunks_enabled: bool

Expand Down
8 changes: 1 addition & 7 deletions backend/onyx/document_index/vespa/deletion.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import concurrent.futures
from typing import cast
from uuid import UUID

import httpx
from retry import retry

from onyx.document_index.vespa.indexing_utils import _does_doc_chunk_exist
from onyx.document_index.vespa_constants import DOCUMENT_ID_ENDPOINT
from onyx.document_index.vespa_constants import NUM_THREADS
from onyx.utils.logger import setup_logger
Expand All @@ -22,14 +20,13 @@ def _retryable_http_delete(http_client: httpx.Client, url: str) -> None:
res.raise_for_status()


@retry(tries=3, delay=1, backoff=2)
def _delete_vespa_chunk(
doc_chunk_id: UUID, index_name: str, http_client: httpx.Client
) -> None:
try:
_retryable_http_delete(
http_client,
f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{cast(str, doc_chunk_id)}",
f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{doc_chunk_id}",
)
except httpx.HTTPStatusError as e:
logger.error(f"Failed to delete chunk, details: {e.response.text}")
Expand All @@ -42,9 +39,6 @@ def delete_vespa_chunks(
http_client: httpx.Client,
executor: concurrent.futures.ThreadPoolExecutor | None = None,
) -> None:
if not _does_doc_chunk_exist(doc_chunk_ids[0], index_name, http_client):
raise ValueError(f"Chunk {doc_chunk_ids[0]} does not exist in Vespa!!!")

external_executor = True

if not executor:
Expand Down
31 changes: 14 additions & 17 deletions backend/onyx/document_index/vespa/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,12 +316,8 @@ def index(
multiple chunk batches calling this function multiple times, otherwise only the last set of
chunks will be kept"""

doc_id_to_previous_chunks_indexed = (
index_batch_params.doc_id_to_previous_chunks_indexed
)
doc_id_to_current_chunks_indexed = (
index_batch_params.doc_id_to_current_chunks_indexed
)
doc_id_to_previous_chunk_cnt = index_batch_params.doc_id_to_previous_chunk_cnt
doc_id_to_new_chunk_cnt = index_batch_params.doc_id_to_new_chunk_cnt
tenant_id = index_batch_params.tenant_id
large_chunks_enabled = index_batch_params.large_chunks_enabled

Expand All @@ -336,32 +332,33 @@ def index(
concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor,
get_vespa_http_client() as http_client,
):
# We require the start and end index for each document in order to
# know precisely which chunks to delete. This information exists for
# documents that have `chunk_count` in the database, but not for
# `old_version` documents.

enriched_doc_infos: list[EnrichedDocumentIndexingInfo] = []
for document_id, _ in doc_id_to_previous_chunks_indexed.items():
last_indexed_chunk = doc_id_to_previous_chunks_indexed.get(
document_id, None
)
for document_id, _ in doc_id_to_previous_chunk_cnt.items():
last_indexed_chunk = doc_id_to_previous_chunk_cnt.get(document_id, None)
# If the document has no `chunk_count` in the database, we know that it
# has the old chunk ID system and we must check for the final chunk index
is_old_version = False
if last_indexed_chunk is None:
is_old_version = True
minimal_doc_info = MinimalDocumentIndexingInfo(
doc_id=document_id,
chunk_start_index=doc_id_to_current_chunks_indexed.get(
document_id, 0
),
chunk_start_index=doc_id_to_new_chunk_cnt.get(document_id, 0),
)
last_indexed_chunk = check_for_final_chunk_existence(
minimal_doc_info=minimal_doc_info,
start_index=doc_id_to_current_chunks_indexed[document_id],
start_index=doc_id_to_new_chunk_cnt[document_id],
index_name=self.index_name,
http_client=http_client,
)

enriched_doc_info = EnrichedDocumentIndexingInfo(
doc_id=document_id,
chunk_start_index=doc_id_to_current_chunks_indexed.get(
document_id, 0
),
chunk_start_index=doc_id_to_new_chunk_cnt.get(document_id, 0),
chunk_end_index=last_indexed_chunk,
old_version=is_old_version,
)
Expand Down
10 changes: 5 additions & 5 deletions backend/onyx/indexing/indexing_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,15 +383,15 @@ def index_doc_batch(
)
}

doc_id_to_previous_chunks_indexed: dict[str, int | None] = {
doc_id_to_previous_chunk_cnt: dict[str, int | None] = {
document_id: chunk_count
for document_id, chunk_count in fetch_chunk_counts_for_documents(
document_ids=updatable_ids,
db_session=db_session,
)
}

doc_id_to_current_chunks_indexed: dict[str, int] = {
doc_id_to_new_chunk_cnt: dict[str, int] = {
document_id: len(
[
chunk
Expand Down Expand Up @@ -433,8 +433,8 @@ def index_doc_batch(
insertion_records = document_index.index(
chunks=access_aware_chunks,
index_batch_params=IndexBatchParams(
doc_id_to_previous_chunks_indexed=doc_id_to_previous_chunks_indexed,
doc_id_to_current_chunks_indexed=doc_id_to_current_chunks_indexed,
doc_id_to_previous_chunk_cnt=doc_id_to_previous_chunk_cnt,
doc_id_to_new_chunk_cnt=doc_id_to_new_chunk_cnt,
tenant_id=tenant_id,
large_chunks_enabled=chunker.enable_large_chunks,
),
Expand Down Expand Up @@ -465,7 +465,7 @@ def index_doc_batch(

update_docs_chunk_count__no_commit(
document_ids=updatable_ids,
doc_id_to_chunk_count=doc_id_to_current_chunks_indexed,
doc_id_to_chunk_count=doc_id_to_new_chunk_cnt,
db_session=db_session,
)

Expand Down
4 changes: 2 additions & 2 deletions backend/onyx/seeding/load_docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,8 @@ def seed_initial_documents(
index_with_retries(
chunks=chunks,
index_batch_params=IndexBatchParams(
doc_id_to_previous_chunks_indexed={},
doc_id_to_current_chunks_indexed={},
doc_id_to_previous_chunk_cnt={},
doc_id_to_new_chunk_cnt={},
large_chunks_enabled=False,
tenant_id=tenant_id,
),
Expand Down
4 changes: 2 additions & 2 deletions backend/scripts/query_time_check/seed_dummy_docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ def do_insertion(
insertion_records = vespa_index.index(
chunks=all_chunks,
index_batch_params=IndexBatchParams(
doc_id_to_previous_chunks_indexed={},
doc_id_to_current_chunks_indexed={},
doc_id_to_previous_chunk_cnt={},
doc_id_to_new_chunk_cnt={},
tenant_id=POSTGRES_DEFAULT_SCHEMA,
large_chunks_enabled=False,
),
Expand Down

0 comments on commit c5c14ec

Please sign in to comment.