Skip to content

Commit

Permalink
Add max_items to list
Browse files Browse the repository at this point in the history
  • Loading branch information
kylebarron committed Oct 21, 2024
1 parent c157fe2 commit e798438
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 8 deletions.
11 changes: 9 additions & 2 deletions object-store-rs/python/object_store_rs/_list.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ class ListResult(TypedDict):
objects: List[ObjectMeta]
"""Object metadata for the listing"""

def list(store: ObjectStore, prefix: str | None = None) -> List[ObjectMeta]:
def list(
store: ObjectStore, prefix: str | None = None, *, max_items: int | None = 2000
) -> List[ObjectMeta]:
"""
List all the objects with the given prefix.
Expand All @@ -56,11 +58,16 @@ def list(store: ObjectStore, prefix: str | None = None) -> List[ObjectMeta]:
store: The ObjectStore instance to use.
prefix: The prefix within ObjectStore to use for listing. Defaults to None.
Keyword Args:
max_items: The maximum number of items to return. Defaults to 2000.
Returns:
A list of `ObjectMeta`.
"""

async def list_async(store: ObjectStore, prefix: str | None = None) -> List[ObjectMeta]:
async def list_async(
store: ObjectStore, prefix: str | None = None, *, max_items: int | None = 2000
) -> List[ObjectMeta]:
"""Call `list` asynchronously.
Refer to the documentation for [list][object_store_rs.list].
Expand Down
31 changes: 25 additions & 6 deletions object-store-rs/src/list.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::HashMap;
use std::sync::Arc;

use futures::TryStreamExt;
use futures::StreamExt;
use object_store::path::Path;
use object_store::{ListResult, ObjectMeta, ObjectStore};
use pyo3::prelude::*;
Expand Down Expand Up @@ -58,41 +58,60 @@ impl IntoPy<PyObject> for PyListResult {
}

#[pyfunction]
#[pyo3(signature = (store, prefix = None))]
#[pyo3(signature = (store, prefix = None, *, max_items = 2000))]
pub(crate) fn list(
py: Python,
store: PyObjectStore,
prefix: Option<String>,
max_items: Option<usize>,
) -> PyObjectStoreResult<Vec<PyObjectMeta>> {
let runtime = get_runtime(py)?;
py.allow_threads(|| {
let out = runtime.block_on(list_materialize(
store.into_inner(),
prefix.map(|s| s.into()).as_ref(),
max_items,
))?;
Ok::<_, PyObjectStoreError>(out)
})
}

#[pyfunction]
#[pyo3(signature = (store, prefix = None))]
#[pyo3(signature = (store, prefix = None, *, max_items = 2000))]
pub(crate) fn list_async(
py: Python,
store: PyObjectStore,
prefix: Option<String>,
max_items: Option<usize>,
) -> PyResult<Bound<PyAny>> {
pyo3_async_runtimes::tokio::future_into_py(py, async move {
let out = list_materialize(store.into_inner(), prefix.map(|s| s.into()).as_ref()).await?;
let out = list_materialize(
store.into_inner(),
prefix.map(|s| s.into()).as_ref(),
max_items,
)
.await?;
Ok(out)
})
}

async fn list_materialize(
store: Arc<dyn ObjectStore>,
prefix: Option<&Path>,
max_items: Option<usize>,
) -> PyObjectStoreResult<Vec<PyObjectMeta>> {
let list_result = store.list(prefix).try_collect::<Vec<_>>().await?;
Ok(list_result.into_iter().map(PyObjectMeta).collect())
let mut stream = store.list(prefix);
let mut result = vec![];
while let Some(object) = stream.next().await {
result.push(PyObjectMeta(object?));
if let Some(max_items) = max_items {
if result.len() >= max_items {
return Ok(result);
}
}
}

Ok(result)
}

#[pyfunction]
Expand Down
15 changes: 15 additions & 0 deletions tests/test_list.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import object_store_rs as obs
from object_store_rs.store import MemoryStore


def test_list_max_items():
store = MemoryStore()

obs.put_file(store, "file1.txt", b"foo")
obs.put_file(store, "file2.txt", b"bar")
obs.put_file(store, "file3.txt", b"baz")

assert len(obs.list(store)) == 3
assert len(obs.list(store, max_items=2)) == 2
assert len(obs.list(store, max_items=1)) == 1
assert len(obs.list(store, max_items=0)) == 1

0 comments on commit e798438

Please sign in to comment.