From fdb1dd74bc935027a1212b57066a0b6f52fe97f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Krawczyk?= <98938598+radkrawczyk@users.noreply.github.com> Date: Mon, 10 Jun 2024 14:04:13 +0200 Subject: [PATCH] Policy object pusher and security group of interests converters (#720) * Policy object pusher * Fixing group of interest models * Group of interests converters * Moving converters test to correct directory, and fixing intrusion prevention UT --- .../sdwan/test_ssl_decryption.py | 83 ++++++++ .../models/configuration/config_migration.py | 19 +- .../configuration/feature_profile/common.py | 5 + .../sdwan/policy_object/__init__.py | 2 + .../sdwan/policy_object/security/aip.py | 40 +++- .../security/intrusion_prevention.py | 32 ++- .../policy_object/security/ssl_decryption.py | 71 ++++++- .../security/ssl_decryption_profile.py | 45 +++- .../policy_object/security/url_filtering.py | 41 +++- .../models/policy/policy_definition.py | 1 + .../policy_converters/test_aip_converter.py | 46 ++++ .../policy_converters}/test_amp_converter.py | 0 .../test_extended_community_converter.py | 0 .../test_intrusion_prevention.py | 72 +++++++ .../test_mirror_converter.py | 0 .../test_ssl_decryption_converter.py | 106 ++++++++++ .../test_ssl_profile_converter.py | 91 ++++++++ .../tests/test_url_filtering_converter.py | 113 ++++++++++ .../converters/policy/policy_definitions.py | 159 +++++++++++++- .../creators/config_pusher.py | 76 +------ .../creators/policy_object_pusher.py | 199 ++++++++++++++++++ .../reverters/config_reverter.py | 4 + catalystwan/utils/config_migration/runner.py | 8 +- 23 files changed, 1118 insertions(+), 95 deletions(-) create mode 100644 catalystwan/integration_tests/feature_profile/sdwan/test_ssl_decryption.py create mode 100644 catalystwan/tests/config_migration/policy_converters/test_aip_converter.py rename catalystwan/tests/{ => config_migration/policy_converters}/test_amp_converter.py (100%) rename catalystwan/tests/{ => config_migration/policy_converters}/test_extended_community_converter.py (100%) create mode 100644 catalystwan/tests/config_migration/policy_converters/test_intrusion_prevention.py rename catalystwan/tests/{ => config_migration/policy_converters}/test_mirror_converter.py (100%) create mode 100644 catalystwan/tests/config_migration/policy_converters/test_ssl_decryption_converter.py create mode 100644 catalystwan/tests/config_migration/policy_converters/test_ssl_profile_converter.py create mode 100644 catalystwan/tests/test_url_filtering_converter.py create mode 100644 catalystwan/utils/config_migration/creators/policy_object_pusher.py diff --git a/catalystwan/integration_tests/feature_profile/sdwan/test_ssl_decryption.py b/catalystwan/integration_tests/feature_profile/sdwan/test_ssl_decryption.py new file mode 100644 index 00000000..e9c2af2c --- /dev/null +++ b/catalystwan/integration_tests/feature_profile/sdwan/test_ssl_decryption.py @@ -0,0 +1,83 @@ +from catalystwan.api.feature_profile_api import PolicyObjectFeatureProfileAPI +from catalystwan.endpoints.configuration_feature_profile import ConfigurationFeatureProfile +from catalystwan.integration_tests.feature_profile.sdwan.base import TestFeatureProfileModels +from catalystwan.models.configuration.feature_profile.sdwan.policy_object.security.ssl_decryption import ( + CaCertBundle, + SslDecryptionParcel, +) +from catalystwan.typed_list import DataSequence + +PROFILE_NAME = "Default_Policy_Object_Profile" + + +class TestSslDecryptionParcel(TestFeatureProfileModels): + policy_api: PolicyObjectFeatureProfileAPI + + @classmethod + def setUpClass(cls) -> None: + super().setUpClass() + cls.policy_api = cls.session.api.sdwan_feature_profiles.policy_object + cls.profile_uuid = ( + ConfigurationFeatureProfile(cls.session.api.sdwan_feature_profiles.policy_object.session) + .get_sdwan_feature_profiles() + .filter(profile_name=PROFILE_NAME) + .single_or_default() + ).profile_id + + def setUp(self) -> None: + self.created_id = None + return super().setUp() + + def tearDown(self) -> None: + if self.created_id: + self.policy_api.delete(self.profile_uuid, SslDecryptionParcel, self.created_id) + + return super().tearDown() + + def test_create_ssl_decryption_parcel(self): + cert_bundle = CaCertBundle.create( + default=False, bundle_string="cert_content", file_name="certificate.ca-bundle" + ) + + ssl_decryption_parcel = SslDecryptionParcel.create( + parcel_name="test_ssl_profile", + parcel_description="description", + ssl_enable=True, + expired_certificate="drop", + untrusted_certificate="drop", + certificate_revocation_status="none", + unknown_status=None, + unsupported_protocol_versions="drop", + unsupported_cipher_suites="drop", + failure_mode="open", + ca_cert_bundle=cert_bundle, + key_modulus="4096", + eckey_type="P384", + certificate_lifetime="1", + min_tls_ver="TLSv1.1", + ) + + self.created_id = self.policy_api.create(self.profile_uuid, ssl_decryption_parcel).id + read_parcel = self.policy_api.get(self.profile_uuid, SslDecryptionParcel, parcel_id=self.created_id) + + assert read_parcel.payload.parcel_name == "test_ssl_profile" + assert read_parcel.payload.parcel_description == "description" + assert read_parcel.payload.ssl_enable.value is True + assert read_parcel.payload.expired_certificate.value == "drop" + assert read_parcel.payload.untrusted_certificate.value == "drop" + assert read_parcel.payload.certificate_revocation_status.value == "none" + assert read_parcel.payload.unknown_status is None + assert read_parcel.payload.unsupported_protocol_versions.value == "drop" + assert read_parcel.payload.unsupported_cipher_suites.value == "drop" + assert read_parcel.payload.failure_mode.value == "open" + assert read_parcel.payload.key_modulus.value == "4096" + assert read_parcel.payload.eckey_type.value == "P384" + assert read_parcel.payload.certificate_lifetime.value == "1" + assert read_parcel.payload.min_tls_ver.value == "TLSv1.1" + assert read_parcel.payload.ca_cert_bundle.default.value is False + assert read_parcel.payload.ca_cert_bundle.file_name.value == "certificate.ca-bundle" + assert read_parcel.payload.ca_cert_bundle.bundle_string.value == "cert_content" + + def test_get_all_ssl_decryption_parcels(self): + parcels = self.policy_api.get(self.profile_uuid, SslDecryptionParcel) + assert type(parcels) is DataSequence diff --git a/catalystwan/models/configuration/config_migration.py b/catalystwan/models/configuration/config_migration.py index 5fa7c01c..786696eb 100644 --- a/catalystwan/models/configuration/config_migration.py +++ b/catalystwan/models/configuration/config_migration.py @@ -24,8 +24,9 @@ from catalystwan.models.configuration.feature_profile.sdwan.service.lan.vpn import LanVpnParcel from catalystwan.models.configuration.network_hierarchy import NodeInfo from catalystwan.models.configuration.topology_group import TopologyGroup -from catalystwan.models.policy import AnyPolicyDefinitionInfo, AnyPolicyListInfo +from catalystwan.models.policy import AnyPolicyDefinitionInfo, AnyPolicyListInfo, URLAllowListInfo, URLBlockListInfo from catalystwan.models.policy.centralized import CentralizedPolicyInfo +from catalystwan.models.policy.definition.ssl_decryption import NetworkDecryptionRuleSequence, UrlProfile from catalystwan.models.policy.localized import LocalizedPolicyInfo from catalystwan.models.policy.security import AnySecurityPolicyInfo from catalystwan.models.templates import FeatureTemplateInformation, TemplateInformation @@ -462,6 +463,18 @@ class UX2ConfigPushResult(BaseModel): report: UX2ConfigPushReport = UX2ConfigPushReport() +@dataclass +class SslProfileResidues: + filtered_url_black_list: List[URLBlockListInfo] + filtered_url_white_list: List[URLAllowListInfo] + + +@dataclass +class SslDecryptioneResidues: + sequences: List[NetworkDecryptionRuleSequence] + profiles: List[UrlProfile] + + @dataclass class PolicyConvertContext: # conversion input @@ -473,6 +486,10 @@ class PolicyConvertContext: sites_by_list_id: Dict[UUID, List[str]] = field(default_factory=dict) lan_vpns_by_list_id: Dict[UUID, List[str]] = field(default_factory=dict) amp_target_vpns_id: Dict[UUID, List[VpnId]] = field(default_factory=dict) + intrusion_prevention_target_vpns_id: Dict[UUID, List[VpnId]] = field(default_factory=dict) + ssl_decryption_residues: Dict[UUID, SslDecryptioneResidues] = field(default_factory=dict) + ssl_profile_residues: Dict[UUID, SslProfileResidues] = field(default_factory=dict) + url_filtering_target_vpns: Dict[UUID, List[VpnId]] = field(default_factory=dict) @staticmethod def from_configs( diff --git a/catalystwan/models/configuration/feature_profile/common.py b/catalystwan/models/configuration/feature_profile/common.py index 49623ef2..c1948c41 100644 --- a/catalystwan/models/configuration/feature_profile/common.py +++ b/catalystwan/models/configuration/feature_profile/common.py @@ -1,3 +1,4 @@ +# Copyright 2024 Cisco Systems, Inc. and its affiliates # Copyright 2023 Cisco Systems, Inc. and its affiliates from datetime import datetime @@ -280,6 +281,10 @@ class RefIdItem(BaseModel): ) ref_id: Global[str] = Field(..., serialization_alias="refId", validation_alias="refId") + @classmethod + def from_uuid(cls, ref_id: UUID): + return cls(ref_id=as_global(str(ref_id))) + class RefIdList(BaseModel): ref_id: Global[List[str]] = Field(..., serialization_alias="refId", validation_alias="refId") diff --git a/catalystwan/models/configuration/feature_profile/sdwan/policy_object/__init__.py b/catalystwan/models/configuration/feature_profile/sdwan/policy_object/__init__.py index 18296d63..176bf44a 100644 --- a/catalystwan/models/configuration/feature_profile/sdwan/policy_object/__init__.py +++ b/catalystwan/models/configuration/feature_profile/sdwan/policy_object/__init__.py @@ -40,6 +40,7 @@ from .security.ssl_decryption import SslDecryptionParcel from .security.ssl_decryption_profile import SslDecryptionProfileParcel from .security.url import BaseURLListEntry, URLAllowParcel, URLBlockParcel, URLParcel +from .security.url_filtering import UrlFilteringParcel from .security.zone import SecurityZoneListEntry, SecurityZoneListParcel AnyPolicyObjectParcel = Annotated[ @@ -76,6 +77,7 @@ StandardCommunityParcel, TlocParcel, URLParcel, + UrlFilteringParcel, ], Field(discriminator="type_"), ] diff --git a/catalystwan/models/configuration/feature_profile/sdwan/policy_object/security/aip.py b/catalystwan/models/configuration/feature_profile/sdwan/policy_object/security/aip.py index 3f75bfc6..fd8d5893 100644 --- a/catalystwan/models/configuration/feature_profile/sdwan/policy_object/security/aip.py +++ b/catalystwan/models/configuration/feature_profile/sdwan/policy_object/security/aip.py @@ -1,7 +1,8 @@ # Copyright 2023 Cisco Systems, Inc. and its affiliates # Copyright 2024 Cisco Systems, Inc. and its affiliates -from typing import Literal +from typing import Literal, Optional +from uuid import UUID from pydantic import AliasPath, Field @@ -25,9 +26,38 @@ class AdvancedInspectionProfileParcel(_ParcelBase): default=Global[TlsDecryptionAction](value="skipDecrypt"), validation_alias=AliasPath("data", "tlsDecryptionAction"), ) - intrusion_prevention: RefIdItem = Field(default=None, validation_alias=AliasPath("data", "intrusionPrevention")) - url_filtering: RefIdItem = Field(default=None, validation_alias=AliasPath("data", "urlFiltering")) - advanced_malware_protection: RefIdItem = Field( + intrusion_prevention: Optional[RefIdItem] = Field( + default=None, validation_alias=AliasPath("data", "intrusionPrevention") + ) + url_filtering: Optional[RefIdItem] = Field(default=None, validation_alias=AliasPath("data", "urlFiltering")) + advanced_malware_protection: Optional[RefIdItem] = Field( default=None, validation_alias=AliasPath("data", "advancedMalwareProtection") ) - ssl_decryption_profile: RefIdItem = Field(default=None, validation_alias=AliasPath("data", "sslDecryptionProfile")) + ssl_decryption_profile: Optional[RefIdItem] = Field( + default=None, validation_alias=AliasPath("data", "sslDecryptionProfile") + ) + + @classmethod + def create( + cls, + parcel_name: str, + parcel_description: str, + tls_decryption_action: TlsDecryptionAction = "skipDecrypt", + intrusion_prevention: Optional[UUID] = None, + url_filtering: Optional[UUID] = None, + advanced_malware_protection: Optional[UUID] = None, + ssl_decryption_profile: Optional[UUID] = None, + ) -> "AdvancedInspectionProfileParcel": + ip = RefIdItem.from_uuid(intrusion_prevention) if intrusion_prevention else None + uf = RefIdItem.from_uuid(url_filtering) if url_filtering else None + amp = RefIdItem.from_uuid(advanced_malware_protection) if advanced_malware_protection else None + sdp = RefIdItem.from_uuid(ssl_decryption_profile) if ssl_decryption_profile else None + return cls( + parcel_name=parcel_name, + parcel_description=parcel_description, + tls_decryption_action=Global[TlsDecryptionAction](value=tls_decryption_action), + intrusion_prevention=ip, + url_filtering=uf, + advanced_malware_protection=amp, + ssl_decryption_profile=sdp, + ) diff --git a/catalystwan/models/configuration/feature_profile/sdwan/policy_object/security/intrusion_prevention.py b/catalystwan/models/configuration/feature_profile/sdwan/policy_object/security/intrusion_prevention.py index 755e356f..389dcd47 100644 --- a/catalystwan/models/configuration/feature_profile/sdwan/policy_object/security/intrusion_prevention.py +++ b/catalystwan/models/configuration/feature_profile/sdwan/policy_object/security/intrusion_prevention.py @@ -1,6 +1,7 @@ # Copyright 2024 Cisco Systems, Inc. and its affiliates -from typing import Literal +from typing import Literal, Optional +from uuid import UUID from pydantic import AliasPath, Field @@ -20,10 +21,37 @@ class IntrusionPreventionParcel(_ParcelBase): inspection_mode: Global[InspectionMode] = Field( default=Global[InspectionMode](value="detection"), validation_alias=AliasPath("data", "inspectionMode") ) - signature_allowed_list: RefIdItem = Field(default=None, validation_alias=AliasPath("data", "signatureAllowedList")) + signature_allowed_list: Optional[RefIdItem] = Field( + default=None, validation_alias=AliasPath("data", "signatureAllowedList") + ) log_level: Global[LogLevel] = Field( default=Global[LogLevel](value="error"), validation_alias=AliasPath("data", "logLevel") ) custom_signature: Global[bool] = Field( default=Global[bool](value=False), validation_alias=AliasPath("data", "customSignature") ) + + @classmethod + def create( + cls, + parcel_name: str, + parcel_description: str, + signature_set: SignatureSet = "balanced", + inspection_mode: InspectionMode = "detection", + signature_allowed_list: Optional[UUID] = None, + log_level: LogLevel = "error", + custom_signature: bool = False, + ) -> "IntrusionPreventionParcel": + sal: Optional[RefIdItem] = None + if signature_allowed_list: + sal = RefIdItem(ref_id=Global[str](value=str(signature_allowed_list))) + + return cls( + parcel_name=parcel_name, + parcel_description=parcel_description, + signature_set=Global[SignatureSet](value=signature_set), + inspection_mode=Global[InspectionMode](value=inspection_mode), + signature_allowed_list=sal, + log_level=Global[LogLevel](value=log_level), + custom_signature=Global[bool](value=custom_signature), + ) diff --git a/catalystwan/models/configuration/feature_profile/sdwan/policy_object/security/ssl_decryption.py b/catalystwan/models/configuration/feature_profile/sdwan/policy_object/security/ssl_decryption.py index ac4c9f1d..1ab03723 100644 --- a/catalystwan/models/configuration/feature_profile/sdwan/policy_object/security/ssl_decryption.py +++ b/catalystwan/models/configuration/feature_profile/sdwan/policy_object/security/ssl_decryption.py @@ -1,13 +1,14 @@ # Copyright 2024 Cisco Systems, Inc. and its affiliates -from typing import Literal +from typing import Literal, Optional from pydantic import AliasPath, BaseModel, ConfigDict, Field from catalystwan.api.configuration_groups.parcel import Global, _ParcelBase Action = Literal["decrypt", "drop"] -CertificateRevocationStatus = Literal["oscp", "none"] +UnspportedModeAction = Literal["drop", "no-decrypt"] +CertificateRevocationStatus = Literal["ocsp", "none"] FailureMode = Literal["close", "open"] KeyModulus = Literal["1024", "2048", "4096"] EckeyType = Literal["P256", "P384", "P521"] @@ -17,9 +18,24 @@ class CaCertBundle(BaseModel): model_config = ConfigDict(populate_by_name=True, extra="forbid") - default: Global[bool] = Field(default=Global[bool](value=True), validation_alias="default") - file_name: Global[str] = Field(default=None, validation_alias="fileName") - bundle_string: Global[str] = Field(default=None, validation_alias="bundle_string") + default: Global[bool] = Field(default=Global[bool](value=True)) + file_name: Optional[Global[str]] = Field(default=None, validation_alias="fileName", serialization_alias="fileName") + bundle_string: Optional[Global[str]] = Field( + default=None, validation_alias="bundleString", serialization_alias="bundleString" + ) + + @classmethod + def create( + cls, + default: bool = True, + file_name: Optional[str] = None, + bundle_string: Optional[str] = None, + ) -> "CaCertBundle": + return cls( + default=Global[bool](value=default), + file_name=Global[str](value=file_name) if file_name else None, + bundle_string=Global[str](value=bundle_string) if bundle_string else None, + ) class SslDecryptionParcel(_ParcelBase): @@ -41,11 +57,11 @@ class SslDecryptionParcel(_ParcelBase): default=Global[CertificateRevocationStatus](value="none"), validation_alias=AliasPath("data", "certificateRevocationStatus"), ) - unknown_status: Global[Action] = Field(default=None, validation_alias=AliasPath("data", "unknownStatus")) - unsupported_protocol_versions: Global[Action] = Field( + unknown_status: Optional[Global[Action]] = Field(default=None, validation_alias=AliasPath("data", "unknownStatus")) + unsupported_protocol_versions: Global[UnspportedModeAction] = Field( default=Global[Action](value="drop"), validation_alias=AliasPath("data", "unsupportedProtocolVersions") ) - unsupported_cipher_suites: Global[Action] = Field( + unsupported_cipher_suites: Global[UnspportedModeAction] = Field( default=Global[Action](value="drop"), validation_alias=AliasPath("data", "unsupportedCipherSuites") ) failure_mode: Global[FailureMode] = Field( @@ -67,3 +83,42 @@ class SslDecryptionParcel(_ParcelBase): ca_tp_label: Global[CaTpLabel] = Field( default=Global[CaTpLabel](value="PROXY-SIGNING-CA"), validation_alias=AliasPath("data", "caTpLabel") ) + + @classmethod + def create( + cls, + parcel_name: str, + parcel_description: str, + ssl_enable: bool = True, + expired_certificate: Action = "drop", + untrusted_certificate: Action = "drop", + certificate_revocation_status: CertificateRevocationStatus = "none", + unknown_status: Optional[Action] = None, + unsupported_protocol_versions: UnspportedModeAction = "drop", + unsupported_cipher_suites: UnspportedModeAction = "drop", + failure_mode: FailureMode = "close", + ca_cert_bundle: CaCertBundle = CaCertBundle(), + key_modulus: KeyModulus = "1024", + eckey_type: EckeyType = "P256", + certificate_lifetime: str = "1", + min_tls_ver: TlsVersion = "TLSv1", + ca_tp_label: CaTpLabel = "PROXY-SIGNING-CA", + ) -> "SslDecryptionParcel": + return cls( + parcel_name=parcel_name, + parcel_description=parcel_description, + ssl_enable=Global[bool](value=ssl_enable), + expired_certificate=Global[Action](value=expired_certificate), + untrusted_certificate=Global[Action](value=untrusted_certificate), + certificate_revocation_status=Global[CertificateRevocationStatus](value=certificate_revocation_status), + unknown_status=Global[Action](value=unknown_status) if unknown_status else None, + unsupported_protocol_versions=Global[UnspportedModeAction](value=unsupported_protocol_versions), + unsupported_cipher_suites=Global[UnspportedModeAction](value=unsupported_cipher_suites), + failure_mode=Global[FailureMode](value=failure_mode), + ca_cert_bundle=ca_cert_bundle, + key_modulus=Global[KeyModulus](value=key_modulus), + eckey_type=Global[EckeyType](value=eckey_type), + certificate_lifetime=Global[str](value=certificate_lifetime), + min_tls_ver=Global[TlsVersion](value=min_tls_ver), + ca_tp_label=Global[CaTpLabel](value=ca_tp_label), + ) diff --git a/catalystwan/models/configuration/feature_profile/sdwan/policy_object/security/ssl_decryption_profile.py b/catalystwan/models/configuration/feature_profile/sdwan/policy_object/security/ssl_decryption_profile.py index f57aff5b..83fdd244 100644 --- a/catalystwan/models/configuration/feature_profile/sdwan/policy_object/security/ssl_decryption_profile.py +++ b/catalystwan/models/configuration/feature_profile/sdwan/policy_object/security/ssl_decryption_profile.py @@ -1,6 +1,7 @@ # Copyright 2024 Cisco Systems, Inc. and its affiliates -from typing import List, Literal +from typing import List, Literal, Optional +from uuid import UUID from pydantic import AliasPath, Field @@ -116,14 +117,48 @@ class SslDecryptionProfileParcel(_ParcelBase): reputation: Global[bool] = Field( default=Global[bool](value=False), validation_alias=AliasPath("data", "reputation") ) - decrypt_threshold: Global[DecryptThreshold] = Field( + decrypt_threshold: Optional[Global[DecryptThreshold]] = Field( default=None, validation_alias=AliasPath("data", "decryptThreshold") ) - skip_decrypt_threshold: Global[DecryptThreshold] = Field( + skip_decrypt_threshold: Optional[Global[DecryptThreshold]] = Field( default=None, validation_alias=AliasPath("data", "skipDecryptThreshold") ) fail_decrypt: Global[bool] = Field( default=Global[bool](value=False), validation_alias=AliasPath("data", "failDecrypt") ) - url_allowed_list: RefIdItem = Field(default=None, validation_alias=AliasPath("data", "urlAllowedList")) - url_blocked_list: RefIdItem = Field(default=None, validation_alias=AliasPath("data", "urlBlockedList")) + url_allowed_list: Optional[RefIdItem] = Field(default=None, validation_alias=AliasPath("data", "urlAllowedList")) + url_blocked_list: Optional[RefIdItem] = Field(default=None, validation_alias=AliasPath("data", "urlBlockedList")) + + @classmethod + def create( + cls, + parcel_name: str, + parcel_description: str, + decrypt_categories: List[Categories], + never_decrypt_categories: List[Categories], + skip_decrypt_categories: List[Categories], + reputation: bool, + fail_decrypt: bool, + skip_decrypt_threshold: Optional[DecryptThreshold] = None, + decrypt_threshold: Optional[DecryptThreshold] = None, + url_allowed_list: Optional[UUID] = None, + url_blocked_list: Optional[UUID] = None, + ) -> "SslDecryptionProfileParcel": + ual = RefIdItem.from_uuid(url_allowed_list) if url_allowed_list else None + ubl = RefIdItem.from_uuid(url_blocked_list) if url_blocked_list else None + dt = Global[DecryptThreshold](value=decrypt_threshold) if decrypt_threshold else None + sdt = Global[DecryptThreshold](value=skip_decrypt_threshold) if skip_decrypt_threshold else None + + return cls( + parcel_name=parcel_name, + parcel_description=parcel_description, + decrypt_categories=Global[List[Categories]](value=decrypt_categories), + never_decrypt_categories=Global[List[Categories]](value=never_decrypt_categories), + skip_decrypt_categories=Global[List[Categories]](value=skip_decrypt_categories), + reputation=Global[bool](value=reputation), + fail_decrypt=Global[bool](value=fail_decrypt), + decrypt_threshold=dt, + skip_decrypt_threshold=sdt, + url_allowed_list=ual, + url_blocked_list=ubl, + ) diff --git a/catalystwan/models/configuration/feature_profile/sdwan/policy_object/security/url_filtering.py b/catalystwan/models/configuration/feature_profile/sdwan/policy_object/security/url_filtering.py index 457358e5..804893ad 100644 --- a/catalystwan/models/configuration/feature_profile/sdwan/policy_object/security/url_filtering.py +++ b/catalystwan/models/configuration/feature_profile/sdwan/policy_object/security/url_filtering.py @@ -1,10 +1,10 @@ # Copyright 2024 Cisco Systems, Inc. and its affiliates -from typing import List, Literal +from typing import List, Literal, Optional from pydantic import AliasPath, Field -from catalystwan.api.configuration_groups.parcel import Global, _ParcelBase +from catalystwan.api.configuration_groups.parcel import Global, _ParcelBase, as_global from catalystwan.models.configuration.feature_profile.common import RefIdItem WebCategories = Literal[ @@ -106,10 +106,41 @@ class UrlFilteringParcel(_ParcelBase): ) web_categories: Global[List[WebCategories]] = Field(validation_alias=AliasPath("data", "webCategories")) web_reputation: Global[WebReputation] = Field(validation_alias=AliasPath("data", "webReputation")) - url_allowed_list: RefIdItem = Field(default=None, validation_alias=AliasPath("data", "urlAllowedList")) - url_blocked_list: RefIdItem = Field(default=None, validation_alias=AliasPath("data", "urlBlockedList")) + url_allowed_list: Optional[RefIdItem] = Field(default=None, validation_alias=AliasPath("data", "urlAllowedList")) + url_blocked_list: Optional[RefIdItem] = Field(default=None, validation_alias=AliasPath("data", "urlBlockedList")) block_page_action: Global[BlockPageAction] = Field(validation_alias=AliasPath("data", "blockPageAction")) block_page_contents: Global[str] = Field(default=None, validation_alias=AliasPath("data", "blockPageContents")) redirect_url: Global[str] = Field(default=None, validation_alias=AliasPath("data", "redirectUrl")) enable_alerts: Global[bool] = Field(validation_alias=AliasPath("data", "enableAlerts")) - alerts: Global[List[Alerts]] = Field(default=None, validation_alias=AliasPath("data", "alerts")) + alerts: Optional[Global[List[Alerts]]] = Field(default=None, validation_alias=AliasPath("data", "alerts")) + + @classmethod + def create( + cls, + parcel_name: str, + parcel_description: str, + web_categories_action: WebCategoriesAction, + web_categories: List[WebCategories], + web_reputation: WebReputation, + enable_alerts: bool, + block_page_action: BlockPageAction, + block_page_contents: str, + alerts: List[Alerts] = [], + url_allowed_list: Optional[RefIdItem] = None, + url_blocked_list: Optional[RefIdItem] = None, + ): + _alerts = Global[List[Alerts]](value=alerts) if alerts else None + + return cls( + parcel_name=parcel_name, + parcel_description=parcel_description, + web_categories_action=as_global(web_categories_action, WebCategoriesAction), + web_categories=Global[List[WebCategories]](value=web_categories), + web_reputation=as_global(web_reputation, WebReputation), + block_page_action=as_global(block_page_action, BlockPageAction), + block_page_contents=as_global(block_page_contents), + enable_alerts=as_global(enable_alerts), + alerts=_alerts, + url_allowed_list=url_allowed_list, + url_blocked_list=url_blocked_list, + ) diff --git a/catalystwan/models/policy/policy_definition.py b/catalystwan/models/policy/policy_definition.py index bbe5ff1c..14b65a47 100644 --- a/catalystwan/models/policy/policy_definition.py +++ b/catalystwan/models/policy/policy_definition.py @@ -1161,6 +1161,7 @@ class InfoTag(BaseModel): class PolicyDefinitionId(BaseModel): + model_config = ConfigDict(populate_by_name=True) definition_id: UUID = Field(serialization_alias="definitionId", validation_alias="definitionId") diff --git a/catalystwan/tests/config_migration/policy_converters/test_aip_converter.py b/catalystwan/tests/config_migration/policy_converters/test_aip_converter.py new file mode 100644 index 00000000..4ce7b166 --- /dev/null +++ b/catalystwan/tests/config_migration/policy_converters/test_aip_converter.py @@ -0,0 +1,46 @@ +# Copyright 2024 Cisco Systems, Inc. and its affiliates +import unittest +from uuid import uuid4 + +from catalystwan.models.configuration.config_migration import PolicyConvertContext +from catalystwan.models.policy.definition.aip import ( + AdvancedInspectionProfileDefinition, + AdvancedInspectionProfilePolicy, +) +from catalystwan.models.policy.policy_definition import Reference +from catalystwan.utils.config_migration.converters.policy.policy_definitions import convert + + +class TestAdvancedInspectionProfileParcel(unittest.TestCase): + def setUp(self) -> None: + self.context = PolicyConvertContext() + + def test_advanced_inspection_profile_conversion(self): + # Arrange + ip = uuid4() + uf = uuid4() + amp = uuid4() + sdp = uuid4() + aip = AdvancedInspectionProfilePolicy( + name="aip", + description="test_description", + optimized="false", + definition=AdvancedInspectionProfileDefinition( + tls_decryption_action="skipDecrypt", + intrusion_prevention=Reference(ref=ip), + url_filtering=Reference(ref=uf), + advanced_malware_protection=Reference(ref=amp), + ssl_utd_decrypt_profile=Reference(ref=sdp), + ), + ) + uuid = uuid4() + # Act + parcel = convert(aip, uuid, context=self.context) + # Assert + assert parcel.parcel_name == "aip" + assert parcel.parcel_description == "test_description" + assert parcel.tls_decryption_action.value == "skipDecrypt" + assert parcel.intrusion_prevention.ref_id.value == str(ip) + assert parcel.url_filtering.ref_id.value == str(uf) + assert parcel.advanced_malware_protection.ref_id.value == str(amp) + assert parcel.ssl_decryption_profile.ref_id.value == str(sdp) diff --git a/catalystwan/tests/test_amp_converter.py b/catalystwan/tests/config_migration/policy_converters/test_amp_converter.py similarity index 100% rename from catalystwan/tests/test_amp_converter.py rename to catalystwan/tests/config_migration/policy_converters/test_amp_converter.py diff --git a/catalystwan/tests/test_extended_community_converter.py b/catalystwan/tests/config_migration/policy_converters/test_extended_community_converter.py similarity index 100% rename from catalystwan/tests/test_extended_community_converter.py rename to catalystwan/tests/config_migration/policy_converters/test_extended_community_converter.py diff --git a/catalystwan/tests/config_migration/policy_converters/test_intrusion_prevention.py b/catalystwan/tests/config_migration/policy_converters/test_intrusion_prevention.py new file mode 100644 index 00000000..94b54fba --- /dev/null +++ b/catalystwan/tests/config_migration/policy_converters/test_intrusion_prevention.py @@ -0,0 +1,72 @@ +# Copyright 2024 Cisco Systems, Inc. and its affiliates +import unittest +from uuid import uuid4 + +from catalystwan.models.configuration.config_migration import PolicyConvertContext +from catalystwan.models.policy.definition.intrusion_prevention import ( + IntrusionPreventionDefinition, + IntrusionPreventionPolicy, +) +from catalystwan.utils.config_migration.converters.policy.policy_definitions import convert + + +class TestIntrusionPreventionConverter(unittest.TestCase): + def setUp(self) -> None: + self.context = PolicyConvertContext() + + def test_intrusion_prevention_unified_conversion(self): + # Arrange + ipp = IntrusionPreventionPolicy( + name="ip_unified", + mode="unified", + definition=IntrusionPreventionDefinition( + signature_set="balanced", + inspection_mode="detection", + signature_white_list=None, + log_level="error", + logging=[], + target_vpns=[], + custom_signature=False, + ), + ) + uuid = uuid4() + # Act + parcel = convert(ipp, uuid, context=self.context) + # Assert + assert parcel.parcel_name == "ip_unified" + assert parcel.signature_set.value == "balanced" + assert parcel.inspection_mode.value == "detection" + assert parcel.signature_allowed_list is None + assert parcel.log_level.value == "error" + assert parcel.custom_signature.value is False + + assert len(self.context.intrusion_prevention_target_vpns_id) == 0 + + def test_intrusion_prevention_security_conversion(self): + # Arrange + ipp = IntrusionPreventionPolicy( + name="ip_security", + mode="security", + definition=IntrusionPreventionDefinition( + signature_set="connectivity", + inspection_mode="protection", + signature_white_list=None, + log_level="critical", + logging=[], + target_vpns=[1, 2, 3], + custom_signature=False, + ), + ) + uuid = uuid4() + # Act + parcel = convert(ipp, uuid, context=self.context) + + # Assert + assert parcel.parcel_name == "ip_security" + assert parcel.signature_set.value == "connectivity" + assert parcel.inspection_mode.value == "protection" + assert parcel.signature_allowed_list is None + assert parcel.log_level.value == "critical" + assert parcel.custom_signature.value is False + assert len(self.context.intrusion_prevention_target_vpns_id) == 1 + assert self.context.intrusion_prevention_target_vpns_id[uuid] == [1, 2, 3] diff --git a/catalystwan/tests/test_mirror_converter.py b/catalystwan/tests/config_migration/policy_converters/test_mirror_converter.py similarity index 100% rename from catalystwan/tests/test_mirror_converter.py rename to catalystwan/tests/config_migration/policy_converters/test_mirror_converter.py diff --git a/catalystwan/tests/config_migration/policy_converters/test_ssl_decryption_converter.py b/catalystwan/tests/config_migration/policy_converters/test_ssl_decryption_converter.py new file mode 100644 index 00000000..e88606a2 --- /dev/null +++ b/catalystwan/tests/config_migration/policy_converters/test_ssl_decryption_converter.py @@ -0,0 +1,106 @@ +# Copyright 2024 Cisco Systems, Inc. and its affiliates +import unittest +from uuid import uuid4 + +from catalystwan.models.common import VpnId +from catalystwan.models.configuration.config_migration import PolicyConvertContext +from catalystwan.models.policy.definition.ssl_decryption import ( + CaCertBundle, + ControlPolicyBaseAction, + NetworkDecryptionRuleSequence, + SslDecryptionDefinition, + SslDecryptionPolicy, + SslDecryptionSettings, + UrlProfile, +) +from catalystwan.utils.config_migration.converters.policy.policy_definitions import convert + + +class TestSslDecryptionConverter(unittest.TestCase): + def setUp(self) -> None: + self.context = PolicyConvertContext() + + def test_ssl_decryption_conversion(self): + # Arrange + ssl_decryption_v1_entry = SslDecryptionPolicy( + name="ssl_decryption", + description="test_description", + mode="security", + definition=SslDecryptionDefinition( + default_action=ControlPolicyBaseAction(type="doNotDecrypt"), + sequences=[ + NetworkDecryptionRuleSequence( + sequence_name="test_sequence", + ) + ], + profiles=[UrlProfile(name="test_profile", order_no=2, vpn=[VpnId(3)], ref=uuid4())], + settings=SslDecryptionSettings( + ssl_enable="true", + expired_certificate="drop", + untrusted_certificate="drop", + certificate_revocation_status="none", + unknown_status="drop", + unsupported_protocol_versions="drop", + unsupported_cipher_suites="drop", + failure_mode="close", + ca_cert_bundle=CaCertBundle(default=True, file_name="test_file", bundle_string="test_string"), + ), + ), + ) + uuid = uuid4() + + # Act + parcel = convert(ssl_decryption_v1_entry, uuid, context=self.context) + # Assert + assert parcel.parcel_name == "ssl_decryption" + assert parcel.parcel_description == "test_description" + assert parcel.ssl_enable.value is True + assert parcel.expired_certificate.value == "drop" + assert parcel.untrusted_certificate.value == "drop" + assert parcel.certificate_revocation_status.value == "none" + assert parcel.unknown_status is None # because certificate_revocation_status == "none" + assert parcel.unsupported_protocol_versions.value == "drop" + assert parcel.unsupported_cipher_suites.value == "drop" + assert parcel.failure_mode.value == "close" + assert parcel.ca_cert_bundle.default.value is True + assert parcel.ca_cert_bundle.file_name.value == "test_file" + assert parcel.ca_cert_bundle.bundle_string.value == "test_string" + assert self.context.ssl_decryption_residues[uuid].profiles == ssl_decryption_v1_entry.definition.profiles + assert self.context.ssl_decryption_residues[uuid].sequences == ssl_decryption_v1_entry.definition.sequences + + def test_ssl_decryption_conversion_with_custom_certificate(self): + ssl_decryption_v1_entry = SslDecryptionPolicy( + name="ssl_decryption", + description="test_description", + mode="security", + definition=SslDecryptionDefinition( + default_action=ControlPolicyBaseAction(type="doNotDecrypt"), + sequences=[ + NetworkDecryptionRuleSequence( + sequence_name="test_sequence", + ) + ], + profiles=[UrlProfile(name="test_profile", order_no=2, vpn=[VpnId(3)], ref=uuid4())], + settings=SslDecryptionSettings( + ssl_enable="true", + expired_certificate="drop", + untrusted_certificate="drop", + certificate_revocation_status="none", + unknown_status="drop", + unsupported_protocol_versions="drop", + unsupported_cipher_suites="drop", + failure_mode="close", + ca_cert_bundle=CaCertBundle( + default=False, file_name="certificate.ca-bundle", bundle_string="fdsfsdfsdfsd\nfsfs\n\n" + ), + ), + ), + ) + uuid = uuid4() + + # Act + parcel = convert(ssl_decryption_v1_entry, uuid, context=self.context) + # # Assert + assert parcel.ca_cert_bundle.default.value is False + assert parcel.ca_cert_bundle.file_name.value == "certificate.ca-bundle" + assert parcel.ca_cert_bundle.bundle_string.value == "fdsfsdfsdfsd\nfsfs\n\n" diff --git a/catalystwan/tests/config_migration/policy_converters/test_ssl_profile_converter.py b/catalystwan/tests/config_migration/policy_converters/test_ssl_profile_converter.py new file mode 100644 index 00000000..c0e82d5d --- /dev/null +++ b/catalystwan/tests/config_migration/policy_converters/test_ssl_profile_converter.py @@ -0,0 +1,91 @@ +# Copyright 2024 Cisco Systems, Inc. and its affiliates +import unittest +from uuid import uuid4 + +from catalystwan.api.configuration_groups.parcel import as_global +from catalystwan.models.configuration.config_migration import PolicyConvertContext +from catalystwan.models.policy.definition.ssl_decryption_utd_profile import ( + SslDecryptionUtdProfileDefinition, + SslDecryptionUtdProfilePolicy, +) +from catalystwan.models.policy.policy_definition import Reference +from catalystwan.utils.config_migration.converters.policy.policy_definitions import convert + + +class TestSslDecryptionConverter(unittest.TestCase): + def setUp(self) -> None: + self.context = PolicyConvertContext() + + def test_ssl_decryption_conversion(self): + # Arrange + url_white_list_ref = uuid4() + url_black_list_ref = uuid4() + ssl_decryption_profile = SslDecryptionUtdProfilePolicy( + name="ssl_decryption", + description="test_description", + mode="security", + optimized="false", + definition=SslDecryptionUtdProfileDefinition( + decrypt_categories=["auctions", "computer-and-internet-security"], + never_decrypt_categories=["dynamic-content"], + skip_decrypt_categories=["hacking"], + reputation=True, + fail_decrypt=True, + decrypt_threshold="high-risk", + filtered_url_white_list=[], + filtered_url_black_list=[], + url_white_list=Reference(ref=url_white_list_ref), + url_black_list=Reference(ref=url_black_list_ref), + ), + ) + uuid = uuid4() + # Act + parcel = convert(ssl_decryption_profile, uuid, context=self.context) + # Assert + assert parcel.parcel_name == "ssl_decryption" + assert parcel.parcel_description == "test_description" + assert parcel.decrypt_categories.value == ["auctions", "computer-and-internet-security"] + assert parcel.never_decrypt_categories.value == ["dynamic-content"] + assert parcel.skip_decrypt_categories.value == ["hacking"] + assert parcel.reputation.value is True + assert parcel.fail_decrypt.value is True + assert parcel.decrypt_threshold.value == "high-risk" + assert parcel.url_allowed_list.ref_id == as_global(str(url_white_list_ref)) + assert parcel.url_blocked_list.ref_id == as_global(str(url_black_list_ref)) + + assert len(self.context.ssl_profile_residues) == 0 + + def test_ssl_decryption_conversion_woth_residues(self): + filtered_lists_sets = ( + (["dummy"], []), + ([], ["dummy"]), + (["dummy"], ["dummy"]), + ) + + url_white_list_ref = uuid4() + url_black_list_ref = uuid4() + + for filtered_url_black_list, filtered_url_white_list in enumerate(filtered_lists_sets): + with self.subTest(): + ssl_decryption_profile = SslDecryptionUtdProfilePolicy( + name="ssl_decryption", + description="test_description", + mode="security", + optimized="false", + definition=SslDecryptionUtdProfileDefinition.model_construct( + decrypt_categories=["auctions", "computer-and-internet-security"], + never_decrypt_categories=["dynamic-content"], + skip_decrypt_categories=["hacking"], + reputation=True, + fail_decrypt=True, + decrypt_threshold="high-risk", + filtered_url_white_list=filtered_url_white_list, + filtered_url_black_list=filtered_url_black_list, + url_white_list=Reference(ref=url_white_list_ref), + url_black_list=Reference(ref=url_black_list_ref), + ), + ) + uuid = uuid4() + convert(ssl_decryption_profile, uuid, context=self.context) + + assert len(self.context.ssl_profile_residues) == 3 diff --git a/catalystwan/tests/test_url_filtering_converter.py b/catalystwan/tests/test_url_filtering_converter.py new file mode 100644 index 00000000..9d54431c --- /dev/null +++ b/catalystwan/tests/test_url_filtering_converter.py @@ -0,0 +1,113 @@ +import unittest +from uuid import uuid4 + +from catalystwan.models.configuration.config_migration import PolicyConvertContext +from catalystwan.models.policy.definition.url_filtering import ( + BLOCK_PAGE_CONTENT_HEADER, + UrlFilteringDefinition, + UrlFilteringPolicyEditPayload, +) +from catalystwan.models.policy.policy_definition import Reference +from catalystwan.utils.config_migration.converters.policy.policy_definitions import url_filtering + + +class TestUrlFilteringConverter(unittest.TestCase): + def setUp(self) -> None: + self.context = PolicyConvertContext() + self.uuid = uuid4() + + def test_convert_security_url_filtering(self): + policy = UrlFilteringPolicyEditPayload( + definition_id=uuid4(), + name="policy1", + mode="security", + definition=UrlFilteringDefinition( + web_reputation="trustworthy", + web_categories_action="allow", + web_categories=["hacking", "hate-and-racism", "health-and-medicine"], + block_page_action="text", + block_page_contents=f"{BLOCK_PAGE_CONTENT_HEADER} test 123 test 456", + enable_alerts=False, + target_vpns=[1, 2, 4, 5], + ), + ) + + parcel = url_filtering(policy, uuid=self.uuid, context=self.context) + + assert parcel.parcel_name == "policy1" + assert parcel.web_reputation.value == "trustworthy" + assert parcel.web_categories_action.value == "allow" + assert parcel.web_categories.value == ["hacking", "hate-and-racism", "health-and-medicine"] + assert parcel.block_page_action.value == "text" + assert parcel.block_page_contents.value == f"{BLOCK_PAGE_CONTENT_HEADER} test 123 test 456" + assert parcel.enable_alerts.value is False + assert parcel.url_allowed_list is None + assert parcel.url_blocked_list is None + assert parcel.alerts is None + + assert len(self.context.url_filtering_target_vpns) == 1 + assert self.uuid in self.context.url_filtering_target_vpns + + def test_convert_unified_url_filtering(self): + policy = UrlFilteringPolicyEditPayload( + definition_id=uuid4(), + name="policy2", + mode="unified", + definition=UrlFilteringDefinition( + web_reputation="suspicious", + web_categories_action="allow", + web_categories=["hacking", "health-and-medicine"], + block_page_action="redirectUrl", + block_page_contents="http://www.google.com", + enable_alerts=True, + alerts=["blacklist"], + ), + ) + + parcel = url_filtering(policy, uuid=self.uuid, context=self.context) + + assert parcel.parcel_name == "policy2" + assert parcel.web_reputation.value == "suspicious" + assert parcel.web_categories_action.value == "allow" + assert parcel.web_categories.value == ["hacking", "health-and-medicine"] + assert parcel.block_page_action.value == "redirect-url" + assert parcel.block_page_contents.value == "http://www.google.com" + assert parcel.enable_alerts.value is True + assert parcel.url_allowed_list is None + assert parcel.url_blocked_list is None + assert parcel.alerts.value == ["blacklist"] + + assert len(self.context.url_filtering_target_vpns) == 0 + + def test_convert_url_policy_with_allow_and_block_list(self): + policy = UrlFilteringPolicyEditPayload( + definition_id=uuid4(), + name="policy3", + mode="unified", + definition=UrlFilteringDefinition( + web_reputation="high-risk", + web_categories_action="block", + web_categories=["health-and-medicine"], + block_page_action="text", + block_page_contents=f"{BLOCK_PAGE_CONTENT_HEADER} test 555 test test 456", + enable_alerts=True, + alerts=["categories-reputation"], + url_white_list=Reference(ref=uuid4()), + url_black_list=Reference(ref=uuid4()), + ), + ) + + parcel = url_filtering(policy, uuid=self.uuid, context=self.context) + + assert parcel.parcel_name == "policy3" + assert parcel.web_reputation.value == "high-risk" + assert parcel.web_categories_action.value == "block" + assert parcel.web_categories.value == ["health-and-medicine"] + assert parcel.block_page_action.value == "text" + assert parcel.block_page_contents.value == f"{BLOCK_PAGE_CONTENT_HEADER} test 555 test test 456" + assert parcel.enable_alerts.value is True + assert parcel.alerts.value == ["categories-reputation"] + assert len(self.context.url_filtering_target_vpns) == 0 + + assert parcel.url_allowed_list.ref_id.value == str(policy.definition.url_white_list.ref) + assert parcel.url_blocked_list.ref_id.value == str(policy.definition.url_black_list.ref) diff --git a/catalystwan/utils/config_migration/converters/policy/policy_definitions.py b/catalystwan/utils/config_migration/converters/policy/policy_definitions.py index 2cc19113..0d0e3d00 100644 --- a/catalystwan/utils/config_migration/converters/policy/policy_definitions.py +++ b/catalystwan/utils/config_migration/converters/policy/policy_definitions.py @@ -1,3 +1,4 @@ +# Copyright 2024 Cisco Systems, Inc. and its affiliates import logging from ipaddress import IPv4Interface from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple, Type, Union @@ -6,23 +7,51 @@ from pydantic import Field from typing_extensions import Annotated +from catalystwan.api.configuration_groups.parcel import as_global from catalystwan.models.common import int_range_str_validator -from catalystwan.models.configuration.config_migration import PolicyConvertContext +from catalystwan.models.configuration.config_migration import ( + PolicyConvertContext, + SslDecryptioneResidues, + SslProfileResidues, +) +from catalystwan.models.configuration.feature_profile.common import RefIdItem from catalystwan.models.configuration.feature_profile.sdwan.acl.ipv4acl import Ipv4AclParcel from catalystwan.models.configuration.feature_profile.sdwan.acl.ipv6acl import Ipv6AclParcel +from catalystwan.models.configuration.feature_profile.sdwan.policy_object.security.aip import ( + AdvancedInspectionProfileParcel, +) from catalystwan.models.configuration.feature_profile.sdwan.policy_object.security.amp import ( AdvancedMalwareProtectionParcel, ) +from catalystwan.models.configuration.feature_profile.sdwan.policy_object.security.intrusion_prevention import ( + IntrusionPreventionParcel, +) +from catalystwan.models.configuration.feature_profile.sdwan.policy_object.security.ssl_decryption import ( + CaCertBundle, + SslDecryptionParcel, +) +from catalystwan.models.configuration.feature_profile.sdwan.policy_object.security.ssl_decryption_profile import ( + SslDecryptionProfileParcel, +) +from catalystwan.models.configuration.feature_profile.sdwan.policy_object.security.url_filtering import ( + BlockPageAction, + UrlFilteringParcel, +) from catalystwan.models.configuration.feature_profile.sdwan.topology.custom_control import CustomControlParcel from catalystwan.models.configuration.feature_profile.sdwan.topology.hubspoke import HubSpokeParcel from catalystwan.models.configuration.feature_profile.sdwan.topology.mesh import MeshParcel from catalystwan.models.policy import AnyPolicyDefinition from catalystwan.models.policy.definition.access_control_list import AclPolicy from catalystwan.models.policy.definition.access_control_list_ipv6 import AclIPv6Policy +from catalystwan.models.policy.definition.aip import AdvancedInspectionProfilePolicy from catalystwan.models.policy.definition.amp import AdvancedMalwareProtectionPolicy from catalystwan.models.policy.definition.control import ControlPolicy from catalystwan.models.policy.definition.hub_and_spoke import HubAndSpokePolicy +from catalystwan.models.policy.definition.intrusion_prevention import IntrusionPreventionPolicy from catalystwan.models.policy.definition.mesh import MeshPolicy +from catalystwan.models.policy.definition.ssl_decryption import SslDecryptionPolicy +from catalystwan.models.policy.definition.ssl_decryption_utd_profile import SslDecryptionUtdProfilePolicy +from catalystwan.models.policy.definition.url_filtering import UrlFilteringPolicy from catalystwan.utils.config_migration.converters.exceptions import CatalystwanConverterCantConvertException from catalystwan.utils.config_migration.converters.utils import convert_varname @@ -37,7 +66,12 @@ MeshParcel, Ipv4AclParcel, Ipv6AclParcel, + AdvancedInspectionProfileParcel, AdvancedMalwareProtectionParcel, + IntrusionPreventionParcel, + SslDecryptionParcel, + SslDecryptionProfileParcel, + UrlFilteringParcel, ], Field(discriminator="type_"), ] @@ -184,13 +218,136 @@ def mesh(in_: MeshPolicy, uuid: UUID, context: PolicyConvertContext) -> MeshParc return out +def ssl_decryption(in_: SslDecryptionPolicy, uuid: UUID, context: PolicyConvertContext) -> SslDecryptionParcel: + definition_dump = in_.definition.settings.model_dump( + exclude={"certificate_lifetime", "ca_cert_bundle", "unknown_status"} + ) + certificate_lifetime = str(in_.definition.settings.certificate_lifetime) + ca_cert_bundle = CaCertBundle.create(**in_.definition.settings.ca_cert_bundle.model_dump()) + unknown_status = ( + in_.definition.settings.unknown_status + if in_.definition.settings.certificate_revocation_status != "none" + else None + ) + + if in_.definition.sequences or in_.definition.profiles: + context.ssl_decryption_residues[uuid] = SslDecryptioneResidues( + sequences=in_.definition.sequences, profiles=in_.definition.profiles + ) + + return SslDecryptionParcel.create( + **_get_parcel_name_desc(in_), + **definition_dump, + ca_cert_bundle=ca_cert_bundle, + certificate_lifetime=certificate_lifetime, + unknown_status=unknown_status, + ) + + +def ssl_profile( + in_: SslDecryptionUtdProfilePolicy, uuid: UUID, context: PolicyConvertContext +) -> SslDecryptionProfileParcel: + definition_dump = in_.definition.model_dump( + exclude={"filtered_url_white_list", "filtered_url_black_list", "url_white_list", "url_black_list"} + ) + + url_allowed_list = in_.definition.url_white_list.ref if in_.definition.url_white_list else None + url_blocked_list = in_.definition.url_black_list.ref if in_.definition.url_black_list else None + + if in_.definition.filtered_url_black_list or in_.definition.filtered_url_white_list: + context.ssl_profile_residues[uuid] = SslProfileResidues( + filtered_url_black_list=in_.definition.filtered_url_black_list, + filtered_url_white_list=in_.definition.filtered_url_white_list, + ) + + return SslDecryptionProfileParcel.create( + **_get_parcel_name_desc(in_), + **definition_dump, + url_allowed_list=url_allowed_list, + url_blocked_list=url_blocked_list, + ) + + +def advanced_inspection_profile( + in_: AdvancedInspectionProfilePolicy, uuid: UUID, context: PolicyConvertContext +) -> AdvancedInspectionProfileParcel: + intrusion_prevention_ref = in_.definition.intrusion_prevention.ref if in_.definition.intrusion_prevention else None + url_filtering_ref = in_.definition.url_filtering.ref if in_.definition.url_filtering else None + advanced_malware_protection_ref = ( + in_.definition.advanced_malware_protection.ref if in_.definition.advanced_malware_protection else None + ) + ssl_decryption_profile_ref = ( + in_.definition.ssl_utd_decrypt_profile.ref if in_.definition.ssl_utd_decrypt_profile else None + ) + + return AdvancedInspectionProfileParcel.create( + **_get_parcel_name_desc(in_), + tls_decryption_action=in_.definition.tls_decryption_action, + intrusion_prevention=intrusion_prevention_ref, + url_filtering=url_filtering_ref, + advanced_malware_protection=advanced_malware_protection_ref, + ssl_decryption_profile=ssl_decryption_profile_ref, + ) + + +def url_filtering(in_: UrlFilteringPolicy, uuid: UUID, context: PolicyConvertContext) -> UrlFilteringParcel: + block_page_action_map: Dict[str, BlockPageAction] = {"text": "text", "redirectUrl": "redirect-url"} + definition_dump = in_.definition.model_dump( + exclude={"target_vpns", "url_white_list", "url_black_list", "logging", "block_page_action"} + ) + + if vpns := in_.definition.target_vpns: + context.url_filtering_target_vpns[uuid] = vpns + + block_page_action = block_page_action_map[in_.definition.block_page_action] + # below references are a references to v1 objects, + # during push the references shall be transformed to point v2 objects + url_allowed_list = ( + RefIdItem(ref_id=as_global(str(in_.definition.url_white_list.ref))) if in_.definition.url_white_list else None + ) + url_blocked_list = ( + RefIdItem(ref_id=as_global(str(in_.definition.url_black_list.ref))) if in_.definition.url_black_list else None + ) + + out = UrlFilteringParcel.create( + **_get_parcel_name_desc(in_), + **definition_dump, + block_page_action=block_page_action, + url_allowed_list=url_allowed_list, + url_blocked_list=url_blocked_list, + ) + return out + + +def intrusion_prevention( + in_: IntrusionPreventionPolicy, uuid: UUID, context: PolicyConvertContext +) -> IntrusionPreventionParcel: + if vpn_list := in_.definition.target_vpns: + context.intrusion_prevention_target_vpns_id[uuid] = vpn_list + + definition_dump = in_.definition.model_dump(exclude={"target_vpns", "logging"}) + signature_white_list = definition_dump.pop("signature_white_list", None) + signature_allowed_list = signature_white_list.get("ref") if signature_white_list else None + + return IntrusionPreventionParcel.create( + **_get_parcel_name_desc(in_), + **definition_dump, + signature_allowed_list=signature_allowed_list, + ) + + CONVERTERS: Mapping[Type[Input], Callable[..., Output]] = { AclPolicy: ipv4acl, AclIPv6Policy: ipv6acl, ControlPolicy: control, HubAndSpokePolicy: hubspoke, MeshPolicy: mesh, + AdvancedInspectionProfilePolicy: advanced_inspection_profile, AdvancedMalwareProtectionPolicy: advanced_malware_protection, + IntrusionPreventionPolicy: intrusion_prevention, + SslDecryptionPolicy: ssl_decryption, + SslDecryptionUtdProfilePolicy: ssl_profile, + UrlFilteringPolicy: url_filtering, } diff --git a/catalystwan/utils/config_migration/creators/config_pusher.py b/catalystwan/utils/config_migration/creators/config_pusher.py index 0064196a..bee7ea25 100644 --- a/catalystwan/utils/config_migration/creators/config_pusher.py +++ b/catalystwan/utils/config_migration/creators/config_pusher.py @@ -1,6 +1,6 @@ # Copyright 2024 Cisco Systems, Inc. and its affiliates import logging -from typing import Callable, Dict, List, Set, Tuple, Type, cast +from typing import Callable, Dict, List, Set, Tuple, cast from uuid import UUID from pydantic import BaseModel @@ -14,14 +14,12 @@ UX2Config, UX2ConfigPushResult, ) -from catalystwan.models.configuration.feature_profile.common import FeatureProfileCreationPayload, ProfileType -from catalystwan.models.configuration.feature_profile.parcel import AnyParcel, Parcel, list_types -from catalystwan.models.configuration.feature_profile.sdwan.policy_object import AnyPolicyObjectParcel +from catalystwan.models.configuration.feature_profile.common import ProfileType from catalystwan.models.configuration.feature_profile.sdwan.topology.custom_control import CustomControlParcel from catalystwan.models.configuration.feature_profile.sdwan.topology.hubspoke import HubSpokeParcel from catalystwan.models.configuration.feature_profile.sdwan.topology.mesh import MeshParcel from catalystwan.session import ManagerSession -from catalystwan.typed_list import DataSequence +from catalystwan.utils.config_migration.creators.policy_object_pusher import PolicyObjectPusher from catalystwan.utils.config_migration.factories.parcel_pusher import ParcelPusherFactory logger = logging.getLogger(__name__) @@ -42,6 +40,9 @@ def __init__( self._session = session self._config_map = self._create_config_map(ux2_config) self._push_result = UX2ConfigPushResult() + self._policy_object_pusher = PolicyObjectPusher( + ux2_config=ux2_config, session=session, progress=progress, push_result=self._push_result + ) self._ux2_config = ux2_config self._progress = progress @@ -54,8 +55,9 @@ def _create_config_map(self, ux2_config: UX2Config) -> ConfigurationMapping: def push(self) -> UX2ConfigPushResult: self._create_cloud_credentials() self._create_config_groups() - dpop = self._get_or_create_default_policy_object_profile() - self._insert_groups_of_interest_in_default_policy_object_profile(dpop) + + self._policy_object_pusher.push() + dpop = self._policy_object_pusher.get_or_create_default_policy_object_profile() self._create_topology_groups(dpop) # needs to be executed after vpn parcels and groups of interests are created self._push_result.report.set_failed_push_parcels_flat_list() logger.debug(f"Configuration push completed. Rollback configuration {self._push_result}") @@ -102,66 +104,6 @@ def _create_config_groups(self): feature_profiles=created_profiles, ) - def _get_or_create_default_policy_object_profile(self) -> UUID: - api = self._session.api.sdwan_feature_profiles.policy_object - profiles = api.get_profiles() - if len(profiles) >= 1: - return profiles[0].profile_id - profile_id = api.create_profile( - FeatureProfileCreationPayload(name="Policy_Profile_Global", description="Policy_Profile_Global_description") - ).id - return profile_id - - def _insert_groups_of_interest_in_default_policy_object_profile(self, profile_id: UUID): - # TODO: fix typing issues in this method, probably we need literal containig AnyPolicyObjectType type_ - def cast_(parcel: AnyParcel) -> AnyPolicyObjectParcel: - return parcel # type: ignore - - api = self._session.api.sdwan_feature_profiles.policy_object - profile_rollback = self._push_result.rollback.add_default_policy_object_profile(profile_id) - - # will hold system created parcels id by type and name when detected - system_created_parcels: Dict[Type[AnyPolicyObjectParcel], Dict[str, UUID]] = {} - - transformed_policy_parcels = [ - any_transformed_parcel - for any_transformed_parcel in self._ux2_config.profile_parcels - if type(any_transformed_parcel.parcel) in list_types(AnyPolicyObjectParcel) - ] - - for i, transformed_policy_parcel in enumerate(transformed_policy_parcels): - parcel = cast_(transformed_policy_parcel.parcel) - header = transformed_policy_parcel.header - parcel_type = type(parcel) - # update existing system created parcels - if not system_created_parcels.get(parcel_type): - system_created_parcels[parcel_type] = {} - exsisting_parcel_list = cast( - DataSequence[Parcel[AnyPolicyObjectParcel]], - api.get(profile_id, parcel_type).filter(created_by="system"), # type: ignore [arg-type] - ) - for ep in exsisting_parcel_list: - if not isinstance(ep.parcel_id, UUID): - id_ = UUID(ep.parcel_id) - else: - id_ = ep.parcel_id - system_created_parcels[parcel_type][ep.payload.parcel_name] = id_ - - # if parcel with given name exists we skip it - if not system_created_parcels[parcel_type].get(header.origname or ""): - try: - parcel_id = api.create(profile_id=profile_id, payload=parcel).id - profile_rollback.add_parcel(parcel.type_, parcel_id) - self._push_result.report.groups_of_interest.add_created(parcel.parcel_name, parcel_id) - self._progress( - f"Creating Policy Object Parcel: {parcel.parcel_name}", - i + 1, - len(transformed_policy_parcels), - ) - except ManagerHTTPError as e: - logger.error(f"Error occured during config group creation: {e.info}") - self._push_result.report.groups_of_interest.add_failed(parcel, e) - def _create_feature_profile_and_parcels(self, feature_profiles_ids: List[UUID]) -> List[FeatureProfileBuildReport]: feature_profiles: List[FeatureProfileBuildReport] = [] for feature_profile_id in feature_profiles_ids: diff --git a/catalystwan/utils/config_migration/creators/policy_object_pusher.py b/catalystwan/utils/config_migration/creators/policy_object_pusher.py new file mode 100644 index 00000000..0ea0e1a5 --- /dev/null +++ b/catalystwan/utils/config_migration/creators/policy_object_pusher.py @@ -0,0 +1,199 @@ +import logging +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import Callable, Dict, Mapping, Type, cast +from uuid import UUID + +from catalystwan.api.feature_profile_api import PolicyObjectFeatureProfileAPI +from catalystwan.exceptions import ManagerHTTPError +from catalystwan.models.configuration.config_migration import UX2Config, UX2ConfigPushResult +from catalystwan.models.configuration.feature_profile.common import FeatureProfileCreationPayload, RefIdItem +from catalystwan.models.configuration.feature_profile.parcel import AnyParcel, Parcel, list_types +from catalystwan.models.configuration.feature_profile.sdwan.policy_object import ( + AdvancedInspectionProfileParcel, + AnyPolicyObjectParcel, + IntrusionPreventionParcel, + SslDecryptionProfileParcel, + UrlFilteringParcel, +) +from catalystwan.session import ManagerSession +from catalystwan.typed_list import DataSequence +from catalystwan.utils.config_migration.converters.exceptions import CatalystwanConverterCantConvertException + +logger = logging.getLogger(__name__) + + +@dataclass +class ReferencesUpdater(ABC): + parcel: AnyPolicyObjectParcel + pushed_objects_map: Dict[UUID, UUID] + + @abstractmethod + def update_references(self): + pass + + def get_target_uuid(self, origin_uuid: UUID) -> UUID: + if v2_uuid := self.pushed_objects_map.get(origin_uuid): + return v2_uuid + + raise CatalystwanConverterCantConvertException( + f"Cannot find transferred policy object based on v1 API id: {origin_uuid}" + ) + + +class UrlFilteringReferencesUpdater(ReferencesUpdater): + def update_references(self): + if allowed_list := self.parcel.url_allowed_list: + v2_uuid = self.get_target_uuid(UUID(allowed_list.ref_id.value)) + self.parcel.url_allowed_list = RefIdItem.from_uuid(v2_uuid) + + if blocked_list := self.parcel.url_blocked_list: + v2_uuid = self.get_target_uuid(UUID(blocked_list.ref_id.value)) + self.parcel.url_blocked_list = RefIdItem.from_uuid(v2_uuid) + + +class SslProfileReferencesUpdater(ReferencesUpdater): + def update_references(self): + if allowed_list := self.parcel.url_allowed_list: + v2_uuid = self.get_target_uuid(UUID(allowed_list.ref_id.value)) + self.parcel.url_allowed_list = RefIdItem.from_uuid(v2_uuid) + + if blocked_list := self.parcel.url_blocked_list: + v2_uuid = self.get_target_uuid(UUID(blocked_list.ref_id.value)) + self.parcel.url_blocked_list = RefIdItem.from_uuid(v2_uuid) + + +class IntrusionPreventionReferencesUpdater(ReferencesUpdater): + def update_references(self): + if allowed_list := self.parcel.signature_allowed_list: + v2_uuid = self.get_target_uuid(UUID(allowed_list.ref_id.value)) + self.parcel.signature_allowed_list = RefIdItem.from_uuid(v2_uuid) + + +class AdvancedInspectionProfileReferencesUpdater(ReferencesUpdater): + def update_references(self): + if advanced_malware_protection := self.parcel.advanced_malware_protection: + v2_uuid = self.get_target_uuid(UUID(advanced_malware_protection.ref_id.value)) + self.parcel.advanced_malware_protection = RefIdItem.from_uuid(v2_uuid) + + if intrusion_prevention := self.parcel.intrusion_prevention: + v2_uuid = self.get_target_uuid(UUID(intrusion_prevention.ref_id.value)) + self.parcel.intrusion_prevention = RefIdItem.from_uuid(v2_uuid) + + if ssl_decryption_profile := self.parcel.ssl_decryption_profile: + v2_uuid = self.get_target_uuid(UUID(ssl_decryption_profile.ref_id.value)) + self.parcel.ssl_decryption_profile = RefIdItem.from_uuid(v2_uuid) + + if url_filtering := self.parcel.url_filtering: + v2_uuid = self.get_target_uuid(UUID(url_filtering.ref_id.value)) + self.parcel.url_filtering = RefIdItem.from_uuid(v2_uuid) + + +REFERENCES_UPDATER_MAPPING: Mapping[type, Type[ReferencesUpdater]] = { + UrlFilteringParcel: UrlFilteringReferencesUpdater, + SslDecryptionProfileParcel: SslProfileReferencesUpdater, + IntrusionPreventionParcel: IntrusionPreventionReferencesUpdater, + AdvancedInspectionProfileParcel: AdvancedInspectionProfileReferencesUpdater, +} + +POLICY_OBJECTS_PUSH_ORDER: Mapping[Type[AnyParcel], int] = { + UrlFilteringParcel: 1, + SslDecryptionProfileParcel: 1, + IntrusionPreventionParcel: 1, + AdvancedInspectionProfileParcel: 2, +} + + +def get_parcel_ordering_value(parcel: Type[AnyParcel]) -> int: + return POLICY_OBJECTS_PUSH_ORDER.get(parcel, 0) + + +def update_parcels_references(parcel: AnyPolicyObjectParcel, pushed_objects_map: Dict[UUID, UUID]) -> None: + if ref_updater := REFERENCES_UPDATER_MAPPING.get(type(parcel)): + ref_updater(parcel, pushed_objects_map=pushed_objects_map).update_references() + + +class PolicyObjectPusher: + def __init__( + self, + ux2_config: UX2Config, + session: ManagerSession, + push_result: UX2ConfigPushResult, + progress: Callable[[str, int, int], None], + ) -> None: + self._ux2_config = ux2_config + self._policy_object_api: PolicyObjectFeatureProfileAPI = session.api.sdwan_feature_profiles.policy_object + self._push_result: UX2ConfigPushResult = push_result + self._progress: Callable[[str, int, int], None] = progress + self._pushed_objects_map: Dict[UUID, UUID] = {} + + def push(self): + default_profile_id = self.get_or_create_default_policy_object_profile() + self.push_groups_of_interests_objects(default_profile_id) + + def push_groups_of_interests_objects(self, default_profile_id: UUID): + # TODO: fix typing issues in this method, probably we need literal containig AnyPolicyObjectType type_ + def cast_(parcel: AnyParcel) -> AnyPolicyObjectParcel: + return parcel # type: ignore + + profile_rollback = self._push_result.rollback.add_default_policy_object_profile(default_profile_id) + + # will hold system created parcels id by type and name when detected + system_created_parcels: Dict[Type[AnyPolicyObjectParcel], Dict[str, UUID]] = {} + + transformed_parcels = sorted( + [ + transformed_parcel + for transformed_parcel in self._ux2_config.profile_parcels + if type(transformed_parcel.parcel) in list_types(AnyPolicyObjectParcel) + ], + key=lambda x: get_parcel_ordering_value(type(x.parcel)), # sorter + ) + + for i, transformed_parcel in enumerate(transformed_parcels): + parcel = cast_(transformed_parcel.parcel) + header = transformed_parcel.header + parcel_type = type(parcel) + + # update existing system created parcels + if not system_created_parcels.get(parcel_type): + system_created_parcels[parcel_type] = {} + exsisting_parcel_list = cast( + DataSequence[Parcel[AnyPolicyObjectParcel]], + self._policy_object_api.get(default_profile_id, parcel_type).filter( # type: ignore [arg-type] + created_by="system" + ), + ) + for ep in exsisting_parcel_list: + id_ = UUID(ep.parcel_id) if not isinstance(ep.parcel_id, UUID) else ep.parcel_id + system_created_parcels[parcel_type][ep.payload.parcel_name] = id_ + + # if parcel with given name exists we skip it + if system_created_parcels[parcel_type].get(header.origname or ""): # changed + continue + + try: + update_parcels_references(parcel, self._pushed_objects_map) + + parcel_id = self._policy_object_api.create(profile_id=default_profile_id, payload=parcel).id + profile_rollback.add_parcel(parcel.type_, parcel_id) + self._push_result.report.groups_of_interest.add_created(parcel.parcel_name, parcel_id) + self._pushed_objects_map[transformed_parcel.header.origin] = parcel_id + + self._progress( + f"Creating Policy Object Parcel: {parcel.parcel_name}", + i + 1, + len(transformed_parcels), + ) + except ManagerHTTPError as e: + logger.error(f"Error occured during config group creation: {e.info}") + self._push_result.report.groups_of_interest.add_failed(parcel, e) + + def get_or_create_default_policy_object_profile(self) -> UUID: + profiles = self._policy_object_api.get_profiles() + if len(profiles) >= 1: + return profiles[0].profile_id + profile_id = self._policy_object_api.create_profile( + FeatureProfileCreationPayload(name="Policy_Profile_Global", description="Policy_Profile_Global_description") + ).id + return profile_id diff --git a/catalystwan/utils/config_migration/reverters/config_reverter.py b/catalystwan/utils/config_migration/reverters/config_reverter.py index 464f6f31..8c11b2f9 100644 --- a/catalystwan/utils/config_migration/reverters/config_reverter.py +++ b/catalystwan/utils/config_migration/reverters/config_reverter.py @@ -48,6 +48,10 @@ def rollback(self, rollback_info: UX2RollbackInfo, progress: Callable[[str, int, if rollback_info.default_policy_object_profile is not None: profile_id = rollback_info.default_policy_object_profile.profile_id api = self._session.api.sdwan_feature_profiles.policy_object + + # removing order shall be reversed, otherwise some parcels may not be removed due to reference count != 0 + rollback_info.default_policy_object_profile.parcels.reverse() + for i, parcel in enumerate(rollback_info.default_policy_object_profile.parcels): parcel_id, parcel_type_str = parcel parcel_type = find_type(parcel_type_str, AnyPolicyObjectParcel) diff --git a/catalystwan/utils/config_migration/runner.py b/catalystwan/utils/config_migration/runner.py index 203bca4b..8df37eb4 100644 --- a/catalystwan/utils/config_migration/runner.py +++ b/catalystwan/utils/config_migration/runner.py @@ -9,6 +9,7 @@ from catalystwan.models.configuration.feature_profile.sdwan.policy_object import AnyPolicyObjectParcel from catalystwan.session import ManagerSession from catalystwan.typed_list import DataSequence +from catalystwan.utils.config_migration.creators.policy_object_pusher import get_parcel_ordering_value from catalystwan.workflows.config_migration import ( collect_ux1_config, log_progress, @@ -108,8 +109,13 @@ def clear_ux2(self) -> None: po_profiles = fp_api.policy_object.get_profiles() if len(po_profiles) > 1: print("WARNING MORE THAN ONE DEFAULT POLICY OBJECT PROFILE DETECTED") + for po_profile in po_profiles: - for dpo_parcel_type in list_types(AnyPolicyObjectParcel): + sorted_parcel_types = sorted( + list_types(AnyPolicyObjectParcel), key=lambda x: get_parcel_ordering_value(x), reverse=True + ) + + for dpo_parcel_type in sorted_parcel_types: for parcel in cast( DataSequence[Parcel[AnyPolicyObjectParcel]], fp_api.policy_object.get(po_profile.profile_id, dpo_parcel_type),