Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop multiple or all scans at once. #844

Merged
merged 8 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 29 additions & 5 deletions src/ostorlab/cli/scan/stop/stop.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
Example of usage:
- ostorlab scan list --source=source."""

from typing import Tuple

Check warning on line 6 in src/ostorlab/cli/scan/stop/stop.py

View check run for this annotation

Codecov / codecov/patch

src/ostorlab/cli/scan/stop/stop.py#L6

Added line #L6 was not covered by tests

import click
from ostorlab.cli.scan import scan
from ostorlab.cli import console as cli_console
Expand All @@ -11,14 +13,36 @@


@scan.command()
@click.argument("scan_id", required=True)
@click.argument("scan_ids", nargs=-1, type=int, required=False)
@click.option(

Check warning on line 17 in src/ostorlab/cli/scan/stop/stop.py

View check run for this annotation

Codecov / codecov/patch

src/ostorlab/cli/scan/stop/stop.py#L16-L17

Added lines #L16 - L17 were not covered by tests
"--all",
elyousfi5 marked this conversation as resolved.
Show resolved Hide resolved
"-a",
"stop_all",
is_flag=True,
help="Stop all running scans",
default=False,
)
@click.pass_context
def stop(ctx: click.core.Context, scan_id: int) -> None:
"""Stop a scan.\n
def stop(ctx: click.core.Context, scan_ids: Tuple[int, ...], stop_all: bool) -> None:

Check warning on line 26 in src/ostorlab/cli/scan/stop/stop.py

View check run for this annotation

Codecov / codecov/patch

src/ostorlab/cli/scan/stop/stop.py#L26

Added line #L26 was not covered by tests
"""Stop one or multiple scans.\n
Usage:\n
- ostorlab scan --runtime=local stop --id=id
- ostorlab scan --runtime=local stop 4
- ostorlab scan --runtime=local stop 4 5 6
- ostorlab scan --runtime=local stop --all
"""
if len(scan_ids) == 0 and stop_all is False:
raise click.UsageError("Either provide scan IDs or use --all flag")

Check warning on line 34 in src/ostorlab/cli/scan/stop/stop.py

View check run for this annotation

Codecov / codecov/patch

src/ostorlab/cli/scan/stop/stop.py#L34

Added line #L34 was not covered by tests

runtime_instance = ctx.obj["runtime"]
with console.status("Stopping scan"):
if stop_all is True:
scans_list = runtime_instance.list()
ids_to_stop = [s.id for s in scans_list]
if len(ids_to_stop) == 0:
console.warning("No running scans found.")
return
else:
ids_to_stop = list(scan_ids)

console.info(f"Stopping {len(ids_to_stop)} scan(s).")
for scan_id in ids_to_stop:
runtime_instance.stop(scan_id=scan_id)
11 changes: 8 additions & 3 deletions src/ostorlab/runtimes/local/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,9 @@
logger.info(
"comparing %s and %s", service_labels.get("ostorlab.universe"), scan_id
)
if service_labels.get("ostorlab.universe") == scan_id:
if service_labels.get("ostorlab.universe") is not None and int(
service_labels.get("ostorlab.universe")
) == int(scan_id):
stopped_services.append(service)
service.remove()

Expand All @@ -309,7 +311,7 @@
network_labels = network.attrs["Labels"]
if (
network_labels is not None
and network_labels.get("ostorlab.universe") == scan_id
and int(network_labels.get("ostorlab.universe")) == scan_id
):
logger.info("removing network %s", network_labels)
stopped_network.append(network)
Expand All @@ -318,7 +320,10 @@
configs = self._docker_client.configs.list()
for config in configs:
config_labels = config.attrs["Spec"]["Labels"]
if config_labels.get("ostorlab.universe") == scan_id:
if (

Check warning on line 323 in src/ostorlab/runtimes/local/runtime.py

View check run for this annotation

Codecov / codecov/patch

src/ostorlab/runtimes/local/runtime.py#L323

Added line #L323 was not covered by tests
config_labels.get("ostorlab.universe") is not None
and int(config_labels.get("ostorlab.universe")) == scan_id
):
logger.info("removing config %s", config_labels)
stopped_configs.append(config)
config.remove()
Expand Down
112 changes: 108 additions & 4 deletions tests/cli/scan/stop/test_scan_stop.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
"""Tests for scan stop command."""

from unittest import mock

from click.testing import CliRunner
from ostorlab.cli import rootcli
from pytest_mock import plugin

from ostorlab.apis.runners import authenticated_runner
from ostorlab.cli import rootcli
from ostorlab.runtimes.local import runtime as local_runtime

from unittest import mock


def testOstorlabScanStopCLI_whenRuntimeIsRemoteAndScanIdIsValid_stopsScan(
httpx_mock,
Expand Down Expand Up @@ -70,4 +72,106 @@ def testOstorlabScanStopCLI_whenRuntimeIsLocal_callsStopMethodWithProvidedId(

runner.invoke(rootcli.rootcli, ["scan", "--runtime=local", "stop", "123456"])

mock_scan_stop.assert_called_once_with(scan_id="123456")
mock_scan_stop.assert_called_once_with(scan_id=123456)


@mock.patch.object(local_runtime.LocalRuntime, "stop")
def testOstorlabScanStopCLI_whenMultipleScanIdsAreProvided_stopsAllProvidedScans(
mock_scan_stop: mock.Mock, mocker: plugin.MockerFixture
) -> None:
"""Test ostorlab scan stop command with multiple scan ids.
Should call stop method for each provided scan id.
"""

mock_scan_stop.return_value = None
mocker.patch("ostorlab.runtimes.local.LocalRuntime.__init__", return_value=None)
runner = CliRunner()

result = runner.invoke(
rootcli.rootcli, ["scan", "--runtime=local", "stop", "1", "2", "3"]
)

assert result.exception is None
assert "Stopping 3 scan(s)" in result.output
assert mock_scan_stop.call_count == 3
mock_scan_stop.assert_any_call(scan_id=1)
mock_scan_stop.assert_any_call(scan_id=2)
mock_scan_stop.assert_any_call(scan_id=3)


@mock.patch.object(local_runtime.LocalRuntime, "stop")
@mock.patch.object(local_runtime.LocalRuntime, "list")
def testOstorlabScanStopCLI_whenStopAllIsUsedAndScansExist_stopsAllScans(
mock_list_scans: mock.Mock, mock_scan_stop: mock.Mock, mocker: plugin.MockerFixture
) -> None:
"""Test ostorlab scan stop command with --all flag.
Should stop all running scans.
"""

mock_list_scans.return_value = [
mock.Mock(id=101),
mock.Mock(id=102),
mock.Mock(id=103),
]
mock_scan_stop.return_value = None
mocker.patch("ostorlab.runtimes.local.LocalRuntime.__init__", return_value=None)
runner = CliRunner()

result = runner.invoke(
rootcli.rootcli, ["scan", "--runtime=local", "stop", "--all"]
)

assert result.exception is None
assert "Stopping 3 scan(s)" in result.output
assert mock_scan_stop.call_count == 3
mock_scan_stop.assert_any_call(scan_id=101)
mock_scan_stop.assert_any_call(scan_id=102)
mock_scan_stop.assert_any_call(scan_id=103)


@mock.patch.object(local_runtime.LocalRuntime, "list")
def testOstorlabScanStopCLI_whenStopAllIsUsedAndNoScansExist_showsWarning(
mock_list_scans: mock.Mock, mocker: plugin.MockerFixture
) -> None:
"""Test ostorlab scan stop command with --all flag.
Should show warning message when no scans are running.
"""

mock_list_scans.return_value = []
mocker.patch("ostorlab.runtimes.local.LocalRuntime.__init__", return_value=None)
runner = CliRunner()

result = runner.invoke(
rootcli.rootcli, ["scan", "--runtime=local", "stop", "--all"]
)

assert result.exception is None
assert "No running scans found" in result.output


@mock.patch.object(local_runtime.LocalRuntime, "stop")
@mock.patch.object(local_runtime.LocalRuntime, "list")
def testOstorlabScanStopCLI_whenStopAllWithShorthandIsUsedAndScansExist_stopsAllScans(
mock_list_scans: mock.Mock, mock_scan_stop: mock.Mock, mocker: plugin.MockerFixture
) -> None:
"""Test ostorlab scan stop command with --all flag.
Should stop all running scans.
"""

mock_list_scans.return_value = [
mock.Mock(id=101),
mock.Mock(id=102),
mock.Mock(id=103),
]
mock_scan_stop.return_value = None
mocker.patch("ostorlab.runtimes.local.LocalRuntime.__init__", return_value=None)
runner = CliRunner()

result = runner.invoke(rootcli.rootcli, ["scan", "--runtime=local", "stop", "-a"])

assert result.exception is None
assert "Stopping 3 scan(s)" in result.output
assert mock_scan_stop.call_count == 3
mock_scan_stop.assert_any_call(scan_id=101)
mock_scan_stop.assert_any_call(scan_id=102)
mock_scan_stop.assert_any_call(scan_id=103)
Loading