Skip to content

Commit

Permalink
Merge pull request #317 from pyiron/pilot
Browse files Browse the repository at this point in the history
Add docstrings and typehints with github co-pilot
  • Loading branch information
jan-janssen authored Aug 3, 2024
2 parents 639ad21 + ca668a7 commit 3183a8b
Show file tree
Hide file tree
Showing 19 changed files with 931 additions and 227 deletions.
8 changes: 5 additions & 3 deletions pysqa/cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,16 @@

def command_line(
arguments_lst: Optional[list] = None, execute_command: callable = execute_command
):
) -> None:
"""
Parse the command line arguments.
Args:
argv: Command line arguments
execute_command: function to comunicate with shell process
arguments_lst (Optional[list]): Command line arguments
execute_command (callable): Function to communicate with shell process
Returns:
None
"""
directory = "~/.queues"
queue = None
Expand Down
34 changes: 32 additions & 2 deletions pysqa/executor/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,17 @@
def execute_files_from_list(
tasks_in_progress_dict: dict, cache_directory: str, executor
):
"""
Execute files from the given list.
Args:
tasks_in_progress_dict (dict): Dictionary of tasks in progress.
cache_directory (str): Directory where the files are stored.
executor: Executor object for executing tasks.
Returns:
None
"""
file_lst = os.listdir(cache_directory)
for file_name_in in file_lst:
key = file_name_in.split(".in.pl")[0]
Expand Down Expand Up @@ -42,6 +53,16 @@ def execute_files_from_list(


def execute_tasks(cores: int, cache_directory: str):
"""
Execute tasks from the given cache directory using the specified number of cores.
Args:
cores (int): Number of cores to use for execution.
cache_directory (str): Directory where the files are stored.
Returns:
None
"""
tasks_in_progress_dict = {}
with Executor(
max_cores=cores,
Expand All @@ -61,9 +82,18 @@ def execute_tasks(cores: int, cache_directory: str):
)


def command_line(arguments_lst: Optional[list] = None):
def command_line(arguments_lst: Optional[list] = None) -> None:
"""
Execute tasks from the command line.
Args:
arguments_lst (Optional[list]): List of command line arguments.
Returns:
None
"""
if arguments_lst is None:
arguments_lst = sys.argv[1:]
cores_arg = arguments_lst[arguments_lst.index("--cores") + 1]
path_arg = arguments_lst[arguments_lst.index("--path") + 1]
execute_tasks(cores=cores_arg, cache_directory=path_arg)
execute_tasks(cores=int(cores_arg), cache_directory=path_arg)
28 changes: 27 additions & 1 deletion pysqa/executor/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ def __init__(
queue_adapter=None,
queue_adapter_kwargs: Optional[dict] = None,
):
"""
Initialize the Executor.
Args:
cwd (Optional[str]): The current working directory. Defaults to None.
queue_adapter: The queue adapter. Defaults to None.
queue_adapter_kwargs (Optional[dict]): Additional keyword arguments for the queue adapter. Defaults to None.
"""
self._task_queue = queue.Queue()
self._memory_dict = {}
self._cache_directory = os.path.abspath(os.path.expanduser(cwd))
Expand Down Expand Up @@ -50,7 +58,18 @@ def __init__(
)
self._process.start()

def submit(self, fn: callable, *args, **kwargs):
def submit(self, fn: callable, *args, **kwargs) -> Future:
"""
Submit a function for execution.
Args:
fn (callable): The function to be executed.
*args: Positional arguments to be passed to the function.
**kwargs: Keyword arguments to be passed to the function.
Returns:
Future: A Future object representing the execution of the function.
"""
funct_dict = serialize_funct(fn, *args, **kwargs)
key = list(funct_dict.keys())[0]
if key not in self._memory_dict.keys():
Expand All @@ -62,6 +81,13 @@ def submit(self, fn: callable, *args, **kwargs):
return self._memory_dict[key]

def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
"""
Shutdown the Executor.
Args:
wait (bool): Whether to wait for all tasks to complete before shutting down. Defaults to True.
cancel_futures (bool): Whether to cancel pending futures. Defaults to False.
"""
if cancel_futures:
cancel_items_in_queue(que=self._task_queue)
self._task_queue.put({"shutdown": True, "wait": wait})
Expand Down
142 changes: 133 additions & 9 deletions pysqa/executor/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,40 @@
import queue
import re
from concurrent.futures import Future
from typing import List

import cloudpickle


def deserialize(funct_dict: dict) -> dict:
"""
Deserialize a dictionary of serialized functions.
Args:
funct_dict (dict): A dictionary containing serialized functions.
Returns:
dict: A dictionary with deserialized functions.
"""
try:
return {k: cloudpickle.loads(v) for k, v in funct_dict.items()}
except EOFError:
return {}


def find_executed_tasks(future_queue: queue.Queue, cache_directory: str):
def find_executed_tasks(future_queue: queue.Queue, cache_directory: str) -> None:
"""
Find executed tasks from the future queue and update the task memory dictionary.
Args:
future_queue (queue.Queue): The queue containing the futures of executed tasks.
cache_directory (str): The directory where the task cache is stored.
Returns:
None
"""
task_memory_dict = {}
while True:
task_dict = {}
Expand All @@ -33,14 +55,36 @@ def find_executed_tasks(future_queue: queue.Queue, cache_directory: str):


def read_from_file(file_name: str) -> dict:
"""
Read the contents of a file and return it as a dictionary.
Args:
file_name (str): The name of the file to read.
Returns:
dict: A dictionary containing the contents of the file, with the file name as the key.
"""
name = file_name.split("/")[-1].split(".")[0]
with open(file_name, "rb") as f:
return {name: f.read()}


def reload_previous_futures(
future_queue: queue.Queue, future_dict: dict, cache_directory: str
):
) -> None:
"""
Reload previous futures from the cache directory and update the future dictionary.
Args:
future_queue (queue.Queue): The queue containing the futures of executed tasks.
future_dict (dict): A dictionary containing the current futures.
cache_directory (str): The directory where the task cache is stored.
Returns:
None
"""
file_lst = os.listdir(cache_directory)
for f in file_lst:
if f.endswith(".in.pl"):
Expand All @@ -56,16 +100,50 @@ def reload_previous_futures(
future_queue.put({key: future_dict[key]})


def serialize_result(result_dict: dict):
def serialize_result(result_dict: dict) -> dict:
"""
Serialize the values in a dictionary using cloudpickle.
Args:
result_dict (dict): A dictionary containing the values to be serialized.
Returns:
dict: A dictionary with serialized values.
"""
return {k: cloudpickle.dumps(v) for k, v in result_dict.items()}


def serialize_funct(fn: callable, *args, **kwargs):
def serialize_funct(fn: callable, *args, **kwargs) -> dict:
"""
Serialize a function along with its arguments and keyword arguments.
Args:
fn (callable): The function to be serialized.
*args: The arguments to be passed to the function.
**kwargs: The keyword arguments to be passed to the function.
Returns:
dict: A dictionary containing the serialized function.
"""
binary = cloudpickle.dumps({"fn": fn, "args": args, "kwargs": kwargs})
return {fn.__name__ + _get_hash(binary=binary): binary}


def write_to_file(funct_dict: dict, state, cache_directory: str):
def write_to_file(funct_dict: dict, state: str, cache_directory: str) -> List[str]:
"""
Write the contents of a dictionary to files in the cache directory.
Args:
funct_dict (dict): A dictionary containing the contents to be written.
state (str): The state of the files to be written.
cache_directory (str): The directory where the files will be written.
Returns:
List[str]: A list of file names that were written.
"""
file_name_lst = []
for k, v in funct_dict.items():
file_name = _get_file_name(name=k, state=state)
Expand All @@ -75,23 +153,69 @@ def write_to_file(funct_dict: dict, state, cache_directory: str):
return file_name_lst


def _get_file_name(name, state):
def _get_file_name(name: str, state: str) -> str:
"""
Generate a file name based on the given name and state.
Args:
name (str): The name to be included in the file name.
state (str): The state of the file.
Returns:
str: The generated file name.
"""
return name + "." + state + ".pl"


def _get_hash(binary):
def _get_hash(binary: bytes) -> str:
"""
Get the hash of a binary using MD5 algorithm.
Args:
binary (bytes): The binary data to be hashed.
Returns:
str: The hexadecimal representation of the hash.
"""
# Remove specification of jupyter kernel from hash to be deterministic
binary_no_ipykernel = re.sub(b"(?<=/ipykernel_)(.*)(?=/)", b"", binary)
return str(hashlib.md5(binary_no_ipykernel).hexdigest())


def _set_future(file_name, future):
def _set_future(file_name: str, future: Future) -> None:
"""
Set the result of a future based on the contents of a file.
Args:
file_name (str): The name of the file containing the result.
future (Future): The future to set the result for.
Returns:
None
"""
values = deserialize(funct_dict=read_from_file(file_name=file_name)).values()
if len(values) == 1:
future.set_result(list(values)[0])


def _update_task_dict(task_dict, task_memory_dict, cache_directory):
def _update_task_dict(
task_dict: dict, task_memory_dict: dict, cache_directory: str
) -> None:
"""
Update the task memory dictionary with the futures from the task dictionary.
Args:
task_dict (dict): A dictionary containing the futures of tasks.
task_memory_dict (dict): The dictionary to store the task memory.
cache_directory (str): The directory where the task cache is stored.
Returns:
None
"""
file_lst = os.listdir(cache_directory)
for key, future in task_dict.items():
task_memory_dict[key] = future
Expand Down
Loading

0 comments on commit 3183a8b

Please sign in to comment.