Skip to content

Commit

Permalink
Added basic functionality for read and write to HUAWEI Object Storage…
Browse files Browse the repository at this point in the history
… Service (OBS)
  • Loading branch information
imhy committed Jul 5, 2024
1 parent 5a82613 commit 5cd51fd
Show file tree
Hide file tree
Showing 9 changed files with 819 additions and 16 deletions.
51 changes: 51 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ Other examples of URLs that ``smart_open`` accepts::
s3://my_key:my_secret@my_server:my_port@my_bucket/my_key
gs://my_bucket/my_blob
azure://my_bucket/my_blob
obs://bucket_id.server:port/object_key
hdfs:///path/file
hdfs://path/file
webhdfs://host:port/path/file
Expand Down Expand Up @@ -290,6 +291,7 @@ Transport-specific Options
- WebHDFS
- GCS
- Azure Blob Storage
- OBS (Huawei Object Storage)

Each option involves setting up its own set of parameters.
For example, for accessing S3, you often need to set up authentication, like API keys or a profile name.
Expand Down Expand Up @@ -455,6 +457,55 @@ Additional keyword arguments can be propagated to the ``commit_block_list`` meth
kwargs = {'metadata': {'version': 2}}
fout = open('azure://container/key', 'wb', transport_params={'blob_kwargs': kwargs})
OBS Credentials
---------------
``smart_open`` uses the ``esdk-obs-python`` library to talk to OBS.
Please see `esdk-obs-python docs <https://support.huaweicloud.com/intl/en-us/sdk-python-devg-obs/obs_22_0500.html>`__.

There are several ways to provide Access key, Secret Key and Security Token

- Using env variables
- Using custom client params

AK, SK, ST can be encrypted in this case You need install and configure `security provider <https://support.huawei.com/enterprise/en/software/260510077-ESW2000847337>`__.


OBS Advanced Usage
--------------------
- Supported env variables:

OBS_ACCESS_KEY_ID,
OBS_SECRET_ACCESS_KEY,
OBS_SECURITY_TOKEN,
SMART_OPEN_OBS_USE_CLIENT_WRITE_MODE,
SMART_OPEN_OBS_DECRYPT_AK_SK,
SMART_OPEN_OBS_SCC_LIB_PATH,
SMART_OPEN_OBS_SCC_CONF_PATH

- Configuration via code
.. code-block:: python
client = {'access_key_id': 'ak', 'secret_access_key': 'sk', 'security_token': 'st', 'server': 'server_url'}
headers = []
transport_params = {
>>> # client can be dict with parameters supported by the obs.ObsClient or instance of the obs.ObsClient
>>> 'client': client,
>>> # additional header for request, please see esdk-obs-python docs
>>> 'headers': headers,
>>> # if True obs.ObsClient will be take write method argument as readable object to get bytes. For writing mode only.
>>> # Please see docs for ObsClient.putContent api.
>>> 'use_obs_client_write_mode': True,
>>> # True if need decrypt Ak, Sk, St
>>> # It required to install CryptoAPI libs.
>>> # https://support.huawei.com/enterprise/en/software/260510077-ESW2000847337
>>> 'decrypt_ak_sk' : True,
>>> # path to python libs of the Crypto provider
>>> 'scc_lib_path': '/usr/lib/scc',
>>> # path to config file of the Crypto provider
>>> 'scc_conf_path': '/home/user/scc.conf'}
fout = open('obs://bucket_id.server:port/object_key', 'wb', transport_params=transport_params)
Drop-in replacement of ``pathlib.Path.open``
--------------------------------------------

Expand Down
128 changes: 128 additions & 0 deletions integration-tests/test_obs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# -*- coding: utf-8 -*-
import io
import os

import obs
from obs import ObsClient

import smart_open
from smart_open.obs import parse_uri

_OBS_URL = os.environ.get('SO_OBS_URL')

assert _OBS_URL is not None, 'please set the SO_OBS_URL environment variable'

assert os.environ.get('OBS_ACCESS_KEY_ID') is not None, \
'please set the OBS_ACCESS_KEY_ID environment variable'
assert os.environ.get('OBS_SECRET_ACCESS_KEY') is not None, \
'please set the OBS_SECRET_ACCESS_KEY environment variable'


def _clear_bucket(obs_client: obs.ObsClient, bucket_id: str):
objects = obs_client.listObjects(bucketName=bucket_id)
for content in objects.body.contents:
print(content.get('key'))
_delete_object(obs_client=obs_client,
bucket_id=bucket_id,
object_key=content.get('key'))


def _delete_object(obs_client: obs.ObsClient, bucket_id: str, object_key: str):
try:
resp = obs_client.deleteObject(bucketName=bucket_id, objectKey=object_key)
if resp.status < 300:
print('requestId:', resp.requestId)
print('deleteMarker:', resp.body.deleteMarker)
except Exception as ex:
print(ex)


def initialize_bucket():
parsed = parse_uri(_OBS_URL)
server = f'https://{parsed.get("server")}'
bucket_id = parsed.get('bucket_id')
obs_client = ObsClient(server=server, security_provider_policy='ENV')
_clear_bucket(obs_client=obs_client, bucket_id=bucket_id)


def write_read(key, content, write_mode, read_mode, **kwargs):
with smart_open.open(key, write_mode, **kwargs) as fout:
fout.write(content)
with smart_open.open(key, read_mode, **kwargs) as fin:
return fin.read()


def read_length_prefixed_messages(key, read_mode, **kwargs):
result = io.BytesIO()

with smart_open.open(key, read_mode, **kwargs) as fin:
length_byte = fin.read(1)
while len(length_byte):
result.write(length_byte)
msg = fin.read(ord(length_byte))
result.write(msg)
length_byte = fin.read(1)
return result.getvalue()


def test_obs_readwrite_binary(benchmark):
initialize_bucket()

key = _OBS_URL + '/sanity.txt'
binary = 'с гранатою в кармане, с чекою в руке'.encode()
actual = benchmark(write_read, key, binary, 'wb', 'rb')
assert actual == binary


def test_obs_readwrite_binary_gzip(benchmark):
initialize_bucket()

key = _OBS_URL + '/sanity.txt.gz'
binary = 'не чайки здесь запели на знакомом языке'.encode()
actual = benchmark(write_read, key, binary, 'wb', 'rb')
assert actual == binary


def test_obs_performance(benchmark):
initialize_bucket()

one_megabyte = io.BytesIO()
for _ in range(1024 * 128):
one_megabyte.write(b'01234567')
one_megabyte = one_megabyte.getvalue()

key = _OBS_URL + '/performance.txt'
actual = benchmark(write_read, key, one_megabyte, 'wb', 'rb')
assert actual == one_megabyte


def test_obs_performance_gz(benchmark):
initialize_bucket()

one_megabyte = io.BytesIO()
for _ in range(1024 * 128):
one_megabyte.write(b'01234567')
one_megabyte = one_megabyte.getvalue()

key = _OBS_URL + '/performance.txt.gz'
actual = benchmark(write_read, key, one_megabyte, 'wb', 'rb')
assert actual == one_megabyte


def test_obs_performance_small_reads(benchmark):
initialize_bucket()

ONE_MIB = 1024 ** 2
one_megabyte_of_msgs = io.BytesIO()
msg = b'\x0f' + b'0123456789abcde' # a length-prefixed "message"
for _ in range(0, ONE_MIB, len(msg)):
one_megabyte_of_msgs.write(msg)
one_megabyte_of_msgs = one_megabyte_of_msgs.getvalue()

key = _OBS_URL + '/many_reads_performance.bin'

with smart_open.open(key, 'wb') as fout:
fout.write(one_megabyte_of_msgs)

actual = benchmark(read_length_prefixed_messages, key, 'rb', buffering=ONE_MIB)
assert actual == one_megabyte_of_msgs
4 changes: 3 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ def read(fname):
http_deps = ['requests']
ssh_deps = ['paramiko']
zst_deps = ['zstandard']
obs_deps = ['esdk-obs-python']

all_deps = aws_deps + gcs_deps + azure_deps + http_deps + ssh_deps + zst_deps
all_deps = aws_deps + gcs_deps + azure_deps + http_deps + ssh_deps + zst_deps + obs_deps
tests_require = all_deps + [
'moto[server]',
'responses',
Expand Down Expand Up @@ -83,6 +84,7 @@ def read(fname):
'webhdfs': http_deps,
'ssh': ssh_deps,
'zst': zst_deps,
'obs': obs_deps,
},
python_requires=">=3.7,<4.0",

Expand Down
Loading

0 comments on commit 5cd51fd

Please sign in to comment.