diff --git a/object-store-rs/python/object_store_rs/_delete.pyi b/object-store-rs/python/object_store_rs/_delete.pyi index 1317fd2..1ed0369 100644 --- a/object-store-rs/python/object_store_rs/_delete.pyi +++ b/object-store-rs/python/object_store_rs/_delete.pyi @@ -1,14 +1,24 @@ +from typing import Sequence + from .store import ObjectStore -def delete(store: ObjectStore, location: str) -> None: - """Delete the object at the specified location. +def delete(store: ObjectStore, locations: str | Sequence[str]) -> None: + """Delete the object at the specified location(s). Args: store: The ObjectStore instance to use. - location: The path within ObjectStore to delete. + locations: The path or paths within the store to delete. + + When supported by the underlying store, this method will use bulk operations + that delete more than one object per a request. + + If the object did not exist, the result may be an error or a success, + depending on the behavior of the underlying store. For example, local + filesystems, GCP, and Azure return an error, while S3 and in-memory will + return Ok. """ -async def delete_async(store: ObjectStore, location: str) -> None: +async def delete_async(store: ObjectStore, locations: str | Sequence[str]) -> None: """Call `delete` asynchronously. Refer to the documentation for [delete][object_store_rs.delete]. diff --git a/object-store-rs/src/delete.rs b/object-store-rs/src/delete.rs index aaf47b5..f770325 100644 --- a/object-store-rs/src/delete.rs +++ b/object-store-rs/src/delete.rs @@ -1,28 +1,83 @@ +use futures::{StreamExt, TryStreamExt}; +use object_store::path::Path; +use pyo3::exceptions::PyTypeError; use pyo3::prelude::*; use pyo3_object_store::error::{PyObjectStoreError, PyObjectStoreResult}; use pyo3_object_store::PyObjectStore; use crate::runtime::get_runtime; +pub(crate) enum PyLocations { + One(Path), + // TODO: also support an Arrow String Array here. + Many(Vec), +} + +impl<'py> FromPyObject<'py> for PyLocations { + fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult { + if let Ok(path) = ob.extract::() { + Ok(Self::One(path.into())) + } else if let Ok(paths) = ob.extract::>() { + Ok(Self::Many( + paths.into_iter().map(|path| path.into()).collect(), + )) + } else { + Err(PyTypeError::new_err( + "Expected string path or sequence of string paths.", + )) + } + } +} + #[pyfunction] -pub fn delete(py: Python, store: PyObjectStore, location: String) -> PyObjectStoreResult<()> { +pub(crate) fn delete( + py: Python, + store: PyObjectStore, + locations: PyLocations, +) -> PyObjectStoreResult<()> { let runtime = get_runtime(py)?; let store = store.into_inner(); - py.allow_threads(|| { - runtime.block_on(store.delete(&location.into()))?; + match locations { + PyLocations::One(path) => { + runtime.block_on(store.delete(&path))?; + } + PyLocations::Many(paths) => { + // TODO: add option to allow some errors here? + let stream = + store.delete_stream(futures::stream::iter(paths.into_iter().map(Ok)).boxed()); + runtime.block_on(stream.try_collect::>())?; + } + }; Ok::<_, PyObjectStoreError>(()) }) } #[pyfunction] -pub fn delete_async(py: Python, store: PyObjectStore, location: String) -> PyResult> { - let store = store.into_inner().clone(); +pub(crate) fn delete_async( + py: Python, + store: PyObjectStore, + locations: PyLocations, +) -> PyResult> { + let store = store.into_inner(); pyo3_async_runtimes::tokio::future_into_py(py, async move { - store - .delete(&location.into()) - .await - .map_err(PyObjectStoreError::ObjectStoreError)?; + match locations { + PyLocations::One(path) => { + store + .delete(&path) + .await + .map_err(PyObjectStoreError::ObjectStoreError)?; + } + PyLocations::Many(paths) => { + // TODO: add option to allow some errors here? + let stream = + store.delete_stream(futures::stream::iter(paths.into_iter().map(Ok)).boxed()); + stream + .try_collect::>() + .await + .map_err(PyObjectStoreError::ObjectStoreError)?; + } + } Ok(()) }) } diff --git a/tests/test_delete.py b/tests/test_delete.py new file mode 100644 index 0000000..cbf44d9 --- /dev/null +++ b/tests/test_delete.py @@ -0,0 +1,74 @@ +from tempfile import TemporaryDirectory + +import object_store_rs as obs +import pytest +from object_store_rs.store import LocalStore, MemoryStore + + +def test_delete_one(): + store = MemoryStore() + + obs.put_file(store, "file1.txt", b"foo") + obs.put_file(store, "file2.txt", b"bar") + obs.put_file(store, "file3.txt", b"baz") + + assert len(obs.list(store)) == 3 + obs.delete(store, "file1.txt") + obs.delete(store, "file2.txt") + obs.delete(store, "file3.txt") + assert len(obs.list(store)) == 0 + + +def test_delete_many(): + store = MemoryStore() + + obs.put_file(store, "file1.txt", b"foo") + obs.put_file(store, "file2.txt", b"bar") + obs.put_file(store, "file3.txt", b"baz") + + assert len(obs.list(store)) == 3 + obs.delete( + store, + ["file1.txt", "file2.txt", "file3.txt"], + ) + assert len(obs.list(store)) == 0 + + +# Local filesystem errors if the file does not exist. +def test_delete_one_local_fs(): + with TemporaryDirectory() as tmpdir: + store = LocalStore(tmpdir) + + obs.put_file(store, "file1.txt", b"foo") + obs.put_file(store, "file2.txt", b"bar") + obs.put_file(store, "file3.txt", b"baz") + + assert len(obs.list(store)) == 3 + obs.delete(store, "file1.txt") + obs.delete(store, "file2.txt") + obs.delete(store, "file3.txt") + assert len(obs.list(store)) == 0 + + with pytest.raises(Exception, match="No such file"): + obs.delete(store, "file1.txt") + + +def test_delete_many_local_fs(): + with TemporaryDirectory() as tmpdir: + store = LocalStore(tmpdir) + + obs.put_file(store, "file1.txt", b"foo") + obs.put_file(store, "file2.txt", b"bar") + obs.put_file(store, "file3.txt", b"baz") + + assert len(obs.list(store)) == 3 + obs.delete( + store, + ["file1.txt", "file2.txt", "file3.txt"], + ) + + with pytest.raises(Exception, match="No such file"): + obs.delete( + store, + ["file1.txt", "file2.txt", "file3.txt"], + )