-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
5 changed files
with
182 additions
and
5 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
#!/usr/bin/env python | ||
# -*- coding: utf-8 -*- | ||
# | ||
# @Author: José Sánchez-Gallego ([email protected]) | ||
# @Date: 2024-12-24 | ||
# @Filename: engineering.py | ||
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause) | ||
|
||
from __future__ import annotations | ||
|
||
from typing import TYPE_CHECKING | ||
|
||
import click | ||
|
||
from . import parser | ||
|
||
|
||
if TYPE_CHECKING: | ||
from lvmecp.actor import ECPCommand | ||
|
||
|
||
@parser.group(name="engineering-mode") | ||
def engineering_mode(): | ||
"""Enable/disable the engineering mode.""" | ||
|
||
pass | ||
|
||
|
||
@engineering_mode.command() | ||
@click.option( | ||
"--timeout", | ||
"-t", | ||
type=float, | ||
help="Timeout for the engineering mode. " | ||
"If not passed, the default timeout is used.", | ||
) | ||
async def enable(command: ECPCommand, timeout: float | None = None): | ||
"""Enables the engineering mode.""" | ||
|
||
await command.actor.engineering_mode(True, timeout=timeout) | ||
|
||
return command.finish(engineering_mode=True) | ||
|
||
|
||
@engineering_mode.command() | ||
async def disable(command: ECPCommand): | ||
"""Disables the engineering mode.""" | ||
|
||
await command.actor.engineering_mode(False) | ||
|
||
return command.finish(engineering_mode=False) | ||
|
||
|
||
@engineering_mode.command() | ||
async def status(command: ECPCommand): | ||
"""Returns the status of the engineering mode.""" | ||
|
||
status = await command.actor.engineering_mode() | ||
|
||
return command.finish(engineering_mode=status) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
#!/usr/bin/env python | ||
# -*- coding: utf-8 -*- | ||
# | ||
# @Author: José Sánchez-Gallego ([email protected]) | ||
# @Date: 2024-12-24 | ||
# @Filename: test_command_heartbeat.py | ||
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause) | ||
|
||
from __future__ import annotations | ||
|
||
import asyncio | ||
|
||
from typing import TYPE_CHECKING | ||
|
||
|
||
if TYPE_CHECKING: | ||
from pytest_mock import MockerFixture | ||
|
||
from lvmecp.actor import ECPActor | ||
|
||
|
||
async def test_command_engineering_mode_status(actor: ECPActor): | ||
cmd = await actor.invoke_mock_command("engineering-mode status") | ||
await cmd | ||
|
||
assert cmd.status.did_succeed | ||
assert cmd.replies.get("engineering_mode") is False | ||
|
||
|
||
async def test_command_engineering_mode_enable(actor: ECPActor, mocker: MockerFixture): | ||
eng_mode_mock = mocker.patch.object(actor, "engineering_mode") | ||
|
||
cmd = await actor.invoke_mock_command("engineering-mode enable --timeout 10") | ||
await cmd | ||
|
||
assert cmd.status.did_succeed | ||
eng_mode_mock.assert_called_once_with(True, timeout=10) | ||
|
||
|
||
async def test_command_engineering_mode_no_mock(actor: ECPActor): | ||
cmd = await actor.invoke_mock_command("engineering-mode enable --timeout 10") | ||
await cmd | ||
|
||
assert await actor.engineering_mode() is True | ||
|
||
cmd = await actor.invoke_mock_command("engineering-mode disable") | ||
await cmd | ||
|
||
assert await actor.engineering_mode() is False | ||
|
||
|
||
async def test_command_engineering_mode_timeouts(actor: ECPActor): | ||
actor._engineering_mode_hearbeat_interval = 0.1 # To speed up the test | ||
|
||
cmd = await actor.invoke_mock_command("engineering-mode enable --timeout 0.2") | ||
await cmd | ||
|
||
assert await actor.engineering_mode() is True | ||
|
||
await asyncio.sleep(0.3) | ||
|
||
assert await actor.engineering_mode() is False |