Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Using fsspec to download files #348

Merged
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
96ec15d
fsspec basic setup done and working for s3
deependujha Sep 1, 2024
45b59ae
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 1, 2024
74dae21
fix storage option in fsspec
deependujha Sep 2, 2024
fcb4d95
pass down `storage_options` in dataset utilities
deependujha Sep 2, 2024
3080c2c
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 2, 2024
c31e259
tested successfully on S3 and GS for (mode= none | append | overwrite…
deependujha Sep 3, 2024
0c761b1
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 3, 2024
2377983
fixed mypy errors and lock files when uploading/downloading
deependujha Sep 4, 2024
ffbf51d
update
deependujha Sep 4, 2024
de8b83b
fixed test `test_try_create_cache_dir`
deependujha Sep 4, 2024
e712327
fixed test: `test_reader_chunk_removal`
deependujha Sep 4, 2024
e118ba9
all tests passed
deependujha Sep 4, 2024
d3450dc
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 4, 2024
ed0fff8
update
deependujha Sep 4, 2024
08236e8
update
deependujha Sep 4, 2024
12b049b
boto3 stop bothering me
deependujha Sep 4, 2024
d560d91
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 4, 2024
27644d3
update
deependujha Sep 4, 2024
bf06cf9
update
deependujha Sep 4, 2024
bdc13f4
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 4, 2024
909b5cb
update
deependujha Sep 4, 2024
f661ae1
Merge branch 'main' into feat/using-fsspec-to-download-files
deependujha Sep 4, 2024
5671d11
tested on azure and made sure `storage_option` is working in all cases
deependujha Sep 5, 2024
2beebc9
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 5, 2024
dd2e742
update
deependujha Sep 5, 2024
f555069
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 5, 2024
87a9556
update
deependujha Sep 5, 2024
8e9d448
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 5, 2024
555eb19
use s5cmd to download files if available
deependujha Sep 5, 2024
5ef4004
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 5, 2024
67205ea
add default storage_options
deependujha Sep 6, 2024
69fb43d
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 6, 2024
b49a126
raise error if cloud is not supported
deependujha Sep 6, 2024
dbe8b0e
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 6, 2024
5a81f04
update
deependujha Sep 6, 2024
848484a
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 6, 2024
4d62fdd
fix windows error related to urllib parse scheme
deependujha Sep 6, 2024
6b961f7
Merge branch 'main' into feat/using-fsspec-to-download-files
deependujha Sep 6, 2024
e544d09
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 6, 2024
31f6e5a
Merge branch 'main' into feat/using-fsspec-to-download-files
bhimrazy Sep 16, 2024
5d1ec46
Merge branch 'main' into feat/using-fsspec-to-download-files
bhimrazy Sep 17, 2024
e68076d
cleanup commented code
deependujha Sep 18, 2024
79a3ad8
Merge branch 'main' into feat/using-fsspec-to-download-files
deependujha Sep 18, 2024
2036e37
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 18, 2024
e230ceb
update
deependujha Sep 18, 2024
e60f9ae
readme updated
deependujha Sep 18, 2024
feb5d48
increase test_dataset_resume_on_future_chunk timeout time to 120 seconds
deependujha Sep 18, 2024
b5ec077
update
deependujha Sep 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,7 @@ torch
lightning-utilities
filelock
numpy
boto3
# boto3
requests
fsspec
fsspec[s3] # aws s3
2 changes: 2 additions & 0 deletions requirements/extras.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ pyarrow
tqdm
lightning-cloud == 0.5.70 # Must be pinned to ensure compatibility
google-cloud-storage
fsspec[gs] # google cloud storage
fsspec[abfs] # azure blob
1 change: 1 addition & 0 deletions src/litdata/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,4 @@
_TIME_FORMAT = "%Y-%m-%d_%H-%M-%S.%fZ"
_IS_IN_STUDIO = bool(os.getenv("LIGHTNING_CLOUD_PROJECT_ID", None)) and bool(os.getenv("LIGHTNING_CLUSTER_ID", None))
_ENABLE_STATUS = bool(int(os.getenv("ENABLE_STATUS_REPORT", "0")))
_SUPPORTED_CLOUD_PROVIDERS = ["s3", "gs", "azure", "abfs"]
161 changes: 86 additions & 75 deletions src/litdata/processing/data_processor.py

Large diffs are not rendered by default.

70 changes: 40 additions & 30 deletions src/litdata/processing/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

import torch

from litdata.constants import _INDEX_FILENAME, _IS_IN_STUDIO
from litdata.constants import _INDEX_FILENAME, _IS_IN_STUDIO, _SUPPORTED_CLOUD_PROVIDERS
from litdata.processing.data_processor import DataChunkRecipe, DataProcessor, DataTransformRecipe
from litdata.processing.readers import BaseReader
from litdata.processing.utilities import (
Expand All @@ -36,8 +36,8 @@
optimize_dns_context,
read_index_file_content,
)
from litdata.streaming.client import S3Client
from litdata.streaming.dataloader import StreamingDataLoader
from litdata.streaming.downloader import copy_file_or_directory, upload_file_or_directory
from litdata.streaming.item_loader import BaseItemLoader
from litdata.streaming.resolver import (
Dir,
Expand All @@ -53,7 +53,7 @@

def _is_remote_file(path: str) -> bool:
obj = parse.urlparse(path)
return obj.scheme in ["s3", "gcs"]
return obj.scheme in _SUPPORTED_CLOUD_PROVIDERS


def _get_indexed_paths(data: Any) -> Dict[int, str]:
Expand Down Expand Up @@ -151,8 +151,15 @@ def __init__(
compression: Optional[str],
encryption: Optional[Encryption] = None,
existing_index: Optional[Dict[str, Any]] = None,
storage_options: Optional[Dict] = {},
):
super().__init__(chunk_size=chunk_size, chunk_bytes=chunk_bytes, compression=compression, encryption=encryption)
super().__init__(
chunk_size=chunk_size,
chunk_bytes=chunk_bytes,
compression=compression,
encryption=encryption,
storage_options=storage_options,
)
self._fn = fn
self._inputs = inputs
self.is_generator = False
Expand Down Expand Up @@ -199,6 +206,7 @@ def map(
error_when_not_empty: bool = False,
reader: Optional[BaseReader] = None,
batch_size: Optional[int] = None,
storage_options: Optional[Dict] = {},
) -> None:
"""This function maps a callable over a collection of inputs, possibly in a distributed way.

Expand All @@ -219,6 +227,7 @@ def map(
Set this to ``False`` if the order in which samples are processed should be preserved.
error_when_not_empty: Whether we should error if the output folder isn't empty.
batch_size: Group the inputs into batches of batch_size length.
storage_options: The storage options used by the cloud provider.

"""
if isinstance(inputs, StreamingDataLoader) and batch_size is not None:
Expand Down Expand Up @@ -257,7 +266,7 @@ def map(
)

if error_when_not_empty:
_assert_dir_is_empty(_output_dir)
_assert_dir_is_empty(_output_dir, storage_options=storage_options)

if not isinstance(inputs, StreamingDataLoader):
input_dir = input_dir or _get_input_dir(inputs)
Expand All @@ -281,6 +290,7 @@ def map(
reorder_files=reorder_files,
weights=weights,
reader=reader,
storage_options=storage_options,
)
with optimize_dns_context(True):
return data_processor.run(LambdaDataTransformRecipe(fn, inputs))
Expand Down Expand Up @@ -314,6 +324,7 @@ def optimize(
use_checkpoint: bool = False,
item_loader: Optional[BaseItemLoader] = None,
start_method: Optional[str] = None,
storage_options: Optional[Dict] = {},
) -> None:
"""This function converts a dataset into chunks, possibly in a distributed way.

Expand Down Expand Up @@ -347,6 +358,7 @@ def optimize(
the format in which the data is stored and optimized for loading.
start_method: The start method used by python multiprocessing package. Default to spawn unless running
inside an interactive shell like Ipython.
storage_options: The storage options used by the cloud provider.

"""
if mode is not None and mode not in ["append", "overwrite"]:
Expand Down Expand Up @@ -401,7 +413,9 @@ def optimize(
" HINT: You can either use `/teamspace/s3_connections/...` or `/teamspace/datasets/...`."
)

_assert_dir_has_index_file(_output_dir, mode=mode, use_checkpoint=use_checkpoint)
_assert_dir_has_index_file(
_output_dir, mode=mode, use_checkpoint=use_checkpoint, storage_options=storage_options
)

if not isinstance(inputs, StreamingDataLoader):
resolved_dir = _resolve_dir(input_dir or _get_input_dir(inputs))
Expand All @@ -417,7 +431,9 @@ def optimize(
num_workers = num_workers or _get_default_num_workers()
state_dict = {rank: 0 for rank in range(num_workers)}

existing_index_file_content = read_index_file_content(_output_dir) if mode == "append" else None
existing_index_file_content = (
read_index_file_content(_output_dir, storage_options=storage_options) if mode == "append" else None
)

if existing_index_file_content is not None:
for chunk in existing_index_file_content["chunks"]:
Expand All @@ -439,6 +455,7 @@ def optimize(
use_checkpoint=use_checkpoint,
item_loader=item_loader,
start_method=start_method,
storage_options=storage_options,
)

with optimize_dns_context(True):
Expand All @@ -451,6 +468,7 @@ def optimize(
compression=compression,
encryption=encryption,
existing_index=existing_index_file_content,
storage_options=storage_options,
)
)
return None
Expand Down Expand Up @@ -519,7 +537,7 @@ class CopyInfo:
new_filename: str


def merge_datasets(input_dirs: List[str], output_dir: str) -> None:
def merge_datasets(input_dirs: List[str], output_dir: str, storage_options: Optional[Dict] = {}) -> None:
"""The merge_datasets utility enables to merge multiple existing optimized datasets into a single optimized
dataset.

Expand All @@ -540,12 +558,14 @@ def merge_datasets(input_dirs: List[str], output_dir: str) -> None:
if any(input_dir == resolved_output_dir for input_dir in resolved_input_dirs):
raise ValueError("The provided output_dir was found within the input_dirs. This isn't supported.")

input_dirs_file_content = [read_index_file_content(input_dir) for input_dir in resolved_input_dirs]
input_dirs_file_content = [
read_index_file_content(input_dir, storage_options=storage_options) for input_dir in resolved_input_dirs
]

if any(file_content is None for file_content in input_dirs_file_content):
raise ValueError("One of the provided input_dir doesn't have an index file.")

output_dir_file_content = read_index_file_content(resolved_output_dir)
output_dir_file_content = read_index_file_content(resolved_output_dir, storage_options=storage_options)

if output_dir_file_content is not None:
raise ValueError("The output_dir already contains an optimized dataset")
Expand Down Expand Up @@ -580,12 +600,12 @@ def merge_datasets(input_dirs: List[str], output_dir: str) -> None:
_tqdm = _get_tqdm_iterator_if_available()

for copy_info in _tqdm(copy_infos):
_apply_copy(copy_info, resolved_output_dir)
_apply_copy(copy_info, resolved_output_dir, storage_options=storage_options)

_save_index(index_json, resolved_output_dir)
_save_index(index_json, resolved_output_dir, storage_options=storage_options)


def _apply_copy(copy_info: CopyInfo, output_dir: Dir) -> None:
def _apply_copy(copy_info: CopyInfo, output_dir: Dir, storage_options: Optional[Dict] = {}) -> None:
if output_dir.url is None and copy_info.input_dir.url is None:
assert copy_info.input_dir.path
assert output_dir.path
Expand All @@ -595,20 +615,15 @@ def _apply_copy(copy_info: CopyInfo, output_dir: Dir) -> None:
shutil.copyfile(input_filepath, output_filepath)

elif output_dir.url and copy_info.input_dir.url:
input_obj = parse.urlparse(os.path.join(copy_info.input_dir.url, copy_info.old_filename))
output_obj = parse.urlparse(os.path.join(output_dir.url, copy_info.new_filename))

s3 = S3Client()
s3.client.copy(
{"Bucket": input_obj.netloc, "Key": input_obj.path.lstrip("/")},
output_obj.netloc,
output_obj.path.lstrip("/"),
)
input_obj = os.path.join(copy_info.input_dir.url, copy_info.old_filename)
output_obj = os.path.join(output_dir.url, copy_info.new_filename)

copy_file_or_directory(input_obj, output_obj, storage_options=storage_options)
else:
raise NotImplementedError


def _save_index(index_json: Dict, output_dir: Dir) -> None:
def _save_index(index_json: Dict, output_dir: Dir, storage_options: Optional[Dict] = {}) -> None:
if output_dir.url is None:
assert output_dir.path
with open(os.path.join(output_dir.path, _INDEX_FILENAME), "w") as f:
Expand All @@ -619,11 +634,6 @@ def _save_index(index_json: Dict, output_dir: Dir) -> None:

f.flush()

obj = parse.urlparse(os.path.join(output_dir.url, _INDEX_FILENAME))

s3 = S3Client()
s3.client.upload_file(
f.name,
obj.netloc,
obj.path.lstrip("/"),
upload_file_or_directory(
f.name, os.path.join(output_dir.url, _INDEX_FILENAME), storage_options=storage_options
)
43 changes: 11 additions & 32 deletions src/litdata/processing/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from urllib import parse

import boto3
import botocore

from litdata.constants import _INDEX_FILENAME, _IS_IN_STUDIO
from litdata.constants import _INDEX_FILENAME, _IS_IN_STUDIO, _SUPPORTED_CLOUD_PROVIDERS
from litdata.streaming.cache import Dir
from litdata.streaming.downloader import download_file_or_directory


def _create_dataset(
Expand Down Expand Up @@ -183,7 +181,7 @@ def _get_work_dir() -> str:
return f"s3://{bucket_name}/projects/{project_id}/lightningapps/{app_id}/artifacts/{work_id}/content/"


def read_index_file_content(output_dir: Dir) -> Optional[Dict[str, Any]]:
def read_index_file_content(output_dir: Dir, storage_options: Optional[Dict] = {}) -> Optional[Dict[str, Any]]:
"""Read the index file content."""
if not isinstance(output_dir, Dir):
raise ValueError("The provided output_dir should be a Dir object.")
Expand All @@ -201,27 +199,26 @@ def read_index_file_content(output_dir: Dir) -> Optional[Dict[str, Any]]:
# download the index file from s3, and read it
obj = parse.urlparse(output_dir.url)

if obj.scheme != "s3":
raise ValueError(f"The provided folder should start with s3://. Found {output_dir.path}.")

# TODO: Add support for all cloud providers
s3 = boto3.client("s3")

prefix = obj.path.lstrip("/").rstrip("/") + "/"
if obj.scheme not in _SUPPORTED_CLOUD_PROVIDERS:
raise ValueError(
f"The provided folder should start with {_SUPPORTED_CLOUD_PROVIDERS}. Found {output_dir.path}."
)

# Check the index file exists
try:
# Create a temporary file
with tempfile.NamedTemporaryFile(suffix=".json", delete=False) as temp_file:
temp_file_name = temp_file.name
s3.download_file(obj.netloc, os.path.join(prefix, _INDEX_FILENAME), temp_file_name)
download_file_or_directory(
os.path.join(output_dir.url, _INDEX_FILENAME), temp_file_name, storage_options=storage_options
)
# Read data from the temporary file
with open(temp_file_name) as temp_file:
data = json.load(temp_file)
# Delete the temporary file
os.remove(temp_file_name)
return data
except botocore.exceptions.ClientError:
except Exception as _e:
return None


Expand Down Expand Up @@ -257,21 +254,3 @@ def remove_uuid_from_filename(filepath: str) -> str:

# uuid is of 32 characters, '.json' is 5 characters and '-' is 1 character
return filepath[:-38] + ".json"


def download_directory_from_S3(bucket_name: str, remote_directory_name: str, local_directory_name: str) -> str:
s3_resource = boto3.resource("s3")
bucket = s3_resource.Bucket(bucket_name)

saved_file_dir = "."

for obj in bucket.objects.filter(Prefix=remote_directory_name):
local_filename = os.path.join(local_directory_name, obj.key)

if not os.path.exists(os.path.dirname(local_filename)):
os.makedirs(os.path.dirname(local_filename))
with open(local_filename, "wb") as f:
s3_resource.meta.client.download_fileobj(bucket_name, obj.key, f)
saved_file_dir = os.path.dirname(local_filename)

return saved_file_dir
Loading
Loading