diff --git a/boefjes/boefjes/plugins/kat_kat_finding_types/kat_finding_types.json b/boefjes/boefjes/plugins/kat_kat_finding_types/kat_finding_types.json index a8b69e6a0bf..74939f27374 100644 --- a/boefjes/boefjes/plugins/kat_kat_finding_types/kat_finding_types.json +++ b/boefjes/boefjes/plugins/kat_kat_finding_types/kat_finding_types.json @@ -489,6 +489,13 @@ "impact": "Disallowed domains are domains that are for example 'world writable', this opens up the possibility for an atacker to host malicious files on a csp whitelisted domain.", "recommendation": "Remove the offending hostname from the CSP header." }, + "KAT-LEGACY-SECURITY-LOCATION": { + "description": "This website only has a legacy location security.txt file.", + "source": "https://www.rfc-editor.org/rfc/rfc9116#section-3-1", + "risk": "info", + "impact": "Only providing the legacy url will mean, as time goes on, more and more tools and researchers will not find your Security disclosure policy possibly leading to less than ideal disclosure.", + "recommendation": "Add a security.txt file location in the /.well-known folder." + }, "KAT-NONSTANDARD-HEADERS": { "description": "Headers are used that are nonstandard and should not be used anymore.", "risk": "low", diff --git a/boefjes/boefjes/plugins/kat_security_txt_downloader/boefje.json b/boefjes/boefjes/plugins/kat_security_txt_downloader/boefje.json index 3f0e3452478..7d84c537058 100644 --- a/boefjes/boefjes/plugins/kat_security_txt_downloader/boefje.json +++ b/boefjes/boefjes/plugins/kat_security_txt_downloader/boefje.json @@ -2,6 +2,10 @@ "id": "security_txt_downloader", "name": "Security.txt downloader", "description": "Downloads the security.txt file from the target website to check if it contains all the required elements.", + "environment_keys": [ + "USERAGENT", + "TIMEOUT" + ], "consumes": [ "Website" ], diff --git a/boefjes/boefjes/plugins/kat_security_txt_downloader/main.py b/boefjes/boefjes/plugins/kat_security_txt_downloader/main.py index bad69556c3b..6f5bc5b1ba3 100644 --- a/boefjes/boefjes/plugins/kat_security_txt_downloader/main.py +++ b/boefjes/boefjes/plugins/kat_security_txt_downloader/main.py @@ -5,10 +5,12 @@ import requests from forcediphttpsadapter.adapters import ForcedIPHTTPSAdapter from requests import Session -from requests.models import Response from boefjes.job_models import BoefjeMeta +DEFAULT_TIMEOUT = 30 +DEFAULT_USERAGENT = "OpenKAT" + def run(boefje_meta: BoefjeMeta) -> list[tuple[set, bytes | str]]: input_ = boefje_meta.arguments["input"] @@ -16,50 +18,49 @@ def run(boefje_meta: BoefjeMeta) -> list[tuple[set, bytes | str]]: scheme = input_["ip_service"]["service"]["name"] ip = input_["ip_service"]["ip_port"]["address"]["address"] - useragent = getenv("USERAGENT", default="OpenKAT") + useragent = getenv("USERAGENT", default=DEFAULT_USERAGENT) + + try: + timeout = int(getenv("TIMEOUT", default=DEFAULT_TIMEOUT)) + except ValueError: + timeout = DEFAULT_TIMEOUT + session = requests.Session() results = {} for path in [".well-known/security.txt", "security.txt"]: - uri = f"{scheme}://{netloc}/{path}" + request_url = f"{scheme}://{netloc}/{path}" if scheme == "https": - session.mount(uri, ForcedIPHTTPSAdapter(dest_ip=ip)) + session.mount(request_url, ForcedIPHTTPSAdapter(dest_ip=ip)) else: addr = ipaddress.ip_address(ip) - netloc = f"[{ip}]" if addr.version == 6 else ip - - uri = f"{scheme}://{netloc}/{path}" - - response = do_request(netloc, session, uri, useragent) - - # if the response is 200, return the content - if response.status_code == 200: - results[path] = {"content": response.content.decode(), "url": response.url, "ip": ip, "status": 200} - # if the response is 301, we need to follow the location header to the correct security txt, - # we can not force the ip anymore - elif response.status_code in [301, 302, 307, 308]: - uri = response.headers["Location"] - response = requests.get(uri, stream=True, timeout=30, verify=False) # noqa: S501 - if response.raw._connection: - ip = response.raw._connection.sock.getpeername()[0] - else: - ip = "" - results[path] = { - "content": response.content.decode(), - "url": response.url, - "ip": str(ip), - "status": response.status_code, - } - else: - results[path] = {"content": None, "url": None, "ip": None, "status": response.status_code} + iploc = f"[{ip}]" if addr.version == 6 else ip + request_url = f"{scheme}://{iploc}/{path}" + + response = do_request(netloc, session, request_url, useragent, timeout) + + # we can not force the ip anymore because we dont know it yet. + # TODO return a redirected URL and have OpenKAT figure out if we want to follow this. + if response.status_code in [301, 302, 307, 308]: + request_url = response.headers["Location"] + response = requests.get(request_url, stream=True, timeout=timeout, verify=False) # noqa: S501 + ip = str(response.raw._connection.sock.getpeername()[0]) + + results[path] = { + "content": response.content.decode(), + "url": response.url, + "request_url": request_url, + "ip": ip, + "status": response.status_code, + } return [(set(), json.dumps(results))] -def do_request(hostname: str, session: Session, uri: str, useragent: str) -> Response: +def do_request(hostname: str, session: Session, uri: str, useragent: str, timeout: int): response = session.get( - uri, headers={"Host": hostname, "User-Agent": useragent}, verify=False, allow_redirects=False + uri, headers={"Host": hostname, "User-Agent": useragent}, timeout=timeout, verify=False, allow_redirects=False ) return response diff --git a/boefjes/boefjes/plugins/kat_security_txt_downloader/normalize.py b/boefjes/boefjes/plugins/kat_security_txt_downloader/normalize.py index dc171d7daf1..0007eaf7f2f 100644 --- a/boefjes/boefjes/plugins/kat_security_txt_downloader/normalize.py +++ b/boefjes/boefjes/plugins/kat_security_txt_downloader/normalize.py @@ -9,15 +9,20 @@ from octopoes.models.ooi.network import IPAddressV4, IPAddressV6, IPPort, Network from octopoes.models.ooi.service import IPService, Service from octopoes.models.ooi.web import URL, SecurityTXT, Website +from octopoes.models.types import Finding, KATFindingType def run(input_ooi: dict, raw: bytes) -> Iterable[NormalizerOutput]: results = json.loads(raw) website_original = Reference.from_str(input_ooi["primary_key"]) + valid_results = {} for path, details in results.items(): - if details["content"] is None: + # remove any nonsense locations from our validresults. + if details["content"] is None or details.get("status", 200) != 200: continue + valid_results[path] = details + url_original = URL( raw=f'{input_ooi["ip_service"]["service"]["name"]}://{input_ooi["hostname"]["name"]}/{path}', network=Network(name=input_ooi["hostname"]["network"]["name"]).reference, @@ -25,6 +30,7 @@ def run(input_ooi: dict, raw: bytes) -> Iterable[NormalizerOutput]: yield url_original url = URL(raw=details["url"], network=Network(name=input_ooi["hostname"]["network"]["name"]).reference) yield url + url_parts = urlparse(details["url"]) # we need to check if the website of the response is the same as the input website if ( @@ -82,3 +88,11 @@ def run(input_ooi: dict, raw: bytes) -> Iterable[NormalizerOutput]: security_txt=None, ) yield security_txt_original + + # Check for legacy url https://www.rfc-editor.org/rfc/rfc9116#section-3-1 + if "security.txt" in valid_results and ".well-known/security.txt" not in valid_results: + ft = KATFindingType(id="KAT-LEGACY-SECURITY-LOCATION") + yield ft + yield Finding( + description="Only legacy /security.txt location found.", finding_type=ft.reference, ooi=website_original + ) diff --git a/boefjes/boefjes/plugins/kat_security_txt_downloader/schema.json b/boefjes/boefjes/plugins/kat_security_txt_downloader/schema.json new file mode 100644 index 00000000000..b5d80f22fdd --- /dev/null +++ b/boefjes/boefjes/plugins/kat_security_txt_downloader/schema.json @@ -0,0 +1,21 @@ +{ + "title": "Arguments", + "type": "object", + "properties": { + "USERAGENT": { + "title": "USERAGENT", + "maxLength": 128, + "type": "string", + "description": "The Useragent used by the downloader.", + "default": "OpenKat" + }, + "TIMEOUT": { + "title": "TIMEOUT", + "maximum": 9999, + "minimum": 0, + "type": "integer", + "description": "The timeout used by the downloader before it fails a url.", + "default": 30 + } + } +} diff --git a/boefjes/tests/examples/inputs/security_txt_result_different_website.json b/boefjes/tests/examples/inputs/security_txt_result_different_website.json index 300097a3a39..cb98fa348ac 100644 --- a/boefjes/tests/examples/inputs/security_txt_result_different_website.json +++ b/boefjes/tests/examples/inputs/security_txt_result_different_website.json @@ -2,6 +2,7 @@ ".well-known/security.txt": { "content": "This is the content", "url": "https://www.example.com/.well-known/security.txt", - "ip": "192.0.2.1" + "ip": "192.0.2.1", + "status": 200 } } diff --git a/boefjes/tests/examples/inputs/security_txt_result_no_file.json b/boefjes/tests/examples/inputs/security_txt_result_no_file.json new file mode 100644 index 00000000000..5c877405143 --- /dev/null +++ b/boefjes/tests/examples/inputs/security_txt_result_no_file.json @@ -0,0 +1,14 @@ +{ + ".well-known/security.txt": { + "content": "404 Not Found

Not Found

The requested URL \"https://www.example.com/.well-known/security.txt\" was not found on this server.

", + "url": "https://www.example.com/.well-known/security.txt", + "ip": "192.0.2.0", + "status": 404 + }, + "security.txt": { + "content": "404 Not Found

Not Found

The requested URL \"https://www.example.com/security.txt\" was not found on this server.

", + "url": "https://www.example.com/security.txt", + "ip": "192.0.2.0", + "status": 404 + } +} diff --git a/boefjes/tests/examples/inputs/security_txt_result_same_website.json b/boefjes/tests/examples/inputs/security_txt_result_same_website.json index a0d3af06de7..616aa9e3201 100644 --- a/boefjes/tests/examples/inputs/security_txt_result_same_website.json +++ b/boefjes/tests/examples/inputs/security_txt_result_same_website.json @@ -2,6 +2,7 @@ ".well-known/security.txt": { "content": "This is the content", "url": "https://example.com/.well-known/security.txt", - "ip": "192.0.2.0" + "ip": "192.0.2.0", + "status": 200 } } diff --git a/boefjes/tests/examples/inputs/security_txt_results_legacy_only.json b/boefjes/tests/examples/inputs/security_txt_results_legacy_only.json new file mode 100644 index 00000000000..022df633edd --- /dev/null +++ b/boefjes/tests/examples/inputs/security_txt_results_legacy_only.json @@ -0,0 +1,14 @@ +{ + ".well-known/security.txt": { + "content": "404 Not Found

Not Found

The requested URL \"https://www.example.com/.well-known/security.txt\" was not found on this server.

", + "url": "https://example.com/.well-known/security.txt", + "ip": "192.0.2.0", + "status": 404 + }, + "security.txt": { + "content": "Contact: mailto:security@example.com\nPreferred-Languages: nl, en\nExpires: 2030-01-01T00:00:00.000Z", + "url": "https://example.com/security.txt", + "ip": "192.0.2.0", + "status": 200 + } +} diff --git a/octopoes/bits/missing_security_txt/__init__.py b/octopoes/bits/missing_security_txt/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/octopoes/bits/missing_security_txt/bit.py b/octopoes/bits/missing_security_txt/bit.py new file mode 100644 index 00000000000..bf6691c4e20 --- /dev/null +++ b/octopoes/bits/missing_security_txt/bit.py @@ -0,0 +1,9 @@ +from bits.definitions import BitDefinition, BitParameterDefinition +from octopoes.models.ooi.web import SecurityTXT, Website + +BIT = BitDefinition( + id="missing_security_txt", + consumes=Website, + parameters=[BitParameterDefinition(ooi_type=SecurityTXT, relation_path="website")], + module="bits.missing_security_txt.missing_security_txt", +) diff --git a/octopoes/bits/missing_security_txt/missing_security_txt.py b/octopoes/bits/missing_security_txt/missing_security_txt.py new file mode 100644 index 00000000000..f7c1f88a88c --- /dev/null +++ b/octopoes/bits/missing_security_txt/missing_security_txt.py @@ -0,0 +1,16 @@ +from collections.abc import Iterator + +from octopoes.models import OOI +from octopoes.models.ooi.findings import Finding, KATFindingType +from octopoes.models.ooi.web import SecurityTXT, Website + + +def run(input_ooi: Website, additional_oois: list[SecurityTXT], config: dict[str, str]) -> Iterator[OOI]: + if not additional_oois: + ft = KATFindingType(id="KAT-NO-SECURITY-TXT") + yield ft + yield Finding( + ooi=input_ooi.reference, + finding_type=ft.reference, + description="This website does not have a security.txt file", + ) diff --git a/rocky/reports/report_types/web_system_report/report.py b/rocky/reports/report_types/web_system_report/report.py index 47f62c760c9..23c504a2be3 100644 --- a/rocky/reports/report_types/web_system_report/report.py +++ b/rocky/reports/report_types/web_system_report/report.py @@ -8,7 +8,7 @@ from django.utils.translation import gettext_lazy as _ from octopoes.models.ooi.dns.zone import Hostname -from octopoes.models.ooi.findings import KATFindingType, RiskLevelSeverity +from octopoes.models.ooi.findings import RiskLevelSeverity from octopoes.models.ooi.network import IPAddressV4, IPAddressV6 from reports.report_types.definitions import Report @@ -132,9 +132,10 @@ def collect_data(self, input_oois: Iterable[str], valid_time: datetime) -> dict[ no_certificate_finding_types = self.group_finding_types_by_source( self.octopoes_api_connector.query_many(query, valid_time, all_hostnames), ["KAT-NO-CERTIFICATE"] ) - query = "Hostname. dict[ ) check.redirects_http_https = not any(url_finding_types.get(hostname, [])) check.offers_https = not any(no_certificate_finding_types.get(hostname, [])) - check.has_security_txt = bool(has_security_txt_finding_types.get(hostname, [])) - security_txt_finding_types = [ - KATFindingType( - id="KAT-NO-SECURITY-TXT", - description="This hostname does not have a Security.txt file.", - risk_severity=RiskLevelSeverity.RECOMMENDATION, - recommendation="Make sure there is a security.txt available.", - ) - ] - + check.has_security_txt = not any(security_txt_finding_types.get(hostname, [])) check.no_uncommon_ports = not any(port_finding_types.get(hostname, [])) check.has_certificates = check.offers_https check.certificates_not_expired = check.has_certificates and "KAT-CERTIFICATE-EXPIRED" not in [ @@ -190,7 +182,7 @@ def collect_data(self, input_oois: Iterable[str], valid_time: datetime) -> dict[ + no_certificate_finding_types.get(hostname, []) + port_finding_types.get(hostname, []) + certificate_finding_types.get(hostname, []) - + security_txt_finding_types + + security_txt_finding_types.get(hostname, []) ) for finding_type in new_types: diff --git a/rocky/tests/integration/test_reports.py b/rocky/tests/integration/test_reports.py index 6a820e05fde..3beffea60d8 100644 --- a/rocky/tests/integration/test_reports.py +++ b/rocky/tests/integration/test_reports.py @@ -9,7 +9,7 @@ from octopoes.api.models import Declaration from octopoes.connector.octopoes import OctopoesAPIConnector from octopoes.models import Reference -from octopoes.models.ooi.findings import Finding, KATFindingType, RiskLevelSeverity +from octopoes.models.ooi.findings import Finding from octopoes.models.ooi.reports import ReportData from tests.integration.conftest import seed_system @@ -22,7 +22,7 @@ def test_web_report(octopoes_api_connector: OctopoesAPIConnector, valid_time): data = report.collect_data([input_ooi], valid_time)[input_ooi] assert data["input_ooi"] == input_ooi - assert len(data["finding_types"]) == 1 + assert len(data["finding_types"]) == 0 assert len(data["web_checks"]) == 1 assert asdict(data["web_checks"].checks[0]) == { @@ -188,12 +188,6 @@ def test_aggregate_report(octopoes_api_connector: OctopoesAPIConnector, valid_ti }, "safe_connections": {"number_of_compliant": 1, "total": 1}, } - security_txt_finding_type = KATFindingType( - id="KAT-NO-SECURITY-TXT", - description="This hostname does not have a Security.txt file.", - recommendation="Make sure there is a security.txt available.", - risk_severity=RiskLevelSeverity.RECOMMENDATION, - ) assert data["basic_security"]["summary"]["Web"] == { "rpki": {"number_of_compliant": 2, "total": 2}, "system_specific": { @@ -210,10 +204,7 @@ def test_aggregate_report(octopoes_api_connector: OctopoesAPIConnector, valid_ti "Certificate is not expired": 2, "Certificate is not expiring soon": 2, }, - "ips": { - "IPAddressV4|test|192.0.2.3": [security_txt_finding_type], - "IPAddressV6|test|3e4d:64a2:cb49:bd48:a1ba:def3:d15d:9230": [security_txt_finding_type], - }, + "ips": {"IPAddressV4|test|192.0.2.3": [], "IPAddressV6|test|3e4d:64a2:cb49:bd48:a1ba:def3:d15d:9230": []}, }, "safe_connections": {"number_of_compliant": 2, "total": 2}, } @@ -385,4 +376,3 @@ def test_multi_report( "Other": {"total": 2, "enabled": 2}, "Web": {"total": 2, "enabled": 2}, } - assert multi_data["recommendation_counts"] == {"Make sure there is a security.txt available.": 2} diff --git a/rocky/tests/reports/test_web_systems_report.py b/rocky/tests/reports/test_web_systems_report.py index 71dab995e72..ca18c77befa 100644 --- a/rocky/tests/reports/test_web_systems_report.py +++ b/rocky/tests/reports/test_web_systems_report.py @@ -1,7 +1,7 @@ from reports.report_types.web_system_report.report import WebSystemReport -def test_web_report_no_findings(mock_octopoes_api_connector, valid_time, hostname, security_txt): +def test_web_report_no_findings(mock_octopoes_api_connector, valid_time, hostname): mock_octopoes_api_connector.oois = {hostname.reference: hostname} mock_octopoes_api_connector.queries = { "Hostname.