Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow for "reduce" steps in LanceDB adapter #1699

Open
zilto opened this issue Aug 15, 2024 · 4 comments
Open

Allow for "reduce" steps in LanceDB adapter #1699

zilto opened this issue Aug 15, 2024 · 4 comments
Assignees
Labels
community This issue came from slack community workspace support This issue is monitored by Solution Engineer

Comments

@zilto
Copy link
Collaborator

zilto commented Aug 15, 2024

Feature description

Allow the LanceDB and other Vector DB adapter to specify a "contextualize" or rolling window operation to join partitioned text chunks before applying the embedding function.

Are you a dlt user?

Yes, I'm already a dlt user.

Use case

context

The constructs of @dlt.resource and @dlt.transformer are very convenient for document ingestion for NLP/LLM use cases. The @dlt.resource returns full-text and @dlt.transformer can chunk it (into paragraphs for example). The LanceDB and other vector DB adapters make it easy to embed the full-text and the chunked text columns. We get something like this:

@dlt.resource
def full_text():
  yield {"document_id": "foo", "text": ...}
  
@dlt.transformer(data_from=full_text)
def chunked_text(document):
  for idx, chunk in enumerate(_split_text(document)):
    yield {
      "document_id": document["document_id"],
      "chunk_id": idx,
      "text": chunk,
    }

Full-text

| document id  | text                                          |
| "foo"        | "The quick brown fox jumps over the lazy dog" |

Chunks (3 words)

| document id | chunk id | text               |
| "foo"       | 1        |  "The quick brown" |
| "foo"       | 2        |  "fox jumps over"  |
| "foo"       | 3        |  "the lazy dog"    |

limitations

However, embedding these "partitioned" chunks is often low value for RAG. A common operation is "contextualizing" chunks, which consists of a rolling window operation (with window size and stride / overlap parameters). For instance LanceDB has contextualize(), but it requires converting the data to a pandas dataframe. Let's illustrate a "2-chunk window" based on the previous table:

Contexts

| document id | chunk id | context  id | text                              |
| "foo"       | 1, 2     | 1           |  "The quick brown fox jumps over" |
| "foo"       | 2, 3     | 2           |  "fox jumps over the lazy dog"    |

AFAIK, dlt doesn't provide a clear API for normalizing the chunk_id and the context_id columns. The "contextualize" operation could be directly implemented in a single @dlt.transformer, but it would only include document_id -> context_id and miss the fact that "contextualized chunks" aren't independent; they share underlying chunks.

Proposed solution

adding a "reducer" step

I was able to hack around to receive a batch of "chunks" and use dlt.mark.with_table_name to dispatch both a "context table" and "relation table" from the same @dlt.transformer. Mock code:

def _split_text(text: str):
  words = text.split()
  for i in range(0, len(words), 3):
        yield ' '.join(words[i:i+3])
        
def _contextualize(chunks: list[str], window=5, stride=3, min_window_size=2):
    n_chunks = len(chunks)
    for start_i in range(0, n_chunks, stride):
        if (start_i + window <= n_chunks) or (n_chunks - start_i >= min_window_size):
            yield " ".join(chunks[start_i : min(start_i + window, n_chunks)])

@dlt.source:
def document_source():
  @dlt.resource(primary_key="document_id")
  def document():
    yield {"document_id": "foo", "text": "The quick brown fox jumps over the lazy dog"}
  
  # this needs to accumulate a "batch" for the whole document before
  # starting the "reduce" / rolling operation step;
  @dlt.transformer(data_from=document, primary_key="chunk_id"):
  def chunks(item: dict):
    return [
      dict(
        document_id=item["document_id"],
        chunk_id=idx,
        text=text,
      )
      for idx, text in _split_text(item["text"])
    ]
 
 
  # order is important for reduce / rolling step
  # default to order of the batch or specifying sorting key
  @dlt.transformer(data_from=chunks, primary_key="context_id")
  def contexts(items: list[dict]):
    # first handle the m-to-n relationship
    # set of foreign keys (i.e., "chunk_id")
    chunk_id_set = set(item["chunk_id"] for item in items)
    context_id = hash_set(chunk_id_set )
    
    # create a table only containing the keys
    for chunk_id in chunk_id_set :
      yield dlt.mark.with_table_name(
        {"chunk_id": chunk_id, "context_id": context_id},
        "chunks_to_contexts_keys",
      ) 
      
    # main transformation logic
    for contextualized in _contextualize([chunk["text"] for chunk in items]):
      yield dlt.mark.with_table_name(
        {"context_id": context_id, "text": contextualized},
        "contexts"
      )
      
  return (document, chunks, contexts)

Contexts

|  context id | text                              |
|  hash(1, 2) |  "The quick brown fox jumps over" |
|  hash(2, 3) |  "fox jumps over the lazy dog"    |

Chunks-to-contexts keys

| chunk id | context id |
| 1        | hash(1, 2) |
| 2        | hash(1, 2) |
| 2        | hash(2, 3) |
| 3        | hash(2, 3) |

There's probably room for a generic @dlt.reducer that automatically manages the primary / foreign keys based on the other resources metadata, handles the key set hashing, and dispatches results to tables. Given that this could be a can of worm, it could be tested and refined while being hidden behind the lancedb_adapter. The API could be expanded to

lancedb_adapter(
  chunks,
  embed="text",
  window=10,
  stride=3,  # could also be renamed to overlap by changing the algo
  min_window_size=3,
)

This would reproduce the above logic by creating the chunks table as defined by the user (chunks resource) and creating the second table automatically

Related issues

No response

@VioletM VioletM added community This issue came from slack community workspace support This issue is monitored by Solution Engineer labels Aug 15, 2024
@rudolfix
Copy link
Collaborator

@zilto thanks for this idea. a few random (probably I miss some background) comments:

  1. why not to chunk with rolling window from the very start? so chunks are already "overlapping". then you do not need to merge them in transformer
  2. what is the role of "chunks", "contexts" and "chunk-to-context-keys" tables/collection when doing a RAG? are you using both chunks and contexts to identify documents via vector search?
  3. what you could try is to use dlt ability to create child tables and yield in contexts:
for contextualized in _contextualize([chunk["text"] for chunk in items]):
      yield
        {"context_id": context_id, "text": contextualized}, chunks: [list of chunks])

that would create contexts and contexts__chunks tables. the won't be as nice as your though (dlt would add its table linking)

btw. with @Pipboyguy we are trying to support chunking in some more or less unified way #1587

@zilto
Copy link
Collaborator Author

zilto commented Aug 15, 2024

I agree with the motivation of the cited issue! But to add more context:

  • This discussion around chunking focuses on unstructured text where you don't have message ids, sections, or other keys to split things in a principled way.
  • In typical text processing scenarios, document -> chunks -> contexts is more performant than document -> overlapping contexts -> chunks because less total text is parsed into the smaller chunks and the joining operation is cheap.
  • overlapping contexts -> chunks should produce duplicated chunks and chunks might not be exact partitions that can recreate the original document as opposed to documents -> chunks (e.g., when using a tokenizer instead of the naive string manipulation used in my example)
  • Given chunks are the smallest meaningful unit, storing it once can enable multiple different use case / pipelines that need bigger text chunks for downstream users
  • Storing the granular chunks should make it easier to manage state for incremental loading as it enables resuming a rolling window operation

1.why not to chunk with rolling window from the very start? so chunks are already "overlapping". then you do not need to merge them in transformer

This suggests doing document -> contexts instead of documents -> chunks -> contexts. Unlike the approach I suggested, one can't know what two "contexts" have in common (e.g., what chunks they share, how many chunks they share, how far are the chunks they share (with ordered chunk ids)).

2.what is the role of "chunks", "contexts" and "chunk-to-context-keys" tables/collection when doing a RAG? are you using both chunks and contexts to identify documents via vector search?

For RAG, I intend to use "contexts" for first-pass vector search then use "context-chunk" lineage to filter out "contexts" that have "too much in common" and increase the information content of the text passed to the LLM. Over time, it's valuable to logging which "context" and underlying "chunk" are high signal for downstream uses.

More concretely, a user asks a question about dlt. You want documentation to be embedded in large "contexts" to have good recall, then the LLM should be able to extract the right info from the "context" and generate an answer. However, it's still fuzzy "what" was useful to the LLM or user. The above lineage would show that retrieving "contexts" with "chunk" dlt is an open source Python library is high signal to answer questions around the topic of pricing.

3.what you could try is to use dlt ability to create child tables and yield in contexts:

Didn't think of that! While it handles relationships, I would have duplicated "chunks" stored, no?

@Pipboyguy Pipboyguy self-assigned this Aug 18, 2024
@rudolfix
Copy link
Collaborator

@zilto it seems will be picking your brain a lot :) our goal is to support chunked documents with "merge" write disposition (where only subset of documents will be updated). I'll get back to this topic tomorrow. we need to move forward...

@Pipboyguy
Copy link
Collaborator

@zilto Thanks for the detailed use case and explanation!

@rudolfix I think this table can be created as part of a job as well to run after main table chain just like the current orphan removal process, and have its orphans removed in a similar fashion. WDYT

@rudolfix rudolfix moved this from Planned to Todo in dlt core library Dec 9, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
community This issue came from slack community workspace support This issue is monitored by Solution Engineer
Projects
Status: Todo
Development

No branches or pull requests

4 participants