Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
tchaton committed Oct 11, 2024
1 parent b9aa903 commit 3a832ae
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 11 deletions.
3 changes: 2 additions & 1 deletion src/litdata/streaming/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,8 @@ def __getitem__(self, index: ChunkedIndex) -> Tuple[str, int, int]:

begin = self._intervals[index.chunk_index][0]

return local_chunkpath, begin, chunk["chunk_bytes"]
filesize_bytes = (1 + chunk["chunk_size"]) * 4 + chunk["chunk_bytes"]
return local_chunkpath, begin, filesize_bytes

def _get_chunk_index_from_filename(self, chunk_filename: str) -> int:
"""Retrieves the associated chunk_index for a given chunk filename."""
Expand Down
14 changes: 7 additions & 7 deletions src/litdata/streaming/item_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def load_item_from_chunk(
chunk_index: int,
chunk_filepath: str,
begin: int,
chunk_bytes: int,
filesize_bytes: int,
) -> Any:
"""Returns an item loaded from a chunk."""

Expand Down Expand Up @@ -132,7 +132,7 @@ def load_item_from_chunk(
chunk_index: int,
chunk_filepath: str,
begin: int,
chunk_bytes: int,
filesize_bytes: int,
encryption: Optional[Encryption] = None,
) -> bytes:
offset = (1 + (index - begin) if index >= begin else index + 1) * 4
Expand All @@ -141,11 +141,11 @@ def load_item_from_chunk(
del self._chunk_filepaths[chunk_filepath]

if chunk_filepath not in self._chunk_filepaths:
exists = os.path.exists(chunk_filepath) and os.stat(chunk_filepath).st_size >= chunk_bytes
exists = os.path.exists(chunk_filepath) and os.stat(chunk_filepath).st_size >= filesize_bytes

while not exists:
sleep(0.1)
exists = os.path.exists(chunk_filepath) and os.stat(chunk_filepath).st_size >= chunk_bytes
exists = os.path.exists(chunk_filepath) and os.stat(chunk_filepath).st_size >= filesize_bytes

self._chunk_filepaths[chunk_filepath] = True

Expand Down Expand Up @@ -329,19 +329,19 @@ def load_item_from_chunk(
chunk_index: int,
chunk_filepath: str,
begin: int,
chunk_bytes: int,
filesize_bytes: int,
) -> torch.Tensor:
assert self._block_size

if chunk_filepath in self._chunk_filepaths and not os.path.isfile(chunk_filepath):
del self._chunk_filepaths[chunk_filepath]

if chunk_filepath not in self._chunk_filepaths:
exists = os.path.exists(chunk_filepath) and os.stat(chunk_filepath).st_size > 0
exists = os.path.exists(chunk_filepath) and os.stat(chunk_filepath).st_size > filesize_bytes

while not exists:
sleep(0.1)
exists = os.path.exists(chunk_filepath) and os.stat(chunk_filepath).st_size > 0
exists = os.path.exists(chunk_filepath) and os.stat(chunk_filepath).st_size > filesize_bytes

self._chunk_filepaths[chunk_filepath] = True

Expand Down
6 changes: 3 additions & 3 deletions src/litdata/streaming/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,15 +278,15 @@ def read(self, index: ChunkedIndex) -> Any:
self._last_chunk_index = index.chunk_index

# Fetch the element
chunk_filepath, begin, chunk_bytes = self.config[index]
chunk_filepath, begin, filesize_bytes = self.config[index]

if isinstance(self._item_loader, PyTreeLoader):
item = self._item_loader.load_item_from_chunk(
index.index, index.chunk_index, chunk_filepath, begin, chunk_bytes, self._encryption
index.index, index.chunk_index, chunk_filepath, begin, filesize_bytes, self._encryption
)
else:
item = self._item_loader.load_item_from_chunk(
index.index, index.chunk_index, chunk_filepath, begin, chunk_bytes
index.index, index.chunk_index, chunk_filepath, begin, filesize_bytes
)
# We need to request deletion after the latest element has been loaded.
# Otherwise, this could trigger segmentation fault error depending on the item loader used.
Expand Down

0 comments on commit 3a832ae

Please sign in to comment.