Skip to content

Commit

Permalink
Merge pull request #25 from Ostorlab/add_task_scan_config_args
Browse files Browse the repository at this point in the history
Add OpenVas scan config as argument and switch to GVMD_FULL_DEEP_ULTI…
  • Loading branch information
amine3 authored Sep 25, 2022
2 parents aeddb8b + d2398f2 commit d33e41c
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 9 deletions.
6 changes: 4 additions & 2 deletions agent/openvas.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,20 @@

ALL_IANA_ASSIGNED_TCP_UDP = '4a4717fe-57d2-11e1-9a26-406186ea4fc5'
GVMD_FULL_FAST_CONFIG = 'daba56c8-73ec-11df-a475-002264764cea'
GVMD_FULL_DEEP_ULTIMATE_CONFIG = '74db13d6-7489-11df-91b9-002264764cea'
OPENVAS_SCANNER_ID = '08b69003-5fc2-4037-a479-93b440211c73'
GMP_USERNAME = 'admin'
GMP_PASSWORD = 'admin'
WAIT_TIME = 30

class OpenVas:
"""OpenVas wrapper to enable using openvas scanner from ostorlab agent class."""
def start_scan(self, target: str) -> str:
def start_scan(self, target: str, scan_config_id: str) -> str:
"""Start OpenVas scan on the ip provided.
Args:
target: Target ip to scan.
scan_config_id: scan configuration used by the task.
Returns:
OpenVas task identifier.
"""
Expand All @@ -35,7 +37,7 @@ def start_scan(self, target: str) -> str:
logger.debug('Creating target')
target_id = self._create_target(gmp, target, ALL_IANA_ASSIGNED_TCP_UDP)
logger.debug('Creating task for target %s', target_id)
task_id = self._create_task(gmp, target, target_id, GVMD_FULL_FAST_CONFIG, OPENVAS_SCANNER_ID, )
task_id = self._create_task(gmp, target, target_id, scan_config_id, OPENVAS_SCANNER_ID, )
logger.debug('Creating report for task %s', task_id)
report_id = self._start_task(gmp, task_id)
logger.info('Started scan of host %s. Corresponding report ID is %s', str(target), str(report_id))
Expand Down
6 changes: 4 additions & 2 deletions agent/openvas_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def __init__(self,
) -> None:
super().__init__(agent_definition, agent_settings)
persist_mixin.AgentPersistMixin.__init__(self, agent_settings)
self._scope_regex: Optional[str] = self.args.get('_scope_regex')
self._scope_regex: Optional[str] = self.args.get('scope_regex')

def start(self) -> None:
"""Calls the start.sh script to bootstrap the scanner."""
Expand Down Expand Up @@ -100,7 +100,9 @@ def process(self, message: m.Message) -> None:
logger.info('scanning target %s', target)
if not self._should_process_target(self._scope_regex, target):
return
task_id = openvas_wrapper.start_scan(target)
task_id = openvas_wrapper.start_scan(target,
self.args.get('scan_config_id', openvas.GVMD_FULL_DEEP_ULTIMATE_CONFIG)
)
openvas_wrapper.wait_task(task_id)
result = openvas_wrapper.get_results()
if result is not None:
Expand Down
3 changes: 3 additions & 0 deletions ostorlab.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,6 @@ args:
- name: "scope_regex"
type: "string"
description: "to define scanning scope."
- name: "scan_config_id"
type: "string"
description: "OpenVas scan configuration used by the task."
12 changes: 7 additions & 5 deletions tests/openvas_agent_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@
from ostorlab.agent.mixins import agent_report_vulnerability_mixin


def testAgentOpenVas_whenBinaryAvailable_RunScan(openvas_agent, scan_message, mocker):
def testAgentOpenVas_whenBinaryAvailable_RunScan(openvas_agent_no_scope, scan_message, mocker):
"""Tests running the agent and parsing the json output."""
star_scan_mocker = mocker.patch('agent.openvas.OpenVas.start_scan', return_value='hduzehfuhehfuhef')
mocker.patch('agent.openvas.OpenVas.wait_task', return_value=None)
with open('tests/openvas_result.csv', 'r', encoding='UTF-8') as f:
mocker.patch('agent.openvas.OpenVas.get_results', return_value=f.read())
mock_report_vulnerability = mocker.patch('agent.openvas_agent.OpenVasAgent.report_vulnerability',
return_value=None)
openvas_agent.process(scan_message)
openvas_agent_no_scope.process(scan_message)

output = {'IP': '128.0.0.1', 'Hostname': 'test', 'Port': '', 'Port Protocol': '', 'CVSS': '',
'Severity': 'HIGH', 'Solution Type': '', 'NVT Name': '', 'Summary': '', 'Specific Result': '',
Expand All @@ -22,7 +22,7 @@ def testAgentOpenVas_whenBinaryAvailable_RunScan(openvas_agent, scan_message, mo
'Affected Software/OS': '', 'Vulnerability Insight': '', 'Vulnerability Detection Method': '',
'Product Detection Result': '', 'BIDs': '', 'CERTs': '', 'Other References': ''}

star_scan_mocker.assert_called_with(scan_message.data.get('host'))
star_scan_mocker.assert_called_with(scan_message.data.get('host'), None)
mock_report_vulnerability.assert_called_with(entry=kb.Entry(title='', risk_rating='INFO',
references={}, short_description='',
description='', recommendation='',
Expand Down Expand Up @@ -179,7 +179,8 @@ def testAgentOpenVas_whenBinaryAvailableAndRangeOfIPsIsInput_RunScan(

assert mock_report_vulnerability.call_args_list[0].kwargs == args1
assert mock_report_vulnerability.call_args_list[1].kwargs == args2
star_scan_mocker.assert_called_with(f'{ip_range_message.data.get("host")}/{ip_range_message.data.get("mask")}')
star_scan_mocker.assert_called_with(
f'{ip_range_message.data.get("host")}/{ip_range_message.data.get("mask")}', None)


def testAgentOpenVas_whenBinaryAvailableAndRangeOfIPsIsInput_NotScan(openvas_agent_no_scope,
Expand Down Expand Up @@ -237,4 +238,5 @@ def testAgentOpenVas_whenBinaryAvailableAndRangeOfIPsIsInput_NotScan(openvas_age

assert mock_report_vulnerability.call_args_list[0].kwargs == args1
assert mock_report_vulnerability.call_args_list[1].kwargs == args2
star_scan_mocker.assert_called_with(f'{ip_range_message.data.get("host")}/{ip_range_message.data.get("mask")}')
star_scan_mocker.assert_called_with(
f'{ip_range_message.data.get("host")}/{ip_range_message.data.get("mask")}', None)

0 comments on commit d33e41c

Please sign in to comment.