Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add docstrings and typehints with github co-pilot #317

Merged
merged 6 commits into from
Aug 3, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions pysqa/cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,16 @@

def command_line(
arguments_lst: Optional[list] = None, execute_command: callable = execute_command
):
) -> None:
"""
Parse the command line arguments.

Args:
argv: Command line arguments
execute_command: function to comunicate with shell process
arguments_lst (Optional[list]): Command line arguments
execute_command (callable): Function to communicate with shell process

Returns:
None
"""
directory = "~/.queues"
queue = None
Expand Down
34 changes: 32 additions & 2 deletions pysqa/executor/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,17 @@
def execute_files_from_list(
tasks_in_progress_dict: dict, cache_directory: str, executor
):
"""
Execute files from the given list.

Args:
tasks_in_progress_dict (dict): Dictionary of tasks in progress.
cache_directory (str): Directory where the files are stored.
executor: Executor object for executing tasks.

Returns:
None
"""
file_lst = os.listdir(cache_directory)
for file_name_in in file_lst:
key = file_name_in.split(".in.pl")[0]
Expand Down Expand Up @@ -42,6 +53,16 @@ def execute_files_from_list(


def execute_tasks(cores: int, cache_directory: str):
"""
Execute tasks from the given cache directory using the specified number of cores.

Args:
cores (int): Number of cores to use for execution.
cache_directory (str): Directory where the files are stored.

Returns:
None
"""
tasks_in_progress_dict = {}
with Executor(
max_cores=cores,
Expand All @@ -61,9 +82,18 @@ def execute_tasks(cores: int, cache_directory: str):
)


def command_line(arguments_lst: Optional[list] = None):
def command_line(arguments_lst: Optional[list] = None) -> None:
"""
Execute tasks from the command line.

Args:
arguments_lst (Optional[list]): List of command line arguments.

Returns:
None
"""
if arguments_lst is None:
arguments_lst = sys.argv[1:]
cores_arg = arguments_lst[arguments_lst.index("--cores") + 1]
path_arg = arguments_lst[arguments_lst.index("--path") + 1]
execute_tasks(cores=cores_arg, cache_directory=path_arg)
execute_tasks(cores=int(cores_arg), cache_directory=path_arg)
28 changes: 27 additions & 1 deletion pysqa/executor/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ def __init__(
queue_adapter=None,
queue_adapter_kwargs: Optional[dict] = None,
):
"""
Initialize the Executor.

Args:
cwd (Optional[str]): The current working directory. Defaults to None.
queue_adapter: The queue adapter. Defaults to None.
queue_adapter_kwargs (Optional[dict]): Additional keyword arguments for the queue adapter. Defaults to None.
"""
self._task_queue = queue.Queue()
self._memory_dict = {}
self._cache_directory = os.path.abspath(os.path.expanduser(cwd))
Expand Down Expand Up @@ -50,7 +58,18 @@ def __init__(
)
self._process.start()

def submit(self, fn: callable, *args, **kwargs):
def submit(self, fn: callable, *args, **kwargs) -> Future:
"""
Submit a function for execution.

Args:
fn (callable): The function to be executed.
*args: Positional arguments to be passed to the function.
**kwargs: Keyword arguments to be passed to the function.

Returns:
Future: A Future object representing the execution of the function.
"""
funct_dict = serialize_funct(fn, *args, **kwargs)
key = list(funct_dict.keys())[0]
if key not in self._memory_dict.keys():
Expand All @@ -62,6 +81,13 @@ def submit(self, fn: callable, *args, **kwargs):
return self._memory_dict[key]

def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
"""
Shutdown the Executor.

Args:
wait (bool): Whether to wait for all tasks to complete before shutting down. Defaults to True.
cancel_futures (bool): Whether to cancel pending futures. Defaults to False.
"""
if cancel_futures:
cancel_items_in_queue(que=self._task_queue)
self._task_queue.put({"shutdown": True, "wait": wait})
Expand Down
141 changes: 132 additions & 9 deletions pysqa/executor/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,34 @@


def deserialize(funct_dict: dict) -> dict:
"""
Deserialize a dictionary of serialized functions.

Args:
funct_dict (dict): A dictionary containing serialized functions.

Returns:
dict: A dictionary with deserialized functions.

"""
try:
return {k: cloudpickle.loads(v) for k, v in funct_dict.items()}
except EOFError:
return {}


def find_executed_tasks(future_queue: queue.Queue, cache_directory: str):
def find_executed_tasks(future_queue: queue.Queue, cache_directory: str) -> None:
"""
Find executed tasks from the future queue and update the task memory dictionary.

Args:
future_queue (queue.Queue): The queue containing the futures of executed tasks.
cache_directory (str): The directory where the task cache is stored.

Returns:
None

"""
task_memory_dict = {}
while True:
task_dict = {}
Expand All @@ -33,14 +54,36 @@ def find_executed_tasks(future_queue: queue.Queue, cache_directory: str):


def read_from_file(file_name: str) -> dict:
"""
Read the contents of a file and return it as a dictionary.

Args:
file_name (str): The name of the file to read.

Returns:
dict: A dictionary containing the contents of the file, with the file name as the key.

"""
name = file_name.split("/")[-1].split(".")[0]
with open(file_name, "rb") as f:
return {name: f.read()}


def reload_previous_futures(
future_queue: queue.Queue, future_dict: dict, cache_directory: str
):
) -> None:
"""
Reload previous futures from the cache directory and update the future dictionary.

Args:
future_queue (queue.Queue): The queue containing the futures of executed tasks.
future_dict (dict): A dictionary containing the current futures.
cache_directory (str): The directory where the task cache is stored.

Returns:
None

"""
file_lst = os.listdir(cache_directory)
for f in file_lst:
if f.endswith(".in.pl"):
Expand All @@ -56,16 +99,50 @@ def reload_previous_futures(
future_queue.put({key: future_dict[key]})


def serialize_result(result_dict: dict):
def serialize_result(result_dict: dict) -> dict:
"""
Serialize the values in a dictionary using cloudpickle.

Args:
result_dict (dict): A dictionary containing the values to be serialized.

Returns:
dict: A dictionary with serialized values.

"""
return {k: cloudpickle.dumps(v) for k, v in result_dict.items()}


def serialize_funct(fn: callable, *args, **kwargs):
def serialize_funct(fn: callable, *args, **kwargs) -> dict:
"""
Serialize a function along with its arguments and keyword arguments.

Args:
fn (callable): The function to be serialized.
*args: The arguments to be passed to the function.
**kwargs: The keyword arguments to be passed to the function.

Returns:
dict: A dictionary containing the serialized function.

"""
binary = cloudpickle.dumps({"fn": fn, "args": args, "kwargs": kwargs})
return {fn.__name__ + _get_hash(binary=binary): binary}


def write_to_file(funct_dict: dict, state, cache_directory: str):
def write_to_file(funct_dict: dict, state: str, cache_directory: str) -> List[str]:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix undefined name List.

The List type hint is used but not imported. Import List from typing to fix this issue.

+ from typing import List
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def write_to_file(funct_dict: dict, state: str, cache_directory: str) -> List[str]:
+ from typing import List
def write_to_file(funct_dict: dict, state: str, cache_directory: str) -> List[str]:
Tools
Ruff

133-133: Undefined name List

(F821)

"""
Write the contents of a dictionary to files in the cache directory.

Args:
funct_dict (dict): A dictionary containing the contents to be written.
state (str): The state of the files to be written.
cache_directory (str): The directory where the files will be written.

Returns:
List[str]: A list of file names that were written.

"""
Comment on lines +134 to +146
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix undefined name List.

The List type hint is used but not imported. Import List from typing to fix this issue.

+ from typing import List
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def write_to_file(funct_dict: dict, state: str, cache_directory: str) -> List[str]:
"""
Write the contents of a dictionary to files in the cache directory.
Args:
funct_dict (dict): A dictionary containing the contents to be written.
state (str): The state of the files to be written.
cache_directory (str): The directory where the files will be written.
Returns:
List[str]: A list of file names that were written.
"""
from typing import List
def write_to_file(funct_dict: dict, state: str, cache_directory: str) -> List[str]:
"""
Write the contents of a dictionary to files in the cache directory.
Args:
funct_dict (dict): A dictionary containing the contents to be written.
state (str): The state of the files to be written.
cache_directory (str): The directory where the files will be written.
Returns:
List[str]: A list of file names that were written.
"""

file_name_lst = []
for k, v in funct_dict.items():
file_name = _get_file_name(name=k, state=state)
Expand All @@ -75,23 +152,69 @@ def write_to_file(funct_dict: dict, state, cache_directory: str):
return file_name_lst


def _get_file_name(name, state):
def _get_file_name(name: str, state: str) -> str:
"""
Generate a file name based on the given name and state.

Args:
name (str): The name to be included in the file name.
state (str): The state of the file.

Returns:
str: The generated file name.

"""
return name + "." + state + ".pl"


def _get_hash(binary):
def _get_hash(binary: bytes) -> str:
"""
Get the hash of a binary using MD5 algorithm.

Args:
binary (bytes): The binary data to be hashed.

Returns:
str: The hexadecimal representation of the hash.

"""
# Remove specification of jupyter kernel from hash to be deterministic
binary_no_ipykernel = re.sub(b"(?<=/ipykernel_)(.*)(?=/)", b"", binary)
return str(hashlib.md5(binary_no_ipykernel).hexdigest())


def _set_future(file_name, future):
def _set_future(file_name: str, future: Future) -> None:
"""
Set the result of a future based on the contents of a file.

Args:
file_name (str): The name of the file containing the result.
future (Future): The future to set the result for.

Returns:
None

"""
values = deserialize(funct_dict=read_from_file(file_name=file_name)).values()
if len(values) == 1:
future.set_result(list(values)[0])


def _update_task_dict(task_dict, task_memory_dict, cache_directory):
def _update_task_dict(
task_dict: dict, task_memory_dict: dict, cache_directory: str
) -> None:
"""
Update the task memory dictionary with the futures from the task dictionary.

Args:
task_dict (dict): A dictionary containing the futures of tasks.
task_memory_dict (dict): The dictionary to store the task memory.
cache_directory (str): The directory where the task cache is stored.

Returns:
None

"""
file_lst = os.listdir(cache_directory)
for key, future in task_dict.items():
task_memory_dict[key] = future
Expand Down
Loading
Loading