Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: add urlDNA.io analyzers #1304

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions analyzers/urlDNA.io/UrlDNA_New_Scan.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
{
"name": "UrlDNA_New_Scan",
"author": "urlDNA.io (@urldna)",
"license": "MIT",
"url": "https://github.com/TheHive-Project/Cortex-Analyzers",
"version": "0.1.0",
"description": "Perform a new scan on urlDNA.io",
"dataTypeList": ["url"],
"command": "UrlDna/urldna_analyzer.py",
"baseConfig": "UrlDNA",
"config": {
"service":"new_scan"
},
"configurationItems": [
{
"name": "key",
"description": "UrlDNA API Key",
"type": "string",
"multi": false,
"required": true
},
{
"name": "device",
"description": "The device type used for scraping, either DESKTOP or MOBILE. Ensure that the selected Device, User Agent, and Viewport settings are consistent to maintain coherence",
"type": "string",
"multi": false,
"required": false
},
{
"name": "user_agent",
"description": "The browser User Agent used for the scan. Ensure that the selected Device, User Agent, and Viewport settings are consistent to maintain coherence",
"type": "string",
"multi": false,
"required": false
},
{
"name": "viewport_width",
"description": "The screen width of the viewport used for the scan. Ensure that the selected Device, User Agent, and Viewport settings are consistent to maintain coherence",
"type": "number",
"multi": false,
"required": false
},
{
"name": "viewport_height",
"description": "The screen height of the viewport used for the scan. Ensure that the selected Device, User Agent, and Viewport settings are consistent to maintain coherence",
"type": "number",
"multi": false,
"required": false
},
{
"name": "scanned_from",
"description": "Specifies the country from which the new scan is performed. This feature is available exclusively with the API Premium plan.",
"type": "string",
"multi": false,
"required": false
},
{
"name": "waiting_time",
"description": "The waiting time for the page to load during the scan. It can be a number between 5 and 15 seconds",
"type": "number",
"multi": false,
"required": false
},
{
"name": "private_scan",
"description": "The visibility setting for the new scan: False will make it publicly visible and searchable",
"type": "boolean",
"multi": false,
"required": false
}

]
}
23 changes: 23 additions & 0 deletions analyzers/urlDNA.io/UrlDNA_Search.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"name": "UrlDNA_Search",
"author": "urlDNA.io (@urldna)",
"license": "MIT",
"url": "https://github.com/TheHive-Project/Cortex-Analyzers",
"version": "0.1.0",
"description": "Perform a search on urlDNA.io for IPs, domains or URLs",
"dataTypeList": ["ip", "domain", "url"],
"command": "UrlDna/urldna_analyzer.py",
"baseConfig": "UrlDNA",
"config": {
"service":"search"
},
"configurationItems": [
{
"name": "key",
"description": "UrlDNA API Key",
"type": "string",
"multi": false,
"required": true
}
]
}
2 changes: 2 additions & 0 deletions analyzers/urlDNA.io/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
cortexutils
requests
159 changes: 159 additions & 0 deletions analyzers/urlDNA.io/urldna.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
#!/usr/bin/env python3
import requests
import time


class UrlDNAException(Exception):
"""Custom exception for errors related to UrlDNA operations."""
pass


class UrlDNA:
"""A client for interacting with the UrlDNA API."""

BASE_URL = "https://api.urldna.io"

def __init__(self, query, data_type="url"):
"""
Initializes the UrlDNA instance with a query and data type.

:param query: The query to be processed (e.g., URL, domain, or IP).
:param data_type: Type of the query ('url', 'domain', or 'ip').
:raises ValueError: If the query is empty or the data type is unsupported.
"""
if not query:
raise ValueError("Query must be defined.")
if data_type not in ["url", "domain", "ip"]:
raise ValueError(f"Unsupported data type: {data_type}")

self.query = query
self.data_type = data_type
self.session = None

def search(self, api_key):
"""
Performs a search query on the UrlDNA API.

:param api_key: API key for authentication.
:return: A dictionary containing the search results.
"""
self._init_session(api_key)
uri = "/search"
data = {"query": self._build_query()}
response = self.session.post(f"{self.BASE_URL}{uri}", json=data)
response.raise_for_status()
return response.json()

def new_scan(self, api_key, device=None, user_agent=None, viewport_width=None, viewport_height=None,
waiting_time=None, private_scan=False, scanned_from="DEFAULT"):
"""
Initiates a new scan and polls for results until completion.

:param api_key: API key for authentication.
:param device: The device type ('MOBILE' or 'DESKTOP'). Defaults to 'DESKTOP'.
:param user_agent: The user agent string for the scan. Defaults to a common desktop user agent.
:param viewport_width: Width of the viewport. Defaults to 1920.
:param viewport_height: Height of the viewport. Defaults to 1080.
:param waiting_time: Time to wait before starting the scan. Defaults to 5 seconds.
:param private_scan: Whether the scan is private. Defaults to False.
:param scanned_from: The origin of the scan. Defaults to 'DEFAULT'.
:return: A dictionary containing the scan results.
:raises UrlDNAException: If the scan fails or polling times out.
"""
self._init_session(api_key)
try:
scan_id = self._initiate_scan(device, user_agent, viewport_width, viewport_height,
waiting_time, private_scan, scanned_from)
return self._poll_for_result(scan_id)
except requests.RequestException as exc:
raise UrlDNAException(f"HTTP error during scan: {exc}")
except Exception as exc:
raise UrlDNAException(f"Error during scan: {exc}")

def _build_query(self):
"""
Builds the query string based on the data type.

:return: A formatted query string.
"""
if self.data_type == "url":
return f"submitted_url = {self.query}"
if self.data_type == "domain":
return f"domain = {self.query}"
if self.data_type == "ip":
return f"ip = {self.query}"
return self.query

def _initiate_scan(self, device, user_agent, viewport_width, viewport_height, waiting_time,
private_scan, scanned_from):
"""
Sends a request to initiate a new scan.

:param device: The device type for the scan.
:param user_agent: The user agent string for the scan.
:param viewport_width: The viewport width for the scan.
:param viewport_height: The viewport height for the scan.
:param waiting_time: Time to wait before starting the scan.
:param private_scan: Whether the scan is private.
:param scanned_from: The origin of the scan.
:return: The scan ID for the initiated scan.
:raises UrlDNAException: If the scan ID is not returned.
"""
data = {
"submitted_url": self.query,
"device": device or "DESKTOP",
"user_agent": user_agent or (
"Mozilla/5.0 (Windows NT 10.0;Win64;x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36"),
"width": viewport_width or 1920,
"height": viewport_height or 1080,
"scanned_from": scanned_from,
"waiting_time": waiting_time or 5,
"private_scan": private_scan,
}

response = self.session.post(f"{self.BASE_URL}/scan", json=data)
response.raise_for_status()
scan_id = response.json().get("id")
if not scan_id:
raise UrlDNAException("Scan ID not returned.")
return scan_id

def _poll_for_result(self, scan_id):
"""
Polls the API for the scan results until they are available.

:param scan_id: The scan ID to poll.
:return: A dictionary containing the scan results.
:raises UrlDNAException: If the polling times out.
"""
uri = f"/scan/{scan_id}"
max_attempts = 10
poll_interval = 10

for attempt in range(max_attempts):
if attempt > 0:
time.sleep(poll_interval)
response = self.session.get(f"{self.BASE_URL}{uri}")
response.raise_for_status()
result = response.json()

status = result.get("scan", {}).get("status")
if status not in ["RUNNING", "PENDING"]:
return result

raise UrlDNAException("Polling timed out before the scan completed.")

def _init_session(self, api_key):
"""
Initializes an HTTP session with the API key for authentication.

:param api_key: The API key for authentication.
"""
if not self.session:
self.session = requests.Session()
self.session.headers.update({
"Content-Type": "application/json",
"User-Agent": "Cortex",
"Authorization": api_key
})
125 changes: 125 additions & 0 deletions analyzers/urlDNA.io/urldna_analyzer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
#!/usr/bin/env python3
from cortexutils.analyzer import Analyzer
from urldna import UrlDNA, UrlDNAException


class UrlDNAAnalyzer(Analyzer):
"""Analyzer for performing UrlDNA operations."""

def __init__(self):
"""
Initializes the analyzer with configuration parameters.
"""
super().__init__()
self.service = self.get_param('config.service', None, 'Service parameter is missing')
self.api_key = self.get_param('config.key', None, 'Missing UrlDNA API key')
self.device = self.get_param('config.device')
self.user_agent = self.get_param('config.user_agent')
self.viewport_width = self.get_param('config.viewport_width')
self.viewport_height = self.get_param('config.viewport_height')
self.waiting_time = self.get_param('config.waiting_time')
self.private_scan = self.get_param('config.private_scan')
self.scanned_from = self.get_param('config.scanned_from')

def new_scan(self, indicator):
"""
Scans a website or resource for indicators.

:param indicator: The URL to scan.
:return: A dictionary containing the scan results.
"""
try:
urldna = UrlDNA(indicator)
return urldna.new_scan(self.api_key, self.device, self.user_agent, self.viewport_width,
self.viewport_height, self.waiting_time, self.private_scan, self.scanned_from)
except UrlDNAException as exc:
self.error(f"Error during urlDNA scan: {exc}")
raise
except Exception as exc:
self.error(f"Unexpected error: {exc}")
raise

def search(self, query):
"""
Performs a search query on the UrlDNA API.

:param query: The query string.
:return: A dictionary containing the search results.
"""
try:
urldna = UrlDNA(query, self.data_type)
return urldna.search(self.api_key)
except UrlDNAException as exc:
self.error(f"Error during search: {exc}")
raise
except Exception as exc:
self.error(f"Unexpected error: {exc}")
raise

def run(self):
"""
Executes the analyzer logic based on the configured service and data type.
"""
if not self.service or not self.data_type:
self.error('Service or data_type is missing.')
raise ValueError('Invalid configuration.')

if self.service == 'new_scan' and self.data_type == 'url':
indicator = self.get_data()
try:
result = self.new_scan(indicator)
self.report({
'type': self.data_type,
'query': indicator,
'service': self.service,
'indicator': result
})
except Exception as exc:
self.error(f"Run failed: {exc}")
raise
elif self.service == 'search':
query = self.get_data()
try:
result = self.search(query)
self.report({
'type': self.data_type,
'query': query,
'service': self.service,
'indicator': result
})
except Exception as exc:
self.error(f"Run failed: {exc}")
raise
else:
self.error('Invalid service or unsupported data type.')
raise ValueError('Unsupported service or data type.')

def summary(self, raw):
"""
Generates a summary based on the scan results.

:param raw: The raw scan data.
:return: A dictionary containing summary taxonomies.
"""
taxonomies = []
level = "info"
namespace = "urlDNA.io"
predicate = "Scan" if raw["service"] == 'new_scan' else "Search"

indicator = raw.get("indicator", {})
if predicate == "Search":
total = len(indicator)
value = f"{total} result{'s' if total != 1 else ''}" if total > 0 else "No results found"
else:
malicious = indicator.get("malicious", {})
is_malicious = malicious.get("malicious", False)
threat_type = malicious.get("threat", "Unknown")
level = 'malicious' if is_malicious else 'info'
value = f"Malicious: {is_malicious}, Threat Type: {threat_type}"

taxonomies.append(self.build_taxonomy(level, namespace, predicate, value))
return {"taxonomies": taxonomies}


if __name__ == '__main__':
UrlDNAAnalyzer().run()