Skip to content

Commit

Permalink
Improved indexing (#3594)
Browse files Browse the repository at this point in the history
* nit

* k

* add steps

* main util functions

* functioning fully

* quick nit

* k

* typing fix

* k

* address comments
  • Loading branch information
pablonyx authored Jan 5, 2025
1 parent e83542f commit ddec239
Show file tree
Hide file tree
Showing 18 changed files with 417 additions and 148 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""add chunk count to document
Revision ID: 2955778aa44c
Revises: c0aab6edb6dd
Create Date: 2025-01-04 11:39:43.268612
"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = "2955778aa44c"
down_revision = "c0aab6edb6dd"
branch_labels = None
depends_on = None


def upgrade() -> None:
op.add_column("document", sa.Column("chunk_count", sa.Integer(), nullable=True))


def downgrade() -> None:
op.drop_column("document", "chunk_count")
3 changes: 3 additions & 0 deletions backend/onyx/connectors/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,11 @@ class DocumentBase(BaseModel):
source: DocumentSource | None = None
semantic_identifier: str # displayed in the UI as the main identifier for the doc
metadata: dict[str, str | list[str]]

# UTC time
doc_updated_at: datetime | None = None
chunk_count: int | None = None

# Owner, creator, etc.
primary_owners: list[BasicExpertInfo] | None = None
# Assignee, space owner, etc.
Expand Down
34 changes: 34 additions & 0 deletions backend/onyx/db/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,18 @@ def update_docs_last_modified__no_commit(
doc.last_modified = now


def update_docs_chunk_count__no_commit(
document_ids: list[str],
doc_id_to_chunk_count: dict[str, int],
db_session: Session,
) -> None:
documents_to_update = (
db_session.query(DbDocument).filter(DbDocument.id.in_(document_ids)).all()
)
for doc in documents_to_update:
doc.chunk_count = doc_id_to_chunk_count[doc.id]


def mark_document_as_modified(
document_id: str,
db_session: Session,
Expand Down Expand Up @@ -612,3 +624,25 @@ def get_document(
stmt = select(DbDocument).where(DbDocument.id == document_id)
doc: DbDocument | None = db_session.execute(stmt).scalar_one_or_none()
return doc


def fetch_chunk_counts_for_documents(
document_ids: list[str],
db_session: Session,
) -> list[tuple[str, int | None]]:
"""
Return a list of (document_id, chunk_count) tuples.
Note: chunk_count might be None if not set in DB,
so we declare it as Optional[int].
"""
stmt = select(DbDocument.id, DbDocument.chunk_count).where(
DbDocument.id.in_(document_ids)
)

# results is a list of 'Row' objects, each containing two columns
results = db_session.execute(stmt).all()

# If DbDocument.id is guaranteed to be a string, you can just do row.id;
# otherwise cast to str if you need to be sure it's a string:
return [(str(row[0]), row[1]) for row in results]
# or row.id, row.chunk_count if they are named attributes in your ORM model
4 changes: 4 additions & 0 deletions backend/onyx/db/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,10 @@ class Document(Base):
DateTime(timezone=True), nullable=True
)

# Number of chunks in the document (in Vespa)
# Only null for documents indexed prior to this change
chunk_count: Mapped[int | None] = mapped_column(Integer, nullable=True)

# last time any vespa relevant row metadata or the doc changed.
# does not include last_synced
last_modified: Mapped[datetime.datetime | None] = mapped_column(
Expand Down
122 changes: 108 additions & 14 deletions backend/onyx/document_index/document_index_utils.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import math
import uuid
from uuid import UUID

from sqlalchemy.orm import Session

from onyx.context.search.models import InferenceChunk
from onyx.db.search_settings import get_current_search_settings
from onyx.db.search_settings import get_secondary_search_settings
from onyx.indexing.models import IndexChunk
from onyx.document_index.interfaces import EnrichedDocumentIndexingInfo
from onyx.indexing.models import DocMetadataAwareIndexChunk


DEFAULT_BATCH_SIZE = 30
Expand Down Expand Up @@ -36,25 +37,118 @@ def translate_boost_count_to_multiplier(boost: int) -> float:
return 2 / (1 + math.exp(-1 * boost / 3))


def get_uuid_from_chunk(
chunk: IndexChunk | InferenceChunk, mini_chunk_ind: int = 0
) -> uuid.UUID:
doc_str = (
chunk.document_id
if isinstance(chunk, InferenceChunk)
else chunk.source_document.id
)
def assemble_document_chunk_info(
enriched_document_info_list: list[EnrichedDocumentIndexingInfo],
tenant_id: str | None,
large_chunks_enabled: bool,
) -> list[UUID]:
doc_chunk_ids = []

for enriched_document_info in enriched_document_info_list:
for chunk_index in range(
enriched_document_info.chunk_start_index,
enriched_document_info.chunk_end_index,
):
if not enriched_document_info.old_version:
doc_chunk_ids.append(
get_uuid_from_chunk_info(
document_id=enriched_document_info.doc_id,
chunk_id=chunk_index,
tenant_id=tenant_id,
)
)
else:
doc_chunk_ids.append(
get_uuid_from_chunk_info_old(
document_id=enriched_document_info.doc_id,
chunk_id=chunk_index,
)
)

if large_chunks_enabled and chunk_index % 4 == 0:
large_chunk_id = int(chunk_index / 4)
large_chunk_reference_ids = [
large_chunk_id + i
for i in range(4)
if large_chunk_id + i < enriched_document_info.chunk_end_index
]
if enriched_document_info.old_version:
doc_chunk_ids.append(
get_uuid_from_chunk_info_old(
document_id=enriched_document_info.doc_id,
chunk_id=large_chunk_id,
large_chunk_reference_ids=large_chunk_reference_ids,
)
)
else:
doc_chunk_ids.append(
get_uuid_from_chunk_info(
document_id=enriched_document_info.doc_id,
chunk_id=large_chunk_id,
tenant_id=tenant_id,
large_chunk_id=large_chunk_id,
)
)

return doc_chunk_ids


def get_uuid_from_chunk_info(
*,
document_id: str,
chunk_id: int,
tenant_id: str | None,
large_chunk_id: int | None = None,
) -> UUID:
doc_str = document_id

# Web parsing URL duplicate catching
if doc_str and doc_str[-1] == "/":
doc_str = doc_str[:-1]
unique_identifier_string = "_".join(
[doc_str, str(chunk.chunk_id), str(mini_chunk_ind)]

chunk_index = (
"large_" + str(large_chunk_id) if large_chunk_id is not None else str(chunk_id)
)
if chunk.large_chunk_reference_ids:
unique_identifier_string = "_".join([doc_str, chunk_index])
if tenant_id:
unique_identifier_string += "_" + tenant_id

return uuid.uuid5(uuid.NAMESPACE_X500, unique_identifier_string)


def get_uuid_from_chunk_info_old(
*, document_id: str, chunk_id: int, large_chunk_reference_ids: list[int] = []
) -> UUID:
doc_str = document_id

# Web parsing URL duplicate catching
if doc_str and doc_str[-1] == "/":
doc_str = doc_str[:-1]
unique_identifier_string = "_".join([doc_str, str(chunk_id), "0"])
if large_chunk_reference_ids:
unique_identifier_string += "_large" + "_".join(
[
str(referenced_chunk_id)
for referenced_chunk_id in chunk.large_chunk_reference_ids
for referenced_chunk_id in large_chunk_reference_ids
]
)
return uuid.uuid5(uuid.NAMESPACE_X500, unique_identifier_string)


def get_uuid_from_chunk(chunk: DocMetadataAwareIndexChunk) -> uuid.UUID:
return get_uuid_from_chunk_info(
document_id=chunk.source_document.id,
chunk_id=chunk.chunk_id,
tenant_id=chunk.tenant_id,
large_chunk_id=chunk.large_chunk_id,
)


def get_uuid_from_chunk_old(
chunk: DocMetadataAwareIndexChunk, large_chunk_reference_ids: list[int] = []
) -> UUID:
return get_uuid_from_chunk_info_old(
document_id=chunk.source_document.id,
chunk_id=chunk.chunk_id,
large_chunk_reference_ids=large_chunk_reference_ids,
)
53 changes: 36 additions & 17 deletions backend/onyx/document_index/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,38 @@ def range(self) -> int | None:
return None


@dataclass
class IndexBatchParams:
"""
Information necessary for efficiently indexing a batch of documents
"""

doc_id_to_previous_chunk_cnt: dict[str, int | None]
doc_id_to_new_chunk_cnt: dict[str, int]
tenant_id: str | None
large_chunks_enabled: bool


@dataclass
class MinimalDocumentIndexingInfo:
"""
Minimal information necessary for indexing a document
"""

doc_id: str
chunk_start_index: int


@dataclass
class EnrichedDocumentIndexingInfo(MinimalDocumentIndexingInfo):
"""
Enriched information necessary for indexing a document, including version and chunk range.
"""

old_version: bool
chunk_end_index: int


@dataclass
class DocumentMetadata:
"""
Expand Down Expand Up @@ -148,7 +180,7 @@ class Indexable(abc.ABC):
def index(
self,
chunks: list[DocMetadataAwareIndexChunk],
fresh_index: bool = False,
index_batch_params: IndexBatchParams,
) -> set[DocumentInsertionRecord]:
"""
Takes a list of document chunks and indexes them in the document index
Expand All @@ -166,14 +198,11 @@ def index(
only needs to index chunks into the PRIMARY index. Do not update the secondary index here,
it is done automatically outside of this code.
NOTE: The fresh_index parameter, when set to True, assumes no documents have been previously
indexed for the given index/tenant. This can be used to optimize the indexing process for
new or empty indices.
Parameters:
- chunks: Document chunks with all of the information needed for indexing to the document
index.
- fresh_index: Boolean indicating whether this is a fresh index with no existing documents.
- tenant_id: The tenant id of the user whose chunks are being indexed
- large_chunks_enabled: Whether large chunks are enabled
Returns:
List of document ids which map to unique documents and are used for deduping chunks
Expand All @@ -185,7 +214,7 @@ def index(

class Deletable(abc.ABC):
"""
Class must implement the ability to delete document by their unique document ids.
Class must implement the ability to delete document by a given unique document id.
"""

@abc.abstractmethod
Expand All @@ -198,16 +227,6 @@ def delete_single(self, doc_id: str) -> int:
"""
raise NotImplementedError

@abc.abstractmethod
def delete(self, doc_ids: list[str]) -> None:
"""
Given a list of document ids, hard delete them from the document index
Parameters:
- doc_ids: list of document ids as specified by the connector
"""
raise NotImplementedError


class Updatable(abc.ABC):
"""
Expand Down
Loading

0 comments on commit ddec239

Please sign in to comment.