Skip to content

Commit

Permalink
Adjust default batch size, update import parquet example (#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
xthexder authored Feb 16, 2024
1 parent 826911d commit 1f839e3
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 30 deletions.
88 changes: 59 additions & 29 deletions examples/import_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,40 +4,66 @@
import os
import sys
import time
import pandas as pd
import turbopuffer as tpuf
from pyarrow.parquet import ParquetFile
import traceback
import threading
from queue import Queue, Full

NUM_THREADS = 4
NUM_THREADS = 8

# Update these values to resume upserting
START_OFFSET = 0
MAX_OFFSET = None # 1_000_000

# Adjust lower if request body ends up too large
tpuf.upsert_batch_size = 10_000

# Print in a way that doesn't mash lines from different threads together
def thread_print(message):
print(f'{message}\n', end='')


def read_docs_to_queue(queue, parquet_files, exiting):
try:
file_offset = 0
for parquet_file in parquet_files:
for file_path in parquet_files:
while queue.full() and not exiting.is_set():
time.sleep(1)
if exiting.is_set():
return
# Add any attribute columns to include after 'emb'
df = pd.read_parquet(parquet_file, columns=['emb']).rename(columns={'emb': 'vector'})
if 'id' not in df.keys():
df['id'] = range(file_offset, file_offset+len(df))
if file_offset >= START_OFFSET:
while not exiting.is_set():
try:
queue.put(df, timeout=1)
break
except Full:
pass
print(f'Loaded {parquet_file}, file_offset from {file_offset} to {file_offset + len(df)}')
parquet_file = ParquetFile(file_path, memory_map=True)
file_size = parquet_file.metadata.num_rows
if MAX_OFFSET is not None and file_offset > MAX_OFFSET:
thread_print(f'Skipping remaining files, file_offset from {file_offset} to {file_offset + file_size - 1}')
break
elif file_offset + file_size <= START_OFFSET:
thread_print(f'Skipped {file_path}, file_offset from {file_offset} to {file_offset + file_size - 1}')
file_offset += file_size
else:
print(f'Skipped {parquet_file}, file_offset from {file_offset} to {file_offset + len(df)}')
file_offset += len(df)
# Add any attribute columns to include after 'emb'
for i in parquet_file.iter_batches(batch_size=tpuf.upsert_batch_size, columns=['emb']):
df = i.to_pandas().rename(columns={'emb': 'vector'})
if 'id' not in df.keys():
df['id'] = range(file_offset, file_offset+len(df))
if file_offset + len(df) <= START_OFFSET:
thread_print(f'Skipped batch file_offset from {file_offset} to {file_offset + len(df) - 1}')
file_offset += len(df)
continue
elif MAX_OFFSET is not None and file_offset > MAX_OFFSET:
thread_print(f'Skipping remaining files, file_offset from {file_offset} to {file_offset + len(df) - 1}')
break

while not exiting.is_set():
try:
queue.put(df, timeout=1)
break
except Full:
pass
thread_print(f'Loaded {file_path}, file_offset from {file_offset} to {file_offset + len(df) - 1}')
file_offset += len(df)
except Exception:
print('Failed to read batch:')
thread_print('Failed to read batch:')
traceback.print_exc()
for _ in range(0, NUM_THREADS):
queue.put(None) # Signal the end of the documents
Expand All @@ -49,10 +75,12 @@ def upsert_docs_from_queue(input_queue, dataset_name, exiting):
batch = input_queue.get()
while batch is not None and not exiting.is_set():
try:
before = time.monotonic()
ns.upsert(batch)
print(f"Completed {batch['id'][0]}..{batch['id'][batch.shape[0]-1]}")
time_diff = time.monotonic() - before
thread_print(f"Completed {batch['id'][0]}..{batch['id'][batch.shape[0]-1]} time: {time_diff} / {len(batch)} = {len(batch)/time_diff}")
except Exception:
print(f"Failed to upsert batch: {batch['id'][0]}..{batch['id'][batch.shape[0]-1]}")
thread_print(f"Failed to upsert batch: {batch['id'][0]}..{batch['id'][batch.shape[0]-1]}")
traceback.print_exc()
batch = input_queue.get()

Expand All @@ -62,22 +90,24 @@ def main(dataset_name, input_path):
parquet_files = glob.glob(input_glob)

if len(parquet_files) == 0:
print(f"No .parquet files found in: {input_glob}")
thread_print(f"No .parquet files found in: {input_glob}")
sys.exit(1)

sorted_files = sorted(sorted(parquet_files), key=len)

ns = tpuf.Namespace(dataset_name)
if ns.exists():
print(f'The namespace "{ns.name}" already exists!')
thread_print(f'The namespace "{ns.name}" already exists!')
existing_dims = ns.dimensions()
print(f'Vectors: {ns.approx_count()}, dimensions: {existing_dims}')
thread_print(f'Vectors: {ns.approx_count()}, dimensions: {existing_dims}')
response = input('Delete namespace? [y/N]: ')
if response == 'y':
ns.delete_all()
else:
print('Cancelled')
sys.exit(1)
response2 = input(f'Resume upsert from {START_OFFSET}? [y/N]: ')
if response2 != 'y':
thread_print('Cancelled')
sys.exit(1)

exiting = threading.Event()
doc_queue = Queue(NUM_THREADS)
Expand All @@ -102,14 +132,14 @@ def main(dataset_name, input_path):
exiting.set()
sys.exit(1)
finally:
print('DONE!')
print('Took:', (time.monotonic() - start_time), 'seconds')
thread_print('DONE!')
thread_print(f'Took: {time.monotonic() - start_time:.3f} seconds')


if __name__ == "__main__":
if len(sys.argv) != 3:
print(f"Usage: {sys.argv[0]} <dataset_name> <input_folder>\n"
" Default TURBOPUFFER_API_KEY will be used from environment.")
thread_print(f"Usage: {sys.argv[0]} <dataset_name> <input_folder>\n"
" Default TURBOPUFFER_API_KEY will be used from environment.")
sys.exit(1)

main(sys.argv[1], sys.argv[2])
2 changes: 1 addition & 1 deletion turbopuffer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import sys
api_key = os.environ.get('TURBOPUFFER_API_KEY')
api_base_url = os.environ.get('TURBOPUFFER_API_BASE_URL', 'https://api.turbopuffer.com/v1')
upsert_batch_size = 5_000
upsert_batch_size = 10_000
max_retries = 6

try:
Expand Down

0 comments on commit 1f839e3

Please sign in to comment.