From d82f8f7aa78cca131af52b54ab5288d406587cba Mon Sep 17 00:00:00 2001 From: Helmi Aziz Muhammad <50535324+helmiazizm@users.noreply.github.com> Date: Mon, 9 Dec 2024 21:01:29 +0700 Subject: [PATCH] Add Alibaba OSS protocol to `PyArrowFileIO` (#1392) * Added force virtual addressing configuration for S3. Also added oss and r2 protocol. * Rewrote force virtual addressing as written in PyArrow documentation * Added the missing r2 key value in schema_to_file_io * Removed R2 protocol for now * Linter fix * Updated documentation for OSS support * Another linter fix --- mkdocs/docs/configuration.md | 18 ++++++++++++++++++ pyiceberg/io/__init__.py | 2 ++ pyiceberg/io/pyarrow.py | 6 +++++- 3 files changed, 25 insertions(+), 1 deletion(-) diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index 133f02060..1c88c7cb3 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -88,6 +88,7 @@ Iceberg works with the concept of a FileIO which is a pluggable module for readi - **file**: `PyArrowFileIO` - **hdfs**: `PyArrowFileIO` - **abfs**, **abfss**: `FsspecFileIO` +- **oss**: `PyArrowFileIO` You can also set the FileIO explicitly: @@ -115,6 +116,7 @@ For the FileIO there are several configuration options available: | s3.region | us-west-2 | Sets the region of the bucket | | s3.proxy-uri | | Configure the proxy server to be used by the FileIO. | | s3.connect-timeout | 60.0 | Configure socket connection timeout, in seconds. | +| s3.force-virtual-addressing | False | Whether to use virtual addressing of buckets. If true, then virtual addressing is always enabled. If false, then virtual addressing is only enabled if endpoint_override is empty. This can be used for non-AWS backends that only support virtual hosted-style access. | @@ -167,6 +169,22 @@ For the FileIO there are several configuration options available: +### Alibaba Cloud Object Storage Service (OSS) + + + +PyIceberg uses [S3FileSystem](https://arrow.apache.org/docs/python/generated/pyarrow.fs.S3FileSystem.html) class to connect to OSS bucket as the service is [compatible with S3 SDK](https://www.alibabacloud.com/help/en/oss/developer-reference/use-amazon-s3-sdks-to-access-oss) as long as the endpoint is addressed with virtual hosted style. + +| Key | Example | Description | +| -------------------- | ------------------- | ------------------------------------------------ | +| s3.endpoint | | Configure an endpoint of the OSS service for the FileIO to access. Be sure to use S3 compatible endpoint as given in the example. | +| s3.access-key-id | admin | Configure the static access key id used to access the FileIO. | +| s3.secret-access-key | password | Configure the static secret access key used to access the FileIO. | +| s3.session-token | AQoDYXdzEJr... | Configure the static session token used to access the FileIO. | +| s3.force-virtual-addressing | True | Whether to use virtual addressing of buckets. This must be set to True as OSS can only be accessed with virtual hosted style address. | + + + ### PyArrow diff --git a/pyiceberg/io/__init__.py b/pyiceberg/io/__init__.py index 3769c3194..40186069d 100644 --- a/pyiceberg/io/__init__.py +++ b/pyiceberg/io/__init__.py @@ -74,6 +74,7 @@ S3_SIGNER_ENDPOINT_DEFAULT = "v1/aws/s3/sign" S3_ROLE_ARN = "s3.role-arn" S3_ROLE_SESSION_NAME = "s3.role-session-name" +S3_FORCE_VIRTUAL_ADDRESSING = "s3.force-virtual-addressing" HDFS_HOST = "hdfs.host" HDFS_PORT = "hdfs.port" HDFS_USER = "hdfs.user" @@ -304,6 +305,7 @@ def delete(self, location: Union[str, InputFile, OutputFile]) -> None: "s3": [ARROW_FILE_IO, FSSPEC_FILE_IO], "s3a": [ARROW_FILE_IO, FSSPEC_FILE_IO], "s3n": [ARROW_FILE_IO, FSSPEC_FILE_IO], + "oss": [ARROW_FILE_IO], "gs": [ARROW_FILE_IO], "file": [ARROW_FILE_IO, FSSPEC_FILE_IO], "hdfs": [ARROW_FILE_IO], diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index bd4e969df..7956a8324 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -102,6 +102,7 @@ S3_ACCESS_KEY_ID, S3_CONNECT_TIMEOUT, S3_ENDPOINT, + S3_FORCE_VIRTUAL_ADDRESSING, S3_PROXY_URI, S3_REGION, S3_ROLE_ARN, @@ -350,7 +351,7 @@ def parse_location(location: str) -> Tuple[str, str, str]: return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}" def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSystem: - if scheme in {"s3", "s3a", "s3n"}: + if scheme in {"s3", "s3a", "s3n", "oss"}: from pyarrow.fs import S3FileSystem client_kwargs: Dict[str, Any] = { @@ -373,6 +374,9 @@ def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSyste if session_name := get_first_property_value(self.properties, S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME): client_kwargs["session_name"] = session_name + if force_virtual_addressing := self.properties.get(S3_FORCE_VIRTUAL_ADDRESSING): + client_kwargs["force_virtual_addressing"] = property_as_bool(self.properties, force_virtual_addressing, False) + return S3FileSystem(**client_kwargs) elif scheme in ("hdfs", "viewfs"): from pyarrow.fs import HadoopFileSystem