Skip to content

Commit

Permalink
chages
Browse files Browse the repository at this point in the history
  • Loading branch information
adolkhan committed Sep 5, 2023
1 parent a0ddafa commit f3d7012
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 35 deletions.
14 changes: 9 additions & 5 deletions deeplake/core/vectorstore/deeplake_vectorstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import deeplake
from deeplake.constants import (
DEFAULT_VECTORSTORE_TENSORS,
MAX_BYTES_PER_MINUTE,
TARGET_BYTE_SIZE,
)
from deeplake.core.vectorstore import utils
from deeplake.core.vectorstore.vector_search import vector_search
Expand Down Expand Up @@ -157,7 +159,11 @@ def add(
embedding_data: Optional[Union[List, List[List]]] = None,
embedding_tensor: Optional[Union[str, List[str]]] = None,
return_ids: bool = False,
ingestion_batch_size: Optional[int] = None,
rate_limiter: Dict = {
"turn_on": False,
"bytes_per_minute": MAX_BYTES_PER_MINUTE,
},
batch_byte_size: int = TARGET_BYTE_SIZE,
**tensors,
) -> Optional[List[str]]:
"""Adding elements to deeplake vector store.
Expand Down Expand Up @@ -280,10 +286,8 @@ def add(
embedding_function=embedding_function,
embedding_data=embedding_data,
embedding_tensor=embedding_tensor,
ingestion_batch_size=ingestion_batch_size or self.ingestion_batch_size,
num_workers=0,
total_samples_processed=0,
logger=logger,
batch_byte_size=batch_byte_size,
rate_limiter=rate_limiter,
)

if self.verbose:
Expand Down
110 changes: 80 additions & 30 deletions deeplake/core/vectorstore/vector_search/dataset/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,11 +395,8 @@ def extend(
embedding_tensor: Union[str, List[str]],
processed_tensors: Dict[str, List[Any]],
dataset: deeplake.core.dataset.Dataset,
rate_limiter={
"turn_on": False,
"bytes_per_minute": MAX_BYTES_PER_MINUTE,
"target_byte_size": TARGET_BYTE_SIZE,
},
batch_byte_size: int,
rate_limiter: Dict,
):
"""
Function to extend the dataset with new data.
Expand All @@ -412,34 +409,20 @@ def extend(
dataset (deeplake.core.dataset.Dataset): Dataset to be extended.
"""
target_byte_size = rate_limiter["target_byte_size"]
bytes_per_minute = rate_limiter["bytes_per_minute"]

if embedding_function:
for func, data, tensor in zip(
embedding_function, embedding_data, embedding_tensor
):
data_batched = chunk_by_bytes(data, target_byte_size=target_byte_size)

# Calculate the number of batches you can send each minute
batches_per_minute = bytes_per_minute / target_byte_size

# Calculate sleep time in seconds between batches
sleep_time = 60 / batches_per_minute

data_iterator = data_iteratot_factory(
data, func, batch_byte_size, rate_limiter
)
embedded_data = []

for data_i in tqdm(
data_batched, total=len(data_batched), desc="Creating embedding data"
for data in tqdm(
data_iterator, total=len(data_iterator), desc="creating embeddings"
):
start = time.time()
embedded_data.append(func(data_i))
end = time.time()
if rate_limiter["turn_on"]:
# we need to take into account the time spent on openai call
diff = sleep_time - (end - start)
if diff > 0:
time.sleep(diff)
embedded_data.append(data)

try:
embedded_data = np.vstack(embedded_data).astype(dtype=np.float32)
except ValueError:
Expand All @@ -453,16 +436,81 @@ def extend(
dataset.extend(processed_tensors)


class DataIterator:
def __init__(self, data, func, batch_byte_size):
self.data = chunk_by_bytes(data, batch_byte_size)
self.data_itr = iter(self.data)
self.index = 0
self.func = func

def __iter__(self):
return self

def __next__(self):
if self.index >= len(self.data):
raise StopIteration
batch = next(self.data_itr)
batch = self.func(batch)
self.index += 1
return batch

def __len__(self):
return len(self.data)


class RateLimitedDataIterator:
def __init__(self, data, func, batch_byte_size, rate_limiter):
self.data = chunk_by_bytes(data, batch_byte_size)
self.data_iter = iter(self.data)
self.index = 0
self.rate_limiter = rate_limiter
self.bytes_per_minute = rate_limiter["bytes_per_minute"]
self.target_byte_size = batch_byte_size
self.func = func

def __iter__(self):
return self

def __next__(self):
if self.index >= len(self.data):
raise StopIteration
batch = next(self.data_iter)
self.index += 1
# Calculate the number of batches you can send each minute
batches_per_minute = self.bytes_per_minute / self.target_byte_size

# Calculate sleep time in seconds between batches
sleep_time = 60 / batches_per_minute

start = time.time()
batch = self.func(batch)
end = time.time()

# we need to take into account the time spent on openai call
diff = sleep_time - (end - start)
if diff > 0:
time.sleep(diff)
return batch

def __len__(self):
return len(self.data)


def data_iteratot_factory(data, func, batch_byte_size, rate_limiter):
if rate_limiter["turn_on"]:
return RateLimitedDataIterator(data, func, batch_byte_size, rate_limiter)
else:
return DataIterator(data, func, batch_byte_size)


def extend_or_ingest_dataset(
processed_tensors,
dataset,
embedding_function,
embedding_tensor,
embedding_data,
ingestion_batch_size,
num_workers,
total_samples_processed,
logger,
batch_byte_size,
rate_limiter,
):
# TODO: Add back the old logic with checkpointing after indexing is fixed
extend(
Expand All @@ -471,6 +519,8 @@ def extend_or_ingest_dataset(
embedding_tensor,
processed_tensors,
dataset,
batch_byte_size,
rate_limiter,
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ def mock_embedding_function(text):
embedding_tensor=["embedding"],
processed_tensors=processed_tensors,
dataset=dataset,
rate_limiter={"turn_on": True, "bytes_per_minute": MAX_BYTES_PER_MINUTE},
)
end_time = time.time()

Expand Down

0 comments on commit f3d7012

Please sign in to comment.