Skip to content
This repository has been archived by the owner on Nov 21, 2024. It is now read-only.

Commit

Permalink
dev-uxmt: Advanced Malware Protection converter (#713)
Browse files Browse the repository at this point in the history
  • Loading branch information
radkrawczyk authored Jun 5, 2024
1 parent 88dfe9a commit a9b1b3b
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 15 deletions.
2 changes: 2 additions & 0 deletions catalystwan/models/configuration/config_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from catalystwan.endpoints.configuration_group import ConfigGroupCreationPayload
from catalystwan.endpoints.configuration_settings import CloudCredentials
from catalystwan.exceptions import ManagerHTTPError
from catalystwan.models.common import VpnId
from catalystwan.models.configuration.feature_profile.common import FeatureProfileCreationPayload, ProfileType
from catalystwan.models.configuration.feature_profile.parcel import AnyParcel, list_types
from catalystwan.models.configuration.feature_profile.sdwan.policy_object import AnyPolicyObjectParcel
Expand Down Expand Up @@ -471,6 +472,7 @@ class PolicyConvertContext:
regions_by_list_id: Dict[UUID, List[str]] = field(default_factory=dict)
sites_by_list_id: Dict[UUID, List[str]] = field(default_factory=dict)
lan_vpns_by_list_id: Dict[UUID, List[str]] = field(default_factory=dict)
amp_target_vpns_id: Dict[UUID, List[VpnId]] = field(default_factory=dict)

@staticmethod
def from_configs(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
# Copyright 2024 Cisco Systems, Inc. and its affiliates

from typing import List, Literal
from typing import List, Literal, Optional

from pydantic import AliasPath, Field

from catalystwan.api.configuration_groups.parcel import Global, _ParcelBase
from catalystwan.api.configuration_groups.parcel import Global, _ParcelBase, as_global

FileReputationServer = Literal["nam", "eur", "apjc"]
FileReputationAlert = Literal["critical", "warning", "info"]
FileAnalysisServer = Literal["nam", "eur"]
FileAnalysisAlert = Literal["critical", "warning", "info"]

FileAnalysisFileTypes = Literal[
"pdf", "ms-exe", "new-office", "rtf", "mdb", "mscab", "msole2", "wri", "xlw", "flv", "swf"
]


class AdvancedMalwareProtectionParcel(_ParcelBase):
type_: Literal["unified/advanced-malware-protection"] = Field(
Expand All @@ -32,12 +36,48 @@ class AdvancedMalwareProtectionParcel(_ParcelBase):
file_analysis_enabled: Global[bool] = Field(
default=Global[bool](value=False), validation_alias=AliasPath("data", "fileAnalysisEnabled")
)
file_analysis_cloud_server: Global[FileAnalysisServer] = Field(
file_analysis_cloud_server: Optional[Global[FileAnalysisServer]] = Field(
default=None, validation_alias=AliasPath("data", "fileAnalysisCloudServer")
)
file_analysis_file_types: Global[List[str]] = Field(
file_analysis_file_types: Optional[Global[List[FileAnalysisFileTypes]]] = Field(
default=None, validation_alias=AliasPath("data", "fileAnalysisFileTypes")
)
file_analysis_alert: Global[FileAnalysisAlert] = Field(
file_analysis_alert: Optional[Global[FileAnalysisAlert]] = Field(
default=None, validation_alias=AliasPath("data", "fileAnalysisAlert")
)

@classmethod
def create(
cls,
parcel_name: str,
parcel_description: str,
file_reputation_cloud_server: FileReputationServer,
file_reputation_est_server: FileReputationServer,
file_reputation_alert: FileReputationAlert,
match_all_vpn: bool,
file_analysis_enabled: bool = False,
file_analysis_alert: Optional[FileAnalysisAlert] = None,
file_analysis_cloud_server: Optional[FileAnalysisServer] = None,
file_analysis_file_types: List[FileAnalysisFileTypes] = [],
):
_file_analysis_alert = as_global(file_analysis_alert, FileAnalysisAlert) if file_analysis_alert else None
_file_analysis_cloud_server = (
Global[FileAnalysisServer](value=file_analysis_cloud_server) if file_analysis_cloud_server else None
)

_file_analysis_file_types = (
Global[List[FileAnalysisFileTypes]](value=file_analysis_file_types) if file_analysis_file_types else None
)

return cls(
parcel_name=parcel_name,
parcel_description=parcel_description,
file_reputation_cloud_server=as_global(file_reputation_cloud_server, FileReputationServer),
file_reputation_est_server=as_global(file_reputation_est_server, FileReputationServer),
file_analysis_alert=_file_analysis_alert,
file_analysis_cloud_server=_file_analysis_cloud_server,
file_reputation_alert=as_global(file_reputation_alert, FileReputationAlert),
match_all_vpn=as_global(match_all_vpn),
file_analysis_enabled=as_global(file_analysis_enabled),
file_analysis_file_types=_file_analysis_file_types,
)
107 changes: 107 additions & 0 deletions catalystwan/tests/test_amp_converter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import unittest
from uuid import uuid4

from catalystwan.models.configuration.config_migration import PolicyConvertContext
from catalystwan.models.policy.definition.amp import (
AdvancedMalwareProtectionDefinition,
AdvancedMalwareProtectionPolicy,
)
from catalystwan.utils.config_migration.converters.policy.policy_definitions import convert


class TestAdvancedMalwareProtectionConverter(unittest.TestCase):
def setUp(self) -> None:
self.context = PolicyConvertContext()

def test_amp_security_conversion(self):
amp_v1_entry = AdvancedMalwareProtectionPolicy(
name="amp_security",
mode="security",
definition=AdvancedMalwareProtectionDefinition(
match_all_vpn=False,
file_reputation_est_server="eur",
file_reputation_cloud_server="eur",
file_reputation_alert="warning",
file_analysis_enabled=False,
file_analysis_file_types=[],
file_analysis_alert="info",
file_analysis_cloud_server="eur",
target_vpns=[1, 2, 3],
),
)

uuid = uuid4()
parcel = convert(amp_v1_entry, uuid, context=self.context)

assert parcel.parcel_name == "amp_security"
assert parcel.match_all_vpn.value is False
assert parcel.file_reputation_cloud_server.value == "eur"
assert parcel.file_reputation_est_server.value == "eur"
assert parcel.file_reputation_alert.value == "warning"
assert parcel.file_analysis_enabled.value is False
assert parcel.file_analysis_file_types is None
assert parcel.file_analysis_alert.value == "info"
assert parcel.file_analysis_cloud_server.value == "eur"

assert len(self.context.amp_target_vpns_id) == 1
assert self.context.amp_target_vpns_id[uuid] == amp_v1_entry.definition.target_vpns

def test_amp_unified_conversion(self):
amp_v1_entry = AdvancedMalwareProtectionPolicy(
name="amp_unified",
mode="unified",
definition=AdvancedMalwareProtectionDefinition(
match_all_vpn=True,
file_reputation_est_server="eur",
file_reputation_cloud_server="eur",
file_reputation_alert="info",
file_analysis_enabled=False,
file_analysis_file_types=[],
file_analysis_alert="info",
file_analysis_cloud_server="eur",
),
)

uuid = uuid4()
parcel = convert(amp_v1_entry, uuid, context=self.context)

assert parcel.parcel_name == "amp_unified"
assert parcel.match_all_vpn.value is True
assert parcel.file_reputation_cloud_server.value == "eur"
assert parcel.file_reputation_est_server.value == "eur"
assert parcel.file_reputation_alert.value == "info"
assert parcel.file_analysis_enabled.value is False
assert parcel.file_analysis_file_types is None
assert parcel.file_analysis_alert.value == "info"
assert parcel.file_analysis_cloud_server.value == "eur"
assert len(self.context.amp_target_vpns_id) == 0

def test_amp_conversion_with_empty_literals(self):
amp_v1_entry = AdvancedMalwareProtectionPolicy(
name="amp_empty_literals",
mode="unified",
definition=AdvancedMalwareProtectionDefinition(
match_all_vpn=True,
file_reputation_est_server="eur",
file_reputation_cloud_server="eur",
file_reputation_alert="critical",
file_analysis_enabled=True,
file_analysis_alert="",
file_analysis_file_types=["pdf", "mscab"],
file_analysis_cloud_server="",
),
)

uuid = uuid4()
parcel = convert(amp_v1_entry, uuid, context=self.context)

assert parcel.parcel_name == "amp_empty_literals"
assert parcel.match_all_vpn.value is True
assert parcel.file_reputation_cloud_server.value == "eur"
assert parcel.file_reputation_est_server.value == "eur"
assert parcel.file_reputation_alert.value == "critical"
assert parcel.file_analysis_enabled.value is True
assert parcel.file_analysis_file_types.value == ["pdf", "mscab"]
assert parcel.file_analysis_alert is None
assert parcel.file_analysis_cloud_server is None
assert len(self.context.amp_target_vpns_id) == 0
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
from ipaddress import IPv4Interface
from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple, Type, Union
from uuid import UUID

from pydantic import Field
from typing_extensions import Annotated
Expand All @@ -9,12 +10,16 @@
from catalystwan.models.configuration.config_migration import PolicyConvertContext
from catalystwan.models.configuration.feature_profile.sdwan.acl.ipv4acl import Ipv4AclParcel
from catalystwan.models.configuration.feature_profile.sdwan.acl.ipv6acl import Ipv6AclParcel
from catalystwan.models.configuration.feature_profile.sdwan.policy_object.security.amp import (
AdvancedMalwareProtectionParcel,
)
from catalystwan.models.configuration.feature_profile.sdwan.topology.custom_control import CustomControlParcel
from catalystwan.models.configuration.feature_profile.sdwan.topology.hubspoke import HubSpokeParcel
from catalystwan.models.configuration.feature_profile.sdwan.topology.mesh import MeshParcel
from catalystwan.models.policy import AnyPolicyDefinition
from catalystwan.models.policy.definition.access_control_list import AclPolicy
from catalystwan.models.policy.definition.access_control_list_ipv6 import AclIPv6Policy
from catalystwan.models.policy.definition.amp import AdvancedMalwareProtectionPolicy
from catalystwan.models.policy.definition.control import ControlPolicy
from catalystwan.models.policy.definition.hub_and_spoke import HubAndSpokePolicy
from catalystwan.models.policy.definition.mesh import MeshPolicy
Expand All @@ -26,7 +31,14 @@
Input = AnyPolicyDefinition
Output = Optional[
Annotated[
Union[CustomControlParcel, HubSpokeParcel, MeshParcel, Ipv4AclParcel, Ipv6AclParcel],
Union[
CustomControlParcel,
HubSpokeParcel,
MeshParcel,
Ipv4AclParcel,
Ipv6AclParcel,
AdvancedMalwareProtectionParcel,
],
Field(discriminator="type_"),
]
]
Expand All @@ -51,15 +63,28 @@ def as_num_ranges_list(p: str) -> List[Union[int, Tuple[int, int]]]:
return num_list


def control(in_: ControlPolicy, context) -> CustomControlParcel:
def advanced_malware_protection(
in_: AdvancedMalwareProtectionPolicy, uuid: UUID, context: PolicyConvertContext
) -> AdvancedMalwareProtectionParcel:
if not in_.definition.file_reputation_alert:
raise CatalystwanConverterCantConvertException("AMP file reputation alert shall not be an empty str.")

if vpn_list := in_.definition.target_vpns:
context.amp_target_vpns_id[uuid] = vpn_list

definition_dump = in_.definition.model_dump(exclude={"target_vpns"})
return AdvancedMalwareProtectionParcel.create(**_get_parcel_name_desc(in_), **definition_dump)


def control(in_: ControlPolicy, uuid: UUID, context) -> CustomControlParcel:
if not context:
raise CatalystwanConverterCantConvertException(f"Additional context required for {ControlPolicy.__name__}")
out = CustomControlParcel(**_get_parcel_name_desc(in_))
# TODO: convert definition
return out


def hubspoke(in_: HubAndSpokePolicy, context: PolicyConvertContext) -> HubSpokeParcel:
def hubspoke(in_: HubAndSpokePolicy, uuid: UUID, context: PolicyConvertContext) -> HubSpokeParcel:
target_vpns = context.lan_vpns_by_list_id[in_.definition.vpn_list]
out = HubSpokeParcel(**_get_parcel_name_desc(in_))
out.target.vpn.value.extend(target_vpns)
Expand All @@ -76,7 +101,7 @@ def hubspoke(in_: HubAndSpokePolicy, context: PolicyConvertContext) -> HubSpokeP
return out


def ipv4acl(in_: AclPolicy, context) -> Ipv4AclParcel:
def ipv4acl(in_: AclPolicy, uuid: UUID, context) -> Ipv4AclParcel:
out = Ipv4AclParcel(**_get_parcel_name_desc(in_))
out.set_default_action(in_.default_action.type)
for in_seq in in_.sequences:
Expand Down Expand Up @@ -141,13 +166,13 @@ def ipv4acl(in_: AclPolicy, context) -> Ipv4AclParcel:
return out


def ipv6acl(in_: AclIPv6Policy, context) -> Ipv6AclParcel:
def ipv6acl(in_: AclIPv6Policy, uuid: UUID, context) -> Ipv6AclParcel:
out = Ipv6AclParcel(**_get_parcel_name_desc(in_))
# TODO: convert definition
return out


def mesh(in_: MeshPolicy, context: PolicyConvertContext) -> MeshParcel:
def mesh(in_: MeshPolicy, uuid: UUID, context: PolicyConvertContext) -> MeshParcel:
target_vpns = context.lan_vpns_by_list_id[in_.definition.vpn_list]
mesh_sites: List[str] = []
for region in in_.definition.regions:
Expand All @@ -165,6 +190,7 @@ def mesh(in_: MeshPolicy, context: PolicyConvertContext) -> MeshParcel:
ControlPolicy: control,
HubAndSpokePolicy: hubspoke,
MeshPolicy: mesh,
AdvancedMalwareProtectionPolicy: advanced_malware_protection,
}


Expand All @@ -179,8 +205,8 @@ def _find_converter(in_: Input) -> Callable[..., Output]:
return _not_supported


def convert(in_: Input, context: PolicyConvertContext) -> Output:
result = _find_converter(in_)(in_, context)
def convert(in_: Input, uuid: UUID, context: PolicyConvertContext) -> Output:
result = _find_converter(in_)(in_, uuid, context)
if result is not None:
result.model_validate(result)
return result
5 changes: 3 additions & 2 deletions catalystwan/workflows/config_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,8 @@
DEVICE_TYPE_BLOCKLIST = ["vsmart", "vbond", "vmanage"]

VPN_TEMPLATE_TYPES = [
"cisco_vpn", "vpn-vedge",
"cisco_vpn",
"vpn-vedge",
]

TOPOLOGY_POLICIES = ["control", "hubAndSpoke", "mesh"]
Expand Down Expand Up @@ -417,7 +418,7 @@ def transform(ux1: UX1Config, add_suffix: bool = True) -> ConfigTransformResult:
# Policy Definitions
for policy_definition in ux1.policies.policy_definitions:
try:
pd_parcel = convert_policy_definition(policy_definition, policy_context)
pd_parcel = convert_policy_definition(policy_definition, policy_definition.definition_id, policy_context)
if pd_parcel is not None:
header = TransformHeader(
type=pd_parcel._get_parcel_type(),
Expand Down

0 comments on commit a9b1b3b

Please sign in to comment.