From 450062191d215f8cd3809ad419a900c23faf8fdc Mon Sep 17 00:00:00 2001 From: Levon Ghukasyan Date: Tue, 5 Sep 2023 10:58:47 +0000 Subject: [PATCH 1/8] add offset --- deeplake/api/tests/test_api.py | 1 - deeplake/api/tests/test_reset.py | 14 +- deeplake/api/tests/test_update_samples.py | 5 +- deeplake/api/tests/test_video.py | 4 +- deeplake/auto/structured/dataframe.py | 2 +- deeplake/cli/test_cli.py | 2 +- deeplake/constants.py | 3 +- deeplake/enterprise/convert_to_libdeeplake.py | 5 +- deeplake/enterprise/dataloader.py | 143 +++++++++++++----- deeplake/enterprise/libdeeplake_query.py | 3 +- deeplake/enterprise/test_pytorch.py | 95 +++++++----- deeplake/enterprise/test_tensorflow.py | 16 +- 12 files changed, 186 insertions(+), 107 deletions(-) diff --git a/deeplake/api/tests/test_api.py b/deeplake/api/tests/test_api.py index 95d28f0913..6e0e62b1e7 100644 --- a/deeplake/api/tests/test_api.py +++ b/deeplake/api/tests/test_api.py @@ -983,7 +983,6 @@ def test_dataset_deepcopy(path, hub_token, num_workers, progressbar): dest_path = "_".join((path, "dest1")) src_ds = deeplake.empty(src_path, overwrite=True, token=hub_token) - # dest_ds = deeplake.empty(dest_path, overwrite=True, token=hub_token) with src_ds: src_ds.info.update(key=0) diff --git a/deeplake/api/tests/test_reset.py b/deeplake/api/tests/test_reset.py index d6784c1557..cf6cdaeca3 100644 --- a/deeplake/api/tests/test_reset.py +++ b/deeplake/api/tests/test_reset.py @@ -54,10 +54,10 @@ def test_load_corrupt_dataset(path): save_head = ds.pending_commit_id with pytest.raises(DatasetCorruptError): - ds = deeplake.load(path, access_method=access_method) + deeplake.load(path, access_method=access_method) with pytest.raises(ReadOnlyModeError): - ds = deeplake.load( + deeplake.load( path, read_only=True, access_method=access_method, reset=True ) @@ -116,7 +116,7 @@ def test_load_corrupted_branch(local_path): save_head = ds.pending_commit_id with pytest.raises(DatasetCorruptError): - ds = deeplake.load(f"{local_path}@alt") + deeplake.load(f"{local_path}@alt") ds = deeplake.load(f"{local_path}@alt", reset=True) verify_reset_on_checkout(ds, "alt", main_2, save_head, {"abc": [[1], [2]]}) @@ -131,10 +131,10 @@ def test_load_corrupted_branch(local_path): save_head = ds.pending_commit_id with pytest.raises(DatasetCorruptError): - ds = deeplake.load(f"{local_path}@alt") + deeplake.load(f"{local_path}@alt") with pytest.raises(DatasetCorruptError): - ds = deeplake.load(f"{local_path}@{save_head}") + deeplake.load(f"{local_path}@{save_head}") ds = deeplake.load(f"{local_path}@alt", reset=True) verify_reset_on_checkout(ds, "alt", alt_2, save_head, {"abc": [[1], [2], [3], [4]]}) @@ -200,10 +200,10 @@ def test_load_corrupt_dataset_with_no_commits(local_path): corrupt_ds(ds, "abc", 1) with pytest.raises(DatasetCorruptError): - ds = deeplake.load(local_path) + deeplake.load(local_path) with pytest.raises(ReadOnlyModeError): - ds = deeplake.load(local_path, read_only=True, reset=True) + deeplake.load(local_path, read_only=True, reset=True) ds = deeplake.load(local_path, reset=True) diff --git a/deeplake/api/tests/test_update_samples.py b/deeplake/api/tests/test_update_samples.py index f69dd8476c..465c870fb8 100644 --- a/deeplake/api/tests/test_update_samples.py +++ b/deeplake/api/tests/test_update_samples.py @@ -51,9 +51,8 @@ def _make_update_assert_equal( # this is necessary because `expected` uses `aslist=True` to handle dynamic cases. # with `aslist=False`, this wouldn't be necessary. expected_value = value - if hasattr(value, "__len__"): - if len(value) == 1: - expected_value = value[0] + if hasattr(value, "__len__") and len(value) == 1: + expected_value = value[0] # make updates tensor[index] = value diff --git a/deeplake/api/tests/test_video.py b/deeplake/api/tests/test_video.py index 7658dcebe4..76415d63ef 100644 --- a/deeplake/api/tests/test_video.py +++ b/deeplake/api/tests/test_video.py @@ -111,7 +111,7 @@ def test_video_timestamps(vstream_path, hub_token): ds = deeplake.load(vstream_path, read_only=True, token=hub_token) with pytest.raises(ValueError): - stamps = ds.mp4_videos[:2].timestamps + ds.mp4_videos[:2].timestamps stamps = ds.large_video[0, 12000:1199:-100].timestamps @@ -131,7 +131,7 @@ def test_video_exception(local_ds): with local_ds as ds: ds.create_tensor("abc") with pytest.raises(Exception): - stamps = ds.abc.timestamps + ds.abc.timestamps @pytest.mark.skipif( diff --git a/deeplake/auto/structured/dataframe.py b/deeplake/auto/structured/dataframe.py index ca8dc35376..642ec87325 100644 --- a/deeplake/auto/structured/dataframe.py +++ b/deeplake/auto/structured/dataframe.py @@ -58,7 +58,7 @@ def _get_most_frequent_image_extension(self, fn_iterator): if len(fn_iterator) == 0: raise IngestionError( - f"Cannot determine the most frequent image compression because no valid image files were provided." + "Cannot determine the most frequent image compression because no valid image files were provided." ) supported_image_extensions = tuple( diff --git a/deeplake/cli/test_cli.py b/deeplake/cli/test_cli.py index cdb03e6226..1611944bd4 100644 --- a/deeplake/cli/test_cli.py +++ b/deeplake/cli/test_cli.py @@ -27,5 +27,5 @@ def test_cli_auth(hub_cloud_dev_credentials, hub_cloud_dev_token, method): def test_bad_token(): runner = CliRunner() - result = runner.invoke(login, f"-t abcd") + result = runner.invoke(login, "-t abcd") assert isinstance(result.exception, LoginException) diff --git a/deeplake/constants.py b/deeplake/constants.py index fb4a9c5085..e2dafe4490 100644 --- a/deeplake/constants.py +++ b/deeplake/constants.py @@ -159,7 +159,8 @@ "gcp://", "gs://", "az://", - "azure://" "gdrive://", + "azure://", + "gdrive://", ) _ENABLE_HUB_SUB_DATASETS = False diff --git a/deeplake/enterprise/convert_to_libdeeplake.py b/deeplake/enterprise/convert_to_libdeeplake.py index 800b7fb8b3..8ab40c0ac2 100644 --- a/deeplake/enterprise/convert_to_libdeeplake.py +++ b/deeplake/enterprise/convert_to_libdeeplake.py @@ -211,8 +211,7 @@ def dataset_to_libdeeplake(hub2_dataset): commit_id = hub2_dataset.pending_commit_id libdeeplake_dataset.checkout(commit_id) slice_ = hub2_dataset.index.values[0].value - if slice_ != slice(None): - if isinstance(slice_, tuple): - slice_ = list(slice_) + if slice_ != slice(None)and isinstance(slice_, tuple): + slice_ = list(slice_) libdeeplake_dataset = libdeeplake_dataset[slice_] return libdeeplake_dataset diff --git a/deeplake/enterprise/dataloader.py b/deeplake/enterprise/dataloader.py index 8692cb74c6..c6fe487a16 100644 --- a/deeplake/enterprise/dataloader.py +++ b/deeplake/enterprise/dataloader.py @@ -3,6 +3,7 @@ from deeplake.enterprise.convert_to_libdeeplake import dataset_to_libdeeplake from deeplake.enterprise.dummy_dataloader import DummyDataloader # type: ignore from deeplake.util.scheduling import create_fetching_schedule, find_primary_tensor +from deeplake.core.seed import DeeplakeRandom from deeplake.enterprise.util import ( handle_mode, raise_indra_installation_error, @@ -34,6 +35,7 @@ BatchSampler = None # type: ignore import numpy as np +import warnings import math @@ -108,6 +110,7 @@ def __init__( _dataloader=None, _world_size=1, _ignore_errors=False, + _offset=None, **kwargs, ): import_indra_loader() @@ -132,6 +135,7 @@ def __init__( self._dataloader = _dataloader self._world_size = _world_size self._ignore_errors = _ignore_errors + self._offset = _offset for k, v in kwargs.items(): setattr(self, k, v) @@ -258,6 +262,25 @@ def batch(self, batch_size: int, drop_last: bool = False): all_vars["_drop_last"] = drop_last return self.__class__(**all_vars) + def offset(self, off: int = 0): + """Returns a shifted :class:`DeepLakeDataLoader` object. + + Args: + off (int): index that the dataloadee will start to iterate. + + Returns: + DeepLakeDataLoader: A :class:`DeepLakeDataLoader` object. + + Raises: + ValueError: If .offset() has already been called. + """ + if self._offset is not None: + raise ValueError("offset is already set") + + all_vars = self.__dict__.copy() + all_vars["_offset"] = off + return self.__class__(**all_vars) + def shuffle(self, shuffle: bool = True, buffer_size: int = 2048): """Returns a shuffled :class:`DeepLakeDataLoader` object. @@ -620,15 +643,85 @@ def _get_suboptimal_thread_count(self) -> Optional[int]: return num_suboptimal_threads return self._num_threads + def __create_dummy_dataloader( + self, + dataset, + tensors: Optional[List[str]] = None, + raw_tensors: Optional[List[str]] = None, + pil_compressed_tensors: Optional[List[str]] = None, + ) -> DummyDataloader: + return DummyDataloader( + deeplake_dataset=dataset, + batch_size=self._batch_size, + shuffle=self._shuffle, + num_workers=self._num_workers, + collate_fn=self.collate_fn, + transform_fn=self._transform, + distributed=self._distributed, + prefetch_factor=self._prefetch_factor, + tensors=tensors, + drop_last=self._drop_last, + upcast=self._mode == "pytorch", # upcast to handle unsupported dtypes, + return_index=self._return_index, + raw_tensors=raw_tensors, + pil_compressed_tensors=pil_compressed_tensors, + persistent_workers=self._persistent_workers, + ) + + def __get_indra_dataloader( + self, + indra_dataset, + tensors: Optional[List[str]] = None, + raw_tensors: Optional[List[str]] = None, + pil_compressed_tensors: Optional[List[str]] = None, + json_tensors: Optional[List[str]] = None, + list_tensors: Optional[List[str]] = None, + htype_dict: Optional[dict] = None, + ndim_dict: Optional[dict] = None, + tensor_info_dict: Optional[dict] = None, + ): + num_threads = ( + self._get_suboptimal_thread_count() + if self._distributed + else self._num_threads + ) + seed = DeeplakeRandom().get_seed() + if self._offset is not None and self._shuffle and seed is None: + warnings.warn( + "To keep dataloader consistent during setting offset and shuffling params please confider seeting deeplake.random.seed" + ) + + return INDRA_LOADER( + indra_dataset, + batch_size=self._batch_size, + num_threads=num_threads, + shuffle=self._shuffle, + num_workers=self._num_workers, + collate_fn=self.collate_fn, + transform_fn=self._transform, + distributed=self._distributed, + prefetch_factor=self._prefetch_factor, + tensors=tensors, + drop_last=self._drop_last, + ignore_errors=self._ignore_errors, + upcast=self._mode == "pytorch", # upcast to handle unsupported dtypes, + return_index=self._return_index, + primary_tensor=self._primary_tensor_name, + buffer_size=self._buffer_size, + raw_tensors=raw_tensors, + pil_compressed_tensors=pil_compressed_tensors, + json_tensors=json_tensors, + list_tensors=list_tensors, + persistent_workers=self._persistent_workers, + htype_dict=htype_dict, + ndim_dict=ndim_dict, + tensor_info_dict=tensor_info_dict, + offset=self._offset, + ) + def __iter__(self): if self._dataloader is None: dataset = self._orig_dataset - collate_fn = self.collate_fn - upcast = self._mode == "pytorch" # upcast to handle unsupported dtypes - - primary_tensor_name = self._primary_tensor_name - buffer_size = self._buffer_size - tensors = self._tensors or map_tensor_keys(dataset, None) jpeg_png_compressed_tensors, json_tensors, list_tensors = check_tensors( @@ -655,22 +748,11 @@ def __iter__(self): dataset, data_tensors, tensor_info_tensors ) if deeplake.constants.RETURN_DUMMY_DATA_FOR_DATALOADER: - self._dataloader = DummyDataloader( - deeplake_dataset=dataset, - batch_size=self._batch_size, - shuffle=self._shuffle, - num_workers=self._num_workers, - collate_fn=collate_fn, - transform_fn=self._transform, - distributed=self._distributed, - prefetch_factor=self._prefetch_factor, + self._dataloader = self.__create_dummy_dataloader( + dataset, tensors=tensors, - drop_last=self._drop_last, - upcast=upcast, - return_index=self._return_index, raw_tensors=raw_tensors, pil_compressed_tensors=pil_compressed_tensors, - persistent_workers=self._persistent_workers, ) else: if not hasattr(self, "_indra_dataset"): @@ -678,38 +760,19 @@ def __iter__(self): else: indra_dataset = self._indra_dataset - num_threads = ( - self._get_suboptimal_thread_count() - if self._distributed - else self._num_threads - ) - self._dataloader = INDRA_LOADER( + self._dataloader = self.__get_indra_dataloader( indra_dataset, - batch_size=self._batch_size, - num_threads=num_threads, - shuffle=self._shuffle, - num_workers=self._num_workers, - collate_fn=collate_fn, - transform_fn=self._transform, - distributed=self._distributed, - prefetch_factor=self._prefetch_factor, tensors=tensors, - drop_last=self._drop_last, - ignore_errors=self._ignore_errors, - upcast=upcast, - return_index=self._return_index, - primary_tensor=primary_tensor_name, - buffer_size=buffer_size, raw_tensors=raw_tensors, pil_compressed_tensors=pil_compressed_tensors, json_tensors=json_tensors, list_tensors=list_tensors, - persistent_workers=self._persistent_workers, htype_dict=htype_dict, ndim_dict=ndim_dict, tensor_info_dict=tensor_info_dict, worker_init_fn=self.worker_init_fn, ) + dataset_read(self._orig_dataset) if self._internal_iterator is not None: diff --git a/deeplake/enterprise/libdeeplake_query.py b/deeplake/enterprise/libdeeplake_query.py index 16a7acd79b..76be94205c 100644 --- a/deeplake/enterprise/libdeeplake_query.py +++ b/deeplake/enterprise/libdeeplake_query.py @@ -39,8 +39,7 @@ def query(dataset, query_string: str): elif dataset.libdeeplake_dataset is not None: ds = dataset.libdeeplake_dataset slice_ = dataset.index.values[0].value - if slice_ != slice(None): - if isinstance(slice_, tuple): + if slice_ != slice(None) and isinstance(slice_, tuple): slice_ = list(slice_) ds = ds[slice_] else: diff --git a/deeplake/enterprise/test_pytorch.py b/deeplake/enterprise/test_pytorch.py index 63598e4634..5cddf97b4c 100644 --- a/deeplake/enterprise/test_pytorch.py +++ b/deeplake/enterprise/test_pytorch.py @@ -1,4 +1,5 @@ import pickle +from indra import api import deeplake import numpy as np import pytest @@ -73,7 +74,26 @@ def test_setting_woker_init_function(local_auth_ds): assert dl.worker_init_fn == None dl.worker_init_fn = partial(dummy_init_fn, 1024) - assert dl.worker_init_fn() == f"function called with arg 1024" + assert dl.worker_init_fn() == "function called with arg 1024" + + +@requires_torch +# @requires_libdeeplake +def test_offset_ds_iteration(local_auth_ds): + with local_auth_ds as ds: + ds.create_tensor("abc", htype="generic", dtype="uint16") + ds.abc.extend([i for i in range(10)]) + + dl = ( + local_auth_ds.dataloader() + .offset(4) + .transform(identity) + .pytorch(collate_fn=identity) + ) + + idx_table = [4, 5, 6, 7, 8, 9, 0, 1, 2, 3] + for i, item in enumerate(dl): + assert idx_table[i] == item[0]["index"].astype(int) @requires_torch @@ -81,8 +101,11 @@ def test_setting_woker_init_function(local_auth_ds): @pytest.mark.parametrize( "ds", [ - pytest.param("hub_cloud_ds", marks=[pytest.mark.slow, pytest.mark.skip("Causing lockups")]), - "local_auth_ds" + pytest.param( + "hub_cloud_ds", + marks=[pytest.mark.slow, pytest.mark.skip("Causing lockups")], + ), + "local_auth_ds", ], indirect=True, ) @@ -267,7 +290,7 @@ def test_custom_tensor_order(local_auth_ds): ds[t].extend(np.random.random((3, 4, 5))) with pytest.raises(TensorDoesNotExistError): - dl = ds.dataloader().pytorch(tensors=["c", "d", "e"]) + ds.dataloader().pytorch(tensors=["c", "d", "e"]) dl = ds.dataloader().pytorch(tensors=["c", "d", "a"], return_index=False) @@ -322,7 +345,7 @@ def test_readonly_with_two_workers(local_auth_ds): ptds = ds.dataloader().pytorch(num_workers=2) # no need to check input, only care that readonly works for _ in ptds: - pass + continue @pytest.mark.xfail(raises=NotImplementedError, strict=True) @@ -536,40 +559,42 @@ def test_pytorch_decode(local_auth_ds, compressed_image_paths, compression): @pytest.mark.flaky @pytest.mark.slow def test_rename(local_auth_ds): + group_name = "red/green" with local_auth_ds as ds: ds.create_tensor("abc") ds.create_tensor("blue/green") ds.abc.append([1, 2, 3]) ds.rename_tensor("abc", "xyz") ds.rename_group("blue", "red") - ds["red/green"].append([1, 2, 3, 4]) + ds[group_name].append([1, 2, 3, 4]) loader = ds.dataloader().pytorch(return_index=False) for sample in loader: - assert set(sample.keys()) == {"xyz", "red/green"} + assert set(sample.keys()) == {"xyz", group_name} np.testing.assert_array_equal(np.array(sample["xyz"]), np.array([[1, 2, 3]])) np.testing.assert_array_equal( - np.array(sample["red/green"]), np.array([[1, 2, 3, 4]]) + np.array(sample[group_name]), np.array([[1, 2, 3, 4]]) ) @requires_torch @requires_libdeeplake -@pytest.mark.parametrize("num_workers", [ - 0, - pytest.param(2, marks=pytest.mark.skip(reason="causing lockups")), -]) +@pytest.mark.parametrize( + "num_workers", + [ + 0, + pytest.param(2, marks=pytest.mark.skip(reason="causing lockups")), + ], +) @pytest.mark.slow @pytest.mark.flaky def test_indexes(local_auth_ds, num_workers): - shuffle = False with local_auth_ds as ds: ds.create_tensor("xyz") for i in range(8): ds.xyz.append(i * np.ones((2, 2))) ptds = ds.dataloader().batch(4).pytorch(num_workers=num_workers, return_index=True) - if shuffle: - ptds = ptds.shuffle() + ptds = ptds.shuffle() for batch in ptds: assert batch.keys() == {"xyz", "index"} @@ -580,13 +605,15 @@ def test_indexes(local_auth_ds, num_workers): @requires_torch @requires_libdeeplake @pytest.mark.slow -@pytest.mark.parametrize("num_workers", [ - 0, - pytest.param(2, marks=pytest.mark.skip("causing lockups")), -]) +@pytest.mark.parametrize( + "num_workers", + [ + 0, + pytest.param(2, marks=pytest.mark.skip("causing lockups")), + ], +) @pytest.mark.flaky def test_indexes_transform(local_auth_ds, num_workers): - shuffle = False with local_auth_ds as ds: ds.create_tensor("xyz") for i in range(8): @@ -600,8 +627,6 @@ def test_indexes_transform(local_auth_ds, num_workers): num_workers=num_workers, return_index=True, collate_fn=identity_collate ) ) - if shuffle: - ptds = ptds.shuffle() for batch in ptds: assert len(batch) == 4 @@ -611,11 +636,12 @@ def test_indexes_transform(local_auth_ds, num_workers): @requires_torch @requires_libdeeplake -@pytest.mark.parametrize("num_workers", [0, pytest.param(2, marks=pytest.mark.skip("causing lockups"))]) +@pytest.mark.parametrize( + "num_workers", [0, pytest.param(2, marks=pytest.mark.skip("causing lockups"))] +) @pytest.mark.slow @pytest.mark.flaky def test_indexes_transform_dict(local_auth_ds, num_workers): - shuffle = False with local_auth_ds as ds: ds.create_tensor("xyz") for i in range(8): @@ -627,8 +653,6 @@ def test_indexes_transform_dict(local_auth_ds, num_workers): .transform({"xyz": double, "index": None}) .pytorch(num_workers=num_workers, return_index=True) ) - if shuffle: - ptds = ptds.shuffle() for batch in ptds: assert batch.keys() == {"xyz", "index"} @@ -641,8 +665,6 @@ def test_indexes_transform_dict(local_auth_ds, num_workers): .transform({"xyz": double}) .pytorch(num_workers=num_workers, return_index=True) ) - if shuffle: - ptds = ptds.shuffle() for batch in ptds: assert batch.keys() == {"xyz"} @@ -650,18 +672,19 @@ def test_indexes_transform_dict(local_auth_ds, num_workers): @requires_torch @requires_libdeeplake -@pytest.mark.parametrize("num_workers", [0, pytest.param(2, marks=pytest.mark.skip("causing lockups"))]) +@pytest.mark.parametrize( + "num_workers", [0, pytest.param(2, marks=pytest.mark.skip("causing lockups"))] +) @pytest.mark.slow @pytest.mark.flaky def test_indexes_tensors(local_auth_ds, num_workers): - shuffle = False with local_auth_ds as ds: ds.create_tensor("xyz") for i in range(8): ds.xyz.append(i * np.ones((2, 2))) with pytest.raises(ValueError): - ptds = ( + ( ds.dataloader() .batch(4) .pytorch( @@ -674,8 +697,6 @@ def test_indexes_tensors(local_auth_ds, num_workers): .batch(4) .pytorch(num_workers=num_workers, return_index=True, tensors=["xyz"]) ) - if shuffle: - ptds = ptds.shuffle() for batch in ptds: assert batch.keys() == {"xyz", "index"} @@ -710,16 +731,16 @@ def test_pytorch_error_handling(local_auth_ds): ptds = ds.dataloader().pytorch() with pytest.raises(EmptyTensorError): for _ in ptds: - pass + continue ptds = ds.dataloader().pytorch(tensors=["x", "y"]) with pytest.raises(EmptyTensorError): for _ in ptds: - pass + continue ptds = ds.dataloader().pytorch(tensors=["x"]) for _ in ptds: - pass + continue @requires_libdeeplake @@ -753,7 +774,7 @@ def test_pil_decode_method(local_auth_ds): ptds = ds.dataloader().pytorch(decode_method={"x": "pil"}) with pytest.raises(CollateExceptionWrapper): for _ in ptds: - pass + continue def custom_transform(batch): batch["x"] = np.array(batch["x"]) diff --git a/deeplake/enterprise/test_tensorflow.py b/deeplake/enterprise/test_tensorflow.py index 168dac178a..e717501c2c 100644 --- a/deeplake/enterprise/test_tensorflow.py +++ b/deeplake/enterprise/test_tensorflow.py @@ -541,19 +541,20 @@ def test_tensorflow_decode(local_auth_ds, compressed_image_paths, compression): @pytest.mark.slow @pytest.mark.flaky def test_rename(local_auth_ds): + tensor_name="red/green" with local_auth_ds as ds: ds.create_tensor("abc") ds.create_tensor("blue/green") ds.abc.append([1, 2, 3]) ds.rename_tensor("abc", "xyz") ds.rename_group("blue", "red") - ds["red/green"].append([1, 2, 3, 4]) + ds[tensor_name].append([1, 2, 3, 4]) loader = ds.dataloader().tensorflow(return_index=False) for sample in loader: - assert set(sample.keys()) == {"xyz", "red/green"} + assert set(sample.keys()) == {"xyz", tensor_name} np.testing.assert_array_equal(np.array(sample["xyz"]), np.array([[1, 2, 3]])) np.testing.assert_array_equal( - np.array(sample["red/green"]), np.array([[1, 2, 3, 4]]) + np.array(sample[tensor_name]), np.array([[1, 2, 3, 4]]) ) @@ -667,7 +668,6 @@ def test_indexes_transform_dict(local_auth_ds, num_workers): @pytest.mark.slow @pytest.mark.flaky def test_indexes_tensors(local_auth_ds, num_workers): - shuffle = False with local_auth_ds as ds: ds.create_tensor("xyz") for i in range(8): @@ -687,8 +687,6 @@ def test_indexes_tensors(local_auth_ds, num_workers): .batch(4) .tensorflow(num_workers=num_workers, return_index=True, tensors=["xyz"]) ) - if shuffle: - ptds = ptds.shuffle() for batch in ptds: assert batch.keys() == {"xyz", "index"} @@ -722,13 +720,13 @@ def test_tensorflow_error_handling(local_auth_ds): ptds = ds.dataloader().tensorflow() with pytest.raises(EmptyTensorError): for _ in ptds: - pass + continue ptds = ds.dataloader().tensorflow(tensors=["x", "y"]) with pytest.raises(EmptyTensorError): for _ in ptds: - pass + continue ptds = ds.dataloader().tensorflow(tensors=["x"]) for _ in ptds: - pass + continue From a554fff23451b489de613bb59ce92a3a2ef05e95 Mon Sep 17 00:00:00 2001 From: Levon Ghukasyan Date: Tue, 5 Sep 2023 11:27:20 +0000 Subject: [PATCH 2/8] small fix --- deeplake/api/tests/test_info.py | 2 -- deeplake/api/tests/test_meta.py | 2 -- deeplake/enterprise/dataloader.py | 2 +- deeplake/enterprise/test_pytorch.py | 1 - 4 files changed, 1 insertion(+), 6 deletions(-) diff --git a/deeplake/api/tests/test_info.py b/deeplake/api/tests/test_info.py index 7af27c9500..486251b729 100644 --- a/deeplake/api/tests/test_info.py +++ b/deeplake/api/tests/test_info.py @@ -130,8 +130,6 @@ def test_update_reference_manually(local_ds_generator): l.append(99) ds.info.update() - ds = local_ds_generator() - assert l == [1, 2, 3, 99] diff --git a/deeplake/api/tests/test_meta.py b/deeplake/api/tests/test_meta.py index 04f54cfb4a..b2d133bb34 100644 --- a/deeplake/api/tests/test_meta.py +++ b/deeplake/api/tests/test_meta.py @@ -35,8 +35,6 @@ def test_subsequent_updates(local_ds_generator): assert len(ds) == 10 assert ds.tensor.shape == (10, 100, 100) - ds = local_ds_generator() - with local_ds_generator() as ds: for _ in range(5): ds.tensor.append(np.ones((100, 200))) diff --git a/deeplake/enterprise/dataloader.py b/deeplake/enterprise/dataloader.py index c6fe487a16..9c7a6e6cc8 100644 --- a/deeplake/enterprise/dataloader.py +++ b/deeplake/enterprise/dataloader.py @@ -717,6 +717,7 @@ def __get_indra_dataloader( ndim_dict=ndim_dict, tensor_info_dict=tensor_info_dict, offset=self._offset, + worker_init_fn=self.worker_init_fn, ) def __iter__(self): @@ -770,7 +771,6 @@ def __iter__(self): htype_dict=htype_dict, ndim_dict=ndim_dict, tensor_info_dict=tensor_info_dict, - worker_init_fn=self.worker_init_fn, ) dataset_read(self._orig_dataset) diff --git a/deeplake/enterprise/test_pytorch.py b/deeplake/enterprise/test_pytorch.py index 5cddf97b4c..95c6a0fa8f 100644 --- a/deeplake/enterprise/test_pytorch.py +++ b/deeplake/enterprise/test_pytorch.py @@ -1,5 +1,4 @@ import pickle -from indra import api import deeplake import numpy as np import pytest From d60c14548597bc464aefc50c6d9b0870d462079a Mon Sep 17 00:00:00 2001 From: Levon Ghukasyan Date: Tue, 5 Sep 2023 16:59:41 +0000 Subject: [PATCH 3/8] linter and sonar lint fixes --- deeplake/__init__.py | 5 +- deeplake/api/tests/test_api.py | 5 +- deeplake/core/compression.py | 2 - deeplake/core/dataset/dataset.py | 2 +- deeplake/core/tensor.py | 6 +- deeplake/core/transform/test_transform.py | 2 +- deeplake/core/transform/transform.py | 2 +- deeplake/enterprise/test_tensorflow.py | 73 ++++++++------------- deeplake/integrations/tests/test_pytorch.py | 4 +- 9 files changed, 44 insertions(+), 57 deletions(-) diff --git a/deeplake/__init__.py b/deeplake/__init__.py index ce3ec2033a..9caa272cd2 100644 --- a/deeplake/__init__.py +++ b/deeplake/__init__.py @@ -69,8 +69,11 @@ "deepcopy", "like", "list", - "ingest", + "ingest_classification", + "ingest_coco", + "ingest_yolo", "ingest_kaggle", + "ingest_dataframe", "ingest_huggingface", "compressions", "htypes", diff --git a/deeplake/api/tests/test_api.py b/deeplake/api/tests/test_api.py index 6e0e62b1e7..761aab54a3 100644 --- a/deeplake/api/tests/test_api.py +++ b/deeplake/api/tests/test_api.py @@ -90,7 +90,6 @@ def test_persist(ds_generator): ds2 = ds_generator() - ds2.storage["dataset_meta.json"] == ds_new.storage["dataset_meta.json"] assert len(ds2) == 4 assert_array_equal(ds2.label.numpy(), np.array([[1], [2], [3], [4]])) @@ -1921,7 +1920,9 @@ def test_dataset_copy( [ ("local_ds_generator", "local_path", "hub_cloud_dev_token"), pytest.param( - "s3_ds_generator", "s3_path", "hub_cloud_dev_token", + "s3_ds_generator", + "s3_path", + "hub_cloud_dev_token", marks=pytest.mark.slow, ), pytest.param( diff --git a/deeplake/core/compression.py b/deeplake/core/compression.py index 6ca6eebbc8..4d6a51609c 100644 --- a/deeplake/core/compression.py +++ b/deeplake/core/compression.py @@ -157,8 +157,6 @@ def compress_bytes( if not buffer: return b"" if compression == "lz4": - if not buffer: - return b"" return numcodecs.lz4.compress(buffer) else: raise SampleCompressionError( diff --git a/deeplake/core/dataset/dataset.py b/deeplake/core/dataset/dataset.py index fbb24341e7..213ac766ac 100644 --- a/deeplake/core/dataset/dataset.py +++ b/deeplake/core/dataset/dataset.py @@ -1939,7 +1939,7 @@ def _send_branch_creation_event(self, *args, **kwargs): def _send_branch_deletion_event(self, *args, **kwargs): """overridden in DeepLakeCloudDataset""" - def _first_load_init(self): + def _first_load_init(self, , verbose=True): """overridden in DeepLakeCloudDataset""" @property diff --git a/deeplake/core/tensor.py b/deeplake/core/tensor.py index 63f4ab5247..2e31679f09 100644 --- a/deeplake/core/tensor.py +++ b/deeplake/core/tensor.py @@ -1350,7 +1350,7 @@ def dict(self, fetch_chunks: bool = False): def list(self, fetch_chunks: bool = False): """Return list data. Only applicable for tensors with 'list' base htype.""" if self.base_htype != "list": - raise Exception(f"Only supported for list tensors.") + raise Exception("Only supported for list tensors.") if self.ndim == 1: return list(self.numpy(fetch_chunks=fetch_chunks)) @@ -1360,14 +1360,14 @@ def list(self, fetch_chunks: bool = False): def path(self, fetch_chunks: bool = False): """Return path data. Only applicable for linked tensors""" if not self.is_link: - raise Exception(f"Only supported for linked tensors.") + raise Exception("Only supported for linked tensors.") assert isinstance(self.chunk_engine, LinkedChunkEngine) return self.chunk_engine.path(self.index, fetch_chunks=fetch_chunks) def creds_key(self): """Return path data. Only applicable for linked tensors""" if not self.is_link: - raise Exception(f"Only supported for linked tensors.") + raise Exception("Only supported for linked tensors.") if self.index.values[0].subscriptable() or len(self.index.values) > 1: raise ValueError("_linked_sample can be used only on exatcly 1 sample.") assert isinstance(self.chunk_engine, LinkedChunkEngine) diff --git a/deeplake/core/transform/test_transform.py b/deeplake/core/transform/test_transform.py index 095e38bb51..a34fc8afda 100644 --- a/deeplake/core/transform/test_transform.py +++ b/deeplake/core/transform/test_transform.py @@ -138,7 +138,7 @@ def add_image(sample_in, samples_out): @deeplake.compute def add_images(i, sample_out): - for i in range(5): + for _ in range(5): image = deeplake.read(get_dummy_data_path("images/flower.png")) sample_out.append({"image": image}) diff --git a/deeplake/core/transform/transform.py b/deeplake/core/transform/transform.py index bd863077fa..81423b19d6 100644 --- a/deeplake/core/transform/transform.py +++ b/deeplake/core/transform/transform.py @@ -323,7 +323,7 @@ def my_fn(sample_in: Any, samples_out, my_arg0, my_arg1=0): index=index, sample=sample, samples_processed=samples_processed, - suggest=suggest, + suggest=suggest, ) from e finally: reload_and_rechunk( diff --git a/deeplake/enterprise/test_tensorflow.py b/deeplake/enterprise/test_tensorflow.py index e717501c2c..a955ef0156 100644 --- a/deeplake/enterprise/test_tensorflow.py +++ b/deeplake/enterprise/test_tensorflow.py @@ -62,9 +62,7 @@ def test_tensorflow_small(local_auth_ds): ds.create_tensor("image2", max_chunk_size=TF_TESTS_MAX_CHUNK_SIZE) ds.image2.extend(np.array([i * np.ones((12, 12)) for i in range(16)])) - if isinstance( - get_base_storage(ds.storage), (MemoryProvider, GCSProvider) - ): + if isinstance(get_base_storage(ds.storage), (MemoryProvider, GCSProvider)): with pytest.raises(ValueError): dl = ds.dataloader() return @@ -130,9 +128,7 @@ def test_tensorflow_transform(local_auth_ds): ds.create_tensor("image2", max_chunk_size=TF_TESTS_MAX_CHUNK_SIZE) ds.image2.extend(np.array([i * np.ones((12, 12)) for i in range(16)])) - if isinstance( - get_base_storage(ds.storage), (MemoryProvider, GCSProvider) - ): + if isinstance(get_base_storage(ds.storage), (MemoryProvider, GCSProvider)): with pytest.raises(ValueError): dl = ds.dataloader() return @@ -167,18 +163,12 @@ def test_tensorflow_transform_dict(local_auth_ds): ds.create_tensor("image3", max_chunk_size=TF_TESTS_MAX_CHUNK_SIZE) ds.image3.extend(np.array([i * np.ones((12, 12)) for i in range(16)])) - if isinstance( - get_base_storage(ds.storage), (MemoryProvider, GCSProvider) - ): + if isinstance(get_base_storage(ds.storage), (MemoryProvider, GCSProvider)): with pytest.raises(ValueError): dl = ds.dataloader() return - dl = ( - ds.dataloader() - .transform({"image": double, "image2": None}) - .tensorflow() - ) + dl = ds.dataloader().transform({"image": double, "image2": None}).tensorflow() assert len(dl.dataset) == 16 @@ -222,9 +212,7 @@ def test_tensorflow_with_compression(local_auth_ds: Dataset): images.extend(np.ones((16, 12, 12, 3), dtype="uint8")) labels.extend(np.ones((16, 1), dtype="uint32")) - if isinstance( - get_base_storage(ds.storage), (MemoryProvider, GCSProvider) - ): + if isinstance(get_base_storage(ds.storage), (MemoryProvider, GCSProvider)): with pytest.raises(ValueError): dl = ds.dataloader() return @@ -250,9 +238,7 @@ def test_custom_tensor_order(local_auth_ds): ds.create_tensor(t, max_chunk_size=TF_TESTS_MAX_CHUNK_SIZE) ds[t].extend(np.random.random((3, 4, 5))) - if isinstance( - get_base_storage(ds.storage), (MemoryProvider, GCSProvider) - ): + if isinstance(get_base_storage(ds.storage), (MemoryProvider, GCSProvider)): with pytest.raises(ValueError): dl = ds.dataloader() return @@ -260,9 +246,7 @@ def test_custom_tensor_order(local_auth_ds): with pytest.raises(TensorDoesNotExistError): dl = ds.dataloader().tensorflow(tensors=["c", "d", "e"]) - dl = ds.dataloader().tensorflow( - tensors=["c", "d", "a"], return_index=False - ) + dl = ds.dataloader().tensorflow(tensors=["c", "d", "a"], return_index=False) for i, batch in enumerate(dl): c1, d1, a1 = batch @@ -334,12 +318,8 @@ def test_groups(local_auth_ds, compressed_image_paths): img1 = deeplake.read(compressed_image_paths["jpeg"][0]) img2 = deeplake.read(compressed_image_paths["png"][0]) with local_auth_ds as ds: - ds.create_tensor( - "images/jpegs/cats", htype="image", sample_compression="jpeg" - ) - ds.create_tensor( - "images/pngs/flowers", htype="image", sample_compression="png" - ) + ds.create_tensor("images/jpegs/cats", htype="image", sample_compression="jpeg") + ds.create_tensor("images/pngs/flowers", htype="image", sample_compression="png") for _ in range(10): ds.images.jpegs.cats.append(img1) ds.images.pngs.flowers.append(img2) @@ -541,7 +521,7 @@ def test_tensorflow_decode(local_auth_ds, compressed_image_paths, compression): @pytest.mark.slow @pytest.mark.flaky def test_rename(local_auth_ds): - tensor_name="red/green" + tensor_name = "red/green" with local_auth_ds as ds: ds.create_tensor("abc") ds.create_tensor("blue/green") @@ -560,10 +540,13 @@ def test_rename(local_auth_ds): @requires_tensorflow @requires_libdeeplake -@pytest.mark.parametrize("num_workers", [ - 0, - pytest.param(2, marks=pytest.mark.skip("causing lockups")), -]) +@pytest.mark.parametrize( + "num_workers", + [ + 0, + pytest.param(2, marks=pytest.mark.skip("causing lockups")), + ], +) @pytest.mark.slow @pytest.mark.flaky def test_indexes(local_auth_ds, num_workers): @@ -574,9 +557,7 @@ def test_indexes(local_auth_ds, num_workers): ds.xyz.append(i * np.ones((2, 2))) ptds = ( - ds.dataloader() - .batch(4) - .tensorflow(num_workers=num_workers, return_index=True) + ds.dataloader().batch(4).tensorflow(num_workers=num_workers, return_index=True) ) if shuffle: ptds = ptds.shuffle() @@ -589,10 +570,13 @@ def test_indexes(local_auth_ds, num_workers): @requires_tensorflow @requires_libdeeplake -@pytest.mark.parametrize("num_workers", [ - 0, - pytest.param(2, marks=pytest.mark.skip("causing lockups")), -]) +@pytest.mark.parametrize( + "num_workers", + [ + 0, + pytest.param(2, marks=pytest.mark.skip("causing lockups")), + ], +) @pytest.mark.slow @pytest.mark.flaky def test_indexes_transform(local_auth_ds, num_workers): @@ -661,10 +645,9 @@ def test_indexes_transform_dict(local_auth_ds, num_workers): @requires_tensorflow @requires_libdeeplake -@pytest.mark.parametrize("num_workers", [ - 0, - pytest.param(2, marks=pytest.mark.skip("causing lockups")) -]) +@pytest.mark.parametrize( + "num_workers", [0, pytest.param(2, marks=pytest.mark.skip("causing lockups"))] +) @pytest.mark.slow @pytest.mark.flaky def test_indexes_tensors(local_auth_ds, num_workers): diff --git a/deeplake/integrations/tests/test_pytorch.py b/deeplake/integrations/tests/test_pytorch.py index 196ea511eb..bc74930f52 100644 --- a/deeplake/integrations/tests/test_pytorch.py +++ b/deeplake/integrations/tests/test_pytorch.py @@ -602,7 +602,9 @@ def test_pytorch_collate(local_ds, shuffle, buffer_size): @pytest.mark.slow @requires_torch -@pytest.mark.parametrize("shuffle", [True, pytest.param(False, marks=pytest.mark.skip("causing lockups"))]) +@pytest.mark.parametrize( + "shuffle", [True, pytest.param(False, marks=pytest.mark.skip("causing lockups"))] +) @pytest.mark.flaky def test_pytorch_transform_collate(local_ds, shuffle): local_ds.create_tensor("a") From f1440a9513c59e983f6d1de7a88693fd1ba20c17 Mon Sep 17 00:00:00 2001 From: Levon Ghukasyan Date: Tue, 5 Sep 2023 17:00:39 +0000 Subject: [PATCH 4/8] bump libdeeplake version to 0.0.74 --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 637a930600..853c695241 100644 --- a/setup.py +++ b/setup.py @@ -70,7 +70,7 @@ def libdeeplake_availabe(): extras_require["all"] = [req_map[r] for r in all_extras] if libdeeplake_availabe(): - libdeeplake = "libdeeplake==0.0.73" + libdeeplake = "libdeeplake==0.0.74" extras_require["enterprise"] = [libdeeplake, "pyjwt"] extras_require["all"].append(libdeeplake) From db7a2de105f3a8d75fad0ba474deb49ba960f5e3 Mon Sep 17 00:00:00 2001 From: Levon Ghukasyan Date: Tue, 5 Sep 2023 17:33:02 +0000 Subject: [PATCH 5/8] fixed typeo --- deeplake/core/dataset/dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deeplake/core/dataset/dataset.py b/deeplake/core/dataset/dataset.py index 213ac766ac..f1fed53e88 100644 --- a/deeplake/core/dataset/dataset.py +++ b/deeplake/core/dataset/dataset.py @@ -1939,7 +1939,7 @@ def _send_branch_creation_event(self, *args, **kwargs): def _send_branch_deletion_event(self, *args, **kwargs): """overridden in DeepLakeCloudDataset""" - def _first_load_init(self, , verbose=True): + def _first_load_init(self, verbose=True): """overridden in DeepLakeCloudDataset""" @property From d7ce13753682185862db4a4894ae221dd8a22302 Mon Sep 17 00:00:00 2001 From: Levon Ghukasyan Date: Tue, 5 Sep 2023 17:50:07 +0000 Subject: [PATCH 6/8] add proper requirimet for test --- deeplake/enterprise/test_pytorch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deeplake/enterprise/test_pytorch.py b/deeplake/enterprise/test_pytorch.py index 95c6a0fa8f..dd04c71613 100644 --- a/deeplake/enterprise/test_pytorch.py +++ b/deeplake/enterprise/test_pytorch.py @@ -77,7 +77,7 @@ def test_setting_woker_init_function(local_auth_ds): @requires_torch -# @requires_libdeeplake +@requires_libdeeplake def test_offset_ds_iteration(local_auth_ds): with local_auth_ds as ds: ds.create_tensor("abc", htype="generic", dtype="uint16") From c4849caf66a7fc510437f2dc4687e1ac04717cb5 Mon Sep 17 00:00:00 2001 From: Levon Ghukasyan Date: Wed, 6 Sep 2023 00:14:41 +0400 Subject: [PATCH 7/8] corrected warning message --- deeplake/enterprise/dataloader.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/deeplake/enterprise/dataloader.py b/deeplake/enterprise/dataloader.py index 9c7a6e6cc8..2ceda66011 100644 --- a/deeplake/enterprise/dataloader.py +++ b/deeplake/enterprise/dataloader.py @@ -203,7 +203,11 @@ def sampler(self): @property def batch_sampler(self): - return BatchSampler(self.sampler, self.batch_size, self.drop_last) if BatchSampler else None + return ( + BatchSampler(self.sampler, self.batch_size, self.drop_last) + if BatchSampler + else None + ) @property def generator(self): @@ -688,7 +692,9 @@ def __get_indra_dataloader( seed = DeeplakeRandom().get_seed() if self._offset is not None and self._shuffle and seed is None: warnings.warn( - "To keep dataloader consistent during setting offset and shuffling params please confider seeting deeplake.random.seed" + "offset and shuffle parameters are set without a random seed. This means that the ordering of the samples are " + "not equal after each initialization and iteration through the dataloader. If you intend to shuffle data while " + "preserving the offset for resuming iteration at a predictable index and order, please set a random seed using deeplake.random()" ) return INDRA_LOADER( From 7ff62ddc9a577bdbae736ad163b2e9f03031a7b6 Mon Sep 17 00:00:00 2001 From: Levon Ghukasyan Date: Wed, 6 Sep 2023 07:46:48 +0400 Subject: [PATCH 8/8] corrected slicing --- deeplake/enterprise/convert_to_libdeeplake.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deeplake/enterprise/convert_to_libdeeplake.py b/deeplake/enterprise/convert_to_libdeeplake.py index 8ab40c0ac2..c354a27dfa 100644 --- a/deeplake/enterprise/convert_to_libdeeplake.py +++ b/deeplake/enterprise/convert_to_libdeeplake.py @@ -213,5 +213,5 @@ def dataset_to_libdeeplake(hub2_dataset): slice_ = hub2_dataset.index.values[0].value if slice_ != slice(None)and isinstance(slice_, tuple): slice_ = list(slice_) - libdeeplake_dataset = libdeeplake_dataset[slice_] + libdeeplake_dataset = libdeeplake_dataset[slice_] return libdeeplake_dataset