Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Document metadata, document IDs, delete from index and tests #36

Merged
merged 21 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
79bcfb1
first pass changes to include document metadata
adharm Jan 9, 2024
da39445
Merge branch 'bclavie:main' into custom-metadata
adharm Jan 9, 2024
4066f9a
Added working tests for all metadata and document id functionality
adharm Jan 12, 2024
8166270
Added working delete_from_index functionality
adharm Jan 12, 2024
b281d77
Fixed tests and functionality for add_to_index, and updated the intro…
adharm Jan 12, 2024
e2bac39
Updated docs and doc strings
adharm Jan 12, 2024
24b44a6
Merge branch 'main' into custom-metadata
adharm Jan 12, 2024
d8bc623
Updated metadata return logic to account for
adharm Jan 12, 2024
52ec19e
Added check for document metadata length vs documents
adharm Jan 12, 2024
4b170eb
- Reverted names
adharm Jan 13, 2024
3bb2559
- Fixed tests
adharm Jan 15, 2024
a5d321f
Merge branch 'main' into custom-metadata
adharm Jan 16, 2024
57aae50
Updated with ruff import sorting
adharm Jan 16, 2024
86f782b
Added ruff ignore for test fixture setup
adharm Jan 16, 2024
bed6e16
renamed test file to match other files better
adharm Jan 16, 2024
aaeb1b4
Merge branch 'main' into custom-metadata
adharm Jan 16, 2024
1c95eb4
fix: formatted files that failed ruff CI check
adharm Jan 16, 2024
34d1532
style: formatted files that failed ruff CI check
adharm Jan 16, 2024
1c3de4e
Merge branch 'custom-metadata' of github.com:anirudhdharmarajan/RAGat…
adharm Jan 16, 2024
9496f45
chore: remove code duplication and ensure process_corpus can run inde…
bclavie Jan 24, 2024
96a51d7
chore: typo
bclavie Jan 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,6 @@ archive/

*/.ragatouille

local/
local/

.vscode/
42 changes: 31 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,17 +117,29 @@ To create an index, you'll need to load a trained model, this can be one of your
```python
from ragatouille import RAGPretrainedModel
from ragatouille.utils import get_wikipedia_page
from ragatouille.data import CorpusProcessor


RAG = RAGPretrainedModel.from_pretrained("colbert-ir/colbertv2.0")
my_documents = [get_wikipedia_page("Hayao_Miyazaki"), get_wikipedia_page("Studio_Ghibli")]
processor = CorpusProcessor()
my_documents = processor.process_corpus(my_documents)
index_path = RAG.index(index_name="my_index", collection=my_documents)
```
You can also optionally add document IDs or document metadata when creating the index:

```python
document_ids = ["miyazaki", "ghibli"]
document_metadatas = [
{"entity": "person", "source": "wikipedia"},
{"entity": "organisation", "source": "wikipedia"},
]
index_path = RAG.index(
index_name="my_index_with_ids_and_metadata",
collection=my_documents,
document_ids=document_ids,
document_metadatas=document_metadatas,
)
```

Once this is done running, your index will be saved on-disk and ready to be queried! RAGatouille and ColBERT handle everything here:
- Splitting your documents
- Tokenizing your documents
- Identifying the individual terms
- Embedding the documents and generating the bags-of-embeddings
Expand Down Expand Up @@ -163,25 +175,33 @@ RAG.search(["What manga did Hayao Miyazaki write?",
```python
# single-query result
[
{"content": "blablabla", "score": 42.424242, "rank": 1},
{"content": "blablabla", "score": 42.424242, "rank": 1, "document_id": "x"},
...,
{"content": "albalbalba", "score": 24.242424, "rank": k},
{"content": "albalbalba", "score": 24.242424, "rank": k, "document_id": "y"},
]
# multi-query result
[
[
{"content": "blablabla", "score": 42.424242, "rank": 1},
{"content": "blablabla", "score": 42.424242, "rank": 1, "document_id": "x"},
...,
{"content": "albalbalba", "score": 24.242424, "rank": k},
{"content": "albalbalba", "score": 24.242424, "rank": k, "document_id": "y"},
],
[
{"content": "blablabla", "score": 42.424242, "rank": 1},
{"content": "blablabla", "score": 42.424242, "rank": 1, "document_id": "x"},
...,
{"content": "albalbalba", "score": 24.242424, "rank": k},
{"content": "albalbalba", "score": 24.242424, "rank": k, "document_id": "y"},
],
],
```

If your index includes document metadata, it'll be returned as a dictionary in the `document_metadata` key of the result dictionary:

```python
[
{"content": "blablabla", "score": 42.424242, "rank": 1, "document_id": "x", "document_metadata": {"A": 1, "B": 2}},
...,
{"content": "albalbalba", "score": 24.242424, "rank": k, "document_id": "y", "document_metadata": {"A": 3, "B": 4}},
]
```

## I'm sold, can I integrate late-interaction RAG into my project?

Expand Down
373 changes: 25 additions & 348 deletions examples/01-basic_indexing_and_search.ipynb

Large diffs are not rendered by default.

133 changes: 121 additions & 12 deletions ragatouille/RAGPretrainedModel.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from pathlib import Path
from typing import Any, Callable, Optional, Union
from typing import Any, Callable, List, Optional, TypeVar, Union
from uuid import uuid4

from langchain.retrievers.document_compressors.base import BaseDocumentCompressor
from langchain_core.retrievers import BaseRetriever
Expand Down Expand Up @@ -53,19 +54,23 @@ def from_pretrained(
pretrained_model_name_or_path: Union[str, Path],
n_gpu: int = -1,
verbose: int = 1,
index_root: Optional[str] = None,
):
"""Load a ColBERT model from a pre-trained checkpoint.

Parameters:
pretrained_model_name_or_path (str): Local path or huggingface model name.
n_gpu (int): Number of GPUs to use. By default, value is -1, which means use all available GPUs or none if no GPU is available.
verbose (int): The level of ColBERT verbosity requested. By default, 1, which will filter out most internal logs.
index_root (Optional[str]): The root directory where indexes will be stored. If None, will use the default directory, '.ragatouille/'.

Returns:
cls (RAGPretrainedModel): The current instance of RAGPretrainedModel, with the model initialised.
"""
instance = cls()
instance.model = ColBERT(pretrained_model_name_or_path, n_gpu, verbose=verbose)
instance.model = ColBERT(
pretrained_model_name_or_path, n_gpu, index_root=index_root, verbose=verbose
)
return instance

@classmethod
Expand All @@ -90,48 +95,109 @@ def from_index(

return instance

def _process_metadata(
self,
document_ids: Optional[Union[TypeVar("T"), List[TypeVar("T")]]],
document_metadatas: Optional[list[dict[Any, Any]]],
collection_len: int,
) -> tuple[list[str], Optional[dict[Any, Any]]]:
if document_ids is None:
document_ids = [str(uuid4()) for i in range(collection_len)]
else:
if len(document_ids) != collection_len:
raise ValueError("document_ids must be the same length as collection")
if len(document_ids) != len(set(document_ids)):
raise ValueError("document_ids must be unique")
if any(not id.strip() for id in document_ids):
raise ValueError("document_ids must not contain empty strings")
if not all(isinstance(id, type(document_ids[0])) for id in document_ids):
raise ValueError("All document_ids must be of the same type")

if document_metadatas is not None:
if len(document_metadatas) != collection_len:
raise ValueError(
"document_metadatas must be the same length as collection"
)
docid_metadata_map = {
x: y for x, y in zip(document_ids, document_metadatas)
}
else:
docid_metadata_map = None

return document_ids, docid_metadata_map

def index(
self,
collection: list[str],
document_ids: Union[TypeVar("T"), List[TypeVar("T")]] = None,
document_metadatas: Optional[list[dict]] = None,
index_name: str = None,
overwrite_index: bool = True,
max_document_length: int = 256,
split_documents: bool = True,
document_splitter_fn: Optional[Callable] = llama_index_sentence_splitter,
preprocessing_fn: Optional[Union[Callable, list[Callable]]] = None,
):
"""Build an index from a collection of documents.
"""Build an index from a list of documents.

Parameters:
collection (list[str]): The collection of documents to index.
document_ids (Optional[list[str]]): An optional list of document ids. Ids will be generated at index time if not supplied.
index_name (str): The name of the index that will be built.
overwrite_index (bool): Whether to overwrite an existing index with the same name.
max_document_length (int): The maximum length of a document. Documents longer than this will be split into chunks.
split_documents (bool): Whether to split documents into chunks.
document_splitter_fn (Optional[Callable]): A function to split documents into chunks. If None and by default, will use the llama_index_sentence_splitter.
preprocessing_fn (Optional[Union[Callable, list[Callable]]]): A function or list of functions to preprocess documents. If None and by default, will not preprocess documents.

Returns:
index (str): The path to the index that was built.
"""

document_ids, docid_metadata_map = self._process_metadata(
document_ids=document_ids,
document_metadatas=document_metadatas,
collection_len=len(collection),
)

if split_documents or preprocessing_fn is not None:
self.corpus_processor = CorpusProcessor(
document_splitter_fn=document_splitter_fn if split_documents else None,
preprocessing_fn=preprocessing_fn,
)
collection = self.corpus_processor.process_corpus(
collection_with_ids = self.corpus_processor.process_corpus(
collection,
document_ids,
chunk_size=max_document_length,
)
else:
collection_with_ids = [
{"document_id": x, "content": y}
for x, y in zip(document_ids, collection)
]

pid_docid_map = {
index: item["document_id"] for index, item in enumerate(collection_with_ids)
}
collection = [x["content"] for x in collection_with_ids]

overwrite = "reuse"
if overwrite_index:
overwrite = True
return self.model.index(
collection,
index_name,
pid_docid_map=pid_docid_map,
docid_metadata_map=docid_metadata_map,
index_name=index_name,
max_document_length=max_document_length,
overwrite=overwrite,
)

def add_to_index(
self,
new_documents: list[str],
new_collection: list[str],
new_document_ids: Union[TypeVar("T"), List[TypeVar("T")]],
new_document_metadatas: Optional[list[dict]] = None,
index_name: Optional[str] = None,
split_documents: bool = True,
document_splitter_fn: Optional[Callable] = llama_index_sentence_splitter,
Expand All @@ -140,21 +206,59 @@ def add_to_index(
"""Add documents to an existing index.

Parameters:
new_documents (list[str]): The documents to add to the index.
new_collection (list[str]): The documents to add to the index.
new_document_metadatas (Optional[list[dict]]): An optional list of metadata dicts
index_name (Optional[str]): The name of the index to add documents to. If None and by default, will add documents to the already initialised one.
"""
new_document_ids, new_docid_metadata_map = self._process_metadata(
document_ids=new_document_ids,
document_metadatas=new_document_metadatas,
collection_len=len(new_collection),
)

if split_documents or preprocessing_fn is not None:
self.corpus_processor = CorpusProcessor(
document_splitter_fn=document_splitter_fn if split_documents else None,
preprocessing_fn=preprocessing_fn,
)
new_documents = self.corpus_processor.process_corpus(
new_documents,
new_collection_with_ids = self.corpus_processor.process_corpus(
new_collection,
new_document_ids,
chunk_size=self.model.config.doc_maxlen,
)
else:
new_collection_with_ids = [
{"document_id": x, "content": y}
for x, y in zip(new_document_ids, new_collection)
]

new_collection = [x["content"] for x in new_collection_with_ids]

new_pid_docid_map = {
index: item["document_id"]
for index, item in enumerate(new_collection_with_ids)
}

self.model.add_to_index(
new_documents,
new_collection,
new_pid_docid_map,
new_docid_metadata_map=new_docid_metadata_map,
index_name=index_name,
)

def delete_from_index(
self,
document_ids: Union[TypeVar("T"), List[TypeVar("T")]],
index_name: Optional[str] = None,
):
"""Delete documents from an index by their IDs.

Parameters:
document_ids (Union[TypeVar("T"), List[TypeVar("T")]]): The IDs of the documents to delete.
index_name (Optional[str]): The name of the index to delete documents from. If None and by default, will delete documents from the already initialised one.
"""
self.model.delete_from_index(
document_ids,
index_name=index_name,
)

Expand All @@ -177,12 +281,17 @@ def search(
zero_index_ranks (bool): Whether to zero the index ranks of the results. By default, result rank 1 is the highest ranked result

Returns:
results (Union[list[dict], list[list[dict]]]): A list of dict containing individual results for each query. If a list of queries is provided, returns a list of lists of dicts. Each result is a dict with keys `content`, `score` and `rank`.
results (Union[list[dict], list[list[dict]]]): A list of dict containing individual results for each query. If a list of queries is provided, returns a list of lists of dicts. Each result is a dict with keys `content`, `score`, `rank`, and 'document_id'. If metadata was indexed for the document, it will be returned under the "document_metadata" key.

Individual results are always in the format:
```python3
{"content": "text of the relevant passage", "score": 0.123456, "rank": 1}
{"content": "text of the relevant passage", "score": 0.123456, "rank": 1, "document_id": "x"}
```
or
```python3
{"content": "text of the relevant passage", "score": 0.123456, "rank": 1, "document_id": "x", "document_metadata": {"metadata_key": "metadata_value", ...}}
```

"""
return self.model.search(
query=query,
Expand Down
15 changes: 12 additions & 3 deletions ragatouille/data/corpus_processor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Callable, Optional, Union
from uuid import uuid4

from ragatouille.data.preprocessors import llama_index_sentence_splitter

Expand All @@ -15,15 +16,23 @@ def __init__(
def process_corpus(
self,
documents: list[str],
document_ids: Optional[list[str]] = None,
**splitter_kwargs,
) -> list[str]:
# TODO CHECK KWARGS
document_ids = (
[str(uuid4()) for _ in range(len(documents))]
if document_ids is None
else document_ids
)
if self.document_splitter_fn is not None:
documents = self.document_splitter_fn(documents, **splitter_kwargs)
documents = self.document_splitter_fn(
documents, document_ids, **splitter_kwargs
)
if self.preprocessing_fn is not None:
if isinstance(self.preprocessing_fn, list):
for fn in self.preprocessing_fn:
documents = fn(documents)
documents = fn(documents, document_ids)
return documents
return self.preprocessing_fn(documents)
return self.preprocessing_fn(documents, document_ids)
return documents
10 changes: 7 additions & 3 deletions ragatouille/data/preprocessors.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@
from llama_index.text_splitter import SentenceSplitter


def llama_index_sentence_splitter(documents: list[str], chunk_size=256):
def llama_index_sentence_splitter(
documents: list[str], document_ids: list[str], chunk_size=256
):
chunk_overlap = min(chunk_size / 4, min(chunk_size / 2, 64))
chunks = []
node_parser = SentenceSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
docs = [[Document(text=doc)] for doc in documents]
for doc in docs:
chunks += [node.text for node in node_parser(doc)]
for doc_id, doc in zip(document_ids, docs):
chunks += [
{"document_id": doc_id, "content": node.text} for node in node_parser(doc)
]
return chunks
Loading
Loading