Skip to content

Commit

Permalink
general refactor (#759)
Browse files Browse the repository at this point in the history
  • Loading branch information
zigaLuksic authored Oct 17, 2023
1 parent 4b185cb commit e9eecf3
Showing 1 changed file with 4 additions and 16 deletions.
20 changes: 4 additions & 16 deletions eolearn/core/utils/parallelize.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,7 @@ def _decide_processing_type(workers: int | None, multiprocess: bool) -> _Process
"""
if workers == 1:
return _ProcessingType.SINGLE_PROCESS
if multiprocess:
return _ProcessingType.MULTIPROCESSING
return _ProcessingType.MULTITHREADING
return _ProcessingType.MULTIPROCESSING if multiprocess else _ProcessingType.MULTITHREADING


def parallelize(
Expand All @@ -74,10 +72,7 @@ def parallelize(
:return: A list of function results.
"""
if not params:
raise ValueError(
"At least 1 list of parameters should be given. Otherwise it is not clear how many times the"
"function has to be executed."
)
return []
processing_type = _decide_processing_type(workers=workers, multiprocess=multiprocess)

if processing_type is _ProcessingType.SINGLE_PROCESS:
Expand Down Expand Up @@ -105,7 +100,6 @@ def execute_with_mp_lock(function: Callable[..., OutputType], *args: Any, **kwar
:param function: A function
:param args: Function's positional arguments
:param kwargs: Function's keyword arguments
:return: Function's results
"""
if multiprocessing.current_process().name == "MainProcess" or MULTIPROCESSING_LOCK is None:
return function(*args, **kwargs)
Expand Down Expand Up @@ -165,10 +159,7 @@ def join_futures_iter(
"""

def _wait_function(remaining_futures: Collection[Future]) -> tuple[Collection[Future], Collection[Future]]:
done, not_done = concurrent.futures.wait(
remaining_futures, timeout=float(update_interval), return_when=FIRST_COMPLETED
)
return done, not_done
return concurrent.futures.wait(remaining_futures, timeout=float(update_interval), return_when=FIRST_COMPLETED)

def _get_result(future: Future) -> Any:
return future.result()
Expand All @@ -184,8 +175,6 @@ def _base_join_futures_iter(
) -> Generator[tuple[int, OutputType], None, None]:
"""A generalized utility function that resolves futures, monitors progress, and serves as an iterator over
results."""
if not isinstance(futures, list):
raise ValueError(f"Parameters 'futures' should be a list but {type(futures)} was given")
remaining_futures: Collection[FutureType] = _make_copy_and_empty_given(futures)

id_to_position_map = {id(future): index for index, future in enumerate(remaining_futures)}
Expand All @@ -195,9 +184,8 @@ def _base_join_futures_iter(
done, remaining_futures = wait_function(remaining_futures)
for future in done:
result = get_result_function(future)
result_position = id_to_position_map[id(future)]
pbar.update(1)
yield result_position, result
yield id_to_position_map[id(future)], result


def _make_copy_and_empty_given(items: list[T]) -> list[T]:
Expand Down

0 comments on commit e9eecf3

Please sign in to comment.