From a3e67cc548e53f979b7e3883c52247712b5255e5 Mon Sep 17 00:00:00 2001 From: Jacob Wirth Date: Thu, 8 Feb 2024 12:29:36 -0800 Subject: [PATCH] Namespace listing and metadata access functions (#11) * Add metadata access functions * Expose performance timing metrics in VectorResult * Add server timing to performance metrics * Fix perf header parsing * Add list namespaces endpoint * Add metadata cache to namespace objects * Add code docs * Remove unused import * Rename list_namespaces() to just namespaces() Parse created_at time * Invalidate metadata after 1 read. * Adjust print logging on request failures * Linter fixes * Add cache hit ratio to performance metrics --- README.md | 16 +++- pyproject.toml | 1 + tests/test_readme.py | 12 +++ tests/test_vectors.py | 64 +++++++++---- turbopuffer/__init__.py | 3 +- turbopuffer/backend.py | 82 ++++++++++++---- turbopuffer/namespace.py | 201 ++++++++++++++++++++++++++++++++++++--- turbopuffer/query.py | 3 +- turbopuffer/vectors.py | 7 +- 9 files changed, 337 insertions(+), 52 deletions(-) diff --git a/README.md b/README.md index e722736..9d8ffc4 100644 --- a/README.md +++ b/README.md @@ -18,11 +18,15 @@ $ pip install turbopuffer[fast] 2. Start using the API ```py import turbopuffer as tpuf -tpuf.api_key = 'your-token' # Alternatively: export=TURBOPUFFER_API_KEY=your-token +tpuf.api_key = 'your-token' # Alternatively: export=TURBOPUFFER_API_KEY=your-token # Open a namespace ns = tpuf.Namespace('hello_world') +# Read namespace metadata +if ns.exists(): + print(f'Namespace {ns.name} exists with {ns.dimensions()} dimensions and approximately {ns.approx_count()} vectors.') + # Upsert your dataset ns.upsert( ids=[1, 2], @@ -53,6 +57,16 @@ print(vectors) # VectorRow(id=2, vector=[0.30000001192092896, 0.4000000059604645], attributes={'name': 'foos'}, dist=0.001016080379486084), # VectorRow(id=1, vector=[0.10000000149011612, 0.20000000298023224], attributes={'name': 'foo'}, dist=0.009067952632904053) # ] + +# List all namespaces +namespaces = tpuf.namespaces() +print('Total namespaces:', len(namespaces)) +for namespace in namespaces: + print('Namespace', namespace.name, 'contains approximately', namespace.approx_count(), + 'vectors with', namespace.dimensions(), 'dimensions.') + +# Delete vectors using the separate delete method +ns.delete([1, 2]) ``` Endpoint Documentation diff --git a/pyproject.toml b/pyproject.toml index c1a7ee5..eecabd9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,6 +31,7 @@ classifiers = [ [tool.poetry.dependencies] python = "^3.9" requests = "^2.31" +iso8601 = "^2.1.0" orjson = {version = "^3.9", optional = true} numpy = {version = "^1.26.3", optional = true} pandas = {version = "^2.1.4", optional = true} diff --git a/tests/test_readme.py b/tests/test_readme.py index 0443306..76a01b8 100644 --- a/tests/test_readme.py +++ b/tests/test_readme.py @@ -5,6 +5,9 @@ def test_readme(): ns = tpuf.Namespace(tests.test_prefix + 'hello_world') + if ns.exists(): + print(f'Namespace {ns.name} exists with {ns.dimensions()} dimensions and approximately {ns.approx_count()} vectors.') + ns.upsert( ids=[1, 2], vectors=[[0.1, 0.2], [0.3, 0.4]], @@ -29,4 +32,13 @@ def test_readme(): ) print(vectors) + namespaces = tpuf.namespaces() + print('Total namespaces:', len(namespaces)) + for namespace in namespaces: + # print('Namespace', namespace.name, 'contains approximately', namespace.approx_count(), + # 'vectors with', namespace.dimensions(), 'dimensions.') + pass + + ns.delete([1, 2]) + ns.delete_all() diff --git a/tests/test_vectors.py b/tests/test_vectors.py index ffc77c9..36eeed0 100644 --- a/tests/test_vectors.py +++ b/tests/test_vectors.py @@ -99,15 +99,22 @@ def test_delete_vectors(): def test_upsert_columns(): ns = tpuf.Namespace(tests.test_prefix + 'client_test') - # Test upsert columns + ids = [0, 1, 2, 3] + vectors = [[0.0, 0.0], [0.1, 0.1], [0.2, 0.2], [0.3, 0.3]] + attributes = { + "key1": ["zero", "one", "two", "three"], + "key2": [" ", "a", "b", "c"], + "test": ["cols", "cols", "cols", "cols"], + } + + # Test upsert columns with positional args + ns.upsert(ids, vectors, attributes) + + # Test upsert columns with named args ns.upsert( - ids=[0, 1, 2, 3], - vectors=[[0.0, 0.0], [0.1, 0.1], [0.2, 0.2], [0.3, 0.3]], - attributes={ - "key1": ["zero", "one", "two", "three"], - "key2": [" ", "a", "b", "c"], - "test": ["cols", "cols", "cols", "cols"], - } + ids=ids, + vectors=vectors, + attributes=attributes ) # Test upsert dict columns @@ -273,6 +280,8 @@ def check_result(row, expected): def test_list_vectors(): ns = tpuf.Namespace(tests.test_prefix + 'client_test') + assert ns.exists() + vector_set = ns.vectors() set_str = str(vector_set) assert set_str.startswith(f"VectorResult(namespace='{tests.test_prefix}client_test', offset=0, next_cursor='") @@ -282,6 +291,37 @@ def test_list_vectors(): assert len(vector_set) == 98 +def test_read_metadata(): + ns = tpuf.Namespace(tests.test_prefix + 'client_test') + + assert ns.exists() + assert ns.dimensions() == 2 + assert ns.approx_count() == 92 + + all_ns = tpuf.namespaces() + assert ns in list(all_ns) + + +def test_delete_all(): + ns = tpuf.Namespace(tests.test_prefix + 'client_test') + # print('Recall:', ns.recall()) + + assert ns.exists() + all_ns_start = tpuf.namespaces() + assert ns in iter(all_ns_start) + + ns.delete_all_indexes() + ns.delete_all() + + assert not ns.exists() + assert ns.dimensions() == 0 + assert ns.approx_count() == 0 + + # all_ns_end = tpuf.namespaces() + # assert ns not in iter(all_ns_end) + # assert len(all_ns_end) < len(all_ns_start) + + def test_string_ids(): ns = tpuf.Namespace(tests.test_prefix + 'string_ids') @@ -378,11 +418,3 @@ def test_string_ids(): assert abs(row.dist - expected[i].dist) < 0.000001 ns.delete_all() - - -def test_delete_all(): - ns = tpuf.Namespace(tests.test_prefix + 'client_test') - # print('Recall:', ns.recall()) - - ns.delete_all_indexes() - ns.delete_all() diff --git a/turbopuffer/__init__.py b/turbopuffer/__init__.py index 69e1d8c..0ebc8f9 100644 --- a/turbopuffer/__init__.py +++ b/turbopuffer/__init__.py @@ -3,6 +3,7 @@ api_key = os.environ.get('TURBOPUFFER_API_KEY') api_base_url = os.environ.get('TURBOPUFFER_API_BASE_URL', 'https://api.turbopuffer.com/v1') upsert_batch_size = 5_000 +max_retries = 6 try: import orjson # extras = ["fast"] @@ -24,7 +25,7 @@ def default(self, obj): def dump_json_bytes(obj): return json.dumps(obj, cls=NumpyEncoder).encode() from turbopuffer.version import VERSION -from turbopuffer.namespace import Namespace +from turbopuffer.namespace import Namespace, namespaces from turbopuffer.vectors import VectorColumns, VectorRow, VectorResult from turbopuffer.query import VectorQuery, FilterTuple from turbopuffer.error import TurbopufferError, AuthenticationError, APIError diff --git a/turbopuffer/backend.py b/turbopuffer/backend.py index 2f1ec53..b5b2992 100644 --- a/turbopuffer/backend.py +++ b/turbopuffer/backend.py @@ -1,10 +1,11 @@ import json +import re import time import traceback import requests import turbopuffer as tpuf import gzip -from turbopuffer.error import TurbopufferError, AuthenticationError, APIError +from turbopuffer.error import AuthenticationError, APIError from typing import Optional, List @@ -33,6 +34,12 @@ def __init__(self, api_key: Optional[str] = None): 'User-Agent': f'tpuf-python/{tpuf.VERSION} {requests.utils.default_headers()["User-Agent"]}', }) + def __eq__(self, other): + if isinstance(other, Backend): + return self.api_key == other.api_key and self.api_base_url == other.api_base_url + else: + return False + def make_api_request(self, *args: List[str], method: Optional[str] = None, @@ -46,21 +53,22 @@ def make_api_request(self, if query is not None: request.params = query + performance = dict() if payload is not None: - # before = time.monotonic() + payload_start = time.monotonic() if isinstance(payload, dict): - # before = time.monotonic() json_payload = tpuf.dump_json_bytes(payload) - # print('Json time:', time.monotonic() - before) + performance['json_time'] = time.monotonic() - payload_start elif isinstance(payload, bytes): json_payload = payload else: raise ValueError(f'Unsupported POST payload type: {type(payload)}') + gzip_start = time.monotonic() gzip_payload = gzip.compress(json_payload, compresslevel=1) - # json_mebibytes = len(json_payload) / 1024 / 1024 - # gzip_mebibytes = len(gzip_payload) / 1024 / 1024 - # print(f'Gzip time ({json_mebibytes} MiB json / {gzip_mebibytes} MiB gzip):', time.monotonic() - before) + performance['gzip_time'] = time.monotonic() - gzip_start + if len(gzip_payload) > 0: + performance['gzip_ratio'] = len(json_payload) / len(gzip_payload) request.headers.update({ 'Content-Type': 'application/json', @@ -70,17 +78,44 @@ def make_api_request(self, prepared = self.session.prepare_request(request) - retry_attempts = 0 - while retry_attempts < 3: - # before = time.monotonic() + retry_attempt = 0 + while retry_attempt < tpuf.max_retries: + request_start = time.monotonic() try: # print(f'Sending request:', prepared.path_url, prepared.headers) response = self.session.send(prepared, allow_redirects=False) - # print(f'Request time (HTTP {response.status_code}):', time.monotonic() - before) + performance['request_time'] = time.monotonic() - request_start + # print(f'Request time (HTTP {response.status_code}):', performance['request_time']) if response.status_code > 500: response.raise_for_status() + server_timing_str = response.headers.get('Server-Timing', '') + if len(server_timing_str) > 0: + match_cache_hit_ratio = re.match(r'.*cache_hit_ratio;ratio=([\d\.]+)', server_timing_str) + if match_cache_hit_ratio: + try: + performance['cache_hit_ratio'] = float(match_cache_hit_ratio.group(1)) + except ValueError: + pass + match_processing = re.match(r'.*processing_time;dur=([\d\.]+)', server_timing_str) + if match_processing: + try: + performance['server_time'] = float(match_processing.group(1)) / 1000.0 + except ValueError: + pass + match_exhaustive = re.match(r'.*exhaustive_search_count;count=([\d\.]+)', server_timing_str) + if match_exhaustive: + try: + performance['exhaustive_search_count'] = int(match_exhaustive.group(1)) + except ValueError: + pass + + if method == 'HEAD': + return dict(response.__dict__, **{ + 'performance': performance, + }) + content_type = response.headers.get('Content-Type', 'text/plain') if content_type == 'application/json': try: @@ -89,16 +124,23 @@ def make_api_request(self, raise APIError(response.status_code, traceback.format_exception_only(err), response.text) if response.ok: - # print("Total request time:", time.monotonic() - start) - return content + performance['total_time'] = time.monotonic() - start + return dict(response.__dict__, **{ + 'content': content, + 'performance': performance, + }) else: raise APIError(response.status_code, content.get('status', 'error'), content.get('error', '')) else: raise APIError(response.status_code, 'Server returned non-JSON response', response.text) - except requests.HTTPError: - print(traceback.format_exc()) - print("retrying...") - retry_attempts += 1 - time.sleep(2) - print("Total request time (failed):", time.monotonic() - start) - raise TurbopufferError('Failed after 3 retries') + except requests.HTTPError as http_err: + retry_attempt += 1 + # print(traceback.format_exc()) + if retry_attempt < tpuf.max_retries: + # print(f'Retrying request in {2 ** retry_attempt}s...') + time.sleep(2 ** retry_attempt) # exponential falloff up to 64 seconds for 6 retries. + else: + print(f'Request failed after {retry_attempt} attempts...') + raise APIError(http_err.response.status_code, + f'Request to {http_err.request.url} failed after {retry_attempt} attempts', + str(http_err)) diff --git a/turbopuffer/namespace.py b/turbopuffer/namespace.py index f6d379f..2416d94 100644 --- a/turbopuffer/namespace.py +++ b/turbopuffer/namespace.py @@ -1,4 +1,6 @@ import sys +import iso8601 +from turbopuffer.error import APIError from turbopuffer.vectors import Cursor, VectorResult, VectorColumns, VectorRow, batch_iter from turbopuffer.backend import Backend from turbopuffer.query import VectorQuery, FilterTuple @@ -16,6 +18,8 @@ class Namespace: name: str backend: Backend + metadata: Optional[dict] = None + def __init__(self, name: str, api_key: Optional[str] = None): """ Creates a new turbopuffer.Namespace object for querying the turbopuffer API. @@ -30,8 +34,62 @@ def __init__(self, name: str, api_key: Optional[str] = None): def __str__(self) -> str: return f'tpuf-namespace:{self.name}' + def __eq__(self, other): + if isinstance(other, Namespace): + return self.name == other.name and self.backend == other.backend + else: + return False + + def refresh_metadata(self): + response = self.backend.make_api_request('vectors', self.name, method='HEAD') + status_code = response.get('status_code') + if status_code == 200: + headers = response.get('headers', dict()) + dimensions = int(headers.get('x-turbopuffer-dimensions', '0')) + approx_count = int(headers.get('x-turbopuffer-approx-num-vectors', '0')) + self.metadata = { + 'exists': dimensions != 0, + 'dimensions': dimensions, + 'approx_count': approx_count, + } + elif status_code == 404: + self.metadata = { + 'exists': False, + 'dimensions': 0, + 'approx_count': 0, + } + else: + raise APIError(response.status_code, 'Unexpected status code', response.get('content')) + + def exists(self) -> bool: + """ + Returns True if the namespace exists, and False if the namespace is missing or empty. + """ + # Always refresh the exists check since metadata from namespaces() might be delayed. + self.refresh_metadata() + return self.metadata['exists'] + + def dimensions(self) -> int: + """ + Returns the number of vector dimensions stored in this namespace. + """ + if self.metadata is None or 'dimensions' not in self.metadata: + self.refresh_metadata() + return self.metadata.pop('dimensions', 0) + + def approx_count(self) -> int: + """ + Returns the approximate number of vectors stored in this namespace. + """ + if self.metadata is None or 'approx_count' not in self.metadata: + self.refresh_metadata() + return self.metadata.pop('approx_count', 0) + @overload - def upsert(self, ids: Union[List[int], List[str]], vectors: List[List[float]], attributes: Optional[Dict[str, List[Optional[str]]]] = None) -> None: + def upsert(self, + ids: Union[List[int], List[str]], + vectors: List[List[float]], + attributes: Optional[Dict[str, List[Optional[str]]]] = None) -> None: """ Creates or updates multiple vectors provided in a column-oriented layout. If this call succeeds, data is guaranteed to be durably written to object storage. @@ -76,6 +134,9 @@ def upsert(self, data=None, ids=None, vectors=None, attributes=None) -> None: return self.upsert(VectorColumns(ids=ids, vectors=vectors, attributes=attributes)) else: raise ValueError('upsert() requires both ids= and vectors= be set.') + elif ids is not None and attributes is None: + # Offset arguments to handle positional arguments case with no data field. + return self.upsert(VectorColumns(ids=data, vectors=ids, attributes=vectors)) elif isinstance(data, VectorColumns): # "if None in data.vectors:" is not supported because data.vectors might be a list of np.ndarray # None == pd.ndarray is an ambiguous comparison in this case. @@ -83,6 +144,9 @@ def upsert(self, data=None, ids=None, vectors=None, attributes=None) -> None: if vec is None: raise ValueError('upsert() call would result in a vector deletion, use Namespace.delete([ids...]) instead.') response = self.backend.make_api_request('vectors', self.name, payload=data.__dict__) + + assert response.get('content', dict()).get('status', '') == 'OK', f'Invalid upsert() response: {response}' + self.metadata = None # Invalidate cached metadata elif isinstance(data, VectorRow): raise ValueError('upsert() should be called on a list of vectors, got single vector.') elif isinstance(data, list): @@ -128,6 +192,7 @@ def upsert(self, data=None, ids=None, vectors=None, attributes=None) -> None: # time_diff = time.monotonic() - before # print(f"Batch {columns.ids[0]}..{columns.ids[-1]} time:", time_diff, '/', len(batch), '=', len(batch)/time_diff) # start = time.monotonic() + return elif isinstance(data, Iterable): # start = time.monotonic() for batch in batch_iter(data, tpuf.upsert_batch_size): @@ -142,8 +207,6 @@ def upsert(self, data=None, ids=None, vectors=None, attributes=None) -> None: else: raise ValueError(f'Unsupported data type: {type(data)}') - assert response.get('status', '') == 'OK', f'Invalid upsert() response: {response}' - def delete(self, ids: Union[int, str, List[int], List[str]]) -> None: """ Deletes vectors by id. @@ -162,7 +225,8 @@ def delete(self, ids: Union[int, str, List[int], List[str]]) -> None: else: raise ValueError(f'Unsupported ids type: {type(ids)}') - assert response.get('status', '') == 'OK', f'Invalid delete() response: {response}' + assert response.get('content', dict()).get('status', '') == 'OK', f'Invalid delete() response: {response}' + self.metadata = None # Invalidate cached metadata @overload def query(self, @@ -212,7 +276,9 @@ def query(self, raise ValueError(f'query() input type must be compatible with turbopuffer.VectorQuery: {type(query_data)}') response = self.backend.make_api_request('vectors', self.name, 'query', payload=query_data.__dict__) - return VectorResult(response, namespace=self) + result = VectorResult(response.get('content', dict()), namespace=self) + result.performance = response.get('performance') + return result def vectors(self, cursor: Optional[Cursor] = None) -> VectorResult: """ @@ -223,8 +289,11 @@ def vectors(self, cursor: Optional[Cursor] = None) -> VectorResult: """ response = self.backend.make_api_request('vectors', self.name, query={'cursor': cursor}) - next_cursor = response.pop('next_cursor', None) - return VectorResult(response, namespace=self, next_cursor=next_cursor) + content = response.get('content', dict()) + next_cursor = content.pop('next_cursor', None) + result = VectorResult(content, namespace=self, next_cursor=next_cursor) + result.performance = response.get('performance') + return result def delete_all_indexes(self) -> None: """ @@ -232,7 +301,7 @@ def delete_all_indexes(self) -> None: """ response = self.backend.make_api_request('vectors', self.name, 'index', method='DELETE') - assert response.get('status', '') == 'ok', f'Invalid delete_all_indexes() response: {response}' + assert response.get('content', dict()).get('status', '') == 'ok', f'Invalid delete_all_indexes() response: {response}' def delete_all(self) -> None: """ @@ -240,7 +309,8 @@ def delete_all(self) -> None: """ response = self.backend.make_api_request('vectors', self.name, method='DELETE') - assert response.get('status', '') == 'ok', f'Invalid delete_all() response: {response}' + assert response.get('content', dict()).get('status', '') == 'ok', f'Invalid delete_all() response: {response}' + self.metadata = None # Invalidate cached metadata def recall(self, num=20, top_k=10) -> float: """ @@ -253,5 +323,114 @@ def recall(self, num=20, top_k=10) -> float: """ response = self.backend.make_api_request('vectors', self.name, '_debug', 'recall', query={'num': num, 'top_k': top_k}) - assert 'recall' in response, f'Invalid recall() response: {response}' - return float(response.get('recall')) + content = response.get('content', dict()) + assert 'recall' in content, f'Invalid recall() response: {response}' + return float(content.get('recall')) + + +class NamespaceIterator: + """ + The VectorResult type represents a set of vectors that are the result of a query. + + A VectorResult can be treated as either a lazy iterator or a list by the user. + Reading the length of the result will internally buffer the full result. + """ + + backend: Backend + namespaces: List[Namespace] = [] + index: int = -1 + offset: int = 0 + next_cursor: Optional[Cursor] = None + + def __init__(self, backend: Backend, initial_set: Union[List[Namespace], List[dict]] = [], next_cursor: Optional[Cursor] = None): + self.backend = backend + self.index = -1 + self.offset = 0 + self.next_cursor = next_cursor + + if len(initial_set): + if isinstance(initial_set[0], Namespace): + self.namespaces = initial_set + else: + self.namespaces = NamespaceIterator.load_namespaces(backend.api_key, initial_set) + + def load_namespaces(api_key: Optional[str], initial_set: List[dict]) -> List[Namespace]: + output = [] + for input in initial_set: + ns = tpuf.Namespace(input['id'], api_key=api_key) + ns.metadata = { + 'exists': True, + 'dimensions': input['dimensions'], + 'approx_count': input['approx_count'], + # rfc3339 returned by the server is compatible with iso8601 + 'created_at': iso8601.parse_date(input['created_at']), + } + output.append(ns) + + return output + + def __str__(self) -> str: + str_list = [ns.name for ns in self.namespaces] + if not self.next_cursor and self.offset == 0: + return str(str_list) + else: + return ("NamespaceIterator(" + f"offset={self.offset}, " + f"next_cursor='{self.next_cursor}', " + f"namespaces={str_list})") + + def __len__(self) -> int: + assert self.offset == 0, "Can't call len(NamespaceIterator) after iterating" + assert self.index == -1, "Can't call len(NamespaceIterator) after iterating" + if not self.next_cursor: + return len(self.namespaces) + else: + it = iter(self) + self.namespaces = [next for next in it] + self.offset = 0 + self.index = -1 + self.next_cursor = None + return len(self.namespaces) + + def __getitem__(self, index) -> VectorRow: + if index >= len(self.namespaces) and self.next_cursor: + it = iter(self) + self.namespaces = [next for next in it] + self.offset = 0 + self.index = -1 + self.next_cursor = None + return self.namespaces[index] + + def __iter__(self) -> 'NamespaceIterator': + assert self.offset == 0, "Can't iterate over NamespaceIterator multiple times" + return NamespaceIterator(self.backend, self.namespaces, self.next_cursor) + + def __next__(self): + if self.index + 1 < len(self.namespaces): + self.index += 1 + return self.namespaces[self.index] + elif self.next_cursor is None: + raise StopIteration + else: + response = self.backend.make_api_request( + 'vectors', + query={'cursor': self.next_cursor} + ) + content = response.get('content', dict()) + self.offset += len(self.namespaces) + self.index = -1 + self.next_cursor = content.pop('next_cursor', None) + self.namespaces = NamespaceIterator.load_namespaces(self.backend.api_key, content.pop('namespaces', list())) + return self.__next__() + + +def namespaces(api_key: Optional[str] = None) -> Iterable[Namespace]: + """ + Lists all turbopuffer namespaces for a given api_key. + If no api_key is provided, the globally configured API key will be used. + """ + backend = Backend(api_key) + response = backend.make_api_request('vectors') + content = response.get('content', dict()) + next_cursor = content.pop('next_cursor', None) + return NamespaceIterator(backend, content.pop('namespaces', list()), next_cursor) diff --git a/turbopuffer/query.py b/turbopuffer/query.py index da08f23..3e03806 100644 --- a/turbopuffer/query.py +++ b/turbopuffer/query.py @@ -54,6 +54,7 @@ def __post_init__(self): # Support passing a single filter instead of a list self.filters[name] = [filter] elif len(filter) > 0 and not isinstance(filter[0], list): - raise ValueError(f'VectorQuery.filters expected a list of filters for key {name}, got list of:', type(filter[0])) + raise ValueError(f'VectorQuery.filters expected a list of filters for key {name}, got list of:', + type(filter[0])) else: raise ValueError(f'VectorQuery.filters expected a list for key {name}, got:', type(filter)) diff --git a/turbopuffer/vectors.py b/turbopuffer/vectors.py index d99f80a..8dfb719 100644 --- a/turbopuffer/vectors.py +++ b/turbopuffer/vectors.py @@ -264,6 +264,8 @@ class VectorResult: offset: int = 0 next_cursor: Optional[Cursor] = None + performance: Optional[dict] = None + def __init__(self, initial_data: Optional[DATA] = None, namespace: Optional['Namespace'] = None, next_cursor: Optional[Cursor] = None): self.namespace = namespace self.index = -1 @@ -340,8 +342,9 @@ def __next__(self): self.namespace.name, query={'cursor': self.next_cursor} ) + content = response.get('content', dict()) self.offset += len(self.data) self.index = -1 - self.next_cursor = response.pop('next_cursor', None) - self.data = VectorResult.load_data(response) + self.next_cursor = content.pop('next_cursor', None) + self.data = VectorResult.load_data(content) return self.__next__()