diff --git a/.github/workflows/pypi.yml b/.github/workflows/pypi.yml index 57915b5..38bdb5c 100644 --- a/.github/workflows/pypi.yml +++ b/.github/workflows/pypi.yml @@ -1,4 +1,4 @@ -name: Upload PROJECT_NAME to Pypi +name: Upload collegamento to Pypi on: release: diff --git a/.gitignore b/.gitignore index 3d690d7..f1de24a 100644 --- a/.gitignore +++ b/.gitignore @@ -7,7 +7,7 @@ dist/ build/ .ruff_cache/ .pytest_cache/ -PROJECT_NAME.egg-info/ +collegamento.egg-info/ # Pycharm .idea diff --git a/PROJECT_NAME/__init__.py b/PROJECT_NAME/__init__.py deleted file mode 100644 index 9df0ecf..0000000 --- a/PROJECT_NAME/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -from beartype.claw import beartype_this_package - -beartype_this_package() - -# xyz module level imports go here diff --git a/README.md b/README.md index 4d0d12e..762c257 100644 --- a/README.md +++ b/README.md @@ -1,14 +1,14 @@ -

PROJECT_NAME v0.1.0

+

collegamento v0.1.0

-A tool that makes it much easier to do xyz. +A tool that makes it much easier to make offload work when asyncio isn't an option. # Installation -In the Command Line, paste the following: `pip install PROJECT_NAME` +In the Command Line, paste the following: `pip install collegamento` ## Description -PROJECT_NAME is a library that can be used for xyz. Docs are listed on this [ReadTheDocs page](https://PROJECT_NAME.readthedocs.io/en/master/) +Collegamento is a library that can be used for Client/Server IPC's with the goal of offloading major workloads to a second process. Docs are listed on this [ReadTheDocs page](https://collegamento.readthedocs.io/en/master/) ## Contributing diff --git a/collegamento/__init__.py b/collegamento/__init__.py new file mode 100644 index 0000000..a05f1d3 --- /dev/null +++ b/collegamento/__init__.py @@ -0,0 +1,16 @@ +from beartype.claw import beartype_this_package + +beartype_this_package() + +from .files_variant import ( # noqa: F401, E402 + FileClient, + FileNotification, + FileRequest, + FileServer, +) + +# xyz module level imports go here +from .ipc import USER_FUNCTION # noqa: F401, E402 +from .ipc import Client as SimpleClient # noqa: F401, E402 +from .ipc import Notification, Request, Response # noqa: F401, E402 +from .ipc import Server as SimpleServer # noqa: F401, E402 diff --git a/collegamento/files_variant.py b/collegamento/files_variant.py new file mode 100644 index 0000000..336aaf3 --- /dev/null +++ b/collegamento/files_variant.py @@ -0,0 +1,121 @@ +from logging import Logger +from multiprocessing.queues import Queue as GenericQueueClass +from typing import NotRequired + +from .ipc import USER_FUNCTION, Client, Notification, Request, Server + + +class FileRequest(Request): + # There may be commands that don't require a file but some might + file: NotRequired[str] + + +class FileNotification(Notification): + file: str + remove: bool + contents: NotRequired[str] + + +class FileClient(Client): + def __init__( + self, commands: dict[str, USER_FUNCTION], id_max: int = 15_000 + ) -> None: + self.files: dict[str, str] = {} + + super().__init__(commands, id_max, FileServer) + + def create_server(self) -> None: + """Creates the main_server through a subprocess - internal API""" + + super().create_server() + + self.logger.info("Copying files to server") + files_copy = self.files.copy() + self.files = {} + for file, data in files_copy.items(): + self.update_file(file, data) + self.logger.debug("Finished copying files to server") + + def update_file(self, file: str, current_state: str) -> None: + """Updates files in the system - external API""" + + self.logger.info(f"Updating file: {file}") + self.files[file] = current_state + + self.logger.debug("Creating notification dict") + notification: FileNotification = { + "id": super().create_message_id(), + "type": "notification", + "file": file, + "remove": False, + "contents": self.files[file], + } + + self.logger.debug("Notifying server of file update") + super().notify_server(notification) + + def remove_file(self, file: str) -> None: + """Removes a file from the main_server - external API""" + if file not in list(self.files.keys()): + self.logger.exception( + f"Cannot remove file {file} as file is not in file database!" + ) + raise Exception( + f"Cannot remove file {file} as file is not in file database!" + ) + + self.logger.info("Notifying server of file deletion") + # self.create_message("notification", remove=True, file=file) + notification: FileNotification = { + "id": super().create_message_id(), + "type": "notification", + "file": file, + "remove": True, + } + self.logger.debug("Notifying server of file removal") + super().notify_server(notification) + + +class FileServer(Server): + def __init__( + self, + commands: dict[str, USER_FUNCTION], + response_queue: GenericQueueClass, + requests_queue: GenericQueueClass, + logger: Logger, + ) -> None: + self.files: dict[str, str] = {} + + super().__init__(commands, response_queue, requests_queue, logger) + + def parse_line(self, message: Request | Notification) -> None: + self.logger.debug("Parsing Message from user - pre-super") + id: int = message["id"] + + if message["type"] == "notification": + self.logger.debug("Mesage is of type notification") + + file: str = message["file"] # type: ignore + + if message["remove"]: # type: ignore + self.logger.info(f"File {file} was requested for removal") + self.files.pop(file) + self.logger.info(f"File {file} has been removed") + else: + contents: str = message["contents"] # type: ignore + self.files[file] = contents + self.logger.info( + f"File {file} has been updated with new contents" + ) + + self.simple_id_response(id, False) + return + + super().parse_line(message) + + def handle_request(self, request: Request) -> None: + if "file" in request: + file = request["file"] + request["file"] = self.files[file] + + super().handle_request(request) diff --git a/collegamento/ipc/__init__.py b/collegamento/ipc/__init__.py new file mode 100644 index 0000000..416b173 --- /dev/null +++ b/collegamento/ipc/__init__.py @@ -0,0 +1,10 @@ +from .client import Client # noqa: F401, E402 +from .misc import ( # noqa: F401, E402 + USER_FUNCTION, + Notification, + Request, + RequestQueueType, + Response, + ResponseQueueType, +) +from .server import Server # noqa: F401, E402 diff --git a/collegamento/ipc/client.py b/collegamento/ipc/client.py new file mode 100644 index 0000000..7951b9e --- /dev/null +++ b/collegamento/ipc/client.py @@ -0,0 +1,220 @@ +from logging import Logger, getLogger +from multiprocessing import Process, Queue, freeze_support +from random import randint + +from .misc import ( + USER_FUNCTION, + Notification, + Request, + RequestQueueType, + Response, + ResponseQueueType, +) +from .server import Server + + +class Client: + """The IPC class is used to talk to the server and run commands. The public API includes the following methods: + - Client.notify_server() + - Client.request() + - Client.add_command() + - Client.cancel_request() + - Client.kill_IPC() + """ + + def __init__( + self, + commands: dict[str, USER_FUNCTION], + id_max: int = 15_000, + server_type: type = Server, + ) -> None: + self.all_ids: list[int] = [] + self.id_max = id_max + self.current_ids: dict[str, int] = {} + self.newest_responses: dict[str, Response | None] = {} + self.server_type: type[Server] = server_type + + self.commands = commands + for command in self.commands: + self.current_ids[command] = 0 + self.newest_responses[command] = None + + self.logger: Logger = getLogger("IPC") + self.logger.info("Creating server") + self.response_queue: ResponseQueueType = Queue() + self.requests_queue: RequestQueueType = Queue() + self.main_server: Process + self.create_server() + self.logger.info("Initialization is complete") + + def create_server(self) -> None: + """Creates the main_server through a subprocess - internal API""" + freeze_support() + server_logger = getLogger("Server") + self.main_server = Process( + target=self.server_type, + args=( + self.commands, + self.response_queue, + self.requests_queue, + server_logger, + ), + daemon=True, + ) + self.main_server.start() + self.logger.info("Server created") + + def create_message_id(self) -> int: + """Creates a Message based on the args and kwawrgs provided. Highly flexible. - internal API""" + self.logger.info("Creating message for server") + id = randint(1, self.id_max) # 0 is reserved for the empty case + while id in self.all_ids: + id = randint(1, self.id_max) + self.all_ids.append(id) + + self.logger.debug("ID for message created") + + if not self.main_server.is_alive(): + # No point in an id if the server's dead + self.logger.critical( + "Server was killed at some point, creating server" + ) + self.create_server() + + return id + + def notify_server( + self, + notification_dict: Notification, + ) -> None: + self.logger.info("Creating notification for server") + + id: int = self.create_message_id() + final_notification: Notification = { + "id": id, + "type": "notification", + } + final_notification.update(notification_dict) + self.logger.debug(f"Notification created: {final_notification}") + + self.requests_queue.put(final_notification) + self.logger.info("Message sent") + + def request( + self, + request_details: dict, + ) -> None: + """Sends the main_server a request of type command with given kwargs - external API""" + self.logger.debug("Beginning request") + + # TODO: Should this just be a walrus operator? "(command := request_dict["command"])" + command: str = request_details["command"] + if command not in self.commands: + self.logger.exception( + f"Command {command} not in builtin commands. Those are {self.commands}!" + ) + raise Exception( + f"Command {command} not in builtin commands. Those are {self.commands}!" + ) + + self.logger.info("Creating request for server") + + id: int = self.create_message_id() + + self.current_ids[command] = id + final_request: Request = { + "id": id, + "type": "request", + "command": command, + } + final_request.update(request_details) # type: ignore + self.logger.debug(f"Request created: {final_request}") + + self.requests_queue.put(final_request) + self.logger.info("Message sent") + + def cancel_request(self, command: str) -> None: + """Cancels a request of type command - external API""" + if command not in self.commands: + self.logger.exception( + f"Cannot cancel command {command}, valid commands are {self.commands}" + ) + raise Exception( + f"Cannot cancel command {command}, valid commands are {self.commands}" + ) + + self.logger.info(f"Cancelled command: {command}") + self.current_ids[command] = 0 + + def parse_response(self, res: Response) -> None: + """Parses main_server output line and discards useless responses - internal API""" + self.logger.debug("Parsing server response") + id = res["id"] + self.all_ids.remove(id) + + if "command" not in res: + self.logger.info("Response was notification response") + return + + command = res["command"] + + if command == "add-command": + return + + if id != self.current_ids[command]: + self.logger.info("Response is from old request") + return + + self.logger.info(f"Response is useful for command type: {command}") + self.current_ids[command] = 0 + self.newest_responses[command] = res + + def check_responses(self) -> None: + """Checks all main_server output by calling IPC.parse_line() on each response - internal API""" + self.logger.debug("Checking responses") + while not self.response_queue.empty(): + self.parse_response(self.response_queue.get()) + + def get_response(self, command: str) -> Response | None: + """Checks responses and returns the current response of type command if it has been returned - external API""" + self.logger.info(f"Getting response for type: {command}") + if command not in self.commands: + self.logger.exception( + f"Cannot get response of command {command}, valid commands are {self.commands}" + ) + raise Exception( + f"Cannot get response of command {command}, valid commands are {self.commands}" + ) + + self.check_responses() + response: Response | None = self.newest_responses[command] + self.newest_responses[command] = None + self.logger.info("Response retrieved") + return response + + def add_commmand(self, name: str, command: USER_FUNCTION) -> None: + if name == "add-command": + self.logger.exception( + "Cannot add command add-command as it is a special builtin" + ) + raise Exception( + "Cannot add command add-command as it is a special builtin" + ) + + id: int = self.create_message_id() + final_request: Request = { + "id": id, + "type": "request", + "command": "add-command", + } + final_request.update({"name": name, "function": command}) # type: ignore + self.logger.debug(f"Add Command Request created: {final_request}") + + self.requests_queue.put(final_request) + self.logger.info("Message sent") + self.commands[name] = command + + def kill_IPC(self) -> None: + """Kills the main_server when salve_ipc's services are no longer required - external API""" + self.logger.info("Killing server") + self.main_server.kill() diff --git a/collegamento/ipc/misc.py b/collegamento/ipc/misc.py new file mode 100644 index 0000000..b734b13 --- /dev/null +++ b/collegamento/ipc/misc.py @@ -0,0 +1,43 @@ +from multiprocessing.queues import Queue as GenericQueueClass +from typing import TYPE_CHECKING, Any, NotRequired, TypedDict + +from beartype.typing import Callable + + +class Message(TypedDict): + """Base class for messages in and out of the server""" + + id: int + type: str # Can be "request", "response", "notification" + + +class Request(Message): + """Request from the IPC class to the server with command specific input""" + + command: str + + +class Notification(Message): + """Notifies the server to store or update its storage of something""" + + contents: NotRequired[Any] + + +class Response(Message): + """Server responses to requests and notifications""" + + cancelled: bool + command: NotRequired[str] + result: NotRequired[Any] + + +USER_FUNCTION = Callable[[Request], Any] + +if TYPE_CHECKING: + ResponseQueueType = GenericQueueClass[Response] + RequestQueueType = GenericQueueClass[Request | Notification] +# Else, this is CPython < 3.12. We are now in the No Man's Land +# of Typing. In this case, avoid subscripting "GenericQueue". Ugh. +else: + ResponseQueueType = GenericQueueClass + RequestQueueType = GenericQueueClass diff --git a/collegamento/ipc/server.py b/collegamento/ipc/server.py new file mode 100644 index 0000000..386c2f7 --- /dev/null +++ b/collegamento/ipc/server.py @@ -0,0 +1,165 @@ +from logging import Logger +from multiprocessing.queues import Queue as GenericQueueClass +from time import sleep +from typing import Any + +from .misc import ( + USER_FUNCTION, + Notification, + Request, + RequestQueueType, + Response, + ResponseQueueType, +) + + +class Server: + """Handles input from the user and returns output from special functions. Not an external API.""" + + def __init__( + self, + commands: dict[str, USER_FUNCTION], + response_queue: GenericQueueClass, + requests_queue: GenericQueueClass, + logger: Logger, + ) -> None: + self.logger: Logger = logger + self.logger.info("Starting server setup") + + self.response_queue: ResponseQueueType = response_queue + self.requests_queue: RequestQueueType = requests_queue + self.all_ids: list[int] = [] + self.newest_ids: dict[str, int] = {} + self.newest_requests: dict[str, Request | None] = {} + + self.commands: dict[str, USER_FUNCTION] = commands + for command in self.commands: + self.newest_ids[command] = 0 + self.newest_requests[command] = None + + self.logger.info("Server setup complete") + + while True: + self.run_tasks() + sleep(0.0025) + + def simple_id_response(self, id: int, cancelled: bool = True) -> None: + self.logger.debug(f"Creating simple response for id {id}") + response: Response = { + "id": id, + "type": "response", + "cancelled": cancelled, + } + self.logger.debug(f"Sending simple response for id {id}") + self.response_queue.put(response) + self.logger.info(f"Simple response for id {id} sent") + + def parse_line(self, message: Request | Notification) -> None: + self.logger.debug("Parsing Message from user") + id: int = message["id"] + + if message["type"] not in {"notification", "request"}: + self.logger.warning( + f"Unknown type {type}. Sending simple response" + ) + self.simple_id_response(id) + self.logger.debug(f"Simple response for id {id} sent") + return + + if message["type"] == "notification": + self.logger.debug("Mesage is of type notification") + self.simple_id_response(id, False) + self.logger.debug( + f"Notification response for id {id} has been sent" + ) + return + + self.logger.info(f"Mesage with id {id} is of type request") + self.all_ids.append(id) + command: str = message["command"] # type: ignore + self.newest_ids[command] = id + self.newest_requests[command] = message # type: ignore + self.logger.debug("Request stored for parsing") + + def cancel_all_ids_except_newest(self) -> None: + self.logger.info("Cancelling all old id's") + + # NOTE: Used to be list comprehension but thats ugly + ids = [] + for request in list(self.newest_requests.values()): + if request is not None: + ids.append(request["id"]) + + for request in self.all_ids: + if request in ids: + self.logger.debug(f"Id {request} is newest of its command") + continue + + self.logger.debug( + f"Id {request} is an old request, sending simple respone" + ) + self.simple_id_response(request) + + self.all_ids = [] + self.logger.debug("All ids list reset") + + def handle_request(self, request: Request) -> None: + command: str = request["command"] + id: int = self.newest_ids[command] + result: Any # noqa: F842 + + command = request["command"] + response: Response = { + "id": id, + "type": "response", + "cancelled": False, + "command": command, + } + + if command == "add-command": + self.commands[request["name"]] = request["function"] # type: ignore + response["result"] = None + response["cancelled"] = True + self.logger.debug("Response created") + self.response_queue.put(response) + self.newest_ids[command] = 0 + self.logger.info(f"Response sent for request of command {command}") + return + + if command not in self.commands: + self.logger.warning(f"Command {command} not recognized") + response["result"] = None + response["cancelled"] = True + else: + self.logger.debug(f"Running user function for command {command}") + response["result"] = self.commands[command](request) + + self.logger.debug("Response created") + self.response_queue.put(response) + self.newest_ids[command] = 0 + self.logger.info(f"Response sent for request of command {command}") + + def run_tasks(self) -> None: + if self.requests_queue.empty(): + return + + self.logger.debug("New request in queue") + while not self.requests_queue.empty(): + self.logger.debug("Parsing request") + self.parse_line(self.requests_queue.get()) + + if not self.all_ids: + self.logger.debug("All requests were notifications") + + self.logger.debug("Cancelling all old id's") + self.cancel_all_ids_except_newest() + + # Actual work + for request in list(self.newest_requests.values()): + if request is None: + continue + command: str = request["command"] + self.logger.info(f"Handling request of command {command}") + self.handle_request(request) + self.newest_requests[command] = None + self.logger.debug("Request completed") diff --git a/docs/source/conf.py b/docs/source/conf.py index f5e34ae..b4cc849 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -11,7 +11,7 @@ # -- Project information ----------------------------------------------------- # https://www.sphinx-doc.org/en/master/usage/configuration.html#project-information -project = "PROJECT_NAME" +project = "collegamento" copyright = "2024, Moosems" author = "Moosems" release = "v0.1.0" diff --git a/docs/source/example-usage.rst b/docs/source/example-usage.rst index 531b9a0..5e07c6a 100644 --- a/docs/source/example-usage.rst +++ b/docs/source/example-usage.rst @@ -2,7 +2,7 @@ Example Usage ============= -Now that you have ``PROJECT_NAME`` installed, let's try running a simple example that does xyz: +Now that you have ``collegamento`` installed, let's try running a simple example that does xyz: .. code-block:: python diff --git a/docs/source/examples.rst b/docs/source/examples.rst index 77558e8..3b5ce4a 100644 --- a/docs/source/examples.rst +++ b/docs/source/examples.rst @@ -1,8 +1,8 @@ ========================= -``PROJECT_NAME`` Examples +``Collegamento`` Examples ========================= -Below are links to all the examples on using ``PROJECT_NAME``. +Below are links to all the examples on using ``Collegamento``. .. toctree:: :maxdepth: 1 diff --git a/docs/source/index.rst b/docs/source/index.rst index 9080b8f..a385bdc 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -1,8 +1,8 @@ ============================== -``PROJECT_NAME`` Documentation +``Collegamento`` Documentation ============================== -Welcome to ``PROJECT_NAME``'s Documentation! ``PROJECT_NAME`` is a library that can be to do xyz. To get started with ``PROJECT_NAME``, visit the :doc:`installation` page! +Welcome to ``Collegamento``'s Documentation! ``Collegamento`` is a library that can be to do xyz. To get started with ``Collegamento``, visit the :doc:`installation` page! .. note:: diff --git a/docs/source/installation.rst b/docs/source/installation.rst index 5a2265d..d53ee05 100644 --- a/docs/source/installation.rst +++ b/docs/source/installation.rst @@ -2,12 +2,12 @@ Installation ============ -To start using ``PROJECT_NAME``, first install it using pip: +To start using ``Collegamento``, first install it using pip: .. code-block:: console - $ pip install PROJECT_NAME + $ pip install collegamento And it's installed! Congratulations on xyz! -Let's move on to the :doc:`example-usage` page to give ``PROJECT_NAME`` a try! +Let's move on to the :doc:`example-usage` page to give ``Collegamento`` a try! diff --git a/setup.py b/setup.py index 6c8a91a..013565e 100644 --- a/setup.py +++ b/setup.py @@ -1,4 +1,4 @@ -# pip install -U -r requirements-dev.txt --break-system-packages; pip uninstall albero -y --break-system-packages; pip install . --break-system-packages --no-build-isolation; python3 -m pytest . +# pip install -U -r requirements-dev.txt --break-system-packages; pip uninstall collegamento -y --break-system-packages; pip install . --break-system-packages --no-build-isolation; python3 -m pytest . from setuptools import setup with open("README.md", "r") as file: @@ -6,12 +6,12 @@ setup( - name="PROJECT_NAME", + name="collegamento", version="0.1.0", - description="PROJECT_NAME does xyz", + description="Collegamento provides an easy to use Client/Server IPC backend", author="Moosems", author_email="moosems.j@gmail.com", - url="https://github.com/salve-org/PROJECT_NAME", + url="https://github.com/salve-org/collegamento", long_description=long_description, long_description_content_type="text/markdown", install_requires=open("requirements.txt", "r+") @@ -27,5 +27,5 @@ "License :: OSI Approved :: MIT License", "Typing :: Typed", ], - packages=["PROJECT_NAME"], # , "PROJECT_NAME.subpackages"], + packages=["collegamento", "collegamento.ipc"], ) diff --git a/tests/test_file_variant.py b/tests/test_file_variant.py new file mode 100644 index 0000000..0f14bca --- /dev/null +++ b/tests/test_file_variant.py @@ -0,0 +1,41 @@ +from time import sleep + +from collegamento import USER_FUNCTION, FileClient, Request, Response + + +def func(test_arg: Request) -> bool: + return True + + +def split_str(arg: Request) -> list[str]: + file = arg["file"] # type: ignore + return file.split(" ") + + +def test_file_variants(): + commands: dict[str, USER_FUNCTION] = {"test": func} + context = FileClient(commands) + + context.update_file("test", "test contents") + context.request({"command": "test"}) + + sleep(1) + + output: Response | None = context.get_response("test") + assert output is not None # noqa: E711 + assert output["result"] is True # noqa: E712 # type: ignore + + context.add_commmand("test1", split_str) + context.request({"command": "test1", "file": "test"}) + + sleep(1) + + output: Response | None = context.get_response("test1") + assert output is not None # noqa: E711 + assert output["result"] == ["test", "contents"] # noqa: E712 # type: ignore + + context.kill_IPC() + + +if __name__ == "__main__": + test_file_variants() diff --git a/tests/test_simple.py b/tests/test_simple.py new file mode 100644 index 0000000..70c4629 --- /dev/null +++ b/tests/test_simple.py @@ -0,0 +1,37 @@ +from time import sleep + +from collegamento import USER_FUNCTION, Request, Response, SimpleClient + + +def foo(bar: Request) -> bool: + if bar["command"] == "test": + return True + return False + + +def test_Client_Server(): + commands: dict[str, USER_FUNCTION] = {"test": foo} + context = SimpleClient(commands) + + context.request({"command": "test"}) + + sleep(1) + + context.add_commmand("test1", foo) + + sleep(1) + + output: Response | None = context.get_response("test") + + assert output is not None # noqa: E711 + assert output["result"] == True # noqa: E712 # type: ignore + + context.request({"command": "test1"}) + + sleep(1) + + output: Response | None = context.get_response("test1") + assert output is not None # noqa: E711 + assert output["result"] == False # noqa: E712 # type: ignore + + context.kill_IPC() diff --git a/tests/test_xyz.py b/tests/test_xyz.py deleted file mode 100644 index 93f5228..0000000 --- a/tests/test_xyz.py +++ /dev/null @@ -1,2 +0,0 @@ -def test_xyz(): - pass