Skip to content

Commit

Permalink
LanceDB - Remove Orphaned Chunks (#1620)
Browse files Browse the repository at this point in the history
* Add tests for LanceDB chunking and merging functionality

Signed-off-by: Marcel Coetzee <[email protected]>

* Add TSplitter type alias for LanceDB document splitting function

Signed-off-by: Marcel Coetzee <[email protected]>

* Refine typing for chunks

Signed-off-by: Marcel Coetzee <[email protected]>

* Add type definitions for chunk splitter function and related types

Signed-off-by: Marcel Coetzee <[email protected]>

* Remove unused ChunkInputT, ChunkOutputT, and TSplitter type definitions

Signed-off-by: Marcel Coetzee <[email protected]>

* Implement efficient update strategy for chunked documents in LanceDB

Signed-off-by: Marcel Coetzee <[email protected]>

* Implement efficient update strategy for chunked documents in LanceDB

Signed-off-by: Marcel Coetzee <[email protected]>

* Refactor LanceDB client and tests for improved readability and type safety

Signed-off-by: Marcel Coetzee <[email protected]>

* Linting

Signed-off-by: Marcel Coetzee <[email protected]>

* Add document_id parameter to lancedb_adapter and update merge logic

Signed-off-by: Marcel Coetzee <[email protected]>

* Remove resolved comments

Signed-off-by: Marcel Coetzee <[email protected]>

* Implement efficient orphan removal for chunked documents in LanceDB

Signed-off-by: Marcel Coetzee <[email protected]>

* Implement efficient update strategy for chunked documents in LanceDB

Signed-off-by: Marcel Coetzee <[email protected]>

* Add test for removing orphaned records in LanceDB

Signed-off-by: Marcel Coetzee <[email protected]>

* Update LanceDB orphaned records removal test for chunked documents

Signed-off-by: Marcel Coetzee <[email protected]>

* Set test pipeline as dev mode

Signed-off-by: Marcel Coetzee <[email protected]>

* Fix write disposition check in LanceDBRemoveOrphansJob execute method

Signed-off-by: Marcel Coetzee <[email protected]>

* Add FollowupJob trait to LoadLanceDBJob

Signed-off-by: Marcel Coetzee <[email protected]>

* Fix file type

Signed-off-by: Marcel Coetzee <[email protected]>

* Fix file typing

Signed-off-by: Marcel Coetzee <[email protected]>

* Add test for removing orphaned records in LanceDB root table

Signed-off-by: Marcel Coetzee <[email protected]>

* Enhance LanceDB test to cover nested child removal and update scenarios

Signed-off-by: Marcel Coetzee <[email protected]>

* Use doc id hint for top level tables

Signed-off-by: Marcel Coetzee <[email protected]>

* Only join on join columns for orphan removal job

Signed-off-by: Marcel Coetzee <[email protected]>

* Add ollama to supported embedding providers and test orphaned record removal with embeddings

Signed-off-by: Marcel Coetzee <[email protected]>

* Add merge_key to document resource for efficient updates in LanceDB

Signed-off-by: Marcel Coetzee <[email protected]>

* Formatting

Signed-off-by: Marcel Coetzee <[email protected]>

* Set default file size to 128MB

Signed-off-by: Marcel Coetzee <[email protected]>

* Only use parquet loader file formats

Signed-off-by: Marcel Coetzee <[email protected]>

* Import pyarrow.parquet

Signed-off-by: Marcel Coetzee <[email protected]>

* Remove recommended file size from LanceDB destination capabilities

Signed-off-by: Marcel Coetzee <[email protected]>

* Update LanceDB client to use more efficient batch processing methods on loading for Load Jobs

Signed-off-by: Marcel Coetzee <[email protected]>

* Refactor unique identifier handling for LanceDB tables

Signed-off-by: Marcel Coetzee <[email protected]>

* Optimize UUID column generation for LanceDB tables

Signed-off-by: Marcel Coetzee <[email protected]>

* Refactor LanceDBClient to use string type hints for Table

Signed-off-by: Marcel Coetzee <[email protected]>

* Minor refactor

Signed-off-by: Marcel Coetzee <[email protected]>

* Implement efficient schema update with Nullability support

Signed-off-by: Marcel Coetzee <[email protected]>

* Optimize orphaned chunks removal for large datasets

Signed-off-by: Marcel Coetzee <[email protected]>

* Projection pushdown

Signed-off-by: Marcel Coetzee <[email protected]>

* Format

Signed-off-by: Marcel Coetzee <[email protected]>

* Prevent primary key and document ID hint conflict in merge disposition

Signed-off-by: Marcel Coetzee <[email protected]>

* Add recommended file size for LanceDB destination

Signed-off-by: Marcel Coetzee <[email protected]>

* Improve comment clarity for projection push-down in LanceDB

Signed-off-by: Marcel Coetzee <[email protected]>

* Update to new load interface

Signed-off-by: Marcel Coetzee <[email protected]>

* Remove unnecessary LanceDBLoadJob attributes

Signed-off-by: Marcel Coetzee <[email protected]>

* Change instance attributes to `run` method as variables

Signed-off-by: Marcel Coetzee <[email protected]>

* Schedule follow up refernce job

Signed-off-by: Marcel Coetzee <[email protected]>

* Add follow up lancedb remove orphan job skeleron

Signed-off-by: Marcel Coetzee <[email protected]>

* Write empty follow up file

Signed-off-by: Marcel Coetzee <[email protected]>

* Write parquet

Signed-off-by: Marcel Coetzee <[email protected]>

* Add support for reference file format in LanceDB destination

Signed-off-by: Marcel Coetzee <[email protected]>

* Handle parent table name resolution if it doesn't exist in Lance db remove orphan job

Signed-off-by: Marcel Coetzee <[email protected]>

* Refactor specialised orphan follow up job back to reference job

Signed-off-by: Marcel Coetzee <[email protected]>

* Refactor orphan removal for chunked documents

Signed-off-by: Marcel Coetzee <[email protected]>

* Fix dlt system table check for name instead of object

Signed-off-by: Marcel Coetzee <[email protected]>

* Implement staging methods

Signed-off-by: Marcel Coetzee <[email protected]>

* Override staging client methods

Signed-off-by: Marcel Coetzee <[email protected]>

* Docs

Signed-off-by: Marcel Coetzee <[email protected]>

* Override staging client methods

Signed-off-by: Marcel Coetzee <[email protected]>

* Delete with inserts

Signed-off-by: Marcel Coetzee <[email protected]>

* Keep with batch reader

Signed-off-by: Marcel Coetzee <[email protected]>

* Remove Lancedb client's staging implementation

Signed-off-by: Marcel Coetzee <[email protected]>

* Insert in memory arrow table. This will be optimized

Signed-off-by: Marcel Coetzee <[email protected]>

* Rename classes to the new job implementation classes

Signed-off-by: Marcel Coetzee <[email protected]>

* Use namedtuple for table chain to improve readability

Signed-off-by: Marcel Coetzee <[email protected]>

* Remove orphans by loading all ancestor IDs simultaneously

Signed-off-by: Marcel Coetzee <[email protected]>

* Fix doc_id adapter

Signed-off-by: Marcel Coetzee <[email protected]>

* Fix doc_id adapter

Signed-off-by: Marcel Coetzee <[email protected]>

* Revert to previous

Signed-off-by: Marcel Coetzee <[email protected]>

* Revert "Remove orphans by loading all ancestor IDs simultaneously"

This reverts commit 06e04d9.

* Remove doc_id hint

Signed-off-by: Marcel Coetzee <[email protected]>

* Infer merge key if not supplied from provided primary  key

Signed-off-by: Marcel Coetzee <[email protected]>

* Remove unused utility functions

Signed-off-by: Marcel Coetzee <[email protected]>

* Remove LanceDB doc ID hints and use schema normalizer

Signed-off-by: Marcel Coetzee <[email protected]>

* LanceDB writes strange code

Signed-off-by: Marcel Coetzee <[email protected]>

* Minor Formatting

Signed-off-by: Marcel Coetzee <[email protected]>

* Support compound primary and merge keys

Signed-off-by: Marcel Coetzee <[email protected]>

* Remove old comment

Signed-off-by: Marcel Coetzee <[email protected]>

* - Change default vector column name to "vector" to conform with lancedb standard
- Add search tests with tantivy as search engine

Signed-off-by: Marcel Coetzee <[email protected]>

* Format and fix linting

Signed-off-by: Marcel Coetzee <[email protected]>

* Add custom embedding function registration test

Signed-off-by: Marcel Coetzee <[email protected]>

* Spawn process in test to make sure registry can be deserialized from arrow files

Signed-off-by: Marcel Coetzee <[email protected]>

* Simplify null string handling

Signed-off-by: Marcel Coetzee <[email protected]>

* Change NULL string replacement with random string, doc clarification

Signed-off-by: Marcel Coetzee <[email protected]>

* Update default vector column name in docs

Signed-off-by: Marcel Coetzee <[email protected]>

* Set `remove_orphans` flag to False on tests that don't require it

Signed-off-by: Marcel Coetzee <[email protected]>

* Implement starter arrow string placeholder function

Signed-off-by: Marcel Coetzee <[email protected]>

* Add test for empty arrow string element vectorised replacement utility function

Signed-off-by: Marcel Coetzee <[email protected]>

* Handle NULL values in addition to empty strings in arrow substitution method

Signed-off-by: Marcel Coetzee <[email protected]>

* More efficient empty value replacement with canonical arrow usage

Signed-off-by: Marcel Coetzee <[email protected]>

* Format

Signed-off-by: Marcel Coetzee <[email protected]>

* Bump pyarrow version

Signed-off-by: Marcel Coetzee <[email protected]>

* Use pa.nulls instead of [None]*len

Signed-off-by: Marcel Coetzee <[email protected]>

* Update tests

Signed-off-by: Marcel Coetzee <[email protected]>

* Invert remove orphans flag

Signed-off-by: Marcel Coetzee <[email protected]>

* Implement root table orphan deletion, only integer doc_ids

Signed-off-by: Marcel Coetzee <[email protected]>

* Cater for string ids as well in doc_id removal process

Signed-off-by: Marcel Coetzee <[email protected]>

* Fix test with wrong primary key

Signed-off-by: Marcel Coetzee <[email protected]>

* Just send list of ids as is. don't pc.compute on client end

Signed-off-by: Marcel Coetzee <[email protected]>

* Extract schema matching into utils

Signed-off-by: Marcel Coetzee <[email protected]>

* Add utils

Signed-off-by: Marcel Coetzee <[email protected]>

* Pass all tests

Signed-off-by: Marcel Coetzee <[email protected]>

* Minor format and cleanup

Signed-off-by: Marcel Coetzee <[email protected]>

* Docs

Signed-off-by: Marcel Coetzee <[email protected]>

* Amend replace test to test with large number of records to catch race conditions with replace disposition

Signed-off-by: Marcel Coetzee <[email protected]>

* Fix replace race conditions by delegating truncation to dlt

Signed-off-by: Marcel Coetzee <[email protected]>

* Update lock file

Signed-off-by: Marcel Coetzee <[email protected]>

* Refactor type mapping and schema handling in LanceDB client

Signed-off-by: Marcel Coetzee <[email protected]>

* Change 'complex' column type to 'json' in LanceDB client

Signed-off-by: Marcel Coetzee <[email protected]>

* update lock file

Signed-off-by: Marcel Coetzee <[email protected]>

* fixes generating lancedb literals

* verifies merge key early, fixes column override in adapters

* fixes linting errors

---------

Signed-off-by: Marcel Coetzee <[email protected]>
Co-authored-by: Marcin Rudolf <[email protected]>
  • Loading branch information
Pipboyguy and rudolfix authored Nov 6, 2024
1 parent e6bd8ea commit c0e4795
Show file tree
Hide file tree
Showing 20 changed files with 1,152 additions and 295 deletions.
17 changes: 17 additions & 0 deletions dlt/common/data_writers/escape.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,23 @@ def escape_duckdb_literal(v: Any) -> Any:
return str(v)


def escape_lancedb_literal(v: Any) -> Any:
if isinstance(v, str):
# we escape extended string which behave like the redshift string
return _escape_extended(v, prefix="'")
if isinstance(v, (datetime, date, time)):
return f"'{v.isoformat()}'"
if isinstance(v, (list, dict)):
return _escape_extended(json.dumps(v), prefix="'")
# TODO: check how binaries are represented in fusion
if isinstance(v, bytes):
return f"from_base64('{base64.b64encode(v).decode('ascii')}')"
if v is None:
return "NULL"

return str(v)


MS_SQL_ESCAPE_DICT = {
"'": "''",
"\n": "' + CHAR(10) + N'",
Expand Down
2 changes: 1 addition & 1 deletion dlt/destinations/impl/databricks/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ def __init__(
)
super().__init__(schema, config, sql_client)
self.config: DatabricksClientConfiguration = config
self.sql_client: DatabricksSqlClient = sql_client # type: ignore[assignment]
self.sql_client: DatabricksSqlClient = sql_client
self.type_mapper = self.capabilities.get_type_mapper()

def create_load_job(
Expand Down
4 changes: 2 additions & 2 deletions dlt/destinations/impl/databricks/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
class DatabricksCursorImpl(DBApiCursorImpl):
"""Use native data frame support if available"""

native_cursor: DatabricksSqlCursor # type: ignore[assignment]
native_cursor: DatabricksSqlCursor
vector_size: ClassVar[int] = 2048 # vector size is 2048

def iter_arrow(self, chunk_size: int) -> Generator[ArrowTable, None, None]:
Expand Down Expand Up @@ -144,7 +144,7 @@ def execute_query(self, query: AnyStr, *args: Any, **kwargs: Any) -> Iterator[DB
# db_args = kwargs or None

db_args = args or kwargs or None
with self._conn.cursor() as curr: # type: ignore[assignment]
with self._conn.cursor() as curr:
curr.execute(query, db_args)
yield DatabricksCursorImpl(curr) # type: ignore[abstract]

Expand Down
3 changes: 1 addition & 2 deletions dlt/destinations/impl/lancedb/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class LanceDBClientOptions(BaseConfiguration):
"sentence-transformers",
"huggingface",
"colbert",
"ollama",
]


Expand Down Expand Up @@ -92,8 +93,6 @@ class LanceDBClientConfiguration(DestinationClientDwhConfiguration):
Make sure it corresponds with the associated embedding model's dimensionality."""
vector_field_name: str = "vector"
"""Name of the special field to store the vector embeddings."""
id_field_name: str = "id__"
"""Name of the special field to manage deduplication."""
sentinel_table_name: str = "dltSentinelTable"
"""Name of the sentinel table that encapsulates datasets. Since LanceDB has no
concept of schemas, this table serves as a proxy to group related dlt tables together."""
Expand Down
8 changes: 6 additions & 2 deletions dlt/destinations/impl/lancedb/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ class lancedb(Destination[LanceDBClientConfiguration, "LanceDBClient"]):

def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps = DestinationCapabilitiesContext()
caps.preferred_loader_file_format = "jsonl"
caps.supported_loader_file_formats = ["jsonl"]
caps.preferred_loader_file_format = "parquet"
caps.supported_loader_file_formats = ["parquet", "reference"]
caps.type_mapper = LanceDBTypeMapper

caps.max_identifier_length = 200
Expand All @@ -42,6 +42,10 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.timestamp_precision = 6
caps.supported_replace_strategies = ["truncate-and-insert"]

caps.recommended_file_size = 128_000_000

caps.supported_merge_strategies = ["upsert"]

return caps

@property
Expand Down
27 changes: 22 additions & 5 deletions dlt/destinations/impl/lancedb/lancedb_adapter.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
from typing import Any
from typing import Any, Dict

from dlt.common.schema.typing import TColumnNames, TTableSchemaColumns
from dlt.destinations.utils import get_resource_for_adapter
from dlt.extract import DltResource
from dlt.extract.items import TTableHintTemplate


VECTORIZE_HINT = "x-lancedb-embed"
NO_REMOVE_ORPHANS_HINT = "x-lancedb-remove-orphans"


def lancedb_adapter(
data: Any,
embed: TColumnNames = None,
merge_key: TColumnNames = None,
no_remove_orphans: bool = False,
) -> DltResource:
"""Prepares data for the LanceDB destination by specifying which columns should be embedded.
Expand All @@ -20,6 +24,10 @@ def lancedb_adapter(
object.
embed (TColumnNames, optional): Specify columns to generate embeddings for.
It can be a single column name as a string, or a list of column names.
merge_key (TColumnNames, optional): Specify columns to merge on.
It can be a single column name as a string, or a list of column names.
no_remove_orphans (bool): Specify whether to remove orphaned records in child
tables with no parent records after merges to maintain referential integrity.
Returns:
DltResource: A resource with applied LanceDB-specific hints.
Expand All @@ -34,7 +42,8 @@ def lancedb_adapter(
"""
resource = get_resource_for_adapter(data)

column_hints: TTableSchemaColumns = {}
additional_table_hints: Dict[str, TTableHintTemplate[Any]] = {}
column_hints: TTableSchemaColumns = None

if embed:
if isinstance(embed, str):
Expand All @@ -43,16 +52,24 @@ def lancedb_adapter(
raise ValueError(
"'embed' must be a list of column names or a single column name as a string."
)
column_hints = {}

for column_name in embed:
column_hints[column_name] = {
"name": column_name,
VECTORIZE_HINT: True, # type: ignore[misc]
}

if not column_hints:
raise ValueError("A value for 'embed' must be specified.")
additional_table_hints[NO_REMOVE_ORPHANS_HINT] = no_remove_orphans

if column_hints or additional_table_hints or merge_key:
resource.apply_hints(
merge_key=merge_key, columns=column_hints, additional_table_hints=additional_table_hints
)
else:
resource.apply_hints(columns=column_hints)
raise ValueError(
"You must must provide at least either the 'embed' or 'merge_key' or 'remove_orphans'"
" argument if using the adapter."
)

return resource
Loading

0 comments on commit c0e4795

Please sign in to comment.