From 246c1353343d352a342c1a9324f308030d02c41b Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Thu, 17 Oct 2024 10:03:19 -0400 Subject: [PATCH] Implement end-user-facing Python lib wrapping ObjectStore (#240) * sketch of python object-store api * put in arc * Progress on Python API * Added Error enum to pyo3-object_store * cleaner signer impl * add delete * Add list * Update get and list * get * copy * Working put!! * generalize put a bit * Head * Rename * remove empty file * Rename back to put_file * Configurable chunk size for multipart upload * multipart visibility --- Cargo.lock | 18 ++ Cargo.toml | 5 + arro3-io/Cargo.toml | 4 +- object-store-rs/Cargo.toml | 36 +++ object-store-rs/README.md | 1 + object-store-rs/pyproject.toml | 20 ++ .../python/object_store_rs/__init__.py | 4 + .../python/object_store_rs/_copy.pyi | 6 + .../python/object_store_rs/_delete.pyi | 4 + .../python/object_store_rs/_get.pyi | 114 ++++++++++ .../python/object_store_rs/_head.pyi | 5 + .../python/object_store_rs/_list.pyi | 45 ++++ .../object_store_rs/_object_store_rs.pyi | 32 +++ .../python/object_store_rs/_put.pyi | 21 ++ .../object_store_rs/_pyo3_object_store.pyi | 115 ++++++++++ .../python/object_store_rs/_rename.pyi | 8 + .../python/object_store_rs/_sign.pyi | 16 ++ object-store-rs/src/copy.rs | 68 ++++++ object-store-rs/src/delete.rs | 28 +++ object-store-rs/src/get.rs | 211 ++++++++++++++++++ object-store-rs/src/head.rs | 33 +++ object-store-rs/src/lib.rs | 55 +++++ object-store-rs/src/list.rs | 136 +++++++++++ object-store-rs/src/put.rs | 119 ++++++++++ object-store-rs/src/rename.rs | 72 ++++++ object-store-rs/src/runtime.rs | 18 ++ object-store-rs/src/signer.rs | 161 +++++++++++++ pyo3-object_store/src/aws.rs | 17 +- pyo3-object_store/src/azure.rs | 13 +- pyo3-object_store/src/client.rs | 6 +- pyo3-object_store/src/error.rs | 41 ++++ pyo3-object_store/src/gcp.rs | 13 +- pyo3-object_store/src/http.rs | 5 +- pyo3-object_store/src/lib.rs | 1 + pyo3-object_store/src/local.rs | 6 +- pyproject.toml | 3 - 36 files changed, 1429 insertions(+), 31 deletions(-) create mode 100644 object-store-rs/Cargo.toml create mode 100644 object-store-rs/README.md create mode 100644 object-store-rs/pyproject.toml create mode 100644 object-store-rs/python/object_store_rs/__init__.py create mode 100644 object-store-rs/python/object_store_rs/_copy.pyi create mode 100644 object-store-rs/python/object_store_rs/_delete.pyi create mode 100644 object-store-rs/python/object_store_rs/_get.pyi create mode 100644 object-store-rs/python/object_store_rs/_head.pyi create mode 100644 object-store-rs/python/object_store_rs/_list.pyi create mode 100644 object-store-rs/python/object_store_rs/_object_store_rs.pyi create mode 100644 object-store-rs/python/object_store_rs/_put.pyi create mode 100644 object-store-rs/python/object_store_rs/_pyo3_object_store.pyi create mode 100644 object-store-rs/python/object_store_rs/_rename.pyi create mode 100644 object-store-rs/python/object_store_rs/_sign.pyi create mode 100644 object-store-rs/src/copy.rs create mode 100644 object-store-rs/src/delete.rs create mode 100644 object-store-rs/src/get.rs create mode 100644 object-store-rs/src/head.rs create mode 100644 object-store-rs/src/lib.rs create mode 100644 object-store-rs/src/list.rs create mode 100644 object-store-rs/src/put.rs create mode 100644 object-store-rs/src/rename.rs create mode 100644 object-store-rs/src/runtime.rs create mode 100644 object-store-rs/src/signer.rs create mode 100644 pyo3-object_store/src/error.rs diff --git a/Cargo.lock b/Cargo.lock index b77d455..dd01026 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -476,8 +476,10 @@ checksum = "a21f936df1771bf62b77f047b726c4625ff2e8aa607c01ec06e5a05bd8463401" dependencies = [ "android-tzdata", "iana-time-zone", + "js-sys", "num-traits", "serde", + "wasm-bindgen", "windows-targets", ] @@ -1291,6 +1293,22 @@ dependencies = [ "memchr", ] +[[package]] +name = "object-store-rs" +version = "0.4.2" +dependencies = [ + "bytes", + "chrono", + "futures", + "http", + "object_store", + "pyo3", + "pyo3-async-runtimes", + "pyo3-object_store", + "tokio", + "url", +] + [[package]] name = "object_store" version = "0.11.0" diff --git a/Cargo.toml b/Cargo.toml index a295448..3c4ca1e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,7 @@ members = [ "arro3-compute", "arro3-core", "arro3-io", + "object-store-rs", "pyo3-arrow", "pyo3-object_store", ] @@ -29,12 +30,16 @@ arrow-csv = "53" arrow-ipc = { version = "53", features = ["lz4", "zstd"] } arrow-schema = "53" arrow-select = "53" +bytes = "1.7.0" half = "2" indexmap = "2" numpy = "0.22" object_store = "0.11" parquet = "53" pyo3 = { version = "0.22", features = ["macros", "indexmap"] } +pyo3-async-runtimes = { git = "https://github.com/PyO3/pyo3-async-runtimes", features = [ + "tokio-runtime", +] } pyo3-file = "0.9" thiserror = "1" diff --git a/arro3-io/Cargo.toml b/arro3-io/Cargo.toml index 80eac53..9fbbf53 100644 --- a/arro3-io/Cargo.toml +++ b/arro3-io/Cargo.toml @@ -35,13 +35,13 @@ arrow-buffer = { workspace = true } arrow-csv = { workspace = true } arrow-ipc = { workspace = true } arrow-schema = { workspace = true } -bytes = "1.7.0" +bytes = { workspace = true } futures = { version = "0.3.30", optional = true } object_store = { workspace = true, optional = true } parquet = { workspace = true } pyo3 = { workspace = true } pyo3-arrow = { path = "../pyo3-arrow" } -pyo3-async-runtimes = { git = "https://github.com/PyO3/pyo3-async-runtimes", features = [ +pyo3-async-runtimes = { workspace = true, features = [ "tokio-runtime", ], optional = true } pyo3-file = { workspace = true } diff --git a/object-store-rs/Cargo.toml b/object-store-rs/Cargo.toml new file mode 100644 index 0000000..0b3beff --- /dev/null +++ b/object-store-rs/Cargo.toml @@ -0,0 +1,36 @@ +[package] +name = "object-store-rs" +version = { workspace = true } +authors = { workspace = true } +edition = { workspace = true } +description = "Core library for representing Arrow data in Python." +readme = "README.md" +repository = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +keywords = { workspace = true } +categories = { workspace = true } +rust-version = { workspace = true } + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[lib] +name = "_object_store_rs" +crate-type = ["cdylib"] + +[dependencies] +bytes = { workspace = true } +chrono = "0.4.38" +futures = "0.3.31" +http = "1.1" +object_store = { workspace = true } +pyo3 = { workspace = true, features = ["chrono"] } +pyo3-async-runtimes = { workspace = true, features = ["tokio-runtime"] } +pyo3-file = { workspace = true } +pyo3-object_store = { path = "../pyo3-object_store" } +tokio = { version = "1.40", features = [ + "macros", + "rt", + "rt-multi-thread", + "sync", +] } +url = "2" diff --git a/object-store-rs/README.md b/object-store-rs/README.md new file mode 100644 index 0000000..0adcf9d --- /dev/null +++ b/object-store-rs/README.md @@ -0,0 +1 @@ +# object-store-rs diff --git a/object-store-rs/pyproject.toml b/object-store-rs/pyproject.toml new file mode 100644 index 0000000..5a8f859 --- /dev/null +++ b/object-store-rs/pyproject.toml @@ -0,0 +1,20 @@ +[build-system] +requires = ["maturin>=1.4.0,<2.0"] +build-backend = "maturin" + +[project] +name = "object-store-rs" +requires-python = ">=3.9" +dependencies = [] +classifiers = [ + "Programming Language :: Rust", + "Programming Language :: Python :: Implementation :: CPython", + "Programming Language :: Python :: Implementation :: PyPy", +] +dynamic = ["version"] + +[tool.maturin] +features = ["pyo3/extension-module"] +module-name = "object_store_rs._object_store_rs" +python-source = "python" +strip = true diff --git a/object-store-rs/python/object_store_rs/__init__.py b/object-store-rs/python/object_store_rs/__init__.py new file mode 100644 index 0000000..e542bee --- /dev/null +++ b/object-store-rs/python/object_store_rs/__init__.py @@ -0,0 +1,4 @@ +from ._object_store_rs import * +from ._object_store_rs import ___version + +__version__: str = ___version() diff --git a/object-store-rs/python/object_store_rs/_copy.pyi b/object-store-rs/python/object_store_rs/_copy.pyi new file mode 100644 index 0000000..c818902 --- /dev/null +++ b/object-store-rs/python/object_store_rs/_copy.pyi @@ -0,0 +1,6 @@ +from ._pyo3_object_store import ObjectStore + +def copy(store: ObjectStore, from_: str, to: str) -> None: ... +async def copy_async(store: ObjectStore, from_: str, to: str) -> None: ... +def copy_if_not_exists(store: ObjectStore, from_: str, to: str) -> None: ... +async def copy_if_not_exists_async(store: ObjectStore, from_: str, to: str) -> None: ... diff --git a/object-store-rs/python/object_store_rs/_delete.pyi b/object-store-rs/python/object_store_rs/_delete.pyi new file mode 100644 index 0000000..400d645 --- /dev/null +++ b/object-store-rs/python/object_store_rs/_delete.pyi @@ -0,0 +1,4 @@ +from ._pyo3_object_store import ObjectStore + +def delete(store: ObjectStore, location: str) -> None: ... +async def delete_async(store: ObjectStore, location: str) -> None: ... diff --git a/object-store-rs/python/object_store_rs/_get.pyi b/object-store-rs/python/object_store_rs/_get.pyi new file mode 100644 index 0000000..ce3d969 --- /dev/null +++ b/object-store-rs/python/object_store_rs/_get.pyi @@ -0,0 +1,114 @@ +from datetime import datetime +from typing import Sequence, TypedDict + +from ._list import ObjectMeta +from ._pyo3_object_store import ObjectStore +from ._sign import HTTP_METHOD as HTTP_METHOD +from ._sign import SignCapableStore as SignCapableStore +from ._sign import sign_url as sign_url +from ._sign import sign_url_async as sign_url_async + +class GetOptions(TypedDict): + """Options for a get request, such as range""" + + if_match: str | None + """ + Request will succeed if the `ObjectMeta::e_tag` matches + otherwise returning [`Error::Precondition`] + + See + + Examples: + + ```text + If-Match: "xyzzy" + If-Match: "xyzzy", "r2d2xxxx", "c3piozzzz" + If-Match: * + ``` + """ + + if_none_match: str | None + """ + Request will succeed if the `ObjectMeta::e_tag` does not match + otherwise returning [`Error::NotModified`] + + See + + Examples: + + ```text + If-None-Match: "xyzzy" + If-None-Match: "xyzzy", "r2d2xxxx", "c3piozzzz" + If-None-Match: * + ``` + """ + + if_unmodified_since: datetime | None + """ + Request will succeed if the object has been modified since + + + """ + + if_modified_since: datetime | None + """ + Request will succeed if the object has not been modified since + otherwise returning [`Error::Precondition`] + + Some stores, such as S3, will only return `NotModified` for exact + timestamp matches, instead of for any timestamp greater than or equal. + + + """ + + # range: + """ + Request transfer of only the specified range of bytes + otherwise returning [`Error::NotModified`] + + + """ + + version: str | None + """ + Request a particular object version + """ + + head: bool + """ + Request transfer of no content + + + """ + +class GetResult: + def bytes(self) -> bytes: + """ + Collects the data into bytes + """ + + async def bytes_async(self) -> bytes: + """ + Collects the data into bytes + """ + + @property + def meta(self) -> ObjectMeta: + """The ObjectMeta for this object""" + +def get( + store: ObjectStore, location: str, *, options: GetOptions | None = None +) -> GetResult: ... +async def get_async( + store: ObjectStore, location: str, *, options: GetOptions | None = None +) -> GetResult: ... +def get_range(store: ObjectStore, location: str, offset: int, length: int) -> bytes: ... +async def get_range_async( + store: ObjectStore, location: str, offset: int, length: int +) -> bytes: ... +def get_ranges( + store: ObjectStore, location: str, offset: Sequence[int], length: Sequence[int] +) -> bytes: ... +async def get_ranges_async( + store: ObjectStore, location: str, offset: Sequence[int], length: Sequence[int] +) -> bytes: ... diff --git a/object-store-rs/python/object_store_rs/_head.pyi b/object-store-rs/python/object_store_rs/_head.pyi new file mode 100644 index 0000000..cc77152 --- /dev/null +++ b/object-store-rs/python/object_store_rs/_head.pyi @@ -0,0 +1,5 @@ +from ._list import ObjectMeta +from ._pyo3_object_store import ObjectStore + +def head(store: ObjectStore, location: str) -> ObjectMeta: ... +async def head_async(store: ObjectStore, location: str) -> ObjectMeta: ... diff --git a/object-store-rs/python/object_store_rs/_list.pyi b/object-store-rs/python/object_store_rs/_list.pyi new file mode 100644 index 0000000..4013c9c --- /dev/null +++ b/object-store-rs/python/object_store_rs/_list.pyi @@ -0,0 +1,45 @@ +from datetime import datetime +from typing import List, TypedDict + +from ._pyo3_object_store import ObjectStore +from ._sign import HTTP_METHOD as HTTP_METHOD +from ._sign import SignCapableStore as SignCapableStore +from ._sign import sign_url as sign_url +from ._sign import sign_url_async as sign_url_async + +class ObjectMeta(TypedDict): + location: str + """The full path to the object""" + + last_modified: datetime + """The last modified time""" + + size: int + """The size in bytes of the object""" + + e_tag: str | None + """The unique identifier for the object + + + """ + + version: str | None + """A version indicator for this object""" + +class ListResult(TypedDict): + common_prefixes: List[str] + """Prefixes that are common (like directories)""" + + objects: List[ObjectMeta] + """Object metadata for the listing""" + +def list(store: ObjectStore, prefix: str | None = None) -> List[ObjectMeta]: ... +async def list_async( + store: ObjectStore, prefix: str | None = None +) -> List[ObjectMeta]: ... +def list_with_delimiter( + store: ObjectStore, prefix: str | None = None +) -> ListResult: ... +async def list_with_delimiter_async( + store: ObjectStore, prefix: str | None = None +) -> ListResult: ... diff --git a/object-store-rs/python/object_store_rs/_object_store_rs.pyi b/object-store-rs/python/object_store_rs/_object_store_rs.pyi new file mode 100644 index 0000000..443f718 --- /dev/null +++ b/object-store-rs/python/object_store_rs/_object_store_rs.pyi @@ -0,0 +1,32 @@ +from ._copy import copy as copy +from ._copy import copy_async as copy_async +from ._copy import copy_if_not_exists as copy_if_not_exists +from ._copy import copy_if_not_exists_async as copy_if_not_exists_async +from ._delete import delete as delete +from ._delete import delete_async as delete_async +from ._get import GetOptions as GetOptions +from ._get import GetResult as GetResult +from ._get import get as get +from ._get import get_async as get_async +from ._get import get_range as get_range +from ._get import get_range_async as get_range_async +from ._get import get_ranges as get_ranges +from ._get import get_ranges_async as get_ranges_async +from ._head import head as head +from ._head import head_async as head_async +from ._list import ListResult as ListResult +from ._list import ObjectMeta as ObjectMeta +from ._list import list as list +from ._list import list_async as list_async +from ._list import list_with_delimiter as list_with_delimiter +from ._list import list_with_delimiter_async as list_with_delimiter_async +from ._put import put_file as put_file +from ._put import put_file_async as put_file_async +from ._rename import rename as rename +from ._rename import rename_async as rename_async +from ._rename import rename_if_not_exists as rename_if_not_exists +from ._rename import rename_if_not_exists_async as rename_if_not_exists_async +from ._sign import HTTP_METHOD as HTTP_METHOD +from ._sign import SignCapableStore as SignCapableStore +from ._sign import sign_url as sign_url +from ._sign import sign_url_async as sign_url_async diff --git a/object-store-rs/python/object_store_rs/_put.pyi b/object-store-rs/python/object_store_rs/_put.pyi new file mode 100644 index 0000000..fb5ee10 --- /dev/null +++ b/object-store-rs/python/object_store_rs/_put.pyi @@ -0,0 +1,21 @@ +from pathlib import Path +from typing import IO + +from ._pyo3_object_store import ObjectStore + +def put_file( + store: ObjectStore, + location: str, + file: IO[bytes] | Path | bytes, + *, + chunk_size: int = 5 * 1024, + max_concurrency: int = 12, +) -> None: ... +async def put_file_async( + store: ObjectStore, + location: str, + file: IO[bytes] | Path | bytes, + *, + chunk_size: int = 5 * 1024, + max_concurrency: int = 12, +) -> None: ... diff --git a/object-store-rs/python/object_store_rs/_pyo3_object_store.pyi b/object-store-rs/python/object_store_rs/_pyo3_object_store.pyi new file mode 100644 index 0000000..d73d56c --- /dev/null +++ b/object-store-rs/python/object_store_rs/_pyo3_object_store.pyi @@ -0,0 +1,115 @@ +# TODO: move this to a standalone package/docs website that can be shared across +# multiple python packages. + +from __future__ import annotations + +from datetime import timedelta +from typing import Dict, TypedDict + +import boto3 +import botocore +import botocore.session + +class BackoffConfig(TypedDict): + init_backoff: timedelta + max_backoff: timedelta + base: int | float + +class RetryConfig(TypedDict): + backoff: BackoffConfig + max_retries: int + retry_timeout: timedelta + +class AzureStore: + @classmethod + def from_env( + cls, + container: str, + *, + config: Dict[str, str] | None = None, + client_options: Dict[str, str] | None = None, + retry_config: RetryConfig | None = None, + ) -> S3Store: ... + @classmethod + def from_url( + cls, + url: str, + *, + config: Dict[str, str] | None = None, + client_options: Dict[str, str] | None = None, + retry_config: RetryConfig | None = None, + ) -> S3Store: ... + +class GCSStore: + @classmethod + def from_env( + cls, + bucket: str, + *, + config: Dict[str, str] | None = None, + client_options: Dict[str, str] | None = None, + retry_config: RetryConfig | None = None, + ) -> S3Store: ... + @classmethod + def from_url( + cls, + url: str, + *, + config: Dict[str, str] | None = None, + client_options: Dict[str, str] | None = None, + retry_config: RetryConfig | None = None, + ) -> S3Store: ... + +class HTTPStore: + @classmethod + def from_url( + cls, + url: str, + *, + client_options: Dict[str, str] | None = None, + retry_config: RetryConfig | None = None, + ) -> S3Store: ... + +class S3Store: + @classmethod + def from_env( + cls, + bucket: str, + *, + config: Dict[str, str] | None = None, + client_options: Dict[str, str] | None = None, + retry_config: RetryConfig | None = None, + ) -> S3Store: ... + @classmethod + def from_session( + cls, + session: boto3.Session | botocore.session.Session, + bucket: str, + *, + config: Dict[str, str] | None = None, + client_options: Dict[str, str] | None = None, + retry_config: RetryConfig | None = None, + ) -> S3Store: ... + @classmethod + def from_url( + cls, + url: str, + *, + config: Dict[str, str] | None = None, + client_options: Dict[str, str] | None = None, + retry_config: RetryConfig | None = None, + ) -> S3Store: ... + +class LocalStore: + """ + Local filesystem storage providing an ObjectStore interface to files on local disk. + Can optionally be created with a directory prefix. + + """ + def __init__(self, prefix: str | None = None) -> None: ... + +class MemoryStore: + """A fully in-memory implementation of ObjectStore.""" + def __init__(self) -> None: ... + +ObjectStore = AzureStore | GCSStore | HTTPStore | S3Store | LocalStore | MemoryStore diff --git a/object-store-rs/python/object_store_rs/_rename.pyi b/object-store-rs/python/object_store_rs/_rename.pyi new file mode 100644 index 0000000..b3ce3ad --- /dev/null +++ b/object-store-rs/python/object_store_rs/_rename.pyi @@ -0,0 +1,8 @@ +from ._pyo3_object_store import ObjectStore + +def rename(store: ObjectStore, from_: str, to: str) -> None: ... +async def rename_async(store: ObjectStore, from_: str, to: str) -> None: ... +def rename_if_not_exists(store: ObjectStore, from_: str, to: str) -> None: ... +async def rename_if_not_exists_async( + store: ObjectStore, from_: str, to: str +) -> None: ... diff --git a/object-store-rs/python/object_store_rs/_sign.pyi b/object-store-rs/python/object_store_rs/_sign.pyi new file mode 100644 index 0000000..41404a1 --- /dev/null +++ b/object-store-rs/python/object_store_rs/_sign.pyi @@ -0,0 +1,16 @@ +from datetime import timedelta +from typing import Literal + +from ._pyo3_object_store import AzureStore, GCSStore, S3Store + +HTTP_METHOD = Literal[ + "GET", "PUT", "POST", "HEAD", "PATCH", "TRACE", "DELETE", "OPTIONS", "CONNECT" +] +SignCapableStore = AzureStore | GCSStore | S3Store + +def sign_url( + store: SignCapableStore, method: HTTP_METHOD, path: str, expires_in: timedelta +) -> str: ... +async def sign_url_async( + store: SignCapableStore, method: HTTP_METHOD, path: str, expires_in: timedelta +) -> str: ... diff --git a/object-store-rs/src/copy.rs b/object-store-rs/src/copy.rs new file mode 100644 index 0000000..6976c89 --- /dev/null +++ b/object-store-rs/src/copy.rs @@ -0,0 +1,68 @@ +use object_store::ObjectStore; +use pyo3::prelude::*; +use pyo3_object_store::error::{PyObjectStoreError, PyObjectStoreResult}; +use pyo3_object_store::PyObjectStore; + +use crate::runtime::get_runtime; + +#[pyfunction] +pub(crate) fn copy( + py: Python, + store: PyObjectStore, + from_: String, + to: String, +) -> PyObjectStoreResult<()> { + let runtime = get_runtime(py)?; + py.allow_threads(|| { + runtime.block_on(store.as_ref().copy(&from_.into(), &to.into()))?; + Ok::<_, PyObjectStoreError>(()) + }) +} + +#[pyfunction] +pub(crate) fn copy_async( + py: Python, + store: PyObjectStore, + from_: String, + to: String, +) -> PyResult> { + pyo3_async_runtimes::tokio::future_into_py(py, async move { + store + .as_ref() + .copy(&from_.into(), &to.into()) + .await + .map_err(PyObjectStoreError::ObjectStoreError)?; + Ok(()) + }) +} + +#[pyfunction] +pub(crate) fn copy_if_not_exists( + py: Python, + store: PyObjectStore, + from_: String, + to: String, +) -> PyObjectStoreResult<()> { + let runtime = get_runtime(py)?; + py.allow_threads(|| { + runtime.block_on(store.as_ref().copy_if_not_exists(&from_.into(), &to.into()))?; + Ok::<_, PyObjectStoreError>(()) + }) +} + +#[pyfunction] +pub(crate) fn copy_if_not_exists_async( + py: Python, + store: PyObjectStore, + from_: String, + to: String, +) -> PyResult> { + pyo3_async_runtimes::tokio::future_into_py(py, async move { + store + .as_ref() + .copy_if_not_exists(&from_.into(), &to.into()) + .await + .map_err(PyObjectStoreError::ObjectStoreError)?; + Ok(()) + }) +} diff --git a/object-store-rs/src/delete.rs b/object-store-rs/src/delete.rs new file mode 100644 index 0000000..aaf47b5 --- /dev/null +++ b/object-store-rs/src/delete.rs @@ -0,0 +1,28 @@ +use pyo3::prelude::*; +use pyo3_object_store::error::{PyObjectStoreError, PyObjectStoreResult}; +use pyo3_object_store::PyObjectStore; + +use crate::runtime::get_runtime; + +#[pyfunction] +pub fn delete(py: Python, store: PyObjectStore, location: String) -> PyObjectStoreResult<()> { + let runtime = get_runtime(py)?; + let store = store.into_inner(); + + py.allow_threads(|| { + runtime.block_on(store.delete(&location.into()))?; + Ok::<_, PyObjectStoreError>(()) + }) +} + +#[pyfunction] +pub fn delete_async(py: Python, store: PyObjectStore, location: String) -> PyResult> { + let store = store.into_inner().clone(); + pyo3_async_runtimes::tokio::future_into_py(py, async move { + store + .delete(&location.into()) + .await + .map_err(PyObjectStoreError::ObjectStoreError)?; + Ok(()) + }) +} diff --git a/object-store-rs/src/get.rs b/object-store-rs/src/get.rs new file mode 100644 index 0000000..f417659 --- /dev/null +++ b/object-store-rs/src/get.rs @@ -0,0 +1,211 @@ +use chrono::{DateTime, Utc}; +use object_store::{GetOptions, GetResult, ObjectStore}; +use pyo3::exceptions::PyValueError; +use pyo3::prelude::*; +use pyo3::types::PyBytes; +use pyo3_object_store::error::{PyObjectStoreError, PyObjectStoreResult}; +use pyo3_object_store::PyObjectStore; + +use crate::list::PyObjectMeta; +use crate::runtime::get_runtime; + +#[derive(FromPyObject)] +pub(crate) struct PyGetOptions { + if_match: Option, + if_none_match: Option, + if_modified_since: Option>, + if_unmodified_since: Option>, + // TODO: + // range: Option>, + version: Option, + head: bool, +} + +impl From for GetOptions { + fn from(value: PyGetOptions) -> Self { + Self { + if_match: value.if_match, + if_none_match: value.if_none_match, + if_modified_since: value.if_modified_since, + if_unmodified_since: value.if_unmodified_since, + range: None, + version: value.version, + head: value.head, + } + } +} + +#[pyclass(name = "GetResult")] +pub(crate) struct PyGetResult(Option); + +impl PyGetResult { + fn new(result: GetResult) -> Self { + Self(Some(result)) + } +} + +#[pymethods] +impl PyGetResult { + fn bytes(&mut self, py: Python) -> PyObjectStoreResult { + let get_result = self + .0 + .take() + .ok_or(PyValueError::new_err("Result has already been disposed."))?; + let runtime = get_runtime(py)?; + py.allow_threads(|| { + let bytes = runtime.block_on(get_result.bytes())?; + Ok::<_, PyObjectStoreError>(PyBytesWrapper(bytes)) + }) + } + + fn bytes_async<'py>(&'py mut self, py: Python<'py>) -> PyResult> { + let get_result = self + .0 + .take() + .ok_or(PyValueError::new_err("Result has already been disposed."))?; + pyo3_async_runtimes::tokio::future_into_py(py, async move { + let bytes = get_result + .bytes() + .await + .map_err(PyObjectStoreError::ObjectStoreError)?; + Ok(PyBytesWrapper(bytes)) + }) + } + + #[getter] + fn meta(&self) -> PyResult { + let inner = self + .0 + .as_ref() + .ok_or(PyValueError::new_err("Result has already been disposed."))?; + Ok(PyObjectMeta::new(inner.meta.clone())) + } +} + +pub(crate) struct PyBytesWrapper(bytes::Bytes); + +// TODO: return buffer protocol object +impl IntoPy for PyBytesWrapper { + fn into_py(self, py: Python<'_>) -> PyObject { + PyBytes::new_bound(py, &self.0).into_py(py) + } +} + +#[pyfunction] +#[pyo3(signature = (store, location, *, options = None))] +pub(crate) fn get( + py: Python, + store: PyObjectStore, + location: String, + options: Option, +) -> PyObjectStoreResult { + let runtime = get_runtime(py)?; + py.allow_threads(|| { + let path = &location.into(); + let fut = if let Some(options) = options { + store.as_ref().get_opts(path, options.into()) + } else { + store.as_ref().get(path) + }; + let out = runtime.block_on(fut)?; + Ok::<_, PyObjectStoreError>(PyGetResult::new(out)) + }) +} + +#[pyfunction] +#[pyo3(signature = (store, location, *, options = None))] +pub(crate) fn get_async( + py: Python, + store: PyObjectStore, + location: String, + options: Option, +) -> PyResult> { + pyo3_async_runtimes::tokio::future_into_py(py, async move { + let path = &location.into(); + let fut = if let Some(options) = options { + store.as_ref().get_opts(path, options.into()) + } else { + store.as_ref().get(path) + }; + let out = fut.await.map_err(PyObjectStoreError::ObjectStoreError)?; + Ok(PyGetResult::new(out)) + }) +} + +#[pyfunction] +pub(crate) fn get_range( + py: Python, + store: PyObjectStore, + location: String, + offset: usize, + length: usize, +) -> PyObjectStoreResult { + let runtime = get_runtime(py)?; + let range = offset..offset + length; + py.allow_threads(|| { + let out = runtime.block_on(store.as_ref().get_range(&location.into(), range))?; + Ok::<_, PyObjectStoreError>(PyBytesWrapper(out)) + }) +} + +#[pyfunction] +pub(crate) fn get_range_async( + py: Python, + store: PyObjectStore, + location: String, + offset: usize, + length: usize, +) -> PyResult> { + let range = offset..offset + length; + pyo3_async_runtimes::tokio::future_into_py(py, async move { + let out = store + .as_ref() + .get_range(&location.into(), range) + .await + .map_err(PyObjectStoreError::ObjectStoreError)?; + Ok(PyBytesWrapper(out)) + }) +} + +#[pyfunction] +pub(crate) fn get_ranges( + py: Python, + store: PyObjectStore, + location: String, + offsets: Vec, + lengths: Vec, +) -> PyObjectStoreResult> { + let runtime = get_runtime(py)?; + let ranges = offsets + .into_iter() + .zip(lengths) + .map(|(offset, length)| offset..offset + length) + .collect::>(); + py.allow_threads(|| { + let out = runtime.block_on(store.as_ref().get_ranges(&location.into(), &ranges))?; + Ok::<_, PyObjectStoreError>(out.into_iter().map(PyBytesWrapper).collect()) + }) +} + +#[pyfunction] +pub(crate) fn get_ranges_async( + py: Python, + store: PyObjectStore, + location: String, + offsets: Vec, + lengths: Vec, +) -> PyResult> { + let ranges = offsets + .into_iter() + .zip(lengths) + .map(|(offset, length)| offset..offset + length) + .collect::>(); + pyo3_async_runtimes::tokio::future_into_py(py, async move { + let out = store + .as_ref() + .get_ranges(&location.into(), &ranges) + .await + .map_err(PyObjectStoreError::ObjectStoreError)?; + Ok(out.into_iter().map(PyBytesWrapper).collect::>()) + }) +} diff --git a/object-store-rs/src/head.rs b/object-store-rs/src/head.rs new file mode 100644 index 0000000..5caf140 --- /dev/null +++ b/object-store-rs/src/head.rs @@ -0,0 +1,33 @@ +use pyo3::prelude::*; +use pyo3_object_store::error::{PyObjectStoreError, PyObjectStoreResult}; +use pyo3_object_store::PyObjectStore; + +use crate::list::PyObjectMeta; +use crate::runtime::get_runtime; + +#[pyfunction] +pub fn head( + py: Python, + store: PyObjectStore, + location: String, +) -> PyObjectStoreResult { + let runtime = get_runtime(py)?; + let store = store.into_inner(); + + py.allow_threads(|| { + let meta = runtime.block_on(store.head(&location.into()))?; + Ok::<_, PyObjectStoreError>(PyObjectMeta::new(meta)) + }) +} + +#[pyfunction] +pub fn head_async(py: Python, store: PyObjectStore, location: String) -> PyResult> { + let store = store.into_inner().clone(); + pyo3_async_runtimes::tokio::future_into_py(py, async move { + let meta = store + .head(&location.into()) + .await + .map_err(PyObjectStoreError::ObjectStoreError)?; + Ok(PyObjectMeta::new(meta)) + }) +} diff --git a/object-store-rs/src/lib.rs b/object-store-rs/src/lib.rs new file mode 100644 index 0000000..33c942b --- /dev/null +++ b/object-store-rs/src/lib.rs @@ -0,0 +1,55 @@ +use pyo3::prelude::*; + +mod copy; +mod delete; +mod get; +mod head; +mod list; +mod put; +mod rename; +mod runtime; +mod signer; + +const VERSION: &str = env!("CARGO_PKG_VERSION"); + +#[pyfunction] +fn ___version() -> &'static str { + VERSION +} + +/// A Python module implemented in Rust. +#[pymodule] +fn _object_store_rs(py: Python, m: &Bound) -> PyResult<()> { + m.add_wrapped(wrap_pyfunction!(___version))?; + + pyo3_object_store::register_store_module(py, m, "object_store_rs")?; + + m.add_wrapped(wrap_pyfunction!(copy::copy_async))?; + m.add_wrapped(wrap_pyfunction!(copy::copy_if_not_exists_async))?; + m.add_wrapped(wrap_pyfunction!(copy::copy_if_not_exists))?; + m.add_wrapped(wrap_pyfunction!(copy::copy))?; + m.add_wrapped(wrap_pyfunction!(delete::delete_async))?; + m.add_wrapped(wrap_pyfunction!(delete::delete))?; + m.add_wrapped(wrap_pyfunction!(get::get_async))?; + m.add_wrapped(wrap_pyfunction!(get::get_range_async))?; + m.add_wrapped(wrap_pyfunction!(get::get_range))?; + m.add_wrapped(wrap_pyfunction!(get::get_ranges_async))?; + m.add_wrapped(wrap_pyfunction!(get::get_ranges))?; + m.add_wrapped(wrap_pyfunction!(get::get))?; + m.add_wrapped(wrap_pyfunction!(head::head_async))?; + m.add_wrapped(wrap_pyfunction!(head::head))?; + m.add_wrapped(wrap_pyfunction!(list::list_async))?; + m.add_wrapped(wrap_pyfunction!(list::list_with_delimiter_async))?; + m.add_wrapped(wrap_pyfunction!(list::list_with_delimiter))?; + m.add_wrapped(wrap_pyfunction!(list::list))?; + m.add_wrapped(wrap_pyfunction!(put::put_file_async))?; + m.add_wrapped(wrap_pyfunction!(put::put_file))?; + m.add_wrapped(wrap_pyfunction!(rename::rename_async))?; + m.add_wrapped(wrap_pyfunction!(rename::rename_if_not_exists_async))?; + m.add_wrapped(wrap_pyfunction!(rename::rename_if_not_exists))?; + m.add_wrapped(wrap_pyfunction!(rename::rename))?; + m.add_wrapped(wrap_pyfunction!(signer::sign_url_async))?; + m.add_wrapped(wrap_pyfunction!(signer::sign_url))?; + + Ok(()) +} diff --git a/object-store-rs/src/list.rs b/object-store-rs/src/list.rs new file mode 100644 index 0000000..f84e0a8 --- /dev/null +++ b/object-store-rs/src/list.rs @@ -0,0 +1,136 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use futures::TryStreamExt; +use object_store::path::Path; +use object_store::{ListResult, ObjectMeta, ObjectStore}; +use pyo3::prelude::*; +use pyo3_object_store::error::{PyObjectStoreError, PyObjectStoreResult}; +use pyo3_object_store::PyObjectStore; + +use crate::runtime::get_runtime; + +pub(crate) struct PyObjectMeta(ObjectMeta); + +impl PyObjectMeta { + pub(crate) fn new(meta: ObjectMeta) -> Self { + Self(meta) + } +} + +impl IntoPy for PyObjectMeta { + fn into_py(self, py: Python<'_>) -> PyObject { + let mut dict = HashMap::new(); + dict.insert("location", self.0.location.as_ref().into_py(py)); + dict.insert("last_modified", self.0.last_modified.into_py(py)); + dict.insert("size", self.0.size.into_py(py)); + dict.insert("e_tag", self.0.e_tag.into_py(py)); + dict.insert("version", self.0.version.into_py(py)); + dict.into_py(py) + } +} + +pub(crate) struct PyListResult(ListResult); + +impl IntoPy for PyListResult { + fn into_py(self, py: Python<'_>) -> PyObject { + let mut dict = HashMap::new(); + dict.insert( + "common_prefixes", + self.0 + .common_prefixes + .into_iter() + .map(String::from) + .collect::>() + .into_py(py), + ); + dict.insert( + "objects", + self.0 + .objects + .into_iter() + .map(PyObjectMeta) + .collect::>() + .into_py(py), + ); + dict.into_py(py) + } +} + +#[pyfunction] +#[pyo3(signature = (store, prefix = None))] +pub(crate) fn list( + py: Python, + store: PyObjectStore, + prefix: Option, +) -> PyObjectStoreResult> { + let runtime = get_runtime(py)?; + py.allow_threads(|| { + let out = runtime.block_on(list_materialize( + store.into_inner(), + prefix.map(|s| s.into()).as_ref(), + ))?; + Ok::<_, PyObjectStoreError>(out) + }) +} + +#[pyfunction] +#[pyo3(signature = (store, prefix = None))] +pub(crate) fn list_async( + py: Python, + store: PyObjectStore, + prefix: Option, +) -> PyResult> { + pyo3_async_runtimes::tokio::future_into_py(py, async move { + let out = list_materialize(store.into_inner(), prefix.map(|s| s.into()).as_ref()).await?; + Ok(out) + }) +} + +async fn list_materialize( + store: Arc, + prefix: Option<&Path>, +) -> PyObjectStoreResult> { + let list_result = store.list(prefix).try_collect::>().await?; + Ok(list_result.into_iter().map(PyObjectMeta).collect()) +} + +#[pyfunction] +#[pyo3(signature = (store, prefix = None))] +pub(crate) fn list_with_delimiter( + py: Python, + store: PyObjectStore, + prefix: Option, +) -> PyObjectStoreResult { + let runtime = get_runtime(py)?; + py.allow_threads(|| { + let out = runtime.block_on(list_with_delimiter_materialize( + store.into_inner(), + prefix.map(|s| s.into()).as_ref(), + ))?; + Ok::<_, PyObjectStoreError>(out) + }) +} + +#[pyfunction] +#[pyo3(signature = (store, prefix = None))] +pub(crate) fn list_with_delimiter_async( + py: Python, + store: PyObjectStore, + prefix: Option, +) -> PyResult> { + pyo3_async_runtimes::tokio::future_into_py(py, async move { + let out = + list_with_delimiter_materialize(store.into_inner(), prefix.map(|s| s.into()).as_ref()) + .await?; + Ok(out) + }) +} + +async fn list_with_delimiter_materialize( + store: Arc, + prefix: Option<&Path>, +) -> PyObjectStoreResult { + let list_result = store.list_with_delimiter(prefix).await?; + Ok(PyListResult(list_result)) +} diff --git a/object-store-rs/src/put.rs b/object-store-rs/src/put.rs new file mode 100644 index 0000000..fcdf83f --- /dev/null +++ b/object-store-rs/src/put.rs @@ -0,0 +1,119 @@ +use std::fs::File; +use std::io::{BufReader, Cursor, Read}; +use std::path::PathBuf; +use std::sync::Arc; + +use object_store::path::Path; +use object_store::{ObjectStore, WriteMultipart}; +use pyo3::exceptions::PyIOError; +use pyo3::prelude::*; +use pyo3::pybacked::PyBackedBytes; +use pyo3_file::PyFileLikeObject; +use pyo3_object_store::error::PyObjectStoreResult; +use pyo3_object_store::PyObjectStore; + +use crate::runtime::get_runtime; + +/// Input types supported by multipart upload +#[derive(Debug)] +pub(crate) enum MultipartPutInput { + File(BufReader), + FileLike(PyFileLikeObject), + Buffer(Cursor), +} + +impl<'py> FromPyObject<'py> for MultipartPutInput { + fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult { + let py = ob.py(); + if let Ok(path) = ob.extract::() { + Ok(Self::File(BufReader::new(File::open(path)?))) + } else if let Ok(buffer) = ob.extract::() { + Ok(Self::Buffer(Cursor::new(buffer))) + } else { + Ok(Self::FileLike(PyFileLikeObject::with_requirements( + ob.into_py(py), + true, + false, + true, + false, + )?)) + } + } +} + +impl Read for MultipartPutInput { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + match self { + Self::File(f) => f.read(buf), + Self::FileLike(f) => f.read(buf), + Self::Buffer(f) => f.read(buf), + } + } +} + +#[pyfunction] +#[pyo3(signature = (store, location, file, *, chunk_size = 5120, max_concurrency = 12))] +pub(crate) fn put_file( + py: Python, + store: PyObjectStore, + location: String, + file: MultipartPutInput, + chunk_size: usize, + max_concurrency: usize, +) -> PyObjectStoreResult<()> { + let runtime = get_runtime(py)?; + runtime.block_on(put_multipart_inner( + store.into_inner(), + &location.into(), + file, + chunk_size, + max_concurrency, + )) +} + +#[pyfunction] +#[pyo3(signature = (store, location, file, *, chunk_size = 5120, max_concurrency = 12))] +pub(crate) fn put_file_async( + py: Python, + store: PyObjectStore, + location: String, + file: MultipartPutInput, + chunk_size: usize, + max_concurrency: usize, +) -> PyResult> { + pyo3_async_runtimes::tokio::future_into_py(py, async move { + Ok(put_multipart_inner( + store.into_inner(), + &location.into(), + file, + chunk_size, + max_concurrency, + ) + .await?) + }) +} + +async fn put_multipart_inner( + store: Arc, + location: &Path, + mut reader: R, + chunk_size: usize, + max_concurrency: usize, +) -> PyObjectStoreResult<()> { + let upload = store.put_multipart(location).await?; + let mut write = WriteMultipart::new(upload); + let mut scratch_buffer = vec![0; chunk_size]; + loop { + let read_size = reader + .read(&mut scratch_buffer) + .map_err(|err| PyIOError::new_err(err.to_string()))?; + if read_size == 0 { + break; + } else { + write.wait_for_capacity(max_concurrency).await?; + write.write(&scratch_buffer[0..read_size]); + } + } + write.finish().await?; + Ok(()) +} diff --git a/object-store-rs/src/rename.rs b/object-store-rs/src/rename.rs new file mode 100644 index 0000000..08a4440 --- /dev/null +++ b/object-store-rs/src/rename.rs @@ -0,0 +1,72 @@ +use object_store::ObjectStore; +use pyo3::prelude::*; +use pyo3_object_store::error::{PyObjectStoreError, PyObjectStoreResult}; +use pyo3_object_store::PyObjectStore; + +use crate::runtime::get_runtime; + +#[pyfunction] +pub(crate) fn rename( + py: Python, + store: PyObjectStore, + from_: String, + to: String, +) -> PyObjectStoreResult<()> { + let runtime = get_runtime(py)?; + py.allow_threads(|| { + runtime.block_on(store.as_ref().rename(&from_.into(), &to.into()))?; + Ok::<_, PyObjectStoreError>(()) + }) +} + +#[pyfunction] +pub(crate) fn rename_async( + py: Python, + store: PyObjectStore, + from_: String, + to: String, +) -> PyResult> { + pyo3_async_runtimes::tokio::future_into_py(py, async move { + store + .as_ref() + .rename(&from_.into(), &to.into()) + .await + .map_err(PyObjectStoreError::ObjectStoreError)?; + Ok(()) + }) +} + +#[pyfunction] +pub(crate) fn rename_if_not_exists( + py: Python, + store: PyObjectStore, + from_: String, + to: String, +) -> PyObjectStoreResult<()> { + let runtime = get_runtime(py)?; + py.allow_threads(|| { + runtime.block_on( + store + .as_ref() + .rename_if_not_exists(&from_.into(), &to.into()), + )?; + Ok::<_, PyObjectStoreError>(()) + }) +} + +#[pyfunction] +pub(crate) fn rename_if_not_exists_async( + py: Python, + store: PyObjectStore, + from_: String, + to: String, +) -> PyResult> { + pyo3_async_runtimes::tokio::future_into_py(py, async move { + store + .as_ref() + .rename_if_not_exists(&from_.into(), &to.into()) + .await + .map_err(PyObjectStoreError::ObjectStoreError)?; + Ok(()) + }) +} diff --git a/object-store-rs/src/runtime.rs b/object-store-rs/src/runtime.rs new file mode 100644 index 0000000..9659387 --- /dev/null +++ b/object-store-rs/src/runtime.rs @@ -0,0 +1,18 @@ +use std::sync::Arc; + +use pyo3::exceptions::PyValueError; +use pyo3::prelude::*; +use pyo3::sync::GILOnceCell; +use tokio::runtime::Runtime; + +static RUNTIME: GILOnceCell> = GILOnceCell::new(); + +/// Get the tokio runtime for sync requests +pub(crate) fn get_runtime(py: Python<'_>) -> PyResult> { + let runtime = RUNTIME.get_or_try_init(py, || { + Ok::<_, PyErr>(Arc::new(Runtime::new().map_err(|err| { + PyValueError::new_err(format!("Could not create tokio runtime. {}", err)) + })?)) + })?; + Ok(runtime.clone()) +} diff --git a/object-store-rs/src/signer.rs b/object-store-rs/src/signer.rs new file mode 100644 index 0000000..339eb3d --- /dev/null +++ b/object-store-rs/src/signer.rs @@ -0,0 +1,161 @@ +use core::time::Duration; +use std::sync::Arc; + +use object_store::aws::AmazonS3; +use object_store::azure::MicrosoftAzure; +use object_store::gcp::GoogleCloudStorage; +use object_store::signer::Signer; +use pyo3::exceptions::PyValueError; +use pyo3::intern; +use pyo3::prelude::*; +use pyo3::pybacked::PyBackedStr; +use pyo3_object_store::error::{PyObjectStoreError, PyObjectStoreResult}; +use pyo3_object_store::{PyAzureStore, PyGCSStore, PyS3Store}; +use url::Url; + +use crate::runtime::get_runtime; + +#[derive(Debug)] +pub(crate) enum SignCapableStore { + S3(Arc), + Gcs(Arc), + Azure(Arc), +} + +impl<'py> FromPyObject<'py> for SignCapableStore { + fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult { + if let Ok(store) = ob.downcast::() { + Ok(Self::S3(store.borrow().as_ref().clone())) + } else if let Ok(store) = ob.downcast::() { + Ok(Self::Gcs(store.borrow().as_ref().clone())) + } else if let Ok(store) = ob.downcast::() { + Ok(Self::Azure(store.borrow().as_ref().clone())) + } else { + let py = ob.py(); + // Check for object-store instance from other library + let cls_name = ob + .getattr(intern!(py, "__class__"))? + .getattr(intern!(py, "__name__"))? + .extract::()?; + if [ + "AzureStore", + "GCSStore", + "HTTPStore", + "LocalStore", + "MemoryStore", + "S3Store", + ] + .contains(&cls_name.as_ref()) + { + return Err(PyValueError::new_err("You must use an object store instance exported from **the same library** as this function. They cannot be used across libraries.\nThis is because object store instances are compiled with a specific version of Rust and Python." )); + } + + Err(PyValueError::new_err(format!( + "Expected an S3Store, GCSStore, or AzureStore instance, got {}", + ob.repr()? + ))) + } + } +} + +impl Signer for SignCapableStore { + fn signed_url<'life0, 'life1, 'async_trait>( + &'life0 self, + method: http::Method, + path: &'life1 object_store::path::Path, + expires_in: Duration, + ) -> ::core::pin::Pin< + Box< + dyn ::core::future::Future> + + ::core::marker::Send + + 'async_trait, + >, + > + where + 'life0: 'async_trait, + 'life1: 'async_trait, + Self: 'async_trait, + { + match self { + Self::S3(inner) => inner.signed_url(method, path, expires_in), + Self::Gcs(inner) => inner.signed_url(method, path, expires_in), + Self::Azure(inner) => inner.signed_url(method, path, expires_in), + } + } +} + +pub(crate) struct PyMethod(http::Method); + +impl<'py> FromPyObject<'py> for PyMethod { + fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult { + let s = ob.extract::()?; + let method = match s.as_ref() { + "GET" => http::Method::GET, + "PUT" => http::Method::PUT, + "POST" => http::Method::POST, + "HEAD" => http::Method::HEAD, + "PATCH" => http::Method::PATCH, + "TRACE" => http::Method::TRACE, + "DELETE" => http::Method::DELETE, + "OPTIONS" => http::Method::OPTIONS, + "CONNECT" => http::Method::CONNECT, + other => { + return Err(PyValueError::new_err(format!( + "Unsupported HTTP method {}", + other + ))) + } + }; + Ok(Self(method)) + } +} + +pub(crate) struct PyUrl(url::Url); + +impl IntoPy for PyUrl { + fn into_py(self, _py: Python<'_>) -> String { + self.0.into() + } +} + +impl IntoPy for PyUrl { + fn into_py(self, py: Python<'_>) -> PyObject { + String::from(self.0).into_py(py) + } +} + +#[pyfunction] +pub(crate) fn sign_url( + py: Python, + store: SignCapableStore, + method: PyMethod, + path: String, + expires_in: Duration, +) -> PyObjectStoreResult { + let runtime = get_runtime(py)?; + let method = method.0; + + let signed_url = py.allow_threads(|| { + let url = runtime.block_on(store.signed_url(method, &path.into(), expires_in))?; + Ok::<_, object_store::Error>(url) + })?; + Ok(signed_url.into()) +} + +#[pyfunction] +pub(crate) fn sign_url_async( + py: Python, + store: SignCapableStore, + method: PyMethod, + path: String, + expires_in: Duration, +) -> PyResult { + let fut = pyo3_async_runtimes::tokio::future_into_py(py, async move { + let url = store + .signed_url(method.0, &path.into(), expires_in) + .await + .map_err(PyObjectStoreError::ObjectStoreError)?; + Ok(PyUrl(url)) + })?; + Ok(fut.into()) +} diff --git a/pyo3-object_store/src/aws.rs b/pyo3-object_store/src/aws.rs index 46c2a8e..de22295 100644 --- a/pyo3-object_store/src/aws.rs +++ b/pyo3-object_store/src/aws.rs @@ -9,6 +9,7 @@ use pyo3::pybacked::PyBackedStr; use pyo3::types::PyType; use crate::client::PyClientOptions; +use crate::error::{PyObjectStoreError, PyObjectStoreResult}; use crate::retry::PyRetryConfig; #[pyclass(name = "S3Store")] @@ -37,7 +38,7 @@ impl PyS3Store { config: Option>, client_options: Option, retry_config: Option, - ) -> PyResult { + ) -> PyObjectStoreResult { let mut builder = AmazonS3Builder::from_env().with_bucket_name(bucket); if let Some(config) = config { for (key, value) in config.into_iter() { @@ -50,7 +51,7 @@ impl PyS3Store { if let Some(retry_config) = retry_config { builder = builder.with_retry(retry_config.into()) } - Ok(Self(Arc::new(builder.build().unwrap()))) + Ok(Self(Arc::new(builder.build()?))) } // Create from an existing boto3.Session or botocore.session.Session object @@ -65,7 +66,7 @@ impl PyS3Store { config: Option>, client_options: Option, retry_config: Option, - ) -> PyResult { + ) -> PyObjectStoreResult { // boto3.Session has a region_name attribute, but botocore.session.Session does not. let region = if let Ok(region) = session.getattr(intern!(py, "region_name")) { Some(region.extract::()?) @@ -111,7 +112,7 @@ impl PyS3Store { builder = builder.with_retry(retry_config.into()) } - Ok(Self(Arc::new(builder.build().unwrap()))) + Ok(Self(Arc::new(builder.build()?))) } #[classmethod] @@ -122,7 +123,7 @@ impl PyS3Store { config: Option>, client_options: Option, retry_config: Option, - ) -> PyResult { + ) -> PyObjectStoreResult { let mut builder = AmazonS3Builder::from_env().with_url(url); if let Some(config) = config { for (key, value) in config.into_iter() { @@ -135,7 +136,7 @@ impl PyS3Store { if let Some(retry_config) = retry_config { builder = builder.with_retry(retry_config.into()) } - Ok(Self(Arc::new(builder.build().unwrap()))) + Ok(Self(Arc::new(builder.build()?))) } } @@ -145,7 +146,7 @@ pub struct PyAmazonS3ConfigKey(AmazonS3ConfigKey); impl<'py> FromPyObject<'py> for PyAmazonS3ConfigKey { fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult { let s = ob.extract::()?.to_lowercase(); - // TODO: remove unwrap - Ok(Self(AmazonS3ConfigKey::from_str(&s).unwrap())) + let key = AmazonS3ConfigKey::from_str(&s).map_err(PyObjectStoreError::ObjectStoreError)?; + Ok(Self(key)) } } diff --git a/pyo3-object_store/src/azure.rs b/pyo3-object_store/src/azure.rs index 2a64ac9..2cee960 100644 --- a/pyo3-object_store/src/azure.rs +++ b/pyo3-object_store/src/azure.rs @@ -8,6 +8,7 @@ use pyo3::pybacked::PyBackedStr; use pyo3::types::PyType; use crate::client::PyClientOptions; +use crate::error::{PyObjectStoreError, PyObjectStoreResult}; use crate::retry::PyRetryConfig; #[pyclass(name = "AzureStore")] @@ -36,7 +37,7 @@ impl PyAzureStore { config: Option>, client_options: Option, retry_config: Option, - ) -> PyResult { + ) -> PyObjectStoreResult { let mut builder = MicrosoftAzureBuilder::from_env().with_container_name(container); if let Some(config) = config { for (key, value) in config.into_iter() { @@ -49,7 +50,7 @@ impl PyAzureStore { if let Some(retry_config) = retry_config { builder = builder.with_retry(retry_config.into()) } - Ok(Self(Arc::new(builder.build().unwrap()))) + Ok(Self(Arc::new(builder.build()?))) } #[classmethod] @@ -60,7 +61,7 @@ impl PyAzureStore { config: Option>, client_options: Option, retry_config: Option, - ) -> PyResult { + ) -> PyObjectStoreResult { let mut builder = MicrosoftAzureBuilder::from_env().with_url(url); if let Some(config) = config { for (key, value) in config.into_iter() { @@ -73,7 +74,7 @@ impl PyAzureStore { if let Some(retry_config) = retry_config { builder = builder.with_retry(retry_config.into()) } - Ok(Self(Arc::new(builder.build().unwrap()))) + Ok(Self(Arc::new(builder.build()?))) } } @@ -83,7 +84,7 @@ pub struct PyAzureConfigKey(AzureConfigKey); impl<'py> FromPyObject<'py> for PyAzureConfigKey { fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult { let s = ob.extract::()?.to_lowercase(); - // TODO: remove unwrap - Ok(Self(AzureConfigKey::from_str(&s).unwrap())) + let key = AzureConfigKey::from_str(&s).map_err(PyObjectStoreError::ObjectStoreError)?; + Ok(Self(key)) } } diff --git a/pyo3-object_store/src/client.rs b/pyo3-object_store/src/client.rs index 224d998..21ecba4 100644 --- a/pyo3-object_store/src/client.rs +++ b/pyo3-object_store/src/client.rs @@ -5,14 +5,16 @@ use object_store::{ClientConfigKey, ClientOptions}; use pyo3::prelude::*; use pyo3::pybacked::PyBackedStr; +use crate::error::PyObjectStoreError; + #[derive(Debug, PartialEq, Eq, Hash)] pub struct PyClientConfigKey(ClientConfigKey); impl<'py> FromPyObject<'py> for PyClientConfigKey { fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult { let s = ob.extract::()?.to_lowercase(); - // TODO: remove unwrap - Ok(Self(ClientConfigKey::from_str(&s).unwrap())) + let key = ClientConfigKey::from_str(&s).map_err(PyObjectStoreError::ObjectStoreError)?; + Ok(Self(key)) } } diff --git a/pyo3-object_store/src/error.rs b/pyo3-object_store/src/error.rs new file mode 100644 index 0000000..3136d1a --- /dev/null +++ b/pyo3-object_store/src/error.rs @@ -0,0 +1,41 @@ +//! Contains the [`PyObjectStoreError`], the Error returned by most fallible functions in this +//! crate. + +use pyo3::exceptions::{PyException, PyValueError}; +use pyo3::prelude::*; +use pyo3::DowncastError; +use thiserror::Error; + +/// The Error variants returned by this crate. +#[derive(Error, Debug)] +#[non_exhaustive] +pub enum PyObjectStoreError { + /// A wrapped [object_store::Error] + #[error(transparent)] + ObjectStoreError(#[from] object_store::Error), + + /// A wrapped [PyErr] + #[error(transparent)] + PyErr(#[from] PyErr), +} + +impl From for PyErr { + fn from(error: PyObjectStoreError) -> Self { + match error { + PyObjectStoreError::PyErr(err) => err, + PyObjectStoreError::ObjectStoreError(err) => PyException::new_err(err.to_string()), + } + } +} + +impl<'a, 'py> From> for PyObjectStoreError { + fn from(other: DowncastError<'a, 'py>) -> Self { + Self::PyErr(PyValueError::new_err(format!( + "Could not downcast: {}", + other + ))) + } +} + +/// A type wrapper around `Result`. +pub type PyObjectStoreResult = Result; diff --git a/pyo3-object_store/src/gcp.rs b/pyo3-object_store/src/gcp.rs index f0f6c82..5780776 100644 --- a/pyo3-object_store/src/gcp.rs +++ b/pyo3-object_store/src/gcp.rs @@ -8,6 +8,7 @@ use pyo3::pybacked::PyBackedStr; use pyo3::types::PyType; use crate::client::PyClientOptions; +use crate::error::{PyObjectStoreError, PyObjectStoreResult}; use crate::retry::PyRetryConfig; #[pyclass(name = "GCSStore")] @@ -36,7 +37,7 @@ impl PyGCSStore { config: Option>, client_options: Option, retry_config: Option, - ) -> PyResult { + ) -> PyObjectStoreResult { let mut builder = GoogleCloudStorageBuilder::from_env().with_bucket_name(bucket); if let Some(config) = config { for (key, value) in config.into_iter() { @@ -49,7 +50,7 @@ impl PyGCSStore { if let Some(retry_config) = retry_config { builder = builder.with_retry(retry_config.into()) } - Ok(Self(Arc::new(builder.build().unwrap()))) + Ok(Self(Arc::new(builder.build()?))) } #[classmethod] @@ -60,7 +61,7 @@ impl PyGCSStore { config: Option>, client_options: Option, retry_config: Option, - ) -> PyResult { + ) -> PyObjectStoreResult { let mut builder = GoogleCloudStorageBuilder::from_env().with_url(url); if let Some(config) = config { for (key, value) in config.into_iter() { @@ -73,7 +74,7 @@ impl PyGCSStore { if let Some(retry_config) = retry_config { builder = builder.with_retry(retry_config.into()) } - Ok(Self(Arc::new(builder.build().unwrap()))) + Ok(Self(Arc::new(builder.build()?))) } } @@ -83,7 +84,7 @@ pub struct PyGoogleConfigKey(GoogleConfigKey); impl<'py> FromPyObject<'py> for PyGoogleConfigKey { fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult { let s = ob.extract::()?.to_lowercase(); - // TODO: remove unwrap - Ok(Self(GoogleConfigKey::from_str(&s).unwrap())) + let key = GoogleConfigKey::from_str(&s).map_err(PyObjectStoreError::ObjectStoreError)?; + Ok(Self(key)) } } diff --git a/pyo3-object_store/src/http.rs b/pyo3-object_store/src/http.rs index 2cb2593..b3813a1 100644 --- a/pyo3-object_store/src/http.rs +++ b/pyo3-object_store/src/http.rs @@ -4,6 +4,7 @@ use object_store::http::{HttpBuilder, HttpStore}; use pyo3::prelude::*; use pyo3::types::PyType; +use crate::error::PyObjectStoreResult; use crate::retry::PyRetryConfig; use crate::PyClientOptions; @@ -31,7 +32,7 @@ impl PyHttpStore { url: &str, client_options: Option, retry_config: Option, - ) -> PyResult { + ) -> PyObjectStoreResult { let mut builder = HttpBuilder::new().with_url(url); if let Some(client_options) = client_options { builder = builder.with_client_options(client_options.into()) @@ -39,6 +40,6 @@ impl PyHttpStore { if let Some(retry_config) = retry_config { builder = builder.with_retry(retry_config.into()) } - Ok(Self(Arc::new(builder.build().unwrap()))) + Ok(Self(Arc::new(builder.build()?))) } } diff --git a/pyo3-object_store/src/lib.rs b/pyo3-object_store/src/lib.rs index 50be0aa..9e9ddb6 100644 --- a/pyo3-object_store/src/lib.rs +++ b/pyo3-object_store/src/lib.rs @@ -5,6 +5,7 @@ mod api; mod aws; mod azure; mod client; +pub mod error; mod gcp; mod http; mod local; diff --git a/pyo3-object_store/src/local.rs b/pyo3-object_store/src/local.rs index 04a4e49..b980636 100644 --- a/pyo3-object_store/src/local.rs +++ b/pyo3-object_store/src/local.rs @@ -3,6 +3,8 @@ use std::sync::Arc; use object_store::local::LocalFileSystem; use pyo3::prelude::*; +use crate::error::PyObjectStoreResult; + #[pyclass(name = "LocalStore")] pub struct PyLocalStore(Arc); @@ -22,9 +24,9 @@ impl PyLocalStore { impl PyLocalStore { #[new] #[pyo3(signature = (prefix = None))] - fn py_new(prefix: Option) -> PyResult { + fn py_new(prefix: Option) -> PyObjectStoreResult { let fs = if let Some(prefix) = prefix { - LocalFileSystem::new_with_prefix(prefix).unwrap() + LocalFileSystem::new_with_prefix(prefix)? } else { LocalFileSystem::new() }; diff --git a/pyproject.toml b/pyproject.toml index 3c72526..6ae60a1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,9 +24,6 @@ dev-dependencies = [ "pytest>=8.3.3", ] -[tool.uv.workspace] -members = ["types-pyo3-object-store"] - [tool.ruff] select = [ # Pyflakes