Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved indexing #3594

Merged
merged 10 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""add chunk count to document

Revision ID: 2955778aa44c
Revises: c0aab6edb6dd
Create Date: 2025-01-04 11:39:43.268612

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = "2955778aa44c"
down_revision = "c0aab6edb6dd"
branch_labels = None
depends_on = None


def upgrade() -> None:
op.add_column("document", sa.Column("chunk_count", sa.Integer(), nullable=True))


def downgrade() -> None:
op.drop_column("document", "chunk_count")
3 changes: 3 additions & 0 deletions backend/onyx/connectors/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,11 @@ class DocumentBase(BaseModel):
source: DocumentSource | None = None
semantic_identifier: str # displayed in the UI as the main identifier for the doc
metadata: dict[str, str | list[str]]

# UTC time
doc_updated_at: datetime | None = None
chunk_count: int | None = None

# Owner, creator, etc.
primary_owners: list[BasicExpertInfo] | None = None
# Assignee, space owner, etc.
Expand Down
34 changes: 34 additions & 0 deletions backend/onyx/db/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,18 @@ def update_docs_last_modified__no_commit(
doc.last_modified = now


def update_docs_chunk_count__no_commit(
document_ids: list[str],
doc_id_to_chunk_count: dict[str, int],
db_session: Session,
) -> None:
documents_to_update = (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are document_id's and document_id_to_current_chunks_indexed the same docs? what's the difference?

db_session.query(DbDocument).filter(DbDocument.id.in_(document_ids)).all()
)
for doc in documents_to_update:
doc.chunk_count = doc_id_to_chunk_count[doc.id]


def mark_document_as_modified(
document_id: str,
db_session: Session,
Expand Down Expand Up @@ -612,3 +624,25 @@ def get_document(
stmt = select(DbDocument).where(DbDocument.id == document_id)
doc: DbDocument | None = db_session.execute(stmt).scalar_one_or_none()
return doc


def fetch_chunk_counts_for_documents(
document_ids: list[str],
db_session: Session,
) -> list[tuple[str, int | None]]:
"""
Return a list of (document_id, chunk_count) tuples.
Note: chunk_count might be None if not set in DB,
so we declare it as Optional[int].
"""
stmt = select(DbDocument.id, DbDocument.chunk_count).where(
DbDocument.id.in_(document_ids)
)

# results is a list of 'Row' objects, each containing two columns
results = db_session.execute(stmt).all()

# If DbDocument.id is guaranteed to be a string, you can just do row.id;
# otherwise cast to str if you need to be sure it's a string:
return [(str(row[0]), row[1]) for row in results]
# or row.id, row.chunk_count if they are named attributes in your ORM model
4 changes: 4 additions & 0 deletions backend/onyx/db/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,10 @@ class Document(Base):
DateTime(timezone=True), nullable=True
)

# Number of chunks in the document (in Vespa)
# Only null for documents indexed prior to this change
chunk_count: Mapped[int | None] = mapped_column(Integer, nullable=True)

# last time any vespa relevant row metadata or the doc changed.
# does not include last_synced
last_modified: Mapped[datetime.datetime | None] = mapped_column(
Expand Down
122 changes: 108 additions & 14 deletions backend/onyx/document_index/document_index_utils.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import math
import uuid
from uuid import UUID

from sqlalchemy.orm import Session

from onyx.context.search.models import InferenceChunk
from onyx.db.search_settings import get_current_search_settings
from onyx.db.search_settings import get_secondary_search_settings
from onyx.indexing.models import IndexChunk
from onyx.document_index.interfaces import EnrichedDocumentIndexingInfo
from onyx.indexing.models import DocMetadataAwareIndexChunk


DEFAULT_BATCH_SIZE = 30
Expand Down Expand Up @@ -36,25 +37,118 @@ def translate_boost_count_to_multiplier(boost: int) -> float:
return 2 / (1 + math.exp(-1 * boost / 3))


def get_uuid_from_chunk(
chunk: IndexChunk | InferenceChunk, mini_chunk_ind: int = 0
) -> uuid.UUID:
doc_str = (
chunk.document_id
if isinstance(chunk, InferenceChunk)
else chunk.source_document.id
)
def assemble_document_chunk_info(
enriched_document_info_list: list[EnrichedDocumentIndexingInfo],
tenant_id: str | None,
large_chunks_enabled: bool,
) -> list[UUID]:
doc_chunk_ids = []

for enriched_document_info in enriched_document_info_list:
for chunk_index in range(
enriched_document_info.chunk_start_index,
enriched_document_info.chunk_end_index,
):
if not enriched_document_info.old_version:
doc_chunk_ids.append(
get_uuid_from_chunk_info(
document_id=enriched_document_info.doc_id,
chunk_id=chunk_index,
tenant_id=tenant_id,
)
)
else:
doc_chunk_ids.append(
get_uuid_from_chunk_info_old(
document_id=enriched_document_info.doc_id,
chunk_id=chunk_index,
)
)

if large_chunks_enabled and chunk_index % 4 == 0:
large_chunk_id = int(chunk_index / 4)
large_chunk_reference_ids = [
large_chunk_id + i
for i in range(4)
if large_chunk_id + i < enriched_document_info.chunk_end_index
]
if enriched_document_info.old_version:
doc_chunk_ids.append(
get_uuid_from_chunk_info_old(
document_id=enriched_document_info.doc_id,
chunk_id=large_chunk_id,
large_chunk_reference_ids=large_chunk_reference_ids,
)
)
else:
doc_chunk_ids.append(
get_uuid_from_chunk_info(
document_id=enriched_document_info.doc_id,
chunk_id=large_chunk_id,
tenant_id=tenant_id,
large_chunk_id=large_chunk_id,
)
)

return doc_chunk_ids


def get_uuid_from_chunk_info(
*,
document_id: str,
chunk_id: int,
tenant_id: str | None,
large_chunk_id: int | None = None,
) -> UUID:
doc_str = document_id

# Web parsing URL duplicate catching
if doc_str and doc_str[-1] == "/":
doc_str = doc_str[:-1]
unique_identifier_string = "_".join(
[doc_str, str(chunk.chunk_id), str(mini_chunk_ind)]

chunk_index = (
"large_" + str(large_chunk_id) if large_chunk_id is not None else str(chunk_id)
)
if chunk.large_chunk_reference_ids:
unique_identifier_string = "_".join([doc_str, chunk_index])
if tenant_id:
unique_identifier_string += "_" + tenant_id

return uuid.uuid5(uuid.NAMESPACE_X500, unique_identifier_string)


def get_uuid_from_chunk_info_old(
*, document_id: str, chunk_id: int, large_chunk_reference_ids: list[int] = []
) -> UUID:
doc_str = document_id

# Web parsing URL duplicate catching
if doc_str and doc_str[-1] == "/":
doc_str = doc_str[:-1]
unique_identifier_string = "_".join([doc_str, str(chunk_id), "0"])
if large_chunk_reference_ids:
unique_identifier_string += "_large" + "_".join(
[
str(referenced_chunk_id)
for referenced_chunk_id in chunk.large_chunk_reference_ids
for referenced_chunk_id in large_chunk_reference_ids
]
)
return uuid.uuid5(uuid.NAMESPACE_X500, unique_identifier_string)


def get_uuid_from_chunk(chunk: DocMetadataAwareIndexChunk) -> uuid.UUID:
return get_uuid_from_chunk_info(
document_id=chunk.source_document.id,
chunk_id=chunk.chunk_id,
tenant_id=chunk.tenant_id,
large_chunk_id=chunk.large_chunk_id,
)


def get_uuid_from_chunk_old(
chunk: DocMetadataAwareIndexChunk, large_chunk_reference_ids: list[int] = []
) -> UUID:
return get_uuid_from_chunk_info_old(
document_id=chunk.source_document.id,
chunk_id=chunk.chunk_id,
large_chunk_reference_ids=large_chunk_reference_ids,
)
53 changes: 36 additions & 17 deletions backend/onyx/document_index/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,38 @@ def range(self) -> int | None:
return None


@dataclass
class IndexBatchParams:
"""
Information necessary for efficiently indexing a batch of documents
"""

doc_id_to_previous_chunks_indexed: dict[str, int | None]
doc_id_to_current_chunks_indexed: dict[str, int]
tenant_id: str | None
large_chunks_enabled: bool


@dataclass
class MinimalDocumentIndexingInfo:
"""
Minimal information necessary for indexing a document
"""

doc_id: str
chunk_start_index: int


@dataclass
class EnrichedDocumentIndexingInfo(MinimalDocumentIndexingInfo):
"""
Enriched information necessary for indexing a document, including version and chunk range.
"""

old_version: bool
chunk_end_index: int


@dataclass
class DocumentMetadata:
"""
Expand Down Expand Up @@ -148,7 +180,7 @@ class Indexable(abc.ABC):
def index(
self,
chunks: list[DocMetadataAwareIndexChunk],
fresh_index: bool = False,
index_batch_params: IndexBatchParams,
) -> set[DocumentInsertionRecord]:
"""
Takes a list of document chunks and indexes them in the document index
Expand All @@ -166,14 +198,11 @@ def index(
only needs to index chunks into the PRIMARY index. Do not update the secondary index here,
it is done automatically outside of this code.

NOTE: The fresh_index parameter, when set to True, assumes no documents have been previously
indexed for the given index/tenant. This can be used to optimize the indexing process for
new or empty indices.

Parameters:
- chunks: Document chunks with all of the information needed for indexing to the document
index.
- fresh_index: Boolean indicating whether this is a fresh index with no existing documents.
- tenant_id: The tenant id of the user whose chunks are being indexed
- large_chunks_enabled: Whether large chunks are enabled

Returns:
List of document ids which map to unique documents and are used for deduping chunks
Expand All @@ -185,7 +214,7 @@ def index(

class Deletable(abc.ABC):
"""
Class must implement the ability to delete document by their unique document ids.
Class must implement the ability to delete document by a given unique document id.
"""

@abc.abstractmethod
Expand All @@ -198,16 +227,6 @@ def delete_single(self, doc_id: str) -> int:
"""
raise NotImplementedError

@abc.abstractmethod
def delete(self, doc_ids: list[str]) -> None:
"""
Given a list of document ids, hard delete them from the document index

Parameters:
- doc_ids: list of document ids as specified by the connector
"""
raise NotImplementedError


class Updatable(abc.ABC):
"""
Expand Down
Loading
Loading