-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Filesystem Backend #18
Open
danielBCN
wants to merge
4
commits into
CLOUDLAB-URV:master
Choose a base branch
from
danielBCN:filesystem
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,372 @@ | ||
from __future__ import annotations | ||
|
||
import copy | ||
import inspect | ||
import logging | ||
import pickle | ||
from collections import namedtuple | ||
from copy import deepcopy | ||
from functools import partial | ||
from types import SimpleNamespace | ||
from typing import TYPE_CHECKING | ||
|
||
import botocore.exceptions | ||
import smart_open | ||
import joblib | ||
|
||
from .entities import CloudDataFormat, CloudObjectSlice | ||
from .preprocessing.handler import joblib_handler | ||
|
||
from .storage.filesystem import FileSystemS3API, FilePath | ||
from .util import head_object, upload_file_with_progress | ||
|
||
if TYPE_CHECKING: | ||
from typing import List, Tuple, Dict, Optional, Any | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class CloudObject: | ||
def __init__( | ||
self, | ||
data_format: CloudDataFormat, | ||
object_path: FilePath, | ||
meta_path: FilePath, | ||
attrs_path: FilePath, | ||
storage_config: Optional[Dict[str, Any]] = None | ||
): | ||
# Storage headers of the data object | ||
self._obj_headers: Optional[Dict[str, str]] = None | ||
# Storage headers of the metadata object | ||
self._meta_headers: Optional[Dict[str, str]] = None | ||
# Storage headers of the attributes object | ||
self._attrs_headers: Optional[Dict[str, str]] = None | ||
|
||
# cls reference for the CloudDataType of this object | ||
self._format_cls: CloudDataFormat = data_format | ||
|
||
self._obj_path = object_path | ||
|
||
# S3 Path for the metadata object. Located in bucket suffixed | ||
# with .meta with the same key as original data object | ||
self._meta_path = meta_path | ||
|
||
# S3 Path for the attributes object. Located in bucket suffixed | ||
# with .meta with key as original data object suffixed with .attrs | ||
self._attrs_path = attrs_path | ||
self._attrs: Optional[SimpleNamespace] = None | ||
|
||
storage_config = storage_config or {} | ||
self._s3 = FileSystemS3API() | ||
|
||
logger.info("Created reference for %s", self) | ||
logger.debug(f"{self._obj_path=},{self._meta_path=}") | ||
|
||
@property | ||
def path(self) -> FilePath: | ||
return self._obj_path | ||
|
||
@property | ||
def meta_path(self) -> FilePath: | ||
return self._meta_path | ||
|
||
@property | ||
def size(self) -> int: | ||
if not self._obj_headers: | ||
self.fetch() | ||
return int(self._obj_headers["ContentLength"]) | ||
|
||
@property | ||
def meta_size(self) -> int: | ||
if self._meta_headers is None or "ContentLength" not in self._meta_headers: | ||
raise AttributeError() | ||
return int(self._meta_headers["ContentLength"]) | ||
|
||
@property | ||
def storage(self) -> FileSystemS3API: | ||
return self._s3 | ||
|
||
@property | ||
def attributes(self) -> Any: | ||
return self._attrs | ||
|
||
@property | ||
def open(self) -> smart_open.smart_open: | ||
assert self.storage is not None | ||
logger.debug("Creating new smart_open client for uri %s", self.path.as_uri()) | ||
client = copy.deepcopy(self.storage) | ||
return partial( | ||
smart_open.open, self.path.as_uri(), transport_params={"client": client} | ||
) | ||
|
||
@property | ||
def open_metadata(self) -> smart_open.smart_open: | ||
assert self.storage is not None | ||
logger.debug("Creating new smart_open client for uri %s", self.path.as_uri()) | ||
client = copy.deepcopy(self.storage) | ||
return partial( | ||
smart_open.open, | ||
self.meta_path.as_uri(), | ||
transport_params={"client": client}, | ||
) | ||
|
||
# @classmethod | ||
# def from_s3( | ||
# cls, | ||
# data_format: CloudDataFormat, | ||
# storage_uri: str, | ||
# fetch: Optional[bool] = True, | ||
# metadata_bucket: Optional[str] = None, | ||
# s3_config: Optional[Dict[str, Any]] = None, | ||
# ) -> CloudObject: | ||
# obj_path = S3Path.from_uri(storage_uri) | ||
# if metadata_bucket is None: | ||
# metadata_bucket = obj_path.bucket + ".meta" | ||
# metadata_path = S3Path.from_bucket_key(metadata_bucket, obj_path.key) | ||
# attributes_path = S3Path.from_bucket_key( | ||
# metadata_bucket, obj_path.key + ".attrs" | ||
# ) | ||
# co = cls(data_format, obj_path, metadata_path, attributes_path, s3_config) | ||
# if fetch: | ||
# co.fetch() | ||
# return co | ||
|
||
@classmethod | ||
def from_bucket_key( | ||
cls, | ||
data_format: CloudDataFormat, | ||
bucket: str, | ||
key: str, | ||
fetch: Optional[bool] = True, | ||
metadata_bucket: Optional[str] = None, | ||
s3_config: Optional[Dict[str, Any]] = None, | ||
) -> CloudObject: | ||
obj_path = FilePath.from_bucket_key(bucket, key) | ||
if metadata_bucket is None: | ||
metadata_bucket = bucket + ".meta" | ||
meta_path = FilePath.from_bucket_key(metadata_bucket, key) | ||
attributes_path = FilePath.from_bucket_key(metadata_bucket, key + ".attrs") | ||
|
||
co = cls(data_format, obj_path, meta_path, attributes_path) | ||
if fetch: | ||
co.fetch() | ||
return co | ||
|
||
# @classmethod | ||
# def new_from_file( | ||
# cls, data_format, file_path, cloud_path, s3_config=None, override=False | ||
# ) -> "CloudObject": | ||
# obj_path = S3Path.from_uri(cloud_path) | ||
# metadata_path = S3Path.from_bucket_key(obj_path.bucket + ".meta", obj_path.key) | ||
# attributes_path = S3Path.from_bucket_key( | ||
# obj_path.bucket + ".meta", obj_path.key + ".attrs" | ||
# ) | ||
# co_instance = cls( | ||
# data_format, obj_path, metadata_path, attributes_path, s3_config | ||
# ) | ||
|
||
# if co_instance.exists(): | ||
# if not override: | ||
# raise Exception("Object already exists") | ||
# else: | ||
# # Clean preprocessing metadata if object already exists | ||
# co_instance.clean() | ||
|
||
# upload_file_with_progress( | ||
# co_instance.storage, | ||
# co_instance.path.bucket, | ||
# co_instance.path.key, | ||
# file_path, | ||
# ) | ||
# return co_instance | ||
|
||
def exists(self) -> bool: | ||
if not self._obj_headers: | ||
try: | ||
self.fetch() | ||
except KeyError: | ||
return False | ||
return bool(self._obj_headers) | ||
|
||
def is_preprocessed(self) -> bool: | ||
try: | ||
head_object(self.storage, bucket=self._meta_path.bucket, key=self._meta_path.key) | ||
return True | ||
except KeyError: | ||
return False | ||
|
||
def fetch(self): | ||
if not self._obj_headers: | ||
logger.info("Fetching object from Storage") | ||
self._fetch_object() | ||
if not self._meta_headers: | ||
logger.info("Fetching metadata from Storage") | ||
self._fetch_metadata() | ||
|
||
def _fetch_object(self): | ||
self._obj_headers, _ = head_object(self._s3, self._obj_path.bucket, self._obj_path.key) | ||
|
||
def _fetch_metadata(self): | ||
try: | ||
res, _ = head_object(self._s3, self._meta_path.bucket, self._meta_path.key) | ||
self._meta_headers = res | ||
res, _ = head_object(self._s3, self._attrs_path.bucket, self._attrs_path.key) | ||
self._attrs_headers = res | ||
get_res = self.storage.get_object(Bucket=self._attrs_path.bucket, Key=self._attrs_path.key) | ||
try: | ||
attrs_dict = pickle.load(get_res["Body"]) | ||
# Get default attributes from the class, | ||
# so we can have default attributes different from None set in the Class | ||
base_attrs = deepcopy(self._format_cls.attrs_types) | ||
# Replace attributes that have been set in the preprocessing stage | ||
base_attrs.update(attrs_dict) | ||
# Create namedtuple so that the attributes object is immutable | ||
co_named_tuple = namedtuple( | ||
self._format_cls.co_class.__name__ + "Attributes", base_attrs.keys() | ||
) | ||
self._attrs = co_named_tuple(**base_attrs) | ||
except Exception as e: | ||
logger.error(e) | ||
self._attrs = None | ||
except KeyError as e: | ||
self._meta_headers = None | ||
self._attrs = None | ||
|
||
def clean(self): | ||
logger.info("Cleaning indexes and metadata for %s", self) | ||
self._s3.delete_object(Bucket=self._meta_path.bucket, Key=self._meta_path.key) | ||
self._meta_headers = None | ||
self.storage.delete_object(Bucket=self._attrs_path.bucket, Key=self._attrs_path.key) | ||
self._attrs_headers = None | ||
self._attrs = {} | ||
|
||
def preprocess( | ||
self, | ||
parallel_config=None, | ||
extra_args=None, | ||
chunk_size=None, | ||
force=False, | ||
debug=False, | ||
): | ||
assert self.exists(), "Object not found in S3" | ||
if self.is_preprocessed() and not force: | ||
return | ||
|
||
parallel_config = parallel_config or {} | ||
extra_args = extra_args or {} | ||
|
||
# Check if the metadata bucket exists, if not create it | ||
try: | ||
meta_bucket_head = self.storage.head_bucket(Bucket=self.meta_path.bucket) | ||
except botocore.exceptions.ClientError as error: | ||
if error.response["Error"]["Code"] != "404": | ||
raise error | ||
meta_bucket_head = None | ||
|
||
if not meta_bucket_head: | ||
logger.info("Creating meta bucket %s", self.meta_path.bucket) | ||
try: | ||
self.storage.create_bucket(Bucket=self.meta_path.bucket) | ||
except botocore.exceptions.ClientError as error: | ||
logger.error( | ||
"Metadata bucket %s not found -- Also failed to create it", | ||
self.meta_path.bucket, | ||
) | ||
raise error | ||
|
||
preproc_signature = inspect.signature( | ||
self._format_cls.preprocessing_function | ||
).parameters | ||
# Check if parameter cloud_object is in the signature | ||
if "cloud_object" not in preproc_signature: | ||
raise Exception( | ||
"Preprocessing function must have cloud_object as a parameter" | ||
) | ||
|
||
jobs = [] | ||
if chunk_size is None: | ||
# Process the entire object as one batch job | ||
preproc_args = {"cloud_object": self} | ||
if "chunk_data" in preproc_signature: | ||
# Placeholder, we will get the data inside the handler function, in case a remote joblib is used | ||
# since a StreamingBody is not picklable | ||
preproc_args["chunk_data"] = None | ||
# get_res = self._s3.get_object(Bucket=self._obj_path.bucket, Key=self._obj_path.key) | ||
# assert get_res["ResponseMetadata"]["HTTPStatusCode"] in (200, 206) | ||
# preproc_args["chunk_data"] = get_res["Body"] | ||
if "chunk_id" in preproc_signature: | ||
preproc_args["chunk_id"] = 0 | ||
if "chunk_size" in preproc_signature: | ||
preproc_args["chunk_size"] = self.size | ||
if "num_chunks" in preproc_signature: | ||
preproc_args["num_chunks"] = 1 | ||
|
||
# Add extra args if there are any other arguments in the signature | ||
for arg in preproc_signature.keys(): | ||
if arg not in preproc_args and arg in extra_args: | ||
preproc_args[arg] = extra_args[arg] | ||
|
||
jobs.append(preproc_args) | ||
# preprocessing_metadata = self._format_cls.preprocessing_function(**preproc_args) | ||
else: | ||
assert chunk_size != 0 and chunk_size <= self.size, ( | ||
"Chunk size must be greater than 0 " "and less or equal to object size" | ||
) | ||
# Partition the object in chunks and preprocess it in parallel | ||
if not {"chunk_data", "chunk_id", "chunk_size", "num_chunks"}.issubset( | ||
preproc_signature.keys() | ||
): | ||
raise Exception( | ||
"Preprocessing function must have " | ||
"(chunk_data, chunk_id, chunk_size, num_chunks) as a parameters" | ||
) | ||
num_chunks = self.size // chunk_size | ||
for chunk_id in range(num_chunks): | ||
preproc_args = { | ||
"cloud_object": self, | ||
"chunk_id": chunk_id, | ||
"chunk_size": chunk_size, | ||
"num_chunks": num_chunks, | ||
} | ||
# Add extra args if there are any other arguments in the signature | ||
for arg in preproc_signature.keys(): | ||
if arg not in preproc_args: | ||
preproc_args[arg] = extra_args[arg] | ||
jobs.append(preproc_args) | ||
|
||
if debug: | ||
# Run in the main thread for debugging | ||
for job in jobs: | ||
joblib_handler((self._format_cls.preprocessing_function, job)) | ||
return | ||
|
||
with joblib.parallel_config(**parallel_config): | ||
jl = joblib.Parallel() | ||
f = jl( | ||
[ | ||
joblib.delayed(joblib_handler)( | ||
(self._format_cls.preprocessing_function, job) | ||
) | ||
for job in jobs | ||
] | ||
) | ||
for res in f: | ||
print(res) | ||
|
||
def get_attribute(self, key: str) -> Any: | ||
return getattr(self._attrs, key) | ||
|
||
def partition(self, strategy, *args, **kwargs) -> List[CloudObjectSlice]: | ||
assert self.is_preprocessed(), "Object must be preprocessed before partitioning" | ||
|
||
slices = strategy(self, *args, **kwargs) | ||
# Store a reference to this CloudObject instance in the slice | ||
for s in slices: | ||
s.cloud_object = self | ||
return slices | ||
|
||
def __getitem__(self, item): | ||
return self._attrs.__getattribute__(item) | ||
|
||
def __repr__(self): | ||
return f"{self.__class__.__name__}<{self._format_cls.co_class.__name__}>({self.path.as_uri()})" |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My idea here was to diferentiate object storage and file system depending on the path prefix. This way we can keep all the logic of CloudObject. For instance,
s3://mybucket/myobject
for S3 andfs://mybucket/myobject
for file system. Then in the class setup we can choose the correct backend. To solve the root path for the FS backend, we can use thestorage_config
parameter, for example:would read
myfile
from/tmp/mybucket/myfile
from the file system mounted on /tmp or /...