+Engine
+Engine is the central component of the AIMM system. Its purpose is handling
+manageable calls to plugins, using the backend when persistence is required and
+serving its state to all controllers.
+Engine’s part of the configuration needs to satisfy the following schema:
+---
+$schema: 'http://json-schema.org/schema#'
+id: 'aimm://server/engine.yaml#'
+type: object
+required:
+ - sigterm_timeout
+ - max_children
+ - check_children_period
+properties:
+ sigterm_timeout:
+ type: number
+ max_children:
+ type: number
+ check_children_period:
+ type: number
+...
+
+
+When engine is started, it queries its backend to access all existing model
+instances. The instances are then stored in engine’s state. Any component
+holding a reference to the engine may use it to perform different actions, such
+as creating model instances, fitting them or using them for predictions. When a
+new model instance is created, or an old one is updated, state change is
+notified to any components that have subscribed to state changes. Additionally,
+calls to the plugins themselves are manageable and engine keeps theirs states
+in its state as well.
+Workflow actions such as fitting or using models are ran asynchronously in
+separate asyncio tasks. Additionally, any plugin they call is ran in a separate
+process, wrapped in a handler that allows subscriptions to call’s state (if the
+plugin call supports state notification). The calls can also be cancelled,
+which is done using signals - initially SIGTERM and later SIGKILL if a
+configured timeout expires. Also, to avoid fork bombs, a separate pseudo-pool
+is implemented to serve as an interface for process creation, disallowing
+creation of new processes if a certain number of subprocesses is already
+running.
+Engine configuration reflects the multiprocessing nature of the implementation,
+since all of the options refer to different timeouts and limitations:
+
+
+sigterm_timeout
is the number of seconds waited after SIGTERM was sent
+to a process before SIGKILL is sent if it doesn’t terminate
+max_children
is the maximum amount of children - concurrent subprocesses
+that can run at the same time
+check_children_period
- the check for children counts is done
+periodically and this setting indicates how often it is checked
+
+
+Engine module provides the following interface:
+
+-
+class aimm.server.common.Engine
+Engine interface
+
+-
+abstract property state: Dict
+Engine state, contains references to all models and actions. It’s
+never modified in-place, instead subscribe_to_state_change()
+should be used
+
+
+
+-
+abstract subscribe_to_state_change(cb: Callable[[], None]) → RegisterCallbackHandle
+Subscribes to any changes to the engine state
+
+
+
+-
+abstract create_instance(model_type: str, *args: Any, **kwargs: Any) → Action
+Starts an action that creates a model instance and stores it in
+state.
+
+- Parameters:
+-
+
+
+
+
+
+-
+abstract async add_instance(model_type: str, instance: Any) → Model
+Adds existing instance to the state
+
+
+
+-
+abstract async update_instance(model: Model)
+Update existing instance in the state
+
+
+
+-
+abstract fit(instance_id: int, *args: Any, **kwargs: Any) → Action
+Starts an action that fits an existing model instance. The used
+fitting function is the one assigned to the model type. The instance,
+while it is being fitted, is not accessible by any of the other
+functions that would use it (other calls to fit, predictions, etc.).
+
+- Parameters:
+
+instance_id – id of model instance that will be fitted
+*args – arguments to pass to the fitting function - if of type
+aimm.server.common.DataAccess
, the value passed to the
+fitting function is the result of the call to that plugin,
+other arguments are passed directly
+**kwargs – keyword arguments, work the same as the positional
+arguments
+
+
+
+
+
+
+-
+abstract predict(instance_id: int, *args: Any, **kwargs: Any) → Action
+Starts an action that uses an existing model instance to perform a
+prediction. The used prediction function is the one assigned to model’s
+type. The instance, while prediction is called, is not accessible by
+any of the other functions that would use it (other calls to predict,
+fittings, etc.). If instance has changed while predicting, it is
+updated in the state and database.
+
+- Parameters:
+
+instance_id – id of the model instance used for prediction
+*args – arguments to pass to the predict function - if of type
+aimm.server.common.DataAccess
, the value passed to the
+predict function is the result of the call to that plugin,
+other arguments are passed directly
+**kwargs – keyword arguments, work the same as the positional
+arguments
+
+
+- Returns:
+Reference to task of the manageable predict call, result of it is
+the model’s prediction
+
+
+
+
+
+
+
+-
+class aimm.server.common.Action
+Represents a manageable call. Is an aio.Resource
so call can be
+cancelled using async_close
.
+
+-
+abstract async wait_result() → Any
+Wait until call returns a result. May raise
+asyncio.CancelledError
in case the call was cancelled.
+
+
+
+
+Since there are no strict limitations on the arguments that may be passed to
+plugins, i.e., positional and keyword arguments are mostly passed as-is,
+callers of the actions have the options of passing different special kinds of
+objects as arguments. These objects are interpreted by the engine as subactions
+that need to be executed before the main action. E.g., a fitting function may
+expect a dataset as an input, and while it is possible to pass the dataset
+directly to engine’s Engine.fit()
call, the caller could create a
+aimm.server.common.DataAccess
object and pass it instead. This would
+indicate to the engine that it needs to use the data access plugin to access
+the required data before fitting. All subactions are also ran in a separate
+subprocesses and notify their progress through state.
+
+State
+State is a dictionary consisting of two properties, models
and actions
.
+Models are a dictionary with instance IDs as keys and
+aimm.server.common.Model
instances as values. Actions are also a
+dictionary, with the following structure:
+---
+description: keys are action IDs
+patternProperties:
+ '(.)+':
+ oneOf:
+ - type: 'null'
+ description: prior to start of the action call
+ - type: object
+ required:
+ - meta
+ - progress
+ properties:
+ meta:
+ type: object
+ required:
+ - call
+ properties:
+ call:
+ type: string
+ description: call that the action is making
+ model_type:
+ type: string
+ model:
+ type: integer
+ args:
+ type: array
+ kwargs:
+ type: object
+ progress:
+ enum:
+ - accessing_data
+ - executing
+ - complete
+ data_access:
+ type: object
+ description: |
+ keys represent argument IDs (numbers for
+ positional, strings for named), values are set by
+ plugin's state callbacks
+ action:
+ description: set by plugin state callback
+...
+
+
+
+
+Multiprocessing
+The details of the multiprocessing implementation are placed in a separate
+module, aimm.server.mprocess
. This module is in charge of providing an
+interface for managed process calls. The central class of the module is the
+aimm.server.mprocess.ProcessManager
. Its purpose is similar to one of
+a standard multiprocessing.Pool
, main difference being that it does
+not keep an exact amount of process workers alive at all times and instead
+holds an asyncio.Condition
that prevents creation of new processes
+until the number of children is under the max_children
configuration
+parameter.
+The manager is implemented in the following class:
+
+-
+class aimm.server.mprocess.ProcessManager(max_children: int, async_group: Group, check_children_period: float, sigterm_timeout: float)
+Class used to create ProcessHandler
objects and limit the
+amount of concurrently active child processes.
+
+- Parameters:
+
+max_children – maximum number of child processes that may be created
+async_group – async group
+check_children_period – number of seconds waited before checking if a
+child process may be created and notifying pending handlers
+sigterm_timeout – number of seconds waited before sending SIGKILL if a
+child process does not terminate after SIGTERM
+
+
+
+
+-
+property async_group: Group
+Group controlling resource’s lifetime.
+
+
+
+-
+create_handler(state_cb: Callable[[Any], None]) → ProcessHandler
+Creates a ProcessHandler
+
+- Parameters:
+state_cb (Callable[List[Any], None]) – state callback for changes
+in the process call
+
+- Returns:
+ProcessHandler
+
+
+
+
+
+
+The process calls are wrapped in a aimm.server.mprocess.ProcessHandler
+instance, whose interface allows callers to terminate the process call. It also
+allows callers to pass their state change callback functions which are called
+whenever the process’ state changes.
+After calling aimm.server.mprocess.ProcessManager.create_handler()
and
+receiving a process handler, the call can be made using the
+aimm.server.mprocess.ProcessHandler.run()
function, which, in reality,
+first spawns an asyncio task that blocks until the process manager allows
+creation of a new process and only then actually creates a new process.
+The state notification is done using callbacks and multiprocessing pipes.
+Process handler receives a state_cb
argument in its constructor and this is
+the function used to notify states to the rest of the system. It also provides
+a method proc_notify_state_change
, which is a callback passed to the
+function running in the separate process. This function uses a
+multiprocessing.Pipe
object to send function’s state values (need to
+be pickle-able). Handlers also have internal state listening loops, running in
+the main asyncio event loop, that react to receiving these state changes and
+notify the rest of the system using the state_cb
passed in the constructor.
+Result of the separated process call is also passed through a separate pipe and
+set as the result of the aimm.server.mprocess.ProcessHandler.result
+property.
+The complete class docstring:
+
+-
+class aimm.server.mprocess.ProcessHandler(async_group: Group, sigterm_timeout: float, state_cb: Callable[[Any], None], condition: Condition)
+Handler for calls in separate processes. Created through
+ProcessManager.create()
.
+
+- Parameters:
+
+async_group (hat.aio.Group) – async group
+sigterm_timeout (float) – time waited until process handles SIGTERM
+before sending SIGKILL during forced shutdown
+state_cb (Optional[Callable[Any]]) – state change cb
+condition (asyncio.Condition) – condition that notifies when a new
+process may be created
+
+
+
+
+-
+property async_group: Group
+Group controlling resource’s lifetime.
+
+
+
+-
+proc_notify_state_change(state: Any)
+To be passed to and ran in the separate process call. Notifies the
+handler of state change, new state is passed to state_cb
received
+in the constructor.
+
+- Parameters:
+state – call state, needs to be pickleable
+
+
+
+
+
+-
+async run(fn: Callable, *args: Any, **kwargs: Any)
+Requests the start of function execution in the separate process.
+
+- Parameters:
+
+fn – function that will be called
+*args – positional arguments, need to be pickleable
+**kwargs – keyword arguments, need to be pickleable
+
+
+
+
+
+
+
+
+