From 8de5cf66faa1417dc1f17d089186ac305b5430e1 Mon Sep 17 00:00:00 2001 From: content-bot <55035720+content-bot@users.noreply.github.com> Date: Thu, 26 Sep 2024 14:46:50 +0300 Subject: [PATCH] Darktrace update for model breaches (#36451) (#36515) * updated with demisto-sdk and fixed comment command * updating release notes * fixed post comment test and added error check for login page * removing validate.json Co-authored-by: seanmacdonald8 <86425481+seanmacdonald8@users.noreply.github.com> --- .../Integrations/DarktraceAIA/DarktraceAIA.py | 54 ++++++------ .../DarktraceAIA/DarktraceAIA_test.py | 3 +- .../DarktraceAdmin/DarktraceAdmin.py | 55 ++++++------ .../DarktraceAdmin/DarktraceAdmin_test.py | 3 +- .../Integrations/DarktraceMBs/DarktraceMBs.py | 86 ++++++++++--------- .../DarktraceMBs/DarktraceMBs_test.py | 3 +- .../DarktraceMBs/test_data/comment_post.json | 12 ++- .../test_data/formatted_comment_post.json | 3 +- Packs/Darktrace/ReleaseNotes/3_0_11.md | 14 +++ Packs/Darktrace/pack_metadata.json | 2 +- 10 files changed, 129 insertions(+), 106 deletions(-) create mode 100644 Packs/Darktrace/ReleaseNotes/3_0_11.md diff --git a/Packs/Darktrace/Integrations/DarktraceAIA/DarktraceAIA.py b/Packs/Darktrace/Integrations/DarktraceAIA/DarktraceAIA.py index d566d3ee5a3f..e901792fb889 100644 --- a/Packs/Darktrace/Integrations/DarktraceAIA/DarktraceAIA.py +++ b/Packs/Darktrace/Integrations/DarktraceAIA/DarktraceAIA.py @@ -5,7 +5,8 @@ import json import traceback from datetime import datetime, timezone -from typing import Any, Dict, List, Mapping, Optional, Tuple, cast +from typing import Any, cast +from collections.abc import Mapping import dateparser import urllib3 @@ -46,7 +47,7 @@ class Client(BaseClient): Most calls use _http_request() that handles proxy, SSL verification, etc. """ - def get(self, query_uri: str, params: Dict[str, str] = None): + def get(self, query_uri: str, params: dict[str, str] = None): """Handles Darktrace GET API calls""" return self._darktrace_api_call(query_uri, method="GET", params=params) @@ -61,7 +62,7 @@ def _darktrace_api_call( params: dict = None, data: dict = None, json: dict = None, - headers: Dict[str, str] = None, + headers: dict[str, str] = None, ): """Handles Darktrace API calls""" headers = { @@ -111,14 +112,14 @@ def error_handler(self, res: requests.Response): elif res.status_code >= 300: raise Exception(DARKTRACE_API_ERRORS['UNDETERMINED_ERROR']) - def _create_headers(self, query_uri: str, query_data: dict = None, is_json: bool = False) -> Dict[str, str]: + def _create_headers(self, query_uri: str, query_data: dict = None, is_json: bool = False) -> dict[str, str]: """Create headers required for successful authentication""" public_token, _ = self._auth date = (datetime.now(timezone.utc)).isoformat(timespec="auto") signature = _create_signature(self._auth, query_uri, date, query_data, is_json=is_json) return {"DTAPI-Token": public_token, "DTAPI-Date": date, "DTAPI-Signature": signature} - def get_ai_analyst_incident_event(self, event_id: str) -> List[Dict[str, Any]]: + def get_ai_analyst_incident_event(self, event_id: str) -> list[dict[str, Any]]: """Searches for a single AI Analyst Incident alerts using '/incidentevents?uuid=' :type event_id: ``str`` :param event_id: unique event identifier @@ -127,7 +128,7 @@ def get_ai_analyst_incident_event(self, event_id: str) -> List[Dict[str, Any]]: """ return self.get(AI_ANALYST_ENDPOINT, params={"uuid": event_id}) - def search_ai_analyst_incident_events(self, min_score: int, start_time: Optional[int]) -> List[Dict[str, Any]]: + def search_ai_analyst_incident_events(self, min_score: int, start_time: int | None) -> list[dict[str, Any]]: """Searches all AI Analyst Incident alerts from a certain date and score' :type min_score: ``str`` :param min_score: minimum score for data to be pulled @@ -143,7 +144,7 @@ def search_ai_analyst_incident_events(self, min_score: int, start_time: Optional } return self.get(query_uri, params) - def get_comments_for_ai_analyst_incident_event(self, event_id: str) -> Dict[str, Any]: + def get_comments_for_ai_analyst_incident_event(self, event_id: str) -> dict[str, Any]: """ Returns all comments for a specified incident event id :type event_id: ``str`` :param event_id: unique event identifier @@ -156,7 +157,7 @@ def get_comments_for_ai_analyst_incident_event(self, event_id: str) -> Dict[str, } return self.get(query_uri, params) - def post_comment_to_ai_analyst_incident_event(self, event_id: str, comment: str) -> Dict[str, Any]: + def post_comment_to_ai_analyst_incident_event(self, event_id: str, comment: str) -> dict[str, Any]: """ Posts a message to an incident event id :type event_id: ``str`` :param event_id: unique event identifier @@ -172,7 +173,7 @@ def post_comment_to_ai_analyst_incident_event(self, event_id: str, comment: str) } return self.post(query_uri, json=body) - def acknowledge_ai_analyst_incident_event(self, event_id: str) -> Dict[str, Any]: + def acknowledge_ai_analyst_incident_event(self, event_id: str) -> dict[str, Any]: """ acknowledges an incident event :type event_id: ``str`` :param event_id: unique event identifier @@ -182,7 +183,7 @@ def acknowledge_ai_analyst_incident_event(self, event_id: str) -> Dict[str, Any] query_uri = AI_ANALYST_ACKNOWLEDGE_ENDPOINT return self.post(query_uri, data={'uuid': str(event_id)}) - def unacknowledge_ai_analyst_incident_event(self, event_id: str) -> Dict[str, Any]: + def unacknowledge_ai_analyst_incident_event(self, event_id: str) -> dict[str, Any]: """ unacknowledges an incident event :type event_id: ``str`` :param event_id: unique event identifier @@ -192,7 +193,7 @@ def unacknowledge_ai_analyst_incident_event(self, event_id: str) -> Dict[str, An query_uri = AI_ANALYST_UNACKNOWLEDGE_ENDPOINT return self.post(query_uri, data={'uuid': str(event_id)}) - def get_ai_analyst_incident_group_from_eventId(self, event_id: str) -> List[Dict[str, Any]]: + def get_ai_analyst_incident_group_from_eventId(self, event_id: str) -> list[dict[str, Any]]: """Searches for a single AI Analyst Group alerts using '/groups?uuid=' :type event_id: ``str`` :param event_id: unique event identifier @@ -205,7 +206,7 @@ def get_ai_analyst_incident_group_from_eventId(self, event_id: str) -> List[Dict """*****HELPER FUNCTIONS****""" -def arg_to_timestamp(arg: Any, arg_name: str, required: bool = False) -> Optional[int]: +def arg_to_timestamp(arg: Any, arg_name: str, required: bool = False) -> int | None: """Converts an XSOAR argument to a timestamp (seconds from epoch) This function is used to quickly validate an argument provided to XSOAR via ``demisto.args()`` into an ``int`` containing a timestamp (seconds @@ -244,7 +245,7 @@ def arg_to_timestamp(arg: Any, arg_name: str, required: bool = False) -> Optiona raise ValueError(f'Invalid date: {arg_name}') return int(date.timestamp()) - if isinstance(arg, (int, float)): + if isinstance(arg, int | float): # Convert to int if the input is a float return int(arg) raise ValueError(f'Invalid date: "{arg_name}"') @@ -270,7 +271,7 @@ def stringify_data(data: Mapping) -> str: return "&".join([f"{k}={v}" for k, v in data.items()]) -def format_JSON_for_ai_analyst_incident(aia_incident: Dict[str, Any], details: bool = False) -> Dict[str, Any]: +def format_JSON_for_ai_analyst_incident(aia_incident: dict[str, Any], details: bool = False) -> dict[str, Any]: """Formats JSON for get-ai-incident-event command :type aia_incident: ``Dict[str, Any]`` :param aia_incident: JSON incident event as returned by API for fetch incident @@ -319,7 +320,7 @@ def check_required_fields(args, *fields): raise ValueError(f'Argument error could not find {field} in {args}') -def test_module(client: Client, first_fetch_time: Optional[int]) -> str: +def test_module(client: Client, first_fetch_time: int | None) -> str: """ Returning 'ok' indicates that the integration works like it is supposed to. Connection to the service is successful. @@ -344,8 +345,8 @@ def test_module(client: Client, first_fetch_time: Optional[int]) -> str: return 'ok' -def fetch_incidents(client: Client, max_alerts: int, last_run: Dict[str, int], - first_fetch_time: Optional[int], min_score: int) -> Tuple[Dict[str, int], List[dict]]: +def fetch_incidents(client: Client, max_alerts: int, last_run: dict[str, int], + first_fetch_time: int | None, min_score: int) -> tuple[dict[str, int], list[dict]]: """This function retrieves new ai analyst incident event every minute. It will use last_run to save the timestamp of the last incident it processed. If last_run is not provided, it should use the integration parameter first_fetch to determine when to start fetching @@ -386,7 +387,7 @@ def fetch_incidents(client: Client, max_alerts: int, last_run: Dict[str, int], latest_created_time = cast(int, last_fetch) # Each incident is a dict with a string as a key - incidents: List[Dict[str, Any]] = [] + incidents: list[dict[str, Any]] = [] ai_analyst_alerts = client.search_ai_analyst_incident_events( min_score=min_score, # Scale the min score from [0,100] to [0 to 1] for API calls @@ -396,9 +397,8 @@ def fetch_incidents(client: Client, max_alerts: int, last_run: Dict[str, int], for alert in ai_analyst_alerts: incident_created_time = int(alert.get('createdAt', 0)) alert['time'] = timestamp_to_datestring(incident_created_time) - if last_fetch: - if incident_created_time <= last_fetch: - continue + if last_fetch and incident_created_time <= last_fetch: + continue id = str(alert['id']) title = str(alert['title']) incident_name = f'DT eventId #{id}: {title}' @@ -426,7 +426,7 @@ def fetch_incidents(client: Client, max_alerts: int, last_run: Dict[str, int], return next_run, incidents -def get_ai_analyst_incident_event_command(client: Client, args: Dict[str, Any]) -> CommandResults: +def get_ai_analyst_incident_event_command(client: Client, args: dict[str, Any]) -> CommandResults: """get-ai-analyst-incident-event-command: Returns a Darktrace incident event details :type client: ``Client`` @@ -464,7 +464,7 @@ def get_ai_analyst_incident_event_command(client: Client, args: Dict[str, Any]) ) -def get_comments_for_ai_analyst_incident_event_command(client: Client, args: Dict[str, Any]) -> CommandResults: +def get_comments_for_ai_analyst_incident_event_command(client: Client, args: dict[str, Any]) -> CommandResults: """darktrace-get-comments-for-ai-analyst-incident-event-command: Returns all comments associated with an incident event. @@ -502,7 +502,7 @@ def get_comments_for_ai_analyst_incident_event_command(client: Client, args: Dic ) -def post_comment_to_ai_analyst_incident_event_command(client: Client, args: Dict[str, Any]) -> CommandResults: +def post_comment_to_ai_analyst_incident_event_command(client: Client, args: dict[str, Any]) -> CommandResults: """darktrace-post-comment-to-ai-analyst-incident-event-command: Posts a comment to an ai analyst event :type client: ``Client`` @@ -541,7 +541,7 @@ def post_comment_to_ai_analyst_incident_event_command(client: Client, args: Dict ) -def acknowledge_ai_analyst_incident_event_command(client: Client, args: Dict[str, Any]) -> CommandResults: +def acknowledge_ai_analyst_incident_event_command(client: Client, args: dict[str, Any]) -> CommandResults: """acknowledge-ai-analyst-incident-event-command: Acknowledges an ai analyst event :type client: ``Client`` @@ -578,7 +578,7 @@ def acknowledge_ai_analyst_incident_event_command(client: Client, args: Dict[str ) -def unacknowledge_ai_analyst_incident_event_command(client: Client, args: Dict[str, Any]) -> CommandResults: +def unacknowledge_ai_analyst_incident_event_command(client: Client, args: dict[str, Any]) -> CommandResults: """unacknowledge-ai-analyst-incident-event-command: Unacknowledges an ai analyst event :type client: ``Client`` @@ -615,7 +615,7 @@ def unacknowledge_ai_analyst_incident_event_command(client: Client, args: Dict[s ) -def get__ai_analyst_incident_group_from_eventId_command(client: Client, args: Dict[str, Any]) -> CommandResults: +def get__ai_analyst_incident_group_from_eventId_command(client: Client, args: dict[str, Any]) -> CommandResults: """darktrace-get-incident-group-from-event: Pulls all events belonging to the same investigation group. :type client: ``Client`` diff --git a/Packs/Darktrace/Integrations/DarktraceAIA/DarktraceAIA_test.py b/Packs/Darktrace/Integrations/DarktraceAIA/DarktraceAIA_test.py index b30c851c79ad..99820e8164e6 100644 --- a/Packs/Darktrace/Integrations/DarktraceAIA/DarktraceAIA_test.py +++ b/Packs/Darktrace/Integrations/DarktraceAIA/DarktraceAIA_test.py @@ -1,9 +1,8 @@ -import io import json def util_load_json(path): - with io.open(path, mode='r', encoding='utf-8') as f: + with open(path, encoding='utf-8') as f: return json.loads(f.read()) diff --git a/Packs/Darktrace/Integrations/DarktraceAdmin/DarktraceAdmin.py b/Packs/Darktrace/Integrations/DarktraceAdmin/DarktraceAdmin.py index 39b6a0ec7e44..8a4751dfd056 100644 --- a/Packs/Darktrace/Integrations/DarktraceAdmin/DarktraceAdmin.py +++ b/Packs/Darktrace/Integrations/DarktraceAdmin/DarktraceAdmin.py @@ -6,7 +6,8 @@ import traceback from base64 import b64encode from datetime import datetime, timezone -from typing import Any, Dict, List, Mapping, Optional +from typing import Any +from collections.abc import Mapping import dateparser import urllib3 @@ -50,7 +51,7 @@ class Client(BaseClient): Most calls use _http_request() that handles proxy, SSL verification, etc. """ - def get(self, query_uri: str, params: Dict[str, str] = None): + def get(self, query_uri: str, params: dict[str, str] = None): """Handles Darktrace GET API calls""" return self._darktrace_api_call(query_uri, method="GET", params=params) @@ -65,7 +66,7 @@ def _darktrace_api_call( params: dict = None, data: dict = None, json: dict = None, - headers: Dict[str, str] = None, + headers: dict[str, str] = None, ): """Handles Darktrace API calls""" headers = { @@ -115,14 +116,14 @@ def error_handler(self, res: requests.Response): elif res.status_code >= 300: raise Exception(DARKTRACE_API_ERRORS['UNDETERMINED_ERROR']) - def _create_headers(self, query_uri: str, query_data: dict = None, is_json: bool = False) -> Dict[str, str]: + def _create_headers(self, query_uri: str, query_data: dict = None, is_json: bool = False) -> dict[str, str]: """Create headers required for successful authentication""" public_token, _ = self._auth date = (datetime.now(timezone.utc)).isoformat(timespec="auto") signature = _create_signature(self._auth, query_uri, date, query_data, is_json=is_json) return {"DTAPI-Token": public_token, "DTAPI-Date": date, "DTAPI-Signature": signature} - def run_advanced_search_analysis(self, query, metric, operation) -> Dict[str, Any]: + def run_advanced_search_analysis(self, query, metric, operation) -> dict[str, Any]: """Returns information from Darktrace advanced search analysis' :type query: ``str`` :param query: Darktrace query string @@ -138,7 +139,7 @@ def run_advanced_search_analysis(self, query, metric, operation) -> Dict[str, An query_format = f"{ADVANCED_SEARCH_ENDPOINT}/{metric}/{operation}/{query_url}" return self.get(query_format) - def get_device_connection_info(self, **query_params) -> Dict[str, Any]: + def get_device_connection_info(self, **query_params) -> dict[str, Any]: """Returns information from Darktrace about graphical connection data for devices using '/deviceinfo' :type did: ``str`` :param did: Darktrace Device ID @@ -161,7 +162,7 @@ def get_device_connection_info(self, **query_params) -> Dict[str, Any]: query_uri = DEVICE_INFO_ENDPOINT return self.get(query_uri, query_params) - def get_external_endpoint_details(self, endpoint_type, endpoint_value, additional_info, devices, score) -> Dict[str, Any]: + def get_external_endpoint_details(self, endpoint_type, endpoint_value, additional_info, devices, score) -> dict[str, Any]: """Returns information from Darktrace about external endpoints using '/endpointdetails' :type endpoint_type: ``str`` :param endpoint_type: Type of endpoint, IP or hostname @@ -185,7 +186,7 @@ def get_external_endpoint_details(self, endpoint_type, endpoint_value, additiona } return self.get(query_uri, params) - def get_similar_devices(self, did, max_results) -> List[Dict[str, Any]]: + def get_similar_devices(self, did, max_results) -> list[dict[str, Any]]: """Returns a list of similar devices using '/similardevices' :type did: ``str`` :param did: Device ID of device @@ -201,7 +202,7 @@ def get_similar_devices(self, did, max_results) -> List[Dict[str, Any]]: } return self.get(query_uri, params) - def post_to_watched_list(self, addlist, description) -> Dict[str, Any]: + def post_to_watched_list(self, addlist, description) -> dict[str, Any]: """Returns information from POST endpoints to watched list advanced search analysis :type addlist: ``List[str]`` :param addlist: Darktrace query string @@ -215,7 +216,7 @@ def post_to_watched_list(self, addlist, description) -> Dict[str, Any]: json['description'] = description return self.post(INTEL_FEED_ENDPOINT, json=json) - def get_tagged_devices(self, tag_name) -> Dict[str, Any]: + def get_tagged_devices(self, tag_name) -> dict[str, Any]: """Returns information from devices given a tag as a filter :type tag_name: ``str`` :param tag_name: tag name to query from @@ -225,7 +226,7 @@ def get_tagged_devices(self, tag_name) -> Dict[str, Any]: params = {"tag": tag_name, "fulldevicedetails": "true"} return self.get(TAG_ENTITIES_ENDPOINT, params=params) - def get_tags_for_device(self, did) -> List[Dict[str, Any]]: + def get_tags_for_device(self, did) -> list[dict[str, Any]]: """Returns tags for a certain device :type did: ``str`` :param did: device id @@ -234,7 +235,7 @@ def get_tags_for_device(self, did) -> List[Dict[str, Any]]: """ return self.get(TAG_ENTITIES_ENDPOINT, params={"did": did}) - def post_tag_to_device(self, did, tag_name) -> Dict[str, Any]: + def post_tag_to_device(self, did, tag_name) -> dict[str, Any]: """Returns response from post command :type did: ``str`` :param did: device id @@ -257,7 +258,7 @@ def check_status(self) -> dict: """*****HELPER FUNCTIONS****""" -def arg_to_timestamp(arg: Any, arg_name: str, required: bool = False) -> Optional[int]: +def arg_to_timestamp(arg: Any, arg_name: str, required: bool = False) -> int | None: """Converts an XSOAR argument to a timestamp (seconds from epoch) This function is used to quickly validate an argument provided to XSOAR via ``demisto.args()`` into an ``int`` containing a timestamp (seconds @@ -296,7 +297,7 @@ def arg_to_timestamp(arg: Any, arg_name: str, required: bool = False) -> Optiona raise ValueError(f'Invalid date: {arg_name}') return int(date.timestamp()) - if isinstance(arg, (int, float)): + if isinstance(arg, int | float): # Convert to int if the input is a float return int(arg) raise ValueError(f'Invalid date: "{arg_name}"') @@ -365,7 +366,7 @@ def test_module(client: Client) -> str: return 'ok' -def run_advanced_search_analysis_command(client: Client, args: Dict[str, Any]) -> CommandResults: +def run_advanced_search_analysis_command(client: Client, args: dict[str, Any]) -> CommandResults: """run_advanced_search_analysis_command: Runs an advanced search analysis on a specific query and metric and applies an operation to returned a list of results. @@ -412,7 +413,7 @@ def run_advanced_search_analysis_command(client: Client, args: Dict[str, Any]) - ) -def get_device_connection_info_command(client: Client, args: Dict[str, Any]) -> CommandResults: +def get_device_connection_info_command(client: Client, args: dict[str, Any]) -> CommandResults: """get_device_connection_info_command: Returns graphing connection information about a specified device :type client: ``Client`` @@ -456,7 +457,7 @@ def get_device_connection_info_command(client: Client, args: Dict[str, Any]) -> ) -def get_external_endpoint_details_command(client: Client, args: Dict[str, Any]) -> CommandResults: +def get_external_endpoint_details_command(client: Client, args: dict[str, Any]) -> CommandResults: """get_external_endpoint_details_command: Returns information about a specified external endpoint :type client: ``Client`` @@ -496,7 +497,7 @@ def get_external_endpoint_details_command(client: Client, args: Dict[str, Any]) ) -def get_similar_devices_command(client: Client, args: Dict[str, Any]) -> CommandResults: +def get_similar_devices_command(client: Client, args: dict[str, Any]) -> CommandResults: """get_similar_devices_command: Returns a list of similar devices to a device specified by Darktrace DID @@ -536,7 +537,7 @@ def get_similar_devices_command(client: Client, args: Dict[str, Any]) -> Command ) -def post_to_watched_list_command(client: Client, args: Dict[str, Any]) -> CommandResults: +def post_to_watched_list_command(client: Client, args: dict[str, Any]) -> CommandResults: """post_to_watched_list_command: Returns a response from posting domains or ips to the watched list domain :type client: ``Client`` @@ -564,7 +565,7 @@ def post_to_watched_list_command(client: Client, args: Dict[str, Any]) -> Comman ) -def get_tagged_devices_command(client: Client, args: Dict[str, Any]) -> CommandResults: +def get_tagged_devices_command(client: Client, args: dict[str, Any]) -> CommandResults: """get_tagged_devices_command: Gets a list of device information based on a common tag. :type client: ``Client`` @@ -585,10 +586,10 @@ def get_tagged_devices_command(client: Client, args: Dict[str, Any]) -> CommandR tag_name = str(args['tagName']) device_info_response = client.get_tagged_devices(tag_name=tag_name) devices = device_info_response.get('devices') - output: List = [] + output: list = [] if devices and len(devices): for device in devices: - info: Dict[str, Any] = {} + info: dict[str, Any] = {} info['deviceId'] = device['did'] info['hostname'] = device.get('hostname', 'N/A') info['label'] = device.get('devicelabel', 'N/A') @@ -606,7 +607,7 @@ def get_tagged_devices_command(client: Client, args: Dict[str, Any]) -> CommandR ) -def get_tags_for_device_command(client: Client, args: Dict[str, Any]) -> CommandResults: +def get_tags_for_device_command(client: Client, args: dict[str, Any]) -> CommandResults: """get_tags_for_device_command: Gets a list of tags for a device :type client: ``Client`` @@ -626,9 +627,9 @@ def get_tags_for_device_command(client: Client, args: Dict[str, Any]) -> Command check_required_fields(args, 'deviceId') did = str(args['deviceId']) device_tags_response = client.get_tags_for_device(did=did) - output: List = [] + output: list = [] for tag in device_tags_response: - info: Dict[str, Any] = {} + info: dict[str, Any] = {} info['tagId'] = tag['tid'] info['tagName'] = tag['name'] info['tagDescription'] = tag['data']['description'] @@ -643,7 +644,7 @@ def get_tags_for_device_command(client: Client, args: Dict[str, Any]) -> Command ) -def post_tag_to_device_command(client: Client, args: Dict[str, Any]) -> CommandResults: +def post_tag_to_device_command(client: Client, args: dict[str, Any]) -> CommandResults: """post_tag_to_device_command: Posts a tag to a device and returns an action response :type client: ``Client`` @@ -664,7 +665,7 @@ def post_tag_to_device_command(client: Client, args: Dict[str, Any]) -> CommandR tag_name = str(args['tagName']) did = str(args['deviceId']) post_tag_to_device_response = client.post_tag_to_device(did=did, tag_name=tag_name) - output: Dict[str, Any] = {} + output: dict[str, Any] = {} output['tagId'] = post_tag_to_device_response['tid'] output['tagName'] = tag_name output['deviceId'] = did diff --git a/Packs/Darktrace/Integrations/DarktraceAdmin/DarktraceAdmin_test.py b/Packs/Darktrace/Integrations/DarktraceAdmin/DarktraceAdmin_test.py index 9be0979f86ab..e09e6125d772 100644 --- a/Packs/Darktrace/Integrations/DarktraceAdmin/DarktraceAdmin_test.py +++ b/Packs/Darktrace/Integrations/DarktraceAdmin/DarktraceAdmin_test.py @@ -1,9 +1,8 @@ -import io import json def util_load_json(path): - with io.open(path, mode='r', encoding='utf-8') as f: + with open(path, encoding='utf-8') as f: return json.loads(f.read()) diff --git a/Packs/Darktrace/Integrations/DarktraceMBs/DarktraceMBs.py b/Packs/Darktrace/Integrations/DarktraceMBs/DarktraceMBs.py index 6a5a211094ee..340110818e87 100644 --- a/Packs/Darktrace/Integrations/DarktraceMBs/DarktraceMBs.py +++ b/Packs/Darktrace/Integrations/DarktraceMBs/DarktraceMBs.py @@ -6,7 +6,8 @@ import time import traceback from datetime import datetime, timezone -from typing import Any, Dict, List, Mapping, Optional, Tuple, cast +from typing import Any, cast +from collections.abc import Mapping import dateparser import urllib3 @@ -39,6 +40,8 @@ 'FAILED_TO_PARSE': 'N/A' } +DARKTRACE_LOGIN = 'Log In | Darktrace' + """*****CLIENT CLASS***** Wraps all the code that interacts with the Darktrace API.""" @@ -52,7 +55,7 @@ class Client(BaseClient): Most calls use _http_request() that handles proxy, SSL verification, etc. """ - def get(self, query_uri: str, params: Dict[str, str] = None): + def get(self, query_uri: str, params: dict[str, str] = None): """Handles Darktrace GET API calls""" return self._darktrace_api_call(query_uri, method='GET', params=params) @@ -70,7 +73,7 @@ def _darktrace_api_call( params: dict = None, data: dict = None, json: dict = None, - headers: Dict[str, str] = None, + headers: dict[str, str] = None, ): """Handles Darktrace API calls""" headers = { @@ -95,6 +98,11 @@ def _darktrace_api_call( + '. Response Status code: ' + str(res.status_code)) except Exception as e: raise Exception(e) + + res_str = res.content.decode('utf-8') + if DARKTRACE_LOGIN in res_str: + raise Exception(DARKTRACE_API_ERRORS['PRIVILEGE_ERROR']) + try: return res.json() except Exception as e: @@ -120,14 +128,14 @@ def error_handler(self, res: requests.Response): elif res.status_code >= 300: raise Exception(DARKTRACE_API_ERRORS['UNDETERMINED_ERROR']) - def _create_headers(self, query_uri: str, query_data: dict = None, is_json: bool = False) -> Dict[str, str]: + def _create_headers(self, query_uri: str, query_data: dict = None, is_json: bool = False) -> dict[str, str]: """Create headers required for successful authentication""" public_token, _ = self._auth date = (datetime.now(timezone.utc)).isoformat(timespec="auto") signature = _create_signature(self._auth, query_uri, date, query_data, is_json=is_json) return {'DTAPI-Token': public_token, 'DTAPI-Date': date, 'DTAPI-Signature': signature} - def get_model_breach(self, pbid: str) -> Dict[str, Any]: + def get_model_breach(self, pbid: str) -> dict[str, Any]: """Searches for a single Darktrace model breach alerts using '/modelbreaches?pbid=' :type pbid: ``str`` :param pbid: Model breach ID of the model breach to get @@ -138,7 +146,7 @@ def get_model_breach(self, pbid: str) -> Dict[str, Any]: params = {'pbid': pbid, 'deviceattop': 'true'} return self.get(query_uri, params) - def get_model_breach_connections(self, pbid: str, endtime: str, count: str, offset: str) -> List[Dict[str, Any]]: + def get_model_breach_connections(self, pbid: str, endtime: str, count: str, offset: str) -> list[dict[str, Any]]: """Searches for a single Darktrace model breach connections using '/details' endpoint :type pbid: ``str`` :param pbid: Model breach ID of the model breach to get @@ -162,7 +170,7 @@ def get_model_breach_connections(self, pbid: str, endtime: str, count: str, offs } return self.get(query_uri, params) - def get_model(self, uuid: str) -> Dict[str, Any]: + def get_model(self, uuid: str) -> dict[str, Any]: """Pulls a model configuration from /models endpoint :type uuid: ``str`` :param uuid: Model ID @@ -173,7 +181,7 @@ def get_model(self, uuid: str) -> Dict[str, Any]: params = {'uuid': uuid} return self.get(query_uri, params) - def get_model_component(self, cid: str) -> Dict[str, Any]: + def get_model_component(self, cid: str) -> dict[str, Any]: """Pulls a model component from /components endpoint :type cid: ``str`` :param cid: component ID @@ -184,7 +192,7 @@ def get_model_component(self, cid: str) -> Dict[str, Any]: params = {'cid': cid} return self.get(query_uri, params) - def get_model_breach_comments(self, pbid: str) -> List[Dict[str, Any]]: + def get_model_breach_comments(self, pbid: str) -> list[dict[str, Any]]: """Searches for comments on a modelbreach using '/modelbreaches//comments' :type pbid: ``str`` :param pbid: Model breach ID @@ -195,7 +203,7 @@ def get_model_breach_comments(self, pbid: str) -> List[Dict[str, Any]]: params = {'pbid': pbid} return self.get(query_uri, params) - def acknowledge_model_breach(self, pbid: str) -> Dict[str, Any]: + def acknowledge_model_breach(self, pbid: str) -> dict[str, Any]: """Acknowledges a modelbreach using '/modelbreaches//acknowledge?acknowledge=true' :type pbid: ``str`` :param pbid: Model breach ID of the model breach to get @@ -205,7 +213,7 @@ def acknowledge_model_breach(self, pbid: str) -> Dict[str, Any]: query_uri = f'{MODEL_BREACH_ENDPOINT}/{pbid}{ACK_BREACH}' return self.post(query_uri, data={'acknowledge': 'true'}) - def unacknowledge_model_breach(self, pbid: str) -> Dict[str, Any]: + def unacknowledge_model_breach(self, pbid: str) -> dict[str, Any]: """Unacknowledges a modelbreach using '/modelbreaches//unacknowledge?unacknowledge=true' :type pbid: ``str`` :param pbid: Model breach ID of the model breach to get @@ -215,7 +223,7 @@ def unacknowledge_model_breach(self, pbid: str) -> Dict[str, Any]: query_uri = f"{MODEL_BREACH_ENDPOINT}/{pbid}{UNACK_BREACH}" return self.post(query_uri, data={"unacknowledge": "true"}) - def post_comment_to_model_breach(self, pbid: str, comment: str) -> Dict[str, Any]: + def post_comment_to_model_breach(self, pbid: str, comment: str) -> dict[str, Any]: """Posts a comment to a model breach' :type pbid: ``str`` :param pbid: Model breach ID @@ -227,7 +235,7 @@ def post_comment_to_model_breach(self, pbid: str, comment: str) -> Dict[str, Any query_uri = f'{MODEL_BREACH_ENDPOINT}/{pbid}{COMMENT_BREACH}' return self.post(query_uri, json={'message': comment}) - def search_model_breaches(self, min_score: float, start_time: Optional[int]) -> List[Dict[str, Any]]: + def search_model_breaches(self, min_score: float, start_time: int | None) -> list[dict[str, Any]]: """Searches for Darktrace alerts using the '/modelbreaches' API endpoint :type min_score: ``float`` :param min_score: min score of the alert to search for. Range [0, 1]. @@ -249,7 +257,7 @@ def search_model_breaches(self, min_score: float, start_time: Optional[int]) -> """*****HELPER FUNCTIONS****""" -def arg_to_timestamp(arg: Any, arg_name: str, required: bool = False) -> Optional[int]: +def arg_to_timestamp(arg: Any, arg_name: str, required: bool = False) -> int | None: """Converts an XSOAR argument to a timestamp (seconds from epoch) This function is used to quickly validate an argument provided to XSOAR via ``demisto.args()`` into an ``int`` containing a timestamp (seconds @@ -288,7 +296,7 @@ def arg_to_timestamp(arg: Any, arg_name: str, required: bool = False) -> Optiona raise ValueError(f'Invalid date: {arg_name}') return int(date.timestamp()) - if isinstance(arg, (int, float)): + if isinstance(arg, int | float): # Convert to int if the input is a float return int(arg) raise ValueError(f'Invalid date: \'{arg_name}\'') @@ -321,7 +329,7 @@ def _create_signature(tokens: tuple, query_uri: str, date: str, query_data: dict ).hexdigest() -def format_JSON_for_model_breach(modelbreach: Dict[str, Any], details: bool = False) -> Dict[str, Any]: +def format_JSON_for_model_breach(modelbreach: dict[str, Any], details: bool = False) -> dict[str, Any]: """Formats JSON for get-model-breach command :type modelbreach: ``Dict[str, Any]`` :param modelbreach: JSON model breach as returned by API for fetch incident @@ -381,7 +389,7 @@ def _compute_xsoar_severity(dt_categry: str) -> int: """*****COMMAND FUNCTIONS****""" -def test_module(client: Client, first_fetch_time: Optional[int]) -> str: +def test_module(client: Client, first_fetch_time: int | None) -> str: """ Returning 'ok' indicates that the integration works like it is supposed to. Connection to the service is successful. @@ -406,8 +414,8 @@ def test_module(client: Client, first_fetch_time: Optional[int]) -> str: return 'ok' -def fetch_incidents(client: Client, max_alerts: int, last_run: Dict[str, int], - first_fetch_time: Optional[int], min_score: int) -> Tuple[Dict[str, int], List[dict]]: +def fetch_incidents(client: Client, max_alerts: int, last_run: dict[str, int], + first_fetch_time: int | None, min_score: int) -> tuple[dict[str, int], list[dict]]: """This function retrieves new model breaches every minute. It will use last_run to save the timestamp of the last incident it processed. If last_run is not provided, it should use the integration parameter first_fetch to determine when to start fetching @@ -448,7 +456,7 @@ def fetch_incidents(client: Client, max_alerts: int, last_run: Dict[str, int], latest_created_time = cast(int, last_fetch) # Each incident is a dict with a string as a key - incidents: List[Dict[str, Any]] = [] + incidents: list[dict[str, Any]] = [] model_breach_alerts = client.search_model_breaches( min_score=min_score / 100, # Scale the min score from [0,100] to [0 to 1] for API calls @@ -462,9 +470,8 @@ def fetch_incidents(client: Client, max_alerts: int, last_run: Dict[str, int], alert['time'] = timestamp_to_datestring(incident_created_time) # to prevent duplicates, we are only adding incidents with creation_time > last fetched incident - if last_fetch: - if incident_created_time <= last_fetch: - continue + if last_fetch and incident_created_time <= last_fetch: + continue pbid = str(alert['pbid']) title = alert['model']['then']['name'] incident_name = f'DT modelId #{pbid}: {title}' @@ -493,7 +500,7 @@ def fetch_incidents(client: Client, max_alerts: int, last_run: Dict[str, int], return next_run, incidents -def get_model_breach_command(client: Client, args: Dict[str, Any]) -> CommandResults: +def get_model_breach_command(client: Client, args: dict[str, Any]) -> CommandResults: """darktrace-get-breach command: Returns a Darktrace model breach :type client: ``Client`` @@ -531,7 +538,7 @@ def get_model_breach_command(client: Client, args: Dict[str, Any]) -> CommandRes ) -def get_model_breach_connections_command(client: Client, args: Dict[str, Any]) -> CommandResults: +def get_model_breach_connections_command(client: Client, args: dict[str, Any]) -> CommandResults: """get_model_breach_connections_command command: Returns a Darktrace model breach connections :type client: ``Client`` @@ -572,7 +579,7 @@ def get_model_breach_connections_command(client: Client, args: Dict[str, Any]) - ) -def get_model_command(client: Client, args: Dict[str, Any]) -> CommandResults: +def get_model_command(client: Client, args: dict[str, Any]) -> CommandResults: """get_model_command command: Returns a Darktrace model information :type client: ``Client`` @@ -602,7 +609,7 @@ def get_model_command(client: Client, args: Dict[str, Any]) -> CommandResults: ) -def get_model_component_command(client: Client, args: Dict[str, Any]) -> CommandResults: +def get_model_component_command(client: Client, args: dict[str, Any]) -> CommandResults: """get_model_component_command command: Returns a Darktrace model component information :type client: ``Client`` @@ -632,7 +639,7 @@ def get_model_component_command(client: Client, args: Dict[str, Any]) -> Command ) -def get_model_breach_comments_command(client: Client, args: Dict[str, Any]) -> CommandResults: +def get_model_breach_comments_command(client: Client, args: dict[str, Any]) -> CommandResults: """darktrace-get-comments command: Returns the comments on the model breach :type client: ``Client`` @@ -674,7 +681,7 @@ def get_model_breach_comments_command(client: Client, args: Dict[str, Any]) -> C ) -def acknowledge_model_breach_command(client: Client, args: Dict[str, Any]) -> CommandResults: +def acknowledge_model_breach_command(client: Client, args: dict[str, Any]) -> CommandResults: """acknowledge_model_breach_command: Acknowledges the model breach based on pbid :type client: ``Client`` @@ -711,7 +718,7 @@ def acknowledge_model_breach_command(client: Client, args: Dict[str, Any]) -> Co ) -def unacknowledge_model_breach_command(client: Client, args: Dict[str, Any]) -> CommandResults: +def unacknowledge_model_breach_command(client: Client, args: dict[str, Any]) -> CommandResults: """acknowledge_model_breach_command: Unacknowledges the model breach based on pbid :type client: ``Client`` @@ -748,7 +755,7 @@ def unacknowledge_model_breach_command(client: Client, args: Dict[str, Any]) -> ) -def post_comment_to_model_breach_command(client: Client, args: Dict[str, Any]) -> CommandResults: +def post_comment_to_model_breach_command(client: Client, args: dict[str, Any]) -> CommandResults: """post_comment_to_model_breach_command: posts a comment to a model breach :type client: ``Client`` @@ -771,21 +778,20 @@ def post_comment_to_model_breach_command(client: Client, args: Dict[str, Any]) - post_comment_response = client.post_comment_to_model_breach(pbid=pbid, comment=comment) - if post_comment_response['response'] != 'SUCCESS': - post_comment_response['response'] = 'Failed to comment Model Breach.' + output_response: dict[str, str | int] = {} + if post_comment_response.get("message", False): + output_response['response'] = 'Successfully posted comment.' + output_response['pbid'] = int(pbid) + output_response['message'] = str(comment) else: - post_comment_response['response'] = 'Successfully Uploaded Comment.' - post_comment_response['pbid'] = int(pbid) - post_comment_response['message'] = str(comment) - post_comment_response['commented'] = 'True' - - readable_output = tableToMarkdown(f'Model Breach {pbid} Acknowledged', post_comment_response) + output_response['response'] = 'Failed to post comment.' + readable_output = tableToMarkdown(f'Model Breach {pbid}', output_response) return CommandResults( readable_output=readable_output, outputs_prefix='Darktrace.ModelBreach', outputs_key_field='pbid', - outputs=post_comment_response + outputs=output_response ) diff --git a/Packs/Darktrace/Integrations/DarktraceMBs/DarktraceMBs_test.py b/Packs/Darktrace/Integrations/DarktraceMBs/DarktraceMBs_test.py index 6bc5c309569c..2ced602b024a 100644 --- a/Packs/Darktrace/Integrations/DarktraceMBs/DarktraceMBs_test.py +++ b/Packs/Darktrace/Integrations/DarktraceMBs/DarktraceMBs_test.py @@ -1,9 +1,8 @@ -import io import json def util_load_json(path): - with io.open(path, mode='r', encoding='utf-8') as f: + with open(path, encoding='utf-8') as f: return json.loads(f.read()) diff --git a/Packs/Darktrace/Integrations/DarktraceMBs/test_data/comment_post.json b/Packs/Darktrace/Integrations/DarktraceMBs/test_data/comment_post.json index 101d122ea26b..f3db9b3c44ba 100644 --- a/Packs/Darktrace/Integrations/DarktraceMBs/test_data/comment_post.json +++ b/Packs/Darktrace/Integrations/DarktraceMBs/test_data/comment_post.json @@ -1,3 +1,9 @@ -{ - "response": "SUCCESS" -} \ No newline at end of file +[ + { + "message": "Test comment post", + "username": "exampleUser", + "time": 1727202568000, + "pid": 2509, + "commentid": 570 + } +] \ No newline at end of file diff --git a/Packs/Darktrace/Integrations/DarktraceMBs/test_data/formatted_comment_post.json b/Packs/Darktrace/Integrations/DarktraceMBs/test_data/formatted_comment_post.json index 016b97ec572e..2ff8a7437a06 100644 --- a/Packs/Darktrace/Integrations/DarktraceMBs/test_data/formatted_comment_post.json +++ b/Packs/Darktrace/Integrations/DarktraceMBs/test_data/formatted_comment_post.json @@ -1,6 +1,5 @@ { - "commented": "True", "message": "Test comment post", "pbid": 2509, - "response": "Successfully Uploaded Comment." + "response": "Successfully posted comment." } \ No newline at end of file diff --git a/Packs/Darktrace/ReleaseNotes/3_0_11.md b/Packs/Darktrace/ReleaseNotes/3_0_11.md new file mode 100644 index 000000000000..d240c61bf0a0 --- /dev/null +++ b/Packs/Darktrace/ReleaseNotes/3_0_11.md @@ -0,0 +1,14 @@ + +#### Integrations + +##### Darktrace Admin + +- code linting + +##### Darktrace Model Breaches + +- code linting +- fix post comment command +##### Darktrace AI Analyst + +- code linting diff --git a/Packs/Darktrace/pack_metadata.json b/Packs/Darktrace/pack_metadata.json index 5b34c9b87b9c..800bd52dedb0 100644 --- a/Packs/Darktrace/pack_metadata.json +++ b/Packs/Darktrace/pack_metadata.json @@ -2,7 +2,7 @@ "name": "Darktrace", "description": "Populates Darktrace Model Breaches and AI Analyst Events in Cortex XSOAR, allowing for cross-platform automated investigation and response.", "support": "partner", - "currentVersion": "3.0.10", + "currentVersion": "3.0.11", "fromVersion": "5.0.0", "author": "Darktrace", "githubUser": "",