From d118c2504234882374c4d3ce6d83a29c5615a6cf Mon Sep 17 00:00:00 2001 From: Matteo Redaelli Date: Thu, 19 Dec 2024 13:39:02 +0100 Subject: [PATCH] adding urlDNA.io adding urldna.io new scan and search analyzer --- analyzers/urlDNA.io/UrlDNA_New_Scan.json | 73 +++++++++++ analyzers/urlDNA.io/UrlDNA_Search.json | 23 ++++ analyzers/urlDNA.io/requirements.txt | 2 + analyzers/urlDNA.io/urldna.py | 159 +++++++++++++++++++++++ analyzers/urlDNA.io/urldna_analyzer.py | 125 ++++++++++++++++++ 5 files changed, 382 insertions(+) create mode 100644 analyzers/urlDNA.io/UrlDNA_New_Scan.json create mode 100644 analyzers/urlDNA.io/UrlDNA_Search.json create mode 100644 analyzers/urlDNA.io/requirements.txt create mode 100644 analyzers/urlDNA.io/urldna.py create mode 100644 analyzers/urlDNA.io/urldna_analyzer.py diff --git a/analyzers/urlDNA.io/UrlDNA_New_Scan.json b/analyzers/urlDNA.io/UrlDNA_New_Scan.json new file mode 100644 index 000000000..58422caeb --- /dev/null +++ b/analyzers/urlDNA.io/UrlDNA_New_Scan.json @@ -0,0 +1,73 @@ +{ + "name": "UrlDNA_New_Scan", + "author": "urlDNA.io (@urldna)", + "license": "MIT", + "url": "https://github.com/TheHive-Project/Cortex-Analyzers", + "version": "0.1.0", + "description": "Perform a new scan on urlDNA.io", + "dataTypeList": ["url"], + "command": "UrlDna/urldna_analyzer.py", + "baseConfig": "UrlDNA", + "config": { + "service":"new_scan" + }, + "configurationItems": [ + { + "name": "key", + "description": "UrlDNA API Key", + "type": "string", + "multi": false, + "required": true + }, + { + "name": "device", + "description": "The device type used for scraping, either DESKTOP or MOBILE. Ensure that the selected Device, User Agent, and Viewport settings are consistent to maintain coherence", + "type": "string", + "multi": false, + "required": false + }, + { + "name": "user_agent", + "description": "The browser User Agent used for the scan. Ensure that the selected Device, User Agent, and Viewport settings are consistent to maintain coherence", + "type": "string", + "multi": false, + "required": false + }, + { + "name": "viewport_width", + "description": "The screen width of the viewport used for the scan. Ensure that the selected Device, User Agent, and Viewport settings are consistent to maintain coherence", + "type": "number", + "multi": false, + "required": false + }, + { + "name": "viewport_height", + "description": "The screen height of the viewport used for the scan. Ensure that the selected Device, User Agent, and Viewport settings are consistent to maintain coherence", + "type": "number", + "multi": false, + "required": false + }, + { + "name": "scanned_from", + "description": "Specifies the country from which the new scan is performed. This feature is available exclusively with the API Premium plan.", + "type": "string", + "multi": false, + "required": false + }, + { + "name": "waiting_time", + "description": "The waiting time for the page to load during the scan. It can be a number between 5 and 15 seconds", + "type": "number", + "multi": false, + "required": false + }, + { + "name": "private_scan", + "description": "The visibility setting for the new scan: False will make it publicly visible and searchable", + "type": "boolean", + "multi": false, + "required": false + } + + ] +} diff --git a/analyzers/urlDNA.io/UrlDNA_Search.json b/analyzers/urlDNA.io/UrlDNA_Search.json new file mode 100644 index 000000000..a89b330ac --- /dev/null +++ b/analyzers/urlDNA.io/UrlDNA_Search.json @@ -0,0 +1,23 @@ +{ + "name": "UrlDNA_Search", + "author": "urlDNA.io (@urldna)", + "license": "MIT", + "url": "https://github.com/TheHive-Project/Cortex-Analyzers", + "version": "0.1.0", + "description": "Perform a search on urlDNA.io for IPs, domains or URLs", + "dataTypeList": ["ip", "domain", "url"], + "command": "UrlDna/urldna_analyzer.py", + "baseConfig": "UrlDNA", + "config": { + "service":"search" + }, + "configurationItems": [ + { + "name": "key", + "description": "UrlDNA API Key", + "type": "string", + "multi": false, + "required": true + } + ] +} \ No newline at end of file diff --git a/analyzers/urlDNA.io/requirements.txt b/analyzers/urlDNA.io/requirements.txt new file mode 100644 index 000000000..6aabc3cfa --- /dev/null +++ b/analyzers/urlDNA.io/requirements.txt @@ -0,0 +1,2 @@ +cortexutils +requests diff --git a/analyzers/urlDNA.io/urldna.py b/analyzers/urlDNA.io/urldna.py new file mode 100644 index 000000000..082088594 --- /dev/null +++ b/analyzers/urlDNA.io/urldna.py @@ -0,0 +1,159 @@ +#!/usr/bin/env python3 +import requests +import time + + +class UrlDNAException(Exception): + """Custom exception for errors related to UrlDNA operations.""" + pass + + +class UrlDNA: + """A client for interacting with the UrlDNA API.""" + + BASE_URL = "https://api.urldna.io" + + def __init__(self, query, data_type="url"): + """ + Initializes the UrlDNA instance with a query and data type. + + :param query: The query to be processed (e.g., URL, domain, or IP). + :param data_type: Type of the query ('url', 'domain', or 'ip'). + :raises ValueError: If the query is empty or the data type is unsupported. + """ + if not query: + raise ValueError("Query must be defined.") + if data_type not in ["url", "domain", "ip"]: + raise ValueError(f"Unsupported data type: {data_type}") + + self.query = query + self.data_type = data_type + self.session = None + + def search(self, api_key): + """ + Performs a search query on the UrlDNA API. + + :param api_key: API key for authentication. + :return: A dictionary containing the search results. + """ + self._init_session(api_key) + uri = "/search" + data = {"query": self._build_query()} + response = self.session.post(f"{self.BASE_URL}{uri}", json=data) + response.raise_for_status() + return response.json() + + def new_scan(self, api_key, device=None, user_agent=None, viewport_width=None, viewport_height=None, + waiting_time=None, private_scan=False, scanned_from="DEFAULT"): + """ + Initiates a new scan and polls for results until completion. + + :param api_key: API key for authentication. + :param device: The device type ('MOBILE' or 'DESKTOP'). Defaults to 'DESKTOP'. + :param user_agent: The user agent string for the scan. Defaults to a common desktop user agent. + :param viewport_width: Width of the viewport. Defaults to 1920. + :param viewport_height: Height of the viewport. Defaults to 1080. + :param waiting_time: Time to wait before starting the scan. Defaults to 5 seconds. + :param private_scan: Whether the scan is private. Defaults to False. + :param scanned_from: The origin of the scan. Defaults to 'DEFAULT'. + :return: A dictionary containing the scan results. + :raises UrlDNAException: If the scan fails or polling times out. + """ + self._init_session(api_key) + try: + scan_id = self._initiate_scan(device, user_agent, viewport_width, viewport_height, + waiting_time, private_scan, scanned_from) + return self._poll_for_result(scan_id) + except requests.RequestException as exc: + raise UrlDNAException(f"HTTP error during scan: {exc}") + except Exception as exc: + raise UrlDNAException(f"Error during scan: {exc}") + + def _build_query(self): + """ + Builds the query string based on the data type. + + :return: A formatted query string. + """ + if self.data_type == "url": + return f"submitted_url = {self.query}" + if self.data_type == "domain": + return f"domain = {self.query}" + if self.data_type == "ip": + return f"ip = {self.query}" + return self.query + + def _initiate_scan(self, device, user_agent, viewport_width, viewport_height, waiting_time, + private_scan, scanned_from): + """ + Sends a request to initiate a new scan. + + :param device: The device type for the scan. + :param user_agent: The user agent string for the scan. + :param viewport_width: The viewport width for the scan. + :param viewport_height: The viewport height for the scan. + :param waiting_time: Time to wait before starting the scan. + :param private_scan: Whether the scan is private. + :param scanned_from: The origin of the scan. + :return: The scan ID for the initiated scan. + :raises UrlDNAException: If the scan ID is not returned. + """ + data = { + "submitted_url": self.query, + "device": device or "DESKTOP", + "user_agent": user_agent or ( + "Mozilla/5.0 (Windows NT 10.0;Win64;x64) AppleWebKit/537.36 " + "(KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36"), + "width": viewport_width or 1920, + "height": viewport_height or 1080, + "scanned_from": scanned_from, + "waiting_time": waiting_time or 5, + "private_scan": private_scan, + } + + response = self.session.post(f"{self.BASE_URL}/scan", json=data) + response.raise_for_status() + scan_id = response.json().get("id") + if not scan_id: + raise UrlDNAException("Scan ID not returned.") + return scan_id + + def _poll_for_result(self, scan_id): + """ + Polls the API for the scan results until they are available. + + :param scan_id: The scan ID to poll. + :return: A dictionary containing the scan results. + :raises UrlDNAException: If the polling times out. + """ + uri = f"/scan/{scan_id}" + max_attempts = 10 + poll_interval = 10 + + for attempt in range(max_attempts): + if attempt > 0: + time.sleep(poll_interval) + response = self.session.get(f"{self.BASE_URL}{uri}") + response.raise_for_status() + result = response.json() + + status = result.get("scan", {}).get("status") + if status not in ["RUNNING", "PENDING"]: + return result + + raise UrlDNAException("Polling timed out before the scan completed.") + + def _init_session(self, api_key): + """ + Initializes an HTTP session with the API key for authentication. + + :param api_key: The API key for authentication. + """ + if not self.session: + self.session = requests.Session() + self.session.headers.update({ + "Content-Type": "application/json", + "User-Agent": "Cortex", + "Authorization": api_key + }) diff --git a/analyzers/urlDNA.io/urldna_analyzer.py b/analyzers/urlDNA.io/urldna_analyzer.py new file mode 100644 index 000000000..4ef2a8875 --- /dev/null +++ b/analyzers/urlDNA.io/urldna_analyzer.py @@ -0,0 +1,125 @@ +#!/usr/bin/env python3 +from cortexutils.analyzer import Analyzer +from urldna import UrlDNA, UrlDNAException + + +class UrlDNAAnalyzer(Analyzer): + """Analyzer for performing UrlDNA operations.""" + + def __init__(self): + """ + Initializes the analyzer with configuration parameters. + """ + super().__init__() + self.service = self.get_param('config.service', None, 'Service parameter is missing') + self.api_key = self.get_param('config.key', None, 'Missing UrlDNA API key') + self.device = self.get_param('config.device') + self.user_agent = self.get_param('config.user_agent') + self.viewport_width = self.get_param('config.viewport_width') + self.viewport_height = self.get_param('config.viewport_height') + self.waiting_time = self.get_param('config.waiting_time') + self.private_scan = self.get_param('config.private_scan') + self.scanned_from = self.get_param('config.scanned_from') + + def new_scan(self, indicator): + """ + Scans a website or resource for indicators. + + :param indicator: The URL to scan. + :return: A dictionary containing the scan results. + """ + try: + urldna = UrlDNA(indicator) + return urldna.new_scan(self.api_key, self.device, self.user_agent, self.viewport_width, + self.viewport_height, self.waiting_time, self.private_scan, self.scanned_from) + except UrlDNAException as exc: + self.error(f"Error during urlDNA scan: {exc}") + raise + except Exception as exc: + self.error(f"Unexpected error: {exc}") + raise + + def search(self, query): + """ + Performs a search query on the UrlDNA API. + + :param query: The query string. + :return: A dictionary containing the search results. + """ + try: + urldna = UrlDNA(query, self.data_type) + return urldna.search(self.api_key) + except UrlDNAException as exc: + self.error(f"Error during search: {exc}") + raise + except Exception as exc: + self.error(f"Unexpected error: {exc}") + raise + + def run(self): + """ + Executes the analyzer logic based on the configured service and data type. + """ + if not self.service or not self.data_type: + self.error('Service or data_type is missing.') + raise ValueError('Invalid configuration.') + + if self.service == 'new_scan' and self.data_type == 'url': + indicator = self.get_data() + try: + result = self.new_scan(indicator) + self.report({ + 'type': self.data_type, + 'query': indicator, + 'service': self.service, + 'indicator': result + }) + except Exception as exc: + self.error(f"Run failed: {exc}") + raise + elif self.service == 'search': + query = self.get_data() + try: + result = self.search(query) + self.report({ + 'type': self.data_type, + 'query': query, + 'service': self.service, + 'indicator': result + }) + except Exception as exc: + self.error(f"Run failed: {exc}") + raise + else: + self.error('Invalid service or unsupported data type.') + raise ValueError('Unsupported service or data type.') + + def summary(self, raw): + """ + Generates a summary based on the scan results. + + :param raw: The raw scan data. + :return: A dictionary containing summary taxonomies. + """ + taxonomies = [] + level = "info" + namespace = "urlDNA.io" + predicate = "Scan" if raw["service"] == 'new_scan' else "Search" + + indicator = raw.get("indicator", {}) + if predicate == "Search": + total = len(indicator) + value = f"{total} result{'s' if total != 1 else ''}" if total > 0 else "No results found" + else: + malicious = indicator.get("malicious", {}) + is_malicious = malicious.get("malicious", False) + threat_type = malicious.get("threat", "Unknown") + level = 'malicious' if is_malicious else 'info' + value = f"Malicious: {is_malicious}, Threat Type: {threat_type}" + + taxonomies.append(self.build_taxonomy(level, namespace, predicate, value)) + return {"taxonomies": taxonomies} + + +if __name__ == '__main__': + UrlDNAAnalyzer().run()