-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
[edit] prepare_pypi_release
- Loading branch information
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
0.1 (2021-01-07) | ||
---------------- | ||
|
||
- Test release. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
## path_finder | ||
|
||
### Description | ||
An interface for finding directories and files by combining best of both worlds: glob/rglob (speed) and regex (flexibility). | ||
|
||
### Features | ||
path_finder officially supports Python 3.5–3.8. \ | ||
The two main features are: path_finder.DirFinder and path_finder.FileFinder (see Usage) | ||
|
||
### License | ||
[MIT][mit] | ||
|
||
### Contributions | ||
All contributions, bug reports, bug fixes, documentation improvements, enhancements and ideas are welcome. | ||
Issues are posted on: https://github.com/hdsr-mid/path_finder/issues | ||
|
||
|
||
[mit]: https://github.com/hdsr-mid/path_finder/blob/main/LICENSE.txt | ||
|
||
|
||
### Usage | ||
#### Test path_finder | ||
``` | ||
> cd path_finder | ||
> pytest | ||
``` | ||
|
||
#### Example FileFinder: | ||
``` | ||
from pathlib import Path | ||
import path_finder | ||
|
||
start_dir1 = pathlib.Path('start_search_from_this_dir') | ||
start_dir2 = pathlib.Path('and_start_search_from_this_dir') | ||
limit_depth = True | ||
depth = 2 # 2, so search in start_dir1, subdir and subsubdirs (same for start_dir2) | ||
filename_regex = '^[0-9]{8}_blabla' | ||
extension = '.csv' # choose from ('.jpg', '.png', '.txt', '.xml', '.csv', '.xlsx', '.pdf', '.h5', '.nc', '.zip') | ||
|
||
file_finder = path_finder.FileFinder( | ||
multi_start_dir=[start_dir1, start_dir2], | ||
extension=extension, | ||
limit_depth=True, | ||
depth=depth, | ||
filename_regex=filename_regex | ||
) | ||
|
||
paths = file_finder.paths # returns a List[Path] | ||
paths_empty_files = file_finder.paths_empty_file # returns a List[Path] | ||
``` | ||
|
||
|
||
#### Example DirFinder: | ||
``` | ||
from pathlib import Path | ||
import path_finder | ||
|
||
dir_finder = path_finder.DirFinder( | ||
single_start_dir=pathlib.Path('start_search_from_this_dir') | ||
exclude_empty_dirs=True, | ||
limit_depth=True, | ||
depth=0, # so only search in single_start_dir | ||
) | ||
|
||
paths = dir_finder.paths # returns a List[Path] | ||
paths_empty_files = dir_finder.paths_empty_file # returns a List[Path] | ||
``` |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
from .finder.dir_finder import DirFinder | ||
from .finder.file_finder import FileFinder |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
from pathlib import Path | ||
from typing import List | ||
|
||
import logging | ||
|
||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class Finder: | ||
|
||
DEPTH_MAPPER = { | ||
0: "*", # search only start_dir | ||
1: "*/*", # search start_dir + its subdirs | ||
2: "*/*/*", # etc.. | ||
3: "*/*/*/*", | ||
4: "*/*/*/*/*", | ||
5: "*/*/*/*/*/*", | ||
6: "*/*/*/*/*/*/*", | ||
} | ||
|
||
def __init__( | ||
self, | ||
single_start_dir: Path = None, | ||
multi_start_dir: List[Path] = None, | ||
limit_depth: bool = True, | ||
depth: int = 0, | ||
): | ||
""" | ||
:param single_start_dir: Path: One single path (=directory) from where the search starts. | ||
:param multi_start_dir: List[Path]: You can start your search from multiple paths directories. | ||
:param limit_depth: bool: if False, then search all subdirs. If True then limit search to 'depth'. | ||
:param depth: int: nr of directories deep (recursively). E.g. depth=1? then only search in | ||
dir, all subdirs of dir | ||
Either choose single_start_dir or multi_start_dir | ||
""" | ||
self.single_start_dir = single_start_dir | ||
self.multi_start_dir = multi_start_dir | ||
self.limit_depth = limit_depth | ||
self.depth = depth | ||
self.validate_finder_constructor() | ||
|
||
def validate_finder_constructor(self): | ||
# validate single_start_dir + multi_start_dir | ||
if (self.single_start_dir and self.multi_start_dir) or ( | ||
not self.single_start_dir and not self.multi_start_dir | ||
): | ||
raise AssertionError(f"use either single_start_dir or multi_start_dir") | ||
if self.single_start_dir: | ||
assert isinstance( | ||
self.single_start_dir, Path | ||
), "single_start_dir must be a pathlib.Path" | ||
assert ( | ||
self.single_start_dir.is_dir() | ||
), f"single_start_dir {self.single_start_dir} does not exist" | ||
elif self.multi_start_dir: | ||
assert isinstance( | ||
self.multi_start_dir, list | ||
), "multi_start_dir must be a list (with pathlib.Path)" | ||
none_path_objects = [ | ||
x for x in self.multi_start_dir if not isinstance(x, Path) | ||
] | ||
if none_path_objects: | ||
msg = "not all elements in multi_start_dir are of type pathlib.Path" | ||
if ( | ||
len(none_path_objects) < 4 | ||
): # 4 is a bit random (just do not return too many paths..) | ||
raise AssertionError(f"{msg} : {none_path_objects}") | ||
raise AssertionError(msg) | ||
none_existing_dirs = [x for x in self.multi_start_dir if not x.is_dir()] | ||
if none_existing_dirs: | ||
msg = "not all elements in multi_start_dir are existing directories" | ||
if ( | ||
len(none_existing_dirs) < 4 | ||
): # 4 is a bit random (just do not return too many paths..) | ||
raise AssertionError(f"{msg}: {none_existing_dirs}") | ||
raise AssertionError(msg) | ||
|
||
# validate depth + limit_depth | ||
assert isinstance(self.limit_depth, bool), f"limit_depth must be a bool" | ||
if self.depth and not self.limit_depth: | ||
raise AssertionError( | ||
f"depth={self.depth} is only possible with limit_depth=True" | ||
) | ||
if not self.limit_depth: | ||
return | ||
max_allowed_depth = max(self.DEPTH_MAPPER.keys()) | ||
if not isinstance(self.depth, int) or not 0 <= self.depth <= max_allowed_depth: | ||
raise AssertionError( | ||
f"depth {self.depth} must be a int and in range: 0 <= depth <= {max_allowed_depth}" | ||
) | ||
logger.debug( | ||
f"search recursively with limit_depth=True with depth={self.depth}" | ||
) | ||
|
||
def _depth_to_startdir(self, path: Path, start_dir: Path) -> int: | ||
"""Calculate the nr of parts (between slashes) in a relative path to start_dir. | ||
Returns 0 if path == start_dir, | ||
Returns 1 if path is a subdir of start_dir, | ||
etc.. | ||
""" | ||
try: | ||
parts = path.relative_to(start_dir).parts | ||
return len(parts) - 1 | ||
except TypeError as err: | ||
raise AssertionError( | ||
f"path {path} could not related to start_dir {start_dir}, err={err}" | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,127 @@ | ||
from ..finder.base import Finder | ||
from itertools import chain | ||
from pathlib import Path | ||
from typing import List | ||
|
||
import logging | ||
import re | ||
|
||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class DirFinder(Finder): | ||
def __init__( | ||
self, | ||
dirname_regex: str = None, | ||
exclude_empty_dirs: bool = False, | ||
*args, | ||
**kwargs, | ||
): | ||
self.dirname_regex = dirname_regex | ||
self.exclude_empty_dirs = exclude_empty_dirs | ||
self._paths = None | ||
self._paths_empty_dir = None | ||
self.validate_dirfinder_constructor() | ||
super().__init__(*args, **kwargs) | ||
|
||
def validate_dirfinder_constructor(self): | ||
# filename_regex is optional!! | ||
if self.dirname_regex and not isinstance(self.dirname_regex, str): | ||
raise AssertionError("dirname_regex must be a str") | ||
|
||
def _is_dir_path_regex_match(self, _path: Path) -> bool: | ||
return _path.is_dir() and re.match( | ||
pattern=self.dirname_regex, string=_path.stem | ||
) | ||
|
||
def _get_paths_from_single_dir(self, single_dir: Path) -> List[Path]: | ||
if self.limit_depth: | ||
# When we get all recursive paths with rglob('*') and then evaluate them may result in | ||
# potentially a lot of unnecessary work. Solution below is 'do it depth-by-depth': if e.g. | ||
# self.depth=2, then we get first all paths of depth=0, then depth=1, and then depth=2. | ||
# First, create an empty generator in which we will merge one generator per depth | ||
dir_paths_generator = chain() | ||
for _depth_n, glob_pattern in self.DEPTH_MAPPER.items(): | ||
if _depth_n > self.depth: | ||
break | ||
if self.dirname_regex: | ||
only_dirs_generator = ( | ||
_path | ||
for _path in single_dir.glob(glob_pattern) | ||
if _path.is_dir() | ||
and re.match(pattern=self.dirname_regex, string=_path.stem) | ||
) | ||
else: | ||
only_dirs_generator = ( | ||
_path | ||
for _path in single_dir.glob(glob_pattern) | ||
if _path.is_dir() | ||
) | ||
# merge generators into one | ||
dir_paths_generator = chain(dir_paths_generator, only_dirs_generator) | ||
logger.debug("convert generator to list, this may take a while") | ||
return [x for x in dir_paths_generator] | ||
|
||
if not self.limit_depth: | ||
# note we use rglob (recursive search all subdirs) | ||
if self.dirname_regex: | ||
dir_paths_generator = ( | ||
_path | ||
for _path in single_dir.rglob("*") | ||
if _path.is_dir() | ||
and re.match(pattern=self.dirname_regex, string=_path.stem) | ||
) | ||
|
||
else: | ||
dir_paths_generator = ( | ||
_path for _path in single_dir.rglob("*") if _path.is_dir() | ||
) | ||
logger.debug("convert generator to list, this may take a while") | ||
return [x for x in dir_paths_generator] | ||
|
||
def _get_paths_from_multi_dir(self) -> List[Path]: | ||
nested_lists_with_paths = [ | ||
self._get_paths_from_single_dir(single_dir=_dir_path) | ||
for _dir_path in self.multi_start_dir | ||
] | ||
paths_from_multi_dir = [ | ||
item for sublist in nested_lists_with_paths for item in sublist | ||
] | ||
return list(set(paths_from_multi_dir)) if paths_from_multi_dir else [] | ||
|
||
@property | ||
def paths(self) -> List[Path]: | ||
if self._paths or self._paths == []: | ||
return self._paths | ||
|
||
# single dir | ||
if self.single_start_dir: | ||
self._paths = self._get_paths_from_single_dir( | ||
single_dir=self.single_start_dir | ||
) | ||
if self.exclude_empty_dirs: | ||
self._paths = [_path for _path in self._paths if any(_path.iterdir())] | ||
return self._paths | ||
|
||
# multi dir | ||
self._paths = self._get_paths_from_multi_dir() | ||
if self.exclude_empty_dirs: | ||
self._paths = [_path for _path in self._paths if any(_path.iterdir())] | ||
return self._paths | ||
|
||
@property | ||
def paths_empty_dir(self) -> List[Path]: | ||
""" A selection of self.paths of dirs that hold no files. """ | ||
if self._paths_empty_dir or self._paths_empty_dir == []: | ||
return self._paths_empty_dir | ||
elif self.exclude_empty_dirs: | ||
logger.info( | ||
f"paths_empty_dir is [] as search was done with exclude_empty_dirs=True" | ||
) | ||
self._paths_empty_dir = [] | ||
return self._paths_empty_dir | ||
self._paths_empty_dir = [ | ||
_path for _path in self.paths if not any(_path.iterdir()) | ||
] | ||
return self._paths_empty_dir |