Skip to content

Commit

Permalink
BulkWriter supports Azure blob storage
Browse files Browse the repository at this point in the history
Signed-off-by: yhmo <[email protected]>
  • Loading branch information
yhmo committed Jan 30, 2024
1 parent f637526 commit fbe3b34
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 36 deletions.
6 changes: 3 additions & 3 deletions examples/example_bulkwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def remote_writer(schema: CollectionSchema, file_type: BulkFileType):
with RemoteBulkWriter(
schema=schema,
remote_path="bulk_data",
connect_param=RemoteBulkWriter.ConnectParam(
connect_param=RemoteBulkWriter.S3ConnectParam(
endpoint=MINIO_ADDRESS,
access_key=MINIO_ACCESS_KEY,
secret_key=MINIO_SECRET_KEY,
Expand Down Expand Up @@ -226,7 +226,7 @@ def all_types_writer(bin_vec: bool, schema: CollectionSchema, file_type: BulkFil
with RemoteBulkWriter(
schema=schema,
remote_path="bulk_data",
connect_param=RemoteBulkWriter.ConnectParam(
connect_param=RemoteBulkWriter.S3ConnectParam(
endpoint=MINIO_ADDRESS,
access_key=MINIO_ACCESS_KEY,
secret_key=MINIO_SECRET_KEY,
Expand Down Expand Up @@ -269,7 +269,7 @@ def all_types_writer(bin_vec: bool, schema: CollectionSchema, file_type: BulkFil
"double": np.float64(i/7),
"varchar": f"varchar_{i}",
"json": json.dumps({"dummy": i, "ok": f"name_{i}"}),
"vector": np.array(gen_binary_vector(), np.dtype("int8")) if bin_vec else np.array(gen_float_vector(), np.dtype("float32")),
"vector": np.array(gen_binary_vector()).astype(np.dtype("int8")) if bin_vec else np.array(gen_float_vector()).astype(np.dtype("float32")),
f"dynamic_{i}": i,
# bulkinsert doesn't support import npy with array field, the below values will be stored into dynamic field
"array_str": np.array([f"str_{k}" for k in range(5)], np.dtype("str")),
Expand Down
195 changes: 162 additions & 33 deletions pymilvus/bulk_writer/remote_bulk_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,14 @@
import logging
import sys
from pathlib import Path
from typing import Any, Optional
from typing import Any, Dict, Optional, Union

from azure.core.exceptions import AzureError
from azure.storage.blob import BlobServiceClient
from minio import Minio
from minio.error import S3Error

from pymilvus.exceptions import MilvusException
from pymilvus.orm.schema import CollectionSchema

from .constants import (
Expand All @@ -32,7 +35,7 @@


class RemoteBulkWriter(LocalBulkWriter):
class ConnectParam:
class S3ConnectParam:
def __init__(
self,
bucket_name: str = DEFAULT_BUCKET_NAME,
Expand All @@ -55,11 +58,56 @@ def __init__(
self._http_client = (http_client,) # urllib3.poolmanager.PoolManager
self._credentials = (credentials,) # minio.credentials.Provider

ConnectParam = S3ConnectParam # keep the ConnectParam for compatible with user's legacy code

class AzureConnectParam:
def __init__(
self,
container_name: str,
conn_str: str,
account_url: Optional[str] = None,
credential: Optional[Union[str, Dict[str, str]]] = None,
upload_chunk_size: int = 8 * 1024 * 1024,
upload_concurrency: int = 4,
):
"""Connection parameters for Azure blob storage
Args:
container_name(str): The target container name
conn_str(str): A connection string to an Azure Storage account,
which can be parsed to an account_url and a credential.
To generate a connection string, read this link:
https://learn.microsoft.com/en-us/azure/storage/common/storage-configure-connection-string
account_url(str): A string in format like https://<storage-account>.blob.core.windows.net
Read this link for more info:
https://learn.microsoft.com/en-us/azure/storage/common/storage-account-overview
credential: Account access key for the account, read this link for more info:
https://learn.microsoft.com/en-us/azure/storage/common/storage-account-keys-manage?tabs=azure-portal#view-account-access-keys
upload_chunk_size: If the blob size is larger than this value or unknown,
the blob is uploaded in chunks by parallel connections. This parameter is
passed to max_single_put_size of Azure. Read this link for more info:
https://learn.microsoft.com/en-us/azure/storage/blobs/storage-blob-upload-python#specify-data-transfer-options-for-upload
upload_concurrency: The maximum number of parallel connections to use when uploading
in chunks. This parameter is passed to max_concurrency of Azure.
Read this link for more info:
https://learn.microsoft.com/en-us/azure/storage/blobs/storage-blob-upload-python#specify-data-transfer-options-for-upload
"""
self._container_name = container_name
self._conn_str = conn_str
self._account_url = account_url
self._credential = credential
self._upload_chunk_size = upload_chunk_size
self._upload_concurrency = upload_concurrency

def __init__(
self,
schema: CollectionSchema,
remote_path: str,
connect_param: ConnectParam,
connect_param: Optional[Union[S3ConnectParam, AzureConnectParam]],
chunk_size: int = 1024 * MB,
file_type: BulkFileType = BulkFileType.PARQUET,
**kwargs,
Expand All @@ -86,8 +134,11 @@ def __exit__(self, exc_type: object, exc_val: object, exc_tb: object):
logger.info(f"Delete empty directory '{Path(self._local_path).parent}'")

def _get_client(self):
try:
if self._client is None:
if self._client is not None:
return self._client

if isinstance(self._connect_param, RemoteBulkWriter.S3ConnectParam):
try:

def arg_parse(arg: Any):
return arg[0] if isinstance(arg, tuple) else arg
Expand All @@ -102,28 +153,114 @@ def arg_parse(arg: Any):
http_client=arg_parse(self._connect_param._http_client),
credentials=arg_parse(self._connect_param._credentials),
)
else:
return self._client
except Exception as err:
logger.error(f"Failed to connect MinIO/S3, error: {err}")
raise
logger.info("Minio/S3 blob storage client successfully initialized")
except Exception as err:
logger.error(f"Failed to connect MinIO/S3, error: {err}")
raise
elif isinstance(self._connect_param, RemoteBulkWriter.AzureConnectParam):
try:
if (
self._connect_param._conn_str is not None
and len(self._connect_param._conn_str) > 0
):
self._client = BlobServiceClient.from_connection_string(
conn_str=self._connect_param._conn_str,
credential=self._connect_param._credential,
max_block_size=self._connect_param._upload_chunk_size,
max_single_put_size=self._connect_param._upload_chunk_size,
)
elif (
self._connect_param._account_url is not None
and len(self._connect_param._account_url) > 0
):
self._client = BlobServiceClient(
account_url=self._connect_param._account_url,
credential=self._connect_param._credential,
max_block_size=self._connect_param._upload_chunk_size,
max_single_put_size=self._connect_param._upload_chunk_size,
)
else:
raise MilvusException(message="Illegal connection parameters")

def append_row(self, row: dict, **kwargs):
super().append_row(row, **kwargs)
logger.info("Azure blob storage client successfully initialized")
except Exception as err:
logger.error(f"Failed to connect Azure, error: {err}")
raise

def commit(self, **kwargs):
super().commit(call_back=self._upload)
return self._client

def _stat_object(self, object_name: str):
if isinstance(self._client, Minio):
return self._client.stat_object(
bucket_name=self._connect_param._bucket_name, object_name=object_name
)
if isinstance(self._client, BlobServiceClient):
blob = self._client.get_blob_client(
container=self._connect_param._container_name, blob=object_name
)
return blob.get_blob_properties()

raise MilvusException(message="Blob storage client is not initialized")

def _remote_exists(self, file: str) -> bool:
def _object_exists(self, object_name: str) -> bool:
try:
minio_client = self._get_client()
minio_client.stat_object(bucket_name=self._connect_param._bucket_name, object_name=file)
except S3Error as err:
if err.code == "NoSuchKey":
self._stat_object(object_name=object_name)
except S3Error as s3err:
if s3err.code == "NoSuchKey":
return False
self._throw(f"Failed to stat MinIO/S3 object, error: {err}")
self._throw(f"Failed to stat MinIO/S3 object '{object_name}', error: {s3err}")
except AzureError as azure_err:
if azure_err.error_code == "BlobNotFound":
return False
self._throw(f"Failed to stat Azure object '{object_name}', error: {azure_err}")

return True

def _bucket_exists(self) -> bool:
if isinstance(self._client, Minio):
return self._client.bucket_exists(self._connect_param._bucket_name)
if isinstance(self._client, BlobServiceClient):
containers = self._client.list_containers()
for container in containers:
if self._connect_param._container_name == container["name"]:
return True
return False

raise MilvusException(message="Blob storage client is not initialized")

def _upload_object(self, file_path: str, object_name: str):
logger.info(f"Prepare to upload '{file_path}' to '{object_name}'")
if isinstance(self._client, Minio):
logger.info(f"Target bucket: '{self._connect_param._bucket_name}'")
self._client.fput_object(
bucket_name=self._connect_param._bucket_name,
object_name=object_name,
file_path=file_path,
)
elif isinstance(self._client, BlobServiceClient):
logger.info(f"Target bucket: '{self._connect_param._container_name}'")
container_client = self._client.get_container_client(
self._connect_param._container_name
)
with Path(file_path).open("rb") as data:
container_client.upload_blob(
name=object_name,
data=data,
overwrite=True,
max_concurrency=self._connect_param._upload_concurrency,
connection_timeout=600,
)
else:
raise MilvusException(message="Blob storage client is not initialized")

logger.info(f"Upload file '{file_path}' to '{object_name}'")

def append_row(self, row: dict, **kwargs):
super().append_row(row, **kwargs)

def commit(self, **kwargs):
super().commit(call_back=self._upload)

def _local_rm(self, file: str):
try:
Path(file).unlink()
Expand All @@ -137,11 +274,8 @@ def _local_rm(self, file: str):
def _upload(self, file_list: list):
remote_files = []
try:
logger.info("Prepare to upload files")
minio_client = self._get_client()
found = minio_client.bucket_exists(self._connect_param._bucket_name)
if not found:
self._throw(f"MinIO bucket '{self._connect_param._bucket_name}' doesn't exist")
if not self._bucket_exists():
self._throw("Blob storage bucket/container doesn't exist")

for file_path in file_list:
ext = Path(file_path).suffix
Expand All @@ -153,22 +287,17 @@ def _upload(self, file_list: list):
Path.joinpath(self._remote_path, relative_file_path.lstrip("/"))
).lstrip("/")

if self._remote_exists(minio_file_path):
if self._object_exists(minio_file_path):
logger.info(
f"Remote file '{minio_file_path}' already exists, will overwrite it"
)

minio_client.fput_object(
bucket_name=self._connect_param._bucket_name,
object_name=minio_file_path,
file_path=file_path,
)
logger.info(f"Upload file '{file_path}' to '{minio_file_path}'")
self._upload_object(object_name=minio_file_path, file_path=file_path)

remote_files.append(str(minio_file_path))
self._local_rm(file_path)
except Exception as e:
self._throw(f"Failed to call MinIO/S3 api, error: {e}")
self._throw(f"Failed to upload files, error: {e}")

logger.info(f"Successfully upload files: {file_list}")
self._remote_files.append(remote_files)
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ dependencies=[
"requests",
"minio>=7.0.0",
"pyarrow>=12.0.0",
"azure-storage-blob",
]

classifiers=[
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,4 @@ ruff
black
requests
minio
azure-storage-blob

0 comments on commit fbe3b34

Please sign in to comment.