Skip to content

Commit

Permalink
Add Alibaba OSS protocol to PyArrowFileIO (apache#1392)
Browse files Browse the repository at this point in the history
* Added force virtual addressing configuration for S3. Also added oss and r2 protocol.

* Rewrote force virtual addressing as written in PyArrow documentation

* Added the missing r2 key value in schema_to_file_io

* Removed R2 protocol for now

* Linter fix

* Updated documentation for OSS support

* Another linter fix
  • Loading branch information
helmiazizm authored Dec 9, 2024
1 parent 2c972fa commit d82f8f7
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 1 deletion.
18 changes: 18 additions & 0 deletions mkdocs/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ Iceberg works with the concept of a FileIO which is a pluggable module for readi
- **file**: `PyArrowFileIO`
- **hdfs**: `PyArrowFileIO`
- **abfs**, **abfss**: `FsspecFileIO`
- **oss**: `PyArrowFileIO`

You can also set the FileIO explicitly:

Expand Down Expand Up @@ -115,6 +116,7 @@ For the FileIO there are several configuration options available:
| s3.region | us-west-2 | Sets the region of the bucket |
| s3.proxy-uri | <http://my.proxy.com:8080> | Configure the proxy server to be used by the FileIO. |
| s3.connect-timeout | 60.0 | Configure socket connection timeout, in seconds. |
| s3.force-virtual-addressing | False | Whether to use virtual addressing of buckets. If true, then virtual addressing is always enabled. If false, then virtual addressing is only enabled if endpoint_override is empty. This can be used for non-AWS backends that only support virtual hosted-style access. |

<!-- markdown-link-check-enable-->

Expand Down Expand Up @@ -167,6 +169,22 @@ For the FileIO there are several configuration options available:

<!-- markdown-link-check-enable-->

### Alibaba Cloud Object Storage Service (OSS)

<!-- markdown-link-check-disable -->

PyIceberg uses [S3FileSystem](https://arrow.apache.org/docs/python/generated/pyarrow.fs.S3FileSystem.html) class to connect to OSS bucket as the service is [compatible with S3 SDK](https://www.alibabacloud.com/help/en/oss/developer-reference/use-amazon-s3-sdks-to-access-oss) as long as the endpoint is addressed with virtual hosted style.

| Key | Example | Description |
| -------------------- | ------------------- | ------------------------------------------------ |
| s3.endpoint | <https://s3.oss-your-bucket-region.aliyuncs.com/> | Configure an endpoint of the OSS service for the FileIO to access. Be sure to use S3 compatible endpoint as given in the example. |
| s3.access-key-id | admin | Configure the static access key id used to access the FileIO. |
| s3.secret-access-key | password | Configure the static secret access key used to access the FileIO. |
| s3.session-token | AQoDYXdzEJr... | Configure the static session token used to access the FileIO. |
| s3.force-virtual-addressing | True | Whether to use virtual addressing of buckets. This must be set to True as OSS can only be accessed with virtual hosted style address. |

<!-- markdown-link-check-enable-->

### PyArrow

<!-- markdown-link-check-disable -->
Expand Down
2 changes: 2 additions & 0 deletions pyiceberg/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
S3_SIGNER_ENDPOINT_DEFAULT = "v1/aws/s3/sign"
S3_ROLE_ARN = "s3.role-arn"
S3_ROLE_SESSION_NAME = "s3.role-session-name"
S3_FORCE_VIRTUAL_ADDRESSING = "s3.force-virtual-addressing"
HDFS_HOST = "hdfs.host"
HDFS_PORT = "hdfs.port"
HDFS_USER = "hdfs.user"
Expand Down Expand Up @@ -304,6 +305,7 @@ def delete(self, location: Union[str, InputFile, OutputFile]) -> None:
"s3": [ARROW_FILE_IO, FSSPEC_FILE_IO],
"s3a": [ARROW_FILE_IO, FSSPEC_FILE_IO],
"s3n": [ARROW_FILE_IO, FSSPEC_FILE_IO],
"oss": [ARROW_FILE_IO],
"gs": [ARROW_FILE_IO],
"file": [ARROW_FILE_IO, FSSPEC_FILE_IO],
"hdfs": [ARROW_FILE_IO],
Expand Down
6 changes: 5 additions & 1 deletion pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
S3_ACCESS_KEY_ID,
S3_CONNECT_TIMEOUT,
S3_ENDPOINT,
S3_FORCE_VIRTUAL_ADDRESSING,
S3_PROXY_URI,
S3_REGION,
S3_ROLE_ARN,
Expand Down Expand Up @@ -350,7 +351,7 @@ def parse_location(location: str) -> Tuple[str, str, str]:
return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}"

def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSystem:
if scheme in {"s3", "s3a", "s3n"}:
if scheme in {"s3", "s3a", "s3n", "oss"}:
from pyarrow.fs import S3FileSystem

client_kwargs: Dict[str, Any] = {
Expand All @@ -373,6 +374,9 @@ def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSyste
if session_name := get_first_property_value(self.properties, S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME):
client_kwargs["session_name"] = session_name

if force_virtual_addressing := self.properties.get(S3_FORCE_VIRTUAL_ADDRESSING):
client_kwargs["force_virtual_addressing"] = property_as_bool(self.properties, force_virtual_addressing, False)

return S3FileSystem(**client_kwargs)
elif scheme in ("hdfs", "viewfs"):
from pyarrow.fs import HadoopFileSystem
Expand Down

0 comments on commit d82f8f7

Please sign in to comment.