Skip to content

Commit

Permalink
Fully remove visit API (#3621)
Browse files Browse the repository at this point in the history
* v1

* update indexing logic

* update updates

* nit

* clean up args

* update for clarity + best practices

* nit + logs

* fix

* minor clean up

* remove logs

* quick nit
  • Loading branch information
pablonyx authored Jan 8, 2025
1 parent eac73a1 commit d7bc32c
Show file tree
Hide file tree
Showing 15 changed files with 383 additions and 240 deletions.
30 changes: 26 additions & 4 deletions backend/onyx/background/celery/tasks/shared/RetryDocumentIndex.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,35 @@ def __init__(self, index: DocumentIndex):
wait=wait_random_exponential(multiplier=1, max=MAX_WAIT),
stop=stop_after_delay(STOP_AFTER),
)
def delete_single(self, doc_id: str) -> int:
return self.index.delete_single(doc_id)
def delete_single(
self,
doc_id: str,
*,
tenant_id: str | None,
chunk_count: int | None,
) -> int:
return self.index.delete_single(
doc_id,
tenant_id=tenant_id,
chunk_count=chunk_count,
)

@retry(
retry=retry_if_exception_type(httpx.ReadTimeout),
wait=wait_random_exponential(multiplier=1, max=MAX_WAIT),
stop=stop_after_delay(STOP_AFTER),
)
def update_single(self, doc_id: str, fields: VespaDocumentFields) -> int:
return self.index.update_single(doc_id, fields)
def update_single(
self,
doc_id: str,
*,
tenant_id: str | None,
chunk_count: int | None,
fields: VespaDocumentFields,
) -> int:
return self.index.update_single(
doc_id,
tenant_id=tenant_id,
chunk_count=chunk_count,
fields=fields,
)
16 changes: 14 additions & 2 deletions backend/onyx/background/celery/tasks/shared/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from onyx.configs.constants import OnyxCeleryTask
from onyx.db.document import delete_document_by_connector_credential_pair__no_commit
from onyx.db.document import delete_documents_complete__no_commit
from onyx.db.document import fetch_chunk_count_for_document
from onyx.db.document import get_document
from onyx.db.document import get_document_connector_count
from onyx.db.document import mark_document_as_modified
Expand Down Expand Up @@ -80,7 +81,13 @@ def document_by_cc_pair_cleanup_task(
# delete it from vespa and the db
action = "delete"

chunks_affected = retry_index.delete_single(document_id)
chunk_count = fetch_chunk_count_for_document(document_id, db_session)

chunks_affected = retry_index.delete_single(
document_id,
tenant_id=tenant_id,
chunk_count=chunk_count,
)
delete_documents_complete__no_commit(
db_session=db_session,
document_ids=[document_id],
Expand Down Expand Up @@ -110,7 +117,12 @@ def document_by_cc_pair_cleanup_task(
)

# update Vespa. OK if doc doesn't exist. Raises exception otherwise.
chunks_affected = retry_index.update_single(document_id, fields=fields)
chunks_affected = retry_index.update_single(
document_id,
tenant_id=tenant_id,
chunk_count=doc.chunk_count,
fields=fields,
)

# there are still other cc_pair references to the doc, so just resync to Vespa
delete_document_by_connector_credential_pair__no_commit(
Expand Down
7 changes: 6 additions & 1 deletion backend/onyx/background/celery/tasks/vespa/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -992,7 +992,12 @@ def vespa_metadata_sync_task(
)

# update Vespa. OK if doc doesn't exist. Raises exception otherwise.
chunks_affected = retry_index.update_single(document_id, fields)
chunks_affected = retry_index.update_single(
document_id,
tenant_id=tenant_id,
chunk_count=doc.chunk_count,
fields=fields,
)

# update db last. Worst case = we crash right before this and
# the sync might repeat again later
Expand Down
23 changes: 15 additions & 8 deletions backend/onyx/db/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -685,20 +685,27 @@ def get_document_sources(
def fetch_chunk_counts_for_documents(
document_ids: list[str],
db_session: Session,
) -> list[tuple[str, int | None]]:
) -> list[tuple[str, int]]:
"""
Return a list of (document_id, chunk_count) tuples.
Note: chunk_count might be None if not set in DB,
so we declare it as Optional[int].
If a document_id is not found in the database, it will be returned with a chunk_count of 0.
"""
stmt = select(DbDocument.id, DbDocument.chunk_count).where(
DbDocument.id.in_(document_ids)
)

# results is a list of 'Row' objects, each containing two columns
results = db_session.execute(stmt).all()

# If DbDocument.id is guaranteed to be a string, you can just do row.id;
# otherwise cast to str if you need to be sure it's a string:
return [(str(row[0]), row[1]) for row in results]
# or row.id, row.chunk_count if they are named attributes in your ORM model
# Create a dictionary of document_id to chunk_count
chunk_counts = {str(row.id): row.chunk_count or 0 for row in results}

# Return a list of tuples, using 0 for documents not found in the database
return [(doc_id, chunk_counts.get(doc_id, 0)) for doc_id in document_ids]


def fetch_chunk_count_for_document(
document_id: str,
db_session: Session,
) -> int | None:
stmt = select(DbDocument.chunk_count).where(DbDocument.id == document_id)
return db_session.execute(stmt).scalar_one_or_none()
12 changes: 8 additions & 4 deletions backend/onyx/document_index/document_index_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from onyx.db.search_settings import get_secondary_search_settings
from onyx.document_index.interfaces import EnrichedDocumentIndexingInfo
from onyx.indexing.models import DocMetadataAwareIndexChunk

from shared_configs.configs import MULTI_TENANT

DEFAULT_BATCH_SIZE = 30
DEFAULT_INDEX_NAME = "danswer_chunk"
Expand Down Expand Up @@ -37,7 +37,10 @@ def translate_boost_count_to_multiplier(boost: int) -> float:
return 2 / (1 + math.exp(-1 * boost / 3))


def assemble_document_chunk_info(
# Assembles a list of Vespa chunk IDs for a document
# given the required context. This can be used to directly query
# Vespa's Document API.
def get_document_chunk_ids(
enriched_document_info_list: list[EnrichedDocumentIndexingInfo],
tenant_id: str | None,
large_chunks_enabled: bool,
Expand Down Expand Up @@ -110,10 +113,11 @@ def get_uuid_from_chunk_info(
"large_" + str(large_chunk_id) if large_chunk_id is not None else str(chunk_id)
)
unique_identifier_string = "_".join([doc_str, chunk_index])
if tenant_id:
if tenant_id and MULTI_TENANT:
unique_identifier_string += "_" + tenant_id

return uuid.uuid5(uuid.NAMESPACE_X500, unique_identifier_string)
uuid_value = uuid.uuid5(uuid.NAMESPACE_X500, unique_identifier_string)
return uuid_value


def get_uuid_from_chunk_info_old(
Expand Down
25 changes: 20 additions & 5 deletions backend/onyx/document_index/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class UpdateRequest:
Does not update any of the None fields
"""

document_ids: list[str]
minimal_document_indexing_info: list[MinimalDocumentIndexingInfo]
# all other fields except these 4 will always be left alone by the update request
access: DocumentAccess | None = None
document_sets: set[str] | None = None
Expand All @@ -136,7 +136,7 @@ def __init__(
index_name: str,
secondary_index_name: str | None,
*args: Any,
**kwargs: Any
**kwargs: Any,
) -> None:
super().__init__(*args, **kwargs)
self.index_name = index_name
Expand Down Expand Up @@ -218,7 +218,13 @@ class Deletable(abc.ABC):
"""

@abc.abstractmethod
def delete_single(self, doc_id: str) -> int:
def delete_single(
self,
doc_id: str,
*,
tenant_id: str | None,
chunk_count: int | None,
) -> int:
"""
Given a single document id, hard delete it from the document index
Expand All @@ -239,7 +245,14 @@ class Updatable(abc.ABC):
"""

@abc.abstractmethod
def update_single(self, doc_id: str, fields: VespaDocumentFields) -> int:
def update_single(
self,
doc_id: str,
*,
tenant_id: str | None,
chunk_count: int | None,
fields: VespaDocumentFields,
) -> int:
"""
Updates all chunks for a document with the specified fields.
None values mean that the field does not need an update.
Expand All @@ -257,7 +270,9 @@ def update_single(self, doc_id: str, fields: VespaDocumentFields) -> int:
raise NotImplementedError

@abc.abstractmethod
def update(self, update_requests: list[UpdateRequest]) -> None:
def update(
self, update_requests: list[UpdateRequest], *, tenant_id: str | None
) -> None:
"""
Updates some set of chunks. The document and fields to update are specified in the update
requests. Each update request in the list applies its changes to a list of document ids.
Expand Down
Loading

0 comments on commit d7bc32c

Please sign in to comment.