Skip to content

Commit

Permalink
Namespace listing and metadata access functions (#11)
Browse files Browse the repository at this point in the history
* Add metadata access functions

* Expose performance timing metrics in VectorResult

* Add server timing to performance metrics

* Fix perf header parsing

* Add list namespaces endpoint

* Add metadata cache to namespace objects

* Add code docs

* Remove unused import

* Rename list_namespaces() to just namespaces()
Parse created_at time

* Invalidate metadata after 1 read.

* Adjust print logging on request failures

* Linter fixes

* Add cache hit ratio to performance metrics
  • Loading branch information
xthexder authored Feb 8, 2024
1 parent 8e94dbf commit a3e67cc
Show file tree
Hide file tree
Showing 9 changed files with 337 additions and 52 deletions.
16 changes: 15 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@ $ pip install turbopuffer[fast]
2. Start using the API
```py
import turbopuffer as tpuf
tpuf.api_key = 'your-token' # Alternatively: export=TURBOPUFFER_API_KEY=your-token
tpuf.api_key = 'your-token' # Alternatively: export=TURBOPUFFER_API_KEY=your-token

# Open a namespace
ns = tpuf.Namespace('hello_world')

# Read namespace metadata
if ns.exists():
print(f'Namespace {ns.name} exists with {ns.dimensions()} dimensions and approximately {ns.approx_count()} vectors.')

# Upsert your dataset
ns.upsert(
ids=[1, 2],
Expand Down Expand Up @@ -53,6 +57,16 @@ print(vectors)
# VectorRow(id=2, vector=[0.30000001192092896, 0.4000000059604645], attributes={'name': 'foos'}, dist=0.001016080379486084),
# VectorRow(id=1, vector=[0.10000000149011612, 0.20000000298023224], attributes={'name': 'foo'}, dist=0.009067952632904053)
# ]

# List all namespaces
namespaces = tpuf.namespaces()
print('Total namespaces:', len(namespaces))
for namespace in namespaces:
print('Namespace', namespace.name, 'contains approximately', namespace.approx_count(),
'vectors with', namespace.dimensions(), 'dimensions.')

# Delete vectors using the separate delete method
ns.delete([1, 2])
```

Endpoint Documentation
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ classifiers = [
[tool.poetry.dependencies]
python = "^3.9"
requests = "^2.31"
iso8601 = "^2.1.0"
orjson = {version = "^3.9", optional = true}
numpy = {version = "^1.26.3", optional = true}
pandas = {version = "^2.1.4", optional = true}
Expand Down
12 changes: 12 additions & 0 deletions tests/test_readme.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
def test_readme():
ns = tpuf.Namespace(tests.test_prefix + 'hello_world')

if ns.exists():
print(f'Namespace {ns.name} exists with {ns.dimensions()} dimensions and approximately {ns.approx_count()} vectors.')

ns.upsert(
ids=[1, 2],
vectors=[[0.1, 0.2], [0.3, 0.4]],
Expand All @@ -29,4 +32,13 @@ def test_readme():
)
print(vectors)

namespaces = tpuf.namespaces()
print('Total namespaces:', len(namespaces))
for namespace in namespaces:
# print('Namespace', namespace.name, 'contains approximately', namespace.approx_count(),
# 'vectors with', namespace.dimensions(), 'dimensions.')
pass

ns.delete([1, 2])

ns.delete_all()
64 changes: 48 additions & 16 deletions tests/test_vectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,22 @@ def test_delete_vectors():
def test_upsert_columns():
ns = tpuf.Namespace(tests.test_prefix + 'client_test')

# Test upsert columns
ids = [0, 1, 2, 3]
vectors = [[0.0, 0.0], [0.1, 0.1], [0.2, 0.2], [0.3, 0.3]]
attributes = {
"key1": ["zero", "one", "two", "three"],
"key2": [" ", "a", "b", "c"],
"test": ["cols", "cols", "cols", "cols"],
}

# Test upsert columns with positional args
ns.upsert(ids, vectors, attributes)

# Test upsert columns with named args
ns.upsert(
ids=[0, 1, 2, 3],
vectors=[[0.0, 0.0], [0.1, 0.1], [0.2, 0.2], [0.3, 0.3]],
attributes={
"key1": ["zero", "one", "two", "three"],
"key2": [" ", "a", "b", "c"],
"test": ["cols", "cols", "cols", "cols"],
}
ids=ids,
vectors=vectors,
attributes=attributes
)

# Test upsert dict columns
Expand Down Expand Up @@ -273,6 +280,8 @@ def check_result(row, expected):
def test_list_vectors():
ns = tpuf.Namespace(tests.test_prefix + 'client_test')

assert ns.exists()

vector_set = ns.vectors()
set_str = str(vector_set)
assert set_str.startswith(f"VectorResult(namespace='{tests.test_prefix}client_test', offset=0, next_cursor='")
Expand All @@ -282,6 +291,37 @@ def test_list_vectors():
assert len(vector_set) == 98


def test_read_metadata():
ns = tpuf.Namespace(tests.test_prefix + 'client_test')

assert ns.exists()
assert ns.dimensions() == 2
assert ns.approx_count() == 92

all_ns = tpuf.namespaces()
assert ns in list(all_ns)


def test_delete_all():
ns = tpuf.Namespace(tests.test_prefix + 'client_test')
# print('Recall:', ns.recall())

assert ns.exists()
all_ns_start = tpuf.namespaces()
assert ns in iter(all_ns_start)

ns.delete_all_indexes()
ns.delete_all()

assert not ns.exists()
assert ns.dimensions() == 0
assert ns.approx_count() == 0

# all_ns_end = tpuf.namespaces()
# assert ns not in iter(all_ns_end)
# assert len(all_ns_end) < len(all_ns_start)


def test_string_ids():
ns = tpuf.Namespace(tests.test_prefix + 'string_ids')

Expand Down Expand Up @@ -378,11 +418,3 @@ def test_string_ids():
assert abs(row.dist - expected[i].dist) < 0.000001

ns.delete_all()


def test_delete_all():
ns = tpuf.Namespace(tests.test_prefix + 'client_test')
# print('Recall:', ns.recall())

ns.delete_all_indexes()
ns.delete_all()
3 changes: 2 additions & 1 deletion turbopuffer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
api_key = os.environ.get('TURBOPUFFER_API_KEY')
api_base_url = os.environ.get('TURBOPUFFER_API_BASE_URL', 'https://api.turbopuffer.com/v1')
upsert_batch_size = 5_000
max_retries = 6

try:
import orjson # extras = ["fast"]
Expand All @@ -24,7 +25,7 @@ def default(self, obj):
def dump_json_bytes(obj): return json.dumps(obj, cls=NumpyEncoder).encode()

from turbopuffer.version import VERSION
from turbopuffer.namespace import Namespace
from turbopuffer.namespace import Namespace, namespaces
from turbopuffer.vectors import VectorColumns, VectorRow, VectorResult
from turbopuffer.query import VectorQuery, FilterTuple
from turbopuffer.error import TurbopufferError, AuthenticationError, APIError
82 changes: 62 additions & 20 deletions turbopuffer/backend.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import json
import re
import time
import traceback
import requests
import turbopuffer as tpuf
import gzip
from turbopuffer.error import TurbopufferError, AuthenticationError, APIError
from turbopuffer.error import AuthenticationError, APIError
from typing import Optional, List


Expand Down Expand Up @@ -33,6 +34,12 @@ def __init__(self, api_key: Optional[str] = None):
'User-Agent': f'tpuf-python/{tpuf.VERSION} {requests.utils.default_headers()["User-Agent"]}',
})

def __eq__(self, other):
if isinstance(other, Backend):
return self.api_key == other.api_key and self.api_base_url == other.api_base_url
else:
return False

def make_api_request(self,
*args: List[str],
method: Optional[str] = None,
Expand All @@ -46,21 +53,22 @@ def make_api_request(self,
if query is not None:
request.params = query

performance = dict()
if payload is not None:
# before = time.monotonic()
payload_start = time.monotonic()
if isinstance(payload, dict):
# before = time.monotonic()
json_payload = tpuf.dump_json_bytes(payload)
# print('Json time:', time.monotonic() - before)
performance['json_time'] = time.monotonic() - payload_start
elif isinstance(payload, bytes):
json_payload = payload
else:
raise ValueError(f'Unsupported POST payload type: {type(payload)}')

gzip_start = time.monotonic()
gzip_payload = gzip.compress(json_payload, compresslevel=1)
# json_mebibytes = len(json_payload) / 1024 / 1024
# gzip_mebibytes = len(gzip_payload) / 1024 / 1024
# print(f'Gzip time ({json_mebibytes} MiB json / {gzip_mebibytes} MiB gzip):', time.monotonic() - before)
performance['gzip_time'] = time.monotonic() - gzip_start
if len(gzip_payload) > 0:
performance['gzip_ratio'] = len(json_payload) / len(gzip_payload)

request.headers.update({
'Content-Type': 'application/json',
Expand All @@ -70,17 +78,44 @@ def make_api_request(self,

prepared = self.session.prepare_request(request)

retry_attempts = 0
while retry_attempts < 3:
# before = time.monotonic()
retry_attempt = 0
while retry_attempt < tpuf.max_retries:
request_start = time.monotonic()
try:
# print(f'Sending request:', prepared.path_url, prepared.headers)
response = self.session.send(prepared, allow_redirects=False)
# print(f'Request time (HTTP {response.status_code}):', time.monotonic() - before)
performance['request_time'] = time.monotonic() - request_start
# print(f'Request time (HTTP {response.status_code}):', performance['request_time'])

if response.status_code > 500:
response.raise_for_status()

server_timing_str = response.headers.get('Server-Timing', '')
if len(server_timing_str) > 0:
match_cache_hit_ratio = re.match(r'.*cache_hit_ratio;ratio=([\d\.]+)', server_timing_str)
if match_cache_hit_ratio:
try:
performance['cache_hit_ratio'] = float(match_cache_hit_ratio.group(1))
except ValueError:
pass
match_processing = re.match(r'.*processing_time;dur=([\d\.]+)', server_timing_str)
if match_processing:
try:
performance['server_time'] = float(match_processing.group(1)) / 1000.0
except ValueError:
pass
match_exhaustive = re.match(r'.*exhaustive_search_count;count=([\d\.]+)', server_timing_str)
if match_exhaustive:
try:
performance['exhaustive_search_count'] = int(match_exhaustive.group(1))
except ValueError:
pass

if method == 'HEAD':
return dict(response.__dict__, **{
'performance': performance,
})

content_type = response.headers.get('Content-Type', 'text/plain')
if content_type == 'application/json':
try:
Expand All @@ -89,16 +124,23 @@ def make_api_request(self,
raise APIError(response.status_code, traceback.format_exception_only(err), response.text)

if response.ok:
# print("Total request time:", time.monotonic() - start)
return content
performance['total_time'] = time.monotonic() - start
return dict(response.__dict__, **{
'content': content,
'performance': performance,
})
else:
raise APIError(response.status_code, content.get('status', 'error'), content.get('error', ''))
else:
raise APIError(response.status_code, 'Server returned non-JSON response', response.text)
except requests.HTTPError:
print(traceback.format_exc())
print("retrying...")
retry_attempts += 1
time.sleep(2)
print("Total request time (failed):", time.monotonic() - start)
raise TurbopufferError('Failed after 3 retries')
except requests.HTTPError as http_err:
retry_attempt += 1
# print(traceback.format_exc())
if retry_attempt < tpuf.max_retries:
# print(f'Retrying request in {2 ** retry_attempt}s...')
time.sleep(2 ** retry_attempt) # exponential falloff up to 64 seconds for 6 retries.
else:
print(f'Request failed after {retry_attempt} attempts...')
raise APIError(http_err.response.status_code,
f'Request to {http_err.request.url} failed after {retry_attempt} attempts',
str(http_err))
Loading

0 comments on commit a3e67cc

Please sign in to comment.