Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the capnp server classes available #61

Merged
merged 2 commits into from
Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion src/labone/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,12 @@
from labone.core import ListNodesFlags
from labone.dataserver import DataServer
from labone.instrument import Instrument
from labone.server.session import SessionFunctionality

__all__ = ["__version__", "Instrument", "DataServer", "ListNodesFlags"]
__all__ = [
"__version__",
"Instrument",
"DataServer",
"ListNodesFlags",
"SessionFunctionality",
]
2 changes: 0 additions & 2 deletions src/labone/mock/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@

from labone.mock.automatic_session_functionality import AutomaticSessionFunctionality
from labone.mock.entry_point import spawn_hpk_mock
from labone.mock.session_mock_template import SessionMockFunctionality

__all__ = [
"spawn_hpk_mock",
"AutomaticSessionFunctionality",
"SessionMockFunctionality",
]
4 changes: 2 additions & 2 deletions src/labone/mock/automatic_session_functionality.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@
Value,
)
from labone.mock.errors import LabOneMockError
from labone.mock.session_mock_template import SessionMockFunctionality, Subscription
from labone.node_info import NodeInfo
from labone.server.session import SessionFunctionality, Subscription

if t.TYPE_CHECKING:
from labone.core.helper import LabOneNodePath
Expand All @@ -59,7 +59,7 @@ class PathData:
streaming_handles: list[Subscription]


class AutomaticSessionFunctionality(SessionMockFunctionality):
class AutomaticSessionFunctionality(SessionFunctionality):
"""Predefined behaviour for HPK mock.

Args:
Expand Down
12 changes: 6 additions & 6 deletions src/labone/mock/entry_point.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
from labone.core.reflection.server import ReflectionServer
from labone.core.session import Session
from labone.mock.hpk_schema import get_schema
from labone.mock.mock_server import start_local_mock
from labone.mock.session_mock_template import SessionMockTemplate
from labone.server.server import start_local_server
from labone.server.session import SessionInterface

if TYPE_CHECKING:
import capnp

from labone.core.helper import CapnpCapability
from labone.mock.session_mock_template import SessionMockFunctionality
from labone.server.session import SessionFunctionality


class MockSession(Session):
Expand Down Expand Up @@ -44,7 +44,7 @@ def __init__(


async def spawn_hpk_mock(
functionality: SessionMockFunctionality,
functionality: SessionFunctionality,
) -> MockSession:
"""Shortcut for creating a mock server.

Expand All @@ -66,9 +66,9 @@ async def spawn_hpk_mock(
capnp.lib.capnp.KjException: If the schema is invalid. Or the id
of the concrete server is not in the schema.
"""
server, client = await start_local_mock(
server, client = await start_local_server(
schema=get_schema(),
mock=SessionMockTemplate(functionality),
server=SessionInterface(functionality),
)
reflection = await ReflectionServer.create_from_connection(client)
return MockSession(
Expand Down
19 changes: 19 additions & 0 deletions src/labone/server/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
"""Subpackage for the server functionality of the LabOne API.

This subpackage allows to create a data server through capnp.
"""

from labone.server.server import CapnpServer, start_server
from labone.server.session import (
SessionFunctionality,
SessionInterface,
Subscription,
)

__all__ = [
"CapnpServer",
"start_server",
"SessionFunctionality",
"SessionInterface",
"Subscription",
]
83 changes: 65 additions & 18 deletions src/labone/mock/mock_server.py → src/labone/server/server.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Abstract reflection as base for mock servers.
"""Abstract reflection as base for servers.

A capnp reflection server is dynamically created from a binary schema file.
This server can provide the schema (getTheSchema) and takes
Expand All @@ -8,8 +8,11 @@

from __future__ import annotations

import argparse
import asyncio
import socket
from abc import ABC
from contextlib import suppress
from typing import TYPE_CHECKING

import capnp
Expand All @@ -21,10 +24,10 @@
from capnp.lib.capnp import _CallContext, _DynamicStructBuilder


class ServerTemplate(ABC):
class CapnpServer(ABC):
"""Common interface for concrete server implementations.

Both Hpk and Simplon servers will implement this interface.
Servers will implement this interface.
It stands for the actual functionality of the server, which
will be defined in the subclasses.

Expand All @@ -39,7 +42,7 @@ class ServerTemplate(ABC):
def capnp_server_factory(
stream: capnp.AsyncIoStream,
schema: CapnpStructReader,
mock: ServerTemplate,
server: CapnpServer,
) -> capnp.TwoPartyServer:
"""Dynamically create a capnp server.

Expand All @@ -50,26 +53,26 @@ def capnp_server_factory(
Args:
stream: Stream for the server.
schema: Parsed capnp schema (`reflection_capnp.CapSchema`).
mock: The concrete server implementation.
server: The concrete server implementation.

Returns:
Dynamically created capnp server.
"""
schema_parsed_dict = schema.to_dict()
parsed_schema = ParsedWireSchema(schema.theSchema)
capnp_interface = capnp.lib.capnp._InterfaceModule( # noqa: SLF001
parsed_schema.full_schema[mock.server_id].schema.as_interface(),
parsed_schema.full_schema[mock.server_id].name,
parsed_schema.full_schema[server.server_id].schema.as_interface(),
parsed_schema.full_schema[server.server_id].name,
)

class MockServerImpl(capnp_interface.Server): # type: ignore[name-defined]
class ServerImpl(capnp_interface.Server): # type: ignore[name-defined]
"""Dynamically created capnp server.

Redirects all calls (except getTheSchema) to the concrete server implementation.
"""

def __init__(self) -> None:
self._mock = mock
self._server = server
# parsed schema needs to stay alive as long as the server is.
self._parsed_schema = parsed_schema

Expand All @@ -78,8 +81,8 @@ def __getattr__(
name: str,
) -> _DynamicStructBuilder | list[_DynamicStructBuilder] | str | list[str]:
"""Redirecting all calls to the concrete server implementation."""
if hasattr(self._mock, name):
return getattr(self._mock, name)
if hasattr(self._server, name):
return getattr(self._server, name)
return getattr(super(), name)

async def getTheSchema( # noqa: N802
Expand All @@ -103,23 +106,23 @@ async def getTheSchema( # noqa: N802
# Otherwise the underlying capnp object need to be copied manually to avoid
# segfaults
_context.results.theSchema.from_dict(schema_parsed_dict)
_context.results.theSchema.typeId = mock.type_id
_context.results.theSchema.typeId = server.type_id

return capnp.TwoPartyServer(stream, bootstrap=MockServerImpl())
return capnp.TwoPartyServer(stream, bootstrap=ServerImpl())


async def start_local_mock(
async def start_local_server(
schema: CapnpStructReader,
mock: ServerTemplate,
server: CapnpServer,
) -> tuple[capnp.TwoPartyServer, capnp.AsyncIoStream]:
"""Starting a local mock server.
"""Starting a local server.

This is equivalent to the `capnp_server_factory` but with the addition that
a local socket pair is created for the server.

Args:
schema: Parsed capnp schema (`reflection_capnp.CapSchema`).
mock: The concrete server implementation.
server: The concrete server implementation.

Returns:
The server and the client connection.
Expand All @@ -131,4 +134,48 @@ async def start_local_mock(
reader = await capnp.AsyncIoStream.create_connection(sock=read)
writer = await capnp.AsyncIoStream.create_connection(sock=write)
# create server for the local socket pair
return capnp_server_factory(writer, schema, mock), reader
return capnp_server_factory(writer, schema, server), reader


def start_server(
schema: CapnpStructReader,
server: CapnpServer,
) -> None:
"""Start the server.

Start the capnp server with the given schema and server instance.
The server address and port can be specified via command line arguments.
The server keeps running until it is interrupted.

Args:
schema: Parsed capnp schema (`reflection_capnp.CapSchema`).
server: The concrete server implementation.
"""

def _host_port() -> tuple[str, int]:
parser = argparse.ArgumentParser(
usage="""Runs the server bound to the given address/port ADDRESS. """,
)

parser.add_argument("address", help="ADDRESS:PORT")

return parser.parse_args().address.split(":")

async def _new_connection(
stream: capnp.AsyncIoStream,
) -> None:
await capnp_server_factory(
stream=stream,
schema=schema,
server=server,
).on_disconnect()

async def _run_server() -> None:
await ensure_capnp_event_loop()
host, port = _host_port()
server = await capnp.AsyncIoStream.create_server(_new_connection, host, port)
async with server:
await server.serve_forever()

with suppress(KeyboardInterrupt):
asyncio.run(_run_server())
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
"""Hpk Mock Server method definitions.
"""Capnp Server method definitions.

This module contains the method definitions for the Hpk Mock Server,
This module contains the method definitions for the Capnp Server,
including setting and getting values, listing nodes, and subscribing to
nodes. This specific capnp server methods define the specific
Hpk behavior.
The logic of the capnp methods is deligated to the HpkMockFunctionality class,
The logic of the capnp methods is deligated to the SessionFunctionality class,
which offers a blueprint meant to be overriden by the user.
"""

Expand All @@ -21,7 +21,7 @@
_capnp_value_to_python_value,
value_from_python_types_dict,
)
from labone.mock.mock_server import ServerTemplate
from labone.server.server import CapnpServer

if TYPE_CHECKING:
from capnp.lib.capnp import (
Expand Down Expand Up @@ -86,22 +86,22 @@ def path(self) -> LabOneNodePath:
return self._path


class SessionMockFunctionality(ABC):
"""Hpk blueprint for defining mock server behavior.
class SessionFunctionality(ABC):
"""Blueprint for defining the session behavior.

The HpKMockFunctionality class offers a interface between
The SessionFunctionality class offers a interface between
capnp server logic and the user. The user can override the methods
to define an individual mock server. The signature of the methods
to define an individual server. The signature of the methods
is mostly identical to the session-interface on the caller side.
Thereby it feels as if the session-interface is overritten directly,
hiding the capnp server logic from the user.

Two possible ways to use this class arise:
* Call methods indirectly (via capnp server), by having a session
to a mock server.
to a server.
* Call methods directly. This can be used to manipulate the state
internally. Limitations of what can be set to a server are
bypassed. E.g. can be useful when setting shf vector nodes.
bypassed. E.g. can be useful when setting SHF vector nodes.

Both approaches can be combined.
"""
Expand Down Expand Up @@ -234,26 +234,26 @@ def build_capnp_error(error: Exception) -> _DynamicStructBuilder:
}


class SessionMockTemplate(ServerTemplate):
"""Hpk Mock Server.
class SessionInterface(CapnpServer):
"""Capnp session interface.

The logic for answering capnp requests is outsourced and taken as an argument.
This allows for custom mock server definition while keeping this classes
This allows for custom server definition while keeping this classes
code static.

Note:
Methods within serve for capnp to answer requests. They should not be
called directly. They should not be overritten in order to define
custom behavior. Instead, override the methods of HpkMockFunctionality.
custom behavior. Instead, override the methods of SessionFunctionality.

Args:
functionality: The implementation of the mock server behavior.
functionality: The implementation of the server behavior.
"""

server_id = HPK_SCHEMA_ID
type_id = SESSION_SCHEMA_ID

def __init__(self, functionality: SessionMockFunctionality) -> None:
def __init__(self, functionality: SessionFunctionality) -> None:
self._functionality = functionality

async def getSessionVersion( # noqa: N802
Expand Down Expand Up @@ -431,7 +431,7 @@ async def subscribe(
"""Capnp server method to subscribe to nodes.

Do not override this method. Instead, override 'subscribe_logic'
of HpkMockFunctionality (or subclass).
of SessionFunctionality (or subclass).

Args:
subscription: Capnp object containing information on
Expand Down
10 changes: 5 additions & 5 deletions tests/core/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
from labone.mock import AutomaticSessionFunctionality, spawn_hpk_mock
from labone.mock.entry_point import MockSession
from labone.mock.hpk_schema import get_schema
from labone.mock.mock_server import start_local_mock
from labone.mock.session_mock_template import SessionMockTemplate
from labone.server.server import start_local_server
from labone.server.session import SessionInterface

from .resources import session_protocol_capnp, testfile_capnp, value_capnp

Expand Down Expand Up @@ -954,7 +954,7 @@ async def test_set_transaction_mix_multiple_devices():
assert (await session_b.get("a")).value == 2


class DummyServerVersionTest(SessionMockTemplate):
class DummyServerVersionTest(SessionInterface):
def __init__(self, version: str):
super().__init__(None)
self._version = version
Expand All @@ -975,9 +975,9 @@ async def getSessionVersion(self, _context): # noqa: N802
)
@pytest.mark.asyncio()
async def test_ensure_compatibility_mismatch(version, should_fail):
mock_server, client_connection = await start_local_mock(
mock_server, client_connection = await start_local_server(
schema=get_schema(),
mock=DummyServerVersionTest(version),
server=DummyServerVersionTest(version),
)
reflection = await ReflectionServer.create_from_connection(client_connection)
session = MockSession(
Expand Down
Loading
Loading