diff --git a/catalystwan/utils/config_migration/converters/exceptions.py b/catalystwan/utils/config_migration/converters/exceptions.py new file mode 100644 index 00000000..eaa13401 --- /dev/null +++ b/catalystwan/utils/config_migration/converters/exceptions.py @@ -0,0 +1,9 @@ +from catalystwan.exceptions import CatalystwanException + + +class CatalystwanConverterCantConvertException(CatalystwanException): + """ + Exception raised when a CatalystwanConverter can't correctly convert a template. + """ + + pass diff --git a/catalystwan/utils/config_migration/converters/feature_template/aaa.py b/catalystwan/utils/config_migration/converters/feature_template/aaa.py index b2cd0eba..5259b925 100644 --- a/catalystwan/utils/config_migration/converters/feature_template/aaa.py +++ b/catalystwan/utils/config_migration/converters/feature_template/aaa.py @@ -8,8 +8,7 @@ class AAATemplateConverter: supported_template_types = ("cisco_aaa", "cedge_aaa", "aaa") - @staticmethod - def create_parcel(name: str, description: str, template_values: dict) -> AAAParcel: + def create_parcel(self, name: str, description: str, template_values: dict) -> AAAParcel: """ Creates an AAAParcel object based on the provided template values. diff --git a/catalystwan/utils/config_migration/converters/feature_template/appqoe.py b/catalystwan/utils/config_migration/converters/feature_template/appqoe.py index 586d9995..7a0f536b 100644 --- a/catalystwan/utils/config_migration/converters/feature_template/appqoe.py +++ b/catalystwan/utils/config_migration/converters/feature_template/appqoe.py @@ -7,13 +7,13 @@ ServiceNodeGroupName, ServiceNodeGroupsNames, ) +from catalystwan.utils.config_migration.converters.exceptions import CatalystwanConverterCantConvertException class AppqoeTemplateConverter: supported_template_types = ("appqoe",) - @staticmethod - def create_parcel(name: str, description: str, template_values: dict) -> AppqoeParcel: + def create_parcel(self, name: str, description: str, template_values: dict) -> AppqoeParcel: """ Create an AppqoeParcel object based on the provided name, description, and template values. @@ -26,7 +26,21 @@ def create_parcel(name: str, description: str, template_values: dict) -> AppqoeP AppqoeParcel: The created AppqoeParcel object. """ values = deepcopy(template_values) - print(values) + + appnav_controller_group = values.get("appnav_controller_group", []) + if not appnav_controller_group: + raise CatalystwanConverterCantConvertException("Appnav controller group is required for Appqoe parcel") + for appnav in appnav_controller_group: + if group_name := appnav.get("group_name"): + appnav["group_name"] = as_default(group_name.value, AppnavControllerGroupName) + for controller in appnav.get("appnav_controllers", []): + if _vpn := controller.get("vpn"): # noqa: F841 + # VPN field is depended on existence of the Service VPN value + # also from UI this list contains only 1 item and should not be a list. + # AppqoeParcel.forwarder.appnav_controller_group.appnav_controllers[0].vpn + # must be populated in the parcel creation process. + pass + for appqoe_item in values.get("service_context", {}).get("appqoe", []): if item_name := appqoe_item.get("name"): appqoe_item["name"] = as_default(value=item_name.value) @@ -46,16 +60,7 @@ def create_parcel(name: str, description: str, template_values: dict) -> AppqoeP internal = group.get("internal") if internal is not None: group["internal"] = as_default(internal.value) - for appnav in values.get("appnav_controller_group", []): - if group_name := appnav.get("group_name"): - appnav["group_name"] = as_default(group_name.value, AppnavControllerGroupName) - for controller in appnav.get("appnav_controllers", []): - if _vpn := controller.get("vpn"): # noqa: F841 - # VPN field is depended on existence of the Service VPN value - # also from UI this list contains only 1 item and should not be a list. - # AppqoeParcel.forwarder.appnav_controller_group.appnav_controllers[0].vpn - # must be populated in the parcel creation process. - pass + parcel_values = { "parcel_name": name, "parcel_description": description, diff --git a/catalystwan/utils/config_migration/converters/feature_template/banner.py b/catalystwan/utils/config_migration/converters/feature_template/banner.py index 5aa4550b..a3d24849 100644 --- a/catalystwan/utils/config_migration/converters/feature_template/banner.py +++ b/catalystwan/utils/config_migration/converters/feature_template/banner.py @@ -4,8 +4,7 @@ class BannerTemplateConverter: supported_template_types = ("cisco_banner",) - @staticmethod - def create_parcel(name: str, description: str, template_values: dict) -> BannerParcel: + def create_parcel(self, name: str, description: str, template_values: dict) -> BannerParcel: """ Creates a BannerParcel object based on the provided template values. diff --git a/catalystwan/utils/config_migration/converters/feature_template/base.py b/catalystwan/utils/config_migration/converters/feature_template/base.py index c3144e72..9c2e3aae 100644 --- a/catalystwan/utils/config_migration/converters/feature_template/base.py +++ b/catalystwan/utils/config_migration/converters/feature_template/base.py @@ -4,6 +4,5 @@ class FeatureTemplateConverter(Protocol): - @staticmethod - def create_parcel(name: str, description: str, template_values: dict) -> AnySystemParcel: + def create_parcel(self, name: str, description: str, template_values: dict) -> AnySystemParcel: ... diff --git a/catalystwan/utils/config_migration/converters/feature_template/basic.py b/catalystwan/utils/config_migration/converters/feature_template/basic.py index 968c50ae..a859cba2 100644 --- a/catalystwan/utils/config_migration/converters/feature_template/basic.py +++ b/catalystwan/utils/config_migration/converters/feature_template/basic.py @@ -9,8 +9,7 @@ class SystemToBasicTemplateConverter: supported_template_types = ("cisco_system", "system-vsmart", "system-vedge") - @staticmethod - def create_parcel(name: str, description: str, template_values: dict) -> BasicParcel: + def create_parcel(self, name: str, description: str, template_values: dict) -> BasicParcel: """ Converts the provided template values into a BasicParcel object. diff --git a/catalystwan/utils/config_migration/converters/feature_template/bfd.py b/catalystwan/utils/config_migration/converters/feature_template/bfd.py index 5b30dc85..64356848 100644 --- a/catalystwan/utils/config_migration/converters/feature_template/bfd.py +++ b/catalystwan/utils/config_migration/converters/feature_template/bfd.py @@ -4,8 +4,7 @@ class BFDTemplateConverter: supported_template_types = ("cisco_bfd", "bfd-vedge") - @staticmethod - def create_parcel(name: str, description: str, template_values: dict) -> BFDParcel: + def create_parcel(self, name: str, description: str, template_values: dict) -> BFDParcel: """ Creates a BFDParcel object based on the provided template values. diff --git a/catalystwan/utils/config_migration/converters/feature_template/bgp.py b/catalystwan/utils/config_migration/converters/feature_template/bgp.py index 96dc5a26..77f84bbe 100644 --- a/catalystwan/utils/config_migration/converters/feature_template/bgp.py +++ b/catalystwan/utils/config_migration/converters/feature_template/bgp.py @@ -17,8 +17,10 @@ class BGPTemplateConverter: supported_template_types = ("bgp", "cisco_bgp") - @staticmethod - def create_parcel(name: str, description: str, template_values: dict) -> BGPParcel: + device_specific_ipv4_neighbor_address = "{{{{lbgp_1_neighbor_{index}_address}}}}" + device_specific_ipv6_neighbor_address = "{{{{lbgp_1_ipv6_neighbor_{index}_address}}}}" + + def create_parcel(self, name: str, description: str, template_values: dict) -> BGPParcel: """ Creates a BannerParcel object based on the provided template values. @@ -30,8 +32,6 @@ def create_parcel(name: str, description: str, template_values: dict) -> BGPParc Returns: BannerParcel: A BannerParcel object with the provided template values. """ - device_specific_ipv4_neighbor_address = "{{{{lbgp_1_neighbor_{index}_address}}}}" - device_specific_ipv6_neighbor_address = "{{{{lbgp_1_ipv6_neighbor_{index}_address}}}}" parcel_values = {"parcel_name": name, "parcel_description": description, **deepcopy(template_values["bgp"])} @@ -48,7 +48,7 @@ def create_parcel(name: str, description: str, template_values: dict) -> BGPParc family_type["family_type"] = as_global(family_type["family_type"].value, FamilyType) if neighbor.get("address") is None: logger.info("Neighbor address is not set, using device specific variable") - neighbor["address"] = as_variable(device_specific_ipv4_neighbor_address.format(index=(i + 1))) + neighbor["address"] = as_variable(self.device_specific_ipv4_neighbor_address.format(index=(i + 1))) if if_name := neighbor.get("update_source", {}).get("if_name"): neighbor["if_name"] = if_name neighbor.pop("update_source") @@ -73,7 +73,7 @@ def create_parcel(name: str, description: str, template_values: dict) -> BGPParc ) if neighbor.get("address") is None: logger.info("Neighbor address is not set, using device specific variable") - neighbor["address"] = as_variable(device_specific_ipv6_neighbor_address.format(index=(i + 1))) + neighbor["address"] = as_variable(self.device_specific_ipv6_neighbor_address.format(index=(i + 1))) if if_name := neighbor.get("update_source", {}).get("if_name"): neighbor["if_name"] = if_name neighbor.pop("update_source") diff --git a/catalystwan/utils/config_migration/converters/feature_template/dhcp.py b/catalystwan/utils/config_migration/converters/feature_template/dhcp.py index 89cde42a..5c28e7a5 100644 --- a/catalystwan/utils/config_migration/converters/feature_template/dhcp.py +++ b/catalystwan/utils/config_migration/converters/feature_template/dhcp.py @@ -20,8 +20,7 @@ class DhcpTemplateConverter: variable_mac_address = "{{{{dhcp_1_staticLease_{}_macAddress}}}}" variable_ip = "{{{{dhcp_1_staticLease_{}_ip}}}}" - @classmethod - def create_parcel(cls, name: str, description: str, template_values: dict) -> LanVpnDhcpServerParcel: + def create_parcel(self, name: str, description: str, template_values: dict) -> LanVpnDhcpServerParcel: """ Create a LanVpnDhcpServerParcel object based on the provided parameters. @@ -50,19 +49,19 @@ def create_parcel(cls, name: str, description: str, template_values: dict) -> La "Assiging variable: dhcp_1_addressPool_networkAddress and dhcp_1_addressPool_subnetMask." ) values["address_pool"] = { - "network_address": as_variable(cls.variable_address_pool), - "subnet_mask": as_variable(cls.variable_subnet_mask), + "network_address": as_variable(self.variable_address_pool), + "subnet_mask": as_variable(self.variable_subnet_mask), } for entry in values.get("option_code", []): - cls._convert_str_list_to_ipv4_list(entry, "ip") + self._convert_str_list_to_ipv4_list(entry, "ip") for key in ("dns_servers", "tftp_servers"): - cls._convert_str_list_to_ipv4_list(values, key) + self._convert_str_list_to_ipv4_list(values, key) static_lease = [] for i, entry in enumerate(values.get("static_lease", [])): - mac_address, ip = cls._get_mac_address_and_ip(entry, i) + mac_address, ip = self._get_mac_address_and_ip(entry, i) static_lease.append( { "mac_address": mac_address, @@ -79,8 +78,7 @@ def create_parcel(cls, name: str, description: str, template_values: dict) -> La return LanVpnDhcpServerParcel(**parcel_values) # type: ignore - @classmethod - def _convert_str_list_to_ipv4_list(cls, d: dict, key: str) -> None: + def _convert_str_list_to_ipv4_list(self, d: dict, key: str) -> None: """ Convert a list of strings representing IPv4 addresses to a list of IPv4Address objects. @@ -95,19 +93,18 @@ def _convert_str_list_to_ipv4_list(cls, d: dict, key: str) -> None: if str_list := d.get(key, as_global([])).value: d[key] = Global[List[IPv4Address]](value=[IPv4Address(ip) for ip in str_list]) - @classmethod - def _get_mac_address_and_ip(cls, entry: dict, i: int) -> tuple: - mac_address = entry.get("mac_address", as_variable(cls.variable_mac_address.format(i + 1))) - ip = entry.get("ip", as_variable(cls.variable_ip.format(i + 1))) + def _get_mac_address_and_ip(self, entry: dict, i: int) -> tuple: + mac_address = entry.get("mac_address", as_variable(self.variable_mac_address.format(i + 1))) + ip = entry.get("ip", as_variable(self.variable_ip.format(i + 1))) if isinstance(mac_address, Variable): logger.warning( f"No MAC address specified for static lease {i + 1}." - f"Assigning variable: {cls.variable_mac_address.format(i + 1)}" + f"Assigning variable: {self.variable_mac_address.format(i + 1)}" ) if isinstance(ip, Variable): logger.warning( f"No IP address specified for static lease {i + 1}." - f"Assigning variable: {cls.variable_ip.format(i + 1)}" + f"Assigning variable: {self.variable_ip.format(i + 1)}" ) return mac_address, ip diff --git a/catalystwan/utils/config_migration/converters/feature_template/factory_method.py b/catalystwan/utils/config_migration/converters/feature_template/factory_method.py index 2288c147..f86b459f 100644 --- a/catalystwan/utils/config_migration/converters/feature_template/factory_method.py +++ b/catalystwan/utils/config_migration/converters/feature_template/factory_method.py @@ -1,6 +1,6 @@ import json import logging -from typing import Any, Dict, cast +from typing import Any, Callable, Dict, cast from catalystwan.api.template_api import FeatureTemplateInformation from catalystwan.exceptions import CatalystwanException @@ -53,7 +53,7 @@ } -def choose_parcel_converter(template_type: str) -> FeatureTemplateConverter: +def choose_parcel_converter(template_type: str) -> Callable[..., FeatureTemplateConverter]: """ This function is used to choose the correct parcel factory based on the template type. @@ -87,7 +87,7 @@ def create_parcel_from_template(template: FeatureTemplateInformation) -> AnySyst Raises: ValueError: If the given template type is not supported. """ - converter = choose_parcel_converter(template.template_type) + converter = choose_parcel_converter(template.template_type)() template_definition_as_dict = json.loads(cast(str, template.template_definiton)) template_values = find_template_values(template_definition_as_dict) template_values_normalized = template_definition_normalization(template_values) diff --git a/catalystwan/utils/config_migration/converters/feature_template/global_.py b/catalystwan/utils/config_migration/converters/feature_template/global_.py index ade77de6..36178e37 100644 --- a/catalystwan/utils/config_migration/converters/feature_template/global_.py +++ b/catalystwan/utils/config_migration/converters/feature_template/global_.py @@ -4,8 +4,7 @@ class GlobalTemplateConverter: supported_template_types = ("cedge_global",) - @staticmethod - def create_parcel(name: str, description: str, template_values: dict) -> GlobalParcel: + def create_parcel(self, name: str, description: str, template_values: dict) -> GlobalParcel: """ Creates an Logging object based on the provided template values. diff --git a/catalystwan/utils/config_migration/converters/feature_template/logging_.py b/catalystwan/utils/config_migration/converters/feature_template/logging_.py index 399be09f..ec02f24e 100644 --- a/catalystwan/utils/config_migration/converters/feature_template/logging_.py +++ b/catalystwan/utils/config_migration/converters/feature_template/logging_.py @@ -9,8 +9,7 @@ class LoggingTemplateConverter: supported_template_types = ("cisco_logging", "logging") - @staticmethod - def create_parcel(name: str, description: str, template_values: dict) -> LoggingParcel: + def create_parcel(self, name: str, description: str, template_values: dict) -> LoggingParcel: """ Creates an Logging object based on the provided template values. diff --git a/catalystwan/utils/config_migration/converters/feature_template/normalizer.py b/catalystwan/utils/config_migration/converters/feature_template/normalizer.py index ecc1aa74..1a779010 100644 --- a/catalystwan/utils/config_migration/converters/feature_template/normalizer.py +++ b/catalystwan/utils/config_migration/converters/feature_template/normalizer.py @@ -3,6 +3,8 @@ from catalystwan.api.configuration_groups.parcel import Global, as_global from catalystwan.models.common import TLOCColor +from catalystwan.models.configuration.feature_profile.sdwan.service.dhcp_server import SubnetMask +from catalystwan.models.configuration.feature_profile.sdwan.service.lan.vpn import Direction from catalystwan.models.configuration.feature_profile.sdwan.system.logging_parcel import ( AuthType, CypherSuite, @@ -11,7 +13,17 @@ ) from catalystwan.models.configuration.feature_profile.sdwan.system.mrf import EnableMrfMigration, Role -CastableLiterals = [Priority, TlsVersion, AuthType, CypherSuite, Role, EnableMrfMigration, TLOCColor] +CastableLiterals = [ + Priority, + TlsVersion, + AuthType, + CypherSuite, + Role, + EnableMrfMigration, + TLOCColor, + SubnetMask, + Direction, +] CastedTypes = Union[ Global[bool], diff --git a/catalystwan/utils/config_migration/converters/feature_template/ntp.py b/catalystwan/utils/config_migration/converters/feature_template/ntp.py index a766abcf..580eb290 100644 --- a/catalystwan/utils/config_migration/converters/feature_template/ntp.py +++ b/catalystwan/utils/config_migration/converters/feature_template/ntp.py @@ -4,8 +4,7 @@ class NTPTemplateConverter: supported_template_types = ("cisco_ntp", "ntp") - @staticmethod - def create_parcel(name: str, description: str, template_values: dict) -> NTPParcel: + def create_parcel(self, name: str, description: str, template_values: dict) -> NTPParcel: """ Creates an Logging object based on the provided template values. diff --git a/catalystwan/utils/config_migration/converters/feature_template/omp.py b/catalystwan/utils/config_migration/converters/feature_template/omp.py index e3703f66..cc5dad83 100644 --- a/catalystwan/utils/config_migration/converters/feature_template/omp.py +++ b/catalystwan/utils/config_migration/converters/feature_template/omp.py @@ -1,3 +1,4 @@ +from copy import deepcopy from typing import Dict, List from catalystwan.api.configuration_groups.parcel import Global, as_default, as_global @@ -7,8 +8,7 @@ class OMPTemplateConverter: supported_template_types = ("cisco_omp", "omp-vedge", "omp-vsmart") - @staticmethod - def create_parcel(name: str, description: str, template_values: dict) -> OMPParcel: + def create_parcel(self, name: str, description: str, template_values: dict) -> OMPParcel: """ Creates an OMPParcel object based on the provided template values. @@ -20,16 +20,15 @@ def create_parcel(name: str, description: str, template_values: dict) -> OMPParc Returns: OMPParcel: An OMPParcel object with the provided template values. """ - - def create_advertise_dict(advertise_list: List) -> Dict: - return {definition["protocol"].value: Global[bool](value=True) for definition in advertise_list} - + values = deepcopy(template_values) parcel_values = { "parcel_name": name, "parcel_description": description, - "ecmp_limit": as_global(float(template_values.get("ecmp_limit", as_default(4)).value)), - "advertise_ipv4": create_advertise_dict(template_values.get("advertise", [])), - "advertise_ipv6": create_advertise_dict(template_values.get("ipv6_advertise", [])), + "ecmp_limit": as_global(float(values.get("ecmp_limit", as_default(4)).value)), + "advertise_ipv4": self.create_advertise_dict(values.get("advertise", [])), + "advertise_ipv6": self.create_advertise_dict(values.get("ipv6_advertise", [])), } - return OMPParcel(**parcel_values) + + def create_advertise_dict(self, advertise_list: List) -> Dict: + return {definition["protocol"].value: Global[bool](value=True) for definition in advertise_list} diff --git a/catalystwan/utils/config_migration/converters/feature_template/snmp.py b/catalystwan/utils/config_migration/converters/feature_template/snmp.py index bce41441..2e06da51 100644 --- a/catalystwan/utils/config_migration/converters/feature_template/snmp.py +++ b/catalystwan/utils/config_migration/converters/feature_template/snmp.py @@ -17,8 +17,9 @@ class SNMPTemplateConverter: supported_template_types = ("cisco_snmp",) - @staticmethod - def create_parcel(name: str, description: str, template_values: dict) -> SNMPParcel: + default_view_oid_id = "{{{{l_snmpView_1_snmpOid_{}_id}}}}" + + def create_parcel(self, name: str, description: str, template_values: dict) -> SNMPParcel: """ Creates a SecurityParcel object based on the provided template values. @@ -31,20 +32,24 @@ def create_parcel(name: str, description: str, template_values: dict) -> SNMPPar SecurityParcel: A SecurityParcel object with the provided template values. """ values = deepcopy(template_values) + self.configure_community(values) + self.configure_target(values) + parcel_values = {"parcel_name": name, "parcel_description": description, **values} + return SNMPParcel(**parcel_values) # type: ignore + + def configure_community(self, values: dict): for community_item in values.get("community", []): if authorization := community_item.get("authorization"): community_item["authorization"] = as_global(authorization.value, Authorization) + def configure_target(self, values: dict) -> None: values["target"] = values.pop("trap", {}).get("target", []) - default_view_oid_id = "{{{{l_snmpView_1_snmpOid_{}_id}}}}" + for view in values.get("view", []): for i, oid in enumerate(view.get("oid", [])): - id_ = oid.get("id", as_variable(default_view_oid_id.format(i + 1))) + id_ = oid.get("id", as_variable(self.default_view_oid_id.format(i + 1))) if isinstance(id_, Variable): logger.info( - f"OID ID is not set, using device specific variable {default_view_oid_id.format(i + 1)}" + f"OID ID is not set, using device specific variable {self.default_view_oid_id.format(i + 1)}" ) oid["id"] = id_ - - parcel_values = {"parcel_name": name, "parcel_description": description, **values} - return SNMPParcel(**parcel_values) # type: ignore diff --git a/catalystwan/utils/config_migration/converters/feature_template/thousandeyes.py b/catalystwan/utils/config_migration/converters/feature_template/thousandeyes.py index 699a394b..a16faada 100644 --- a/catalystwan/utils/config_migration/converters/feature_template/thousandeyes.py +++ b/catalystwan/utils/config_migration/converters/feature_template/thousandeyes.py @@ -1,3 +1,4 @@ +from copy import deepcopy from typing import Union from catalystwan.api.configuration_groups.parcel import as_global, as_variable @@ -16,8 +17,12 @@ class ThousandEyesTemplateConverter: supported_template_types = ("cisco_thousandeyes",) - @staticmethod - def create_parcel(name: str, description: str, template_values: dict) -> ThousandEyesParcel: + delete_keys = ("proxy_type", "proxy_pac", "proxy_static", "proxy_port") + + # Default Values - TE Management IP + thousand_eyes_mgmt_ip = "{{thousand_eyes_mgmt_ip}}" + + def create_parcel(self, name: str, description: str, template_values: dict) -> ThousandEyesParcel: """ Creates a ThousandEyesParcel object based on the provided template values. @@ -29,35 +34,38 @@ def create_parcel(name: str, description: str, template_values: dict) -> Thousan Returns: ThousandEyesParcel: A ThousandEyesParcel object with the provided values. """ - virtual_application = template_values["virtual_application"][0]["te"] + values = deepcopy(template_values["virtual_application"][0]["te"]) + self.configure_thousand_eyes_mgmt_ip(values) + self.configure_proxy_type(values) + self.cleanup_keys(values) + parcel_values = { + "parcel_name": name, + "parcel_description": description, + "virtual_application": [values], + } + return ThousandEyesParcel(**parcel_values) # type: ignore - if virtual_application.get("te_mgmt_ip"): - virtual_application["te_mgmt_ip"] = as_variable("{{thousand_eyes_mgmt_ip}}") + def configure_thousand_eyes_mgmt_ip(self, values: dict): + if values.get("te_mgmt_ip"): + values["te_mgmt_ip"] = as_variable(self.thousand_eyes_mgmt_ip) - proxy_type = virtual_application.get("proxy_type", as_global("none")) + def configure_proxy_type(self, values: dict): + proxy_type = values.get("proxy_type", as_global("none")) if proxy_type is None: proxy_type == as_global("none") proxy_type = proxy_type.value - proxy_config: Union[ProxyConfigNone, ProxyConfigPac, ProxyConfigStatic] = ProxyConfigNone() if proxy_type == "none": proxy_config = ProxyConfigNone() elif proxy_type == "pac": - proxy_config = ProxyConfigPac(pac_url=virtual_application["proxy_pac"]["pac_url"]) + proxy_config = ProxyConfigPac(pac_url=values["proxy_pac"]["pac_url"]) elif proxy_type == "static": proxy_config = ProxyConfigStatic( - proxy_host=virtual_application["proxy_static"]["proxy_host"], - proxy_port=virtual_application["proxy_static"]["proxy_port"], + proxy_host=values["proxy_static"]["proxy_host"], + proxy_port=values["proxy_static"]["proxy_port"], ) + values["proxy_config"] = proxy_config - virtual_application["proxy_config"] = proxy_config - - for key in ["proxy_type", "proxy_pac", "proxy_static", "proxy_port"]: - virtual_application.pop(key, None) - - parcel_values = { - "parcel_name": name, - "parcel_description": description, - "virtual_application": [virtual_application], - } - return ThousandEyesParcel(**parcel_values) # type: ignore + def cleanup_keys(self, values: dict): + for key in self.delete_keys: + values.pop(key, None) diff --git a/catalystwan/utils/config_migration/converters/feature_template/ucse.py b/catalystwan/utils/config_migration/converters/feature_template/ucse.py index 0c151892..e719662b 100644 --- a/catalystwan/utils/config_migration/converters/feature_template/ucse.py +++ b/catalystwan/utils/config_migration/converters/feature_template/ucse.py @@ -12,8 +12,9 @@ class UcseTemplateConverter: supported_template_types = ("ucse",) - @staticmethod - def create_parcel(name: str, description: str, template_values: dict) -> UcseParcel: + delete_keys = ("module_type", "subslot_name") + + def create_parcel(self, name: str, description: str, template_values: dict) -> UcseParcel: """ Creates a UcseParcel object based on the provided template values. @@ -25,27 +26,34 @@ def create_parcel(name: str, description: str, template_values: dict) -> UcsePar Returns: UcseParcel: A UcseParcel object with the provided values. """ - parcel_values = deepcopy(template_values) - - for interface_values in parcel_values.get("interface", []): + values = deepcopy(template_values) + self.configure_interface(values) + self.configure_static_case(values) + self.configure_lom_type(values) + self.cleanup_keys(values) + values.update({"parcel_name": name, "parcel_description": description}) + return UcseParcel(**values) + + def configure_interface(self, values: dict) -> None: + for interface_values in values.get("interface", []): ip = interface_values.pop("ip", None) if ip: interface_values["address"] = ip.get("static_case", {}).get("address") - imc = parcel_values.get("imc", {}) + def configure_static_case(self, values: dict) -> None: + imc = values.get("imc", {}) static_case = imc.get("ip", {}).get("static_case") if static_case: imc["ip"] = static_case - access_port = imc.get("access_port", {}) + def configure_lom_type(self, values: dict) -> None: + access_port = values.get("imc", {}).get("access_port", {}) shared_lom = access_port.get("shared_lom") if shared_lom: lom_type = list(shared_lom.keys())[0] shared_lom.clear() access_port["shared_lom"]["lom_type"] = Global[LomType](value=lom_type) - for key in ["module_type", "subslot_name"]: - parcel_values.pop(key, None) - - parcel_values.update({"parcel_name": name, "parcel_description": description}) - return UcseParcel(**parcel_values) + def cleanup_keys(self, values: dict) -> None: + for key in self.delete_keys: + values.pop(key, None) diff --git a/catalystwan/utils/config_migration/converters/feature_template/vpn.py b/catalystwan/utils/config_migration/converters/feature_template/vpn.py index 3f913dcd..1ac89f11 100644 --- a/catalystwan/utils/config_migration/converters/feature_template/vpn.py +++ b/catalystwan/utils/config_migration/converters/feature_template/vpn.py @@ -1,6 +1,9 @@ import logging from copy import deepcopy from ipaddress import IPv4Interface, IPv6Interface +from typing import Literal, Type, Union + +from pydantic import BaseModel from catalystwan.api.configuration_groups.parcel import as_default, as_global, as_variable from catalystwan.models.configuration.feature_profile.common import Prefix @@ -47,6 +50,22 @@ logger = logging.getLogger(__name__) +class RouteLeakMappingItem(BaseModel): + ux2_model: Type[Union[RouteLeakFromGlobal, RouteLeakFromService, RouteLeakBetweenServices]] + ux2_field: Literal["route_leak_from_global", "route_leak_from_service", "route_leak_between_services"] + + +class RouteMappingItem(BaseModel): + ux2_model: Type[Union[StaticGreRouteIPv4, StaticIpsecRouteIPv4, ServiceRoute]] + ux2_field: Literal["route_gre", "route_service", "ipsec_route"] + + +class OmpMappingItem(BaseModel): + ux2_model_omp: Type[Union[OmpAdvertiseIPv4, OmpAdvertiseIPv6]] + ux2_model_prefix: Type[Union[IPv4Prefix, IPv6Prefix]] + ux2_field: Literal["omp_advertise_ipv4", "omp_advertise_ipv6"] + + class LanVpnParcelTemplateConverter: """ A class for converting template values into a LanVpnParcel object. @@ -54,6 +73,46 @@ class LanVpnParcelTemplateConverter: supported_template_types = ("cisco_vpn",) + delete_keys = ( + "ecmp_hash_key", + "ip", + "omp", + "nat", + "nat64", + "dns", + "host", + "ipv6", + "name", + "route_import_from", + "route_import", + "route_export", + ) + + route_leaks_mapping = { + "route_import": RouteLeakMappingItem(ux2_model=RouteLeakFromGlobal, ux2_field="route_leak_from_global"), + "route_export": RouteLeakMappingItem(ux2_model=RouteLeakFromService, ux2_field="route_leak_from_service"), + "route_import_from": RouteLeakMappingItem( + ux2_model=RouteLeakBetweenServices, ux2_field="route_leak_between_services" + ), + } + + routes_mapping = { + "route_gre": RouteMappingItem(ux2_model=StaticGreRouteIPv4, ux2_field="route_gre"), + "route_service": RouteMappingItem(ux2_model=ServiceRoute, ux2_field="route_service"), + "ipsec_route": RouteMappingItem(ux2_model=StaticIpsecRouteIPv4, ux2_field="ipsec_route"), + } + + omp_mapping = { + "advertise": OmpMappingItem( + ux2_model_omp=OmpAdvertiseIPv4, + ux2_model_prefix=IPv4Prefix, + ux2_field="omp_advertise_ipv4", + ), + "ipv6_advertise": OmpMappingItem( + ux2_model_omp=OmpAdvertiseIPv6, ux2_model_prefix=IPv6Prefix, ux2_field="omp_advertise_ipv6" + ), + } + # Default Values - IPv4 Route ipv4_route_prefix_network_address = "{{{{lan_vpn_ipv4Route_{}_prefix_networkAddress}}}}" ipv4_route_prefix_subnet_mask = "{{{{lan_vpn_ipv4Route_{}_prefix_subnetMask}}}}" @@ -71,6 +130,14 @@ class LanVpnParcelTemplateConverter: nat_overload = "{{{{lan_vpn_nat_{}_overload}}}}" nat_direction = "{{{{lan_vpn_nat_{}_direction}}}}" + # Default Values - Port Forwarding + nat_port_foward_natpool_name = "{{{{lan_vpn_natPortForward_{}_natpoolName}}}}" + nat_port_foward_translate_port = "{{{{lan_vpn_natPortForward_{}_translatePort}}}}" + nat_port_foward_translated_source_ip = "{{{{lan_vpn_natPortForward_{}_translatedSourceIp}}}}" + nat_port_foward_source_port = "{{{{lan_vpn_natPortForward_{}_sourcePort}}}}" + nat_port_foward_source_ip = "{{{{lan_vpn_natPortForward_{}_sourceIp}}}}" + nat_port_foward_protocol = "{{{{lan_vpn_natPortForward_{}_protocol}}}}" + # Default Values - Static NAT static_nat_pool_name = "{{{{lan_vpn__staticNat_{}_poolName}}}}" static_nat_source_ip = "{{{{lan_vpn_staticNat_{}_sourceIp}}}}" @@ -83,8 +150,7 @@ class LanVpnParcelTemplateConverter: nat64_v4_pool_range_end = "{{{{lan_vpn_nat64_{}_v4_poolRangeEnd}}}}" nat64_v4_pool_overload = "{{{{lan_vpn_nat64_{}_v4_poolOverload}}}}" - @classmethod - def create_parcel(cls, name: str, description: str, template_values: dict) -> LanVpnParcel: + def create_parcel(self, name: str, description: str, template_values: dict) -> LanVpnParcel: """ Creates a LanVpnParcel object based on the provided parameters. @@ -97,162 +163,171 @@ def create_parcel(cls, name: str, description: str, template_values: dict) -> La LanVpnParcel: The created LanVpnParcel object. """ values = deepcopy(template_values) - print(values) - if vpn_name := values.pop("name", None): + self.configure_vpn_name(values) + self.configure_natpool(values) + self.configure_port_forwarding(values) + self.configure_static_nat(values) + self.configure_nat64(values) + self.configure_omp(values) + self.configure_dns(values) + self.configure_hostname_mapping(values) + self.configure_service(values) + self.configure_ipv4_route(values) + self.configure_ipv6_route(values) + self.configure_routes(values) + self.configure_route_leaks(values) + self.cleanup_keys(values) + parcel_values = { + "parcel_name": name, + "parcel_description": description, + **values, + } + return LanVpnParcel(**parcel_values) # type: ignore + + def cleanup_keys(self, values: dict) -> None: + for key in self.delete_keys: + values.pop(key, None) + + def configure_vpn_name(self, values: dict) -> None: + if vpn_name := values.get("name", None): values["vpn_name"] = vpn_name - network_address_translation = values.pop("nat", {}) - if natpool := network_address_translation.pop("natpool", []): + def configure_dns(self, values: dict) -> None: + if dns := values.get("dns", []): + dns_ipv4 = DnsIPv4() + for entry in dns: + if entry["role"] == "primary": + dns_ipv4.primary_dns_address_ipv4 = entry["dns_addr"] + elif entry["role"] == "secondary": + dns_ipv4.secondary_dns_address_ipv4 = entry["dns_addr"] + values["dns"] = dns_ipv4 + + def configure_hostname_mapping(self, values: dict) -> None: + if host := values.get("host", []): + host_mapping_items = [] + for entry in host: + host_mapping_item = HostMapping( + host_name=entry["hostname"], + list_of_ip=entry["ip"], + ) + host_mapping_items.append(host_mapping_item) + values["new_host_mapping"] = host_mapping_items + + def configure_service(self, values: dict) -> None: + if service := values.get("service", []): + service_items = [] + for service_i, entry in enumerate(service): + service_item = Service( + service_type=as_global(entry["svc_type"].value, ServiceType), + ipv4_addresses=entry.get("address", as_variable(self.service_ipv4_addresses.format(service_i + 1))), + tracking=entry.get("track_enable", as_default(False)), + ) + service_items.append(service_item) + values["service"] = service_items + + def configure_natpool(self, values: dict) -> None: + if natpool := values.get("nat", {}).get("natpool", []): nat_items = [] for nat_i, entry in enumerate(natpool): direction = entry.get("direction") if direction: direction = as_global(direction.value, Direction) else: - direction = as_variable(cls.nat_direction.format(nat_i + 1)) + direction = as_variable(self.nat_direction.format(nat_i + 1)) nat_items.append( NatPool( - nat_pool_name=entry.get("name", as_variable(cls.nat_natpool_name.format(nat_i + 1))), - prefix_length=entry.get("prefix_length", as_variable(cls.nat_prefix_length.format(nat_i + 1))), - range_start=entry.get("range_start", as_variable(cls.nat_range_start.format(nat_i + 1))), - range_end=entry.get("range_end", as_variable(cls.nat_range_end.format(nat_i + 1))), + nat_pool_name=entry.get("name", as_variable(self.nat_natpool_name.format(nat_i + 1))), + prefix_length=entry.get("prefix_length", as_variable(self.nat_prefix_length.format(nat_i + 1))), + range_start=entry.get("range_start", as_variable(self.nat_range_start.format(nat_i + 1))), + range_end=entry.get("range_end", as_variable(self.nat_range_end.format(nat_i + 1))), overload=entry.get("overload", as_default(True)), direction=direction, ) ) values["nat_pool"] = nat_items - if port_forward := network_address_translation.pop("port_forward", []): + def configure_port_forwarding(self, values: dict) -> None: + if port_forward := values.get("nat", {}).get("port_forward", []): nat_port_forwarding_items = [] - for entry in port_forward: + for net_port_foward_i, entry in enumerate(port_forward): + protocol = entry.get("proto") + if protocol: + protocol = as_global(protocol.value.upper(), NATPortForwardProtocol) + else: + protocol = as_variable(self.nat_port_foward_protocol.format(net_port_foward_i + 1)) nat_port_forwarding_items.append( NatPortForward( - nat_pool_name=entry["pool_name"], - source_port=entry["source_port"], - translate_port=entry["translate_port"], - source_ip=entry["source_ip"], - translated_source_ip=entry["translate_ip"], - protocol=as_global(entry["proto"].value.upper(), NATPortForwardProtocol), + nat_pool_name=entry.get( + "pool_name", as_variable(self.nat_port_foward_natpool_name.format(net_port_foward_i + 1)) + ), + source_port=entry.get( + "source_port", as_variable(self.nat_port_foward_source_port.format(net_port_foward_i + 1)) + ), + translate_port=entry.get( + "translate_port", + as_variable(self.nat_port_foward_translate_port.format(net_port_foward_i + 1)), + ), + source_ip=entry.get( + "source_ip", as_variable(self.nat_port_foward_source_ip.format(net_port_foward_i + 1)) + ), + translated_source_ip=entry.get( + "translate_ip", + as_variable(self.nat_port_foward_translated_source_ip.format(net_port_foward_i + 1)), + ), + protocol=protocol, ) ) values["nat_port_forwarding"] = nat_port_forwarding_items - if static_nat := network_address_translation.pop("static", []): + def configure_static_nat(self, values: dict) -> None: + if static_nat := values.get("nat", {}).get("static", []): static_nat_items = [] for static_nat_i, entry in enumerate(static_nat): static_nat_direction = entry.get("static_nat_direction") if static_nat_direction: static_nat_direction = as_global(static_nat_direction.value, Direction) else: - static_nat_direction = as_variable(cls.static_nat_direction.format(static_nat_i + 1)) + static_nat_direction = as_variable(self.static_nat_direction.format(static_nat_i + 1)) static_nat_items.append( StaticNat( nat_pool_name=entry.get( - "pool_name", as_variable(cls.static_nat_pool_name.format(static_nat_i + 1)) + "pool_name", as_variable(self.static_nat_pool_name.format(static_nat_i + 1)) ), source_ip=entry.get( - "source_ip", as_variable(cls.static_nat_source_ip.format(static_nat_i + 1)) + "source_ip", as_variable(self.static_nat_source_ip.format(static_nat_i + 1)) ), translated_source_ip=entry.get( - "translate_ip", as_variable(cls.static_nat_translated_source_ip.format(static_nat_i + 1)) + "translate_ip", as_variable(self.static_nat_translated_source_ip.format(static_nat_i + 1)) ), static_nat_direction=static_nat_direction, ) ) values["static_nat"] = static_nat_items - network_address_translation_64 = values.pop("nat64", {}) - if nat64pool := network_address_translation_64.pop("v4", {}).pop("pool", []): + def configure_nat64(self, values: dict) -> None: + if nat64pool := values.get("nat64", {}).get("v4", {}).get("pool", []): nat64_items = [] for nat64pool_i, entry in enumerate(nat64pool): nat64_items.append( Nat64v4Pool( nat64_v4_pool_name=entry.get( - "name", as_variable(cls.nat64_v4_pool_name.format(nat64pool_i + 1)) + "name", as_variable(self.nat64_v4_pool_name.format(nat64pool_i + 1)) ), nat64_v4_pool_range_start=entry.get( - "start_address", as_variable(cls.nat64_v4_pool_range_start.format(nat64pool_i + 1)) + "start_address", as_variable(self.nat64_v4_pool_range_start.format(nat64pool_i + 1)) ), nat64_v4_pool_range_end=entry.get( - "end_address", as_variable(cls.nat64_v4_pool_range_end.format(nat64pool_i + 1)) + "end_address", as_variable(self.nat64_v4_pool_range_end.format(nat64pool_i + 1)) ), nat64_v4_pool_overload=entry.get( - "overload", as_variable(cls.nat64_v4_pool_overload.format(nat64pool_i + 1)) + "overload", as_variable(self.nat64_v4_pool_overload.format(nat64pool_i + 1)) ), ) ) values["nat64_v4_pool"] = nat64_items - omp = values.pop("omp", {}) - if omp_advertise_ipv4 := omp.pop("advertise", []): - omp_advertise_ipv4_items = [] - for entry in omp_advertise_ipv4: - ipv4_prefix_list_items = [] - for prefix_entry in entry.pop("prefix_list", []): - ipv4_prefix_item = IPv4Prefix( - prefix=prefix_entry["prefix_entry"], - aggregate_only=prefix_entry["aggregate_only"], - region=prefix_entry["region"], - ) - ipv4_prefix_list_items.append(ipv4_prefix_item) - - omp_advertise_ipv4_item = OmpAdvertiseIPv4( - omp_protocol=as_global(entry["protocol"].value, ProtocolIPv4), - prefix_list=ipv4_prefix_list_items if ipv4_prefix_list_items else None, - ) - omp_advertise_ipv4_items.append(omp_advertise_ipv4_item) - values["omp_advertise_ipv4"] = omp_advertise_ipv4_items - - if omp_advertise_ipv6 := omp.pop("ipv6_advertise", []): - omp_advertise_ipv6_items = [] - for entry in omp_advertise_ipv6: - ipv6_prefix_list_items = [] - for prefix_entry in entry.pop("prefix_list", []): - ipv6_prefix_item = IPv6Prefix( - prefix=prefix_entry["prefix_entry"], - aggregate_only=prefix_entry["aggregate_only"], - region=as_global(prefix_entry["region"].value, Region), - ) - ipv6_prefix_list_items.append(ipv6_prefix_item) - - omp_advertise_ipv6_item = OmpAdvertiseIPv6( - omp_protocol=as_global(entry["protocol"].value, ProtocolIPv6), - prefix_list=ipv6_prefix_list_items if ipv6_prefix_list_items else None, - ) - omp_advertise_ipv6_items.append(omp_advertise_ipv6_item) - values["omp_advertise_ipv6"] = omp_advertise_ipv6_items - - if dns := values.pop("dns", {}): - dns_ipv4 = DnsIPv4() - for entry in dns: - if entry["role"] == "primary": - dns_ipv4.primary_dns_address_ipv4 = entry["dns_addr"] - elif entry["role"] == "secondary": - dns_ipv4.secondary_dns_address_ipv4 = entry["dns_addr"] - - if host := values.pop("host", []): - host_mapping_items = [] - for entry in host: - host_mapping_item = HostMapping( - host_name=entry["hostname"], - list_of_ip=entry["ip"], - ) - host_mapping_items.append(host_mapping_item) - values["new_host_mapping"] = host_mapping_items - - if service := values.get("service", []): - service_items = [] - for service_i, entry in enumerate(service): - service_item = Service( - service_type=as_global(entry["svc_type"].value, ServiceType), - ipv4_addresses=entry.get("address", as_variable(cls.service_ipv4_addresses.format(service_i + 1))), - tracking=entry.get("track_enable", as_default(False)), - ) - service_items.append(service_item) - values["service"] = service_items - - ipv4 = values.pop("ip", {}) - if ipv4_route := ipv4.pop("route", []): + def configure_ipv4_route(self, values: dict) -> None: + if ipv4_route := values.get("ip", {}).get("route", []): ipv4_route_items = [] for route_i, route in enumerate(ipv4_route): prefix = route.pop("prefix", None) @@ -264,8 +339,8 @@ def create_parcel(cls, name: str, description: str, template_values: dict) -> La ) else: route_prefix = RoutePrefix( - ip_address=as_variable(cls.ipv4_route_prefix_network_address.format(route_i + 1)), - subnet_mask=as_variable(cls.ipv4_route_prefix_subnet_mask.format(route_i + 1)), + ip_address=as_variable(self.ipv4_route_prefix_network_address.format(route_i + 1)), + subnet_mask=as_variable(self.ipv4_route_prefix_subnet_mask.format(route_i + 1)), ) ip_route_item = None if "next_hop" in route: @@ -275,12 +350,12 @@ def create_parcel(cls, name: str, description: str, template_values: dict) -> La IPv4RouteGatewayNextHop( address=next_hop.pop( "address", - as_variable(cls.ipv4_route_next_hop_address.format(route_i + 1, next_hop_i + 1)), + as_variable(self.ipv4_route_next_hop_address.format(route_i + 1, next_hop_i + 1)), ), distance=next_hop.pop( "distance", as_variable( - cls.ipv4_route_next_hop_administrative_distance.format( + self.ipv4_route_next_hop_administrative_distance.format( route_i + 1, next_hop_i + 1 ) ), @@ -288,6 +363,31 @@ def create_parcel(cls, name: str, description: str, template_values: dict) -> La ) ) ip_route_item = NextHopRouteContainer(next_hop_container=NextHopContainer(next_hop=next_hop_items)) + elif "next_hop_with_track" in route: + next_hop_with_track_items = [] + for next_hop_with_track_i, next_hop_with_track in enumerate(route.pop("next_hop_with_track", [])): + next_hop_with_track_items.append( + IPv4RouteGatewayNextHop( + address=next_hop_with_track.pop( + "address", + as_variable( + self.ipv4_route_next_hop_address.format(route_i + 1, next_hop_with_track_i + 1) + ), + ), + distance=next_hop_with_track.pop( + "distance", + as_variable( + self.ipv4_route_next_hop_administrative_distance.format( + route_i + 1, next_hop_with_track_i + 1 + ) + ), + ), + ) + ) + + ip_route_item = NextHopRouteContainer( + next_hop_container=NextHopContainer(next_hop_with_tracker=next_hop_items) # type: ignore + ) elif "vpn" in route: ip_route_item = StaticRouteVPN( # type: ignore vpn=as_global(True), @@ -297,46 +397,11 @@ def create_parcel(cls, name: str, description: str, template_values: dict) -> La ) values["ipv4_route"] = ipv4_route_items - if gre_routes := ipv4.pop("gre_route", []): - gre_route_items = [] - for gre_route in gre_routes: - interface = IPv4Interface(gre_route.pop("prefix").value) - gre_prefix = Prefix( - address=as_global(interface.network.network_address), - mask=as_global(str(interface.netmask)), - ) - gre_route_items.append(StaticGreRouteIPv4(prefix=gre_prefix, vpn=gre_route.pop("vpn"))) - values["gre_route"] = gre_route_items - - if ipsec_routes := ipv4.pop("ipsec_route", []): - ipsec_route_items = [] - for ipsec_route in ipsec_routes: - interface = IPv4Interface(ipsec_route.pop("prefix").value) - ipsec_prefix = Prefix( - address=as_global(interface.network.network_address), - mask=as_global(str(interface.netmask)), - ) - ipsec_route_prefix = StaticIpsecRouteIPv4(prefix=ipsec_prefix) - ipsec_route_items.append(ipsec_route_prefix) - values["ipsec_route"] = ipsec_route_items - - if service_routes := ipv4.pop("service_route", []): - service_route_items = [] - for service_route in service_routes: - ipv4_interface = IPv4Interface(service_route.pop("prefix").value) - service_prefix = Prefix( - address=as_global(ipv4_interface.network.network_address), - mask=as_global(str(ipv4_interface.netmask)), - ) - service_route_prefix = ServiceRoute(prefix=service_prefix, vpn=service_route.pop("vpn")) - service_route_items.append(service_route_prefix) - values["service_route"] = service_route_items - - ipv6 = values.pop("ipv6", {}) - if ipv6_route := ipv6.pop("route", []): + def configure_ipv6_route(self, values: dict) -> None: + if ipv6_route := values.get("ipv6", {}).get("route", []): ipv6_route_items = [] for route in ipv6_route: - ipv6_interface = IPv6Interface(route.pop("prefix").value) + ipv6_interface = IPv6Interface(route.get("prefix").value) route_prefix = RoutePrefix( ip_address=as_global(ipv6_interface.network.network_address), subnet_mask=as_global(str(ipv6_interface.netmask)), @@ -349,64 +414,80 @@ def create_parcel(cls, name: str, description: str, template_values: dict) -> La ipv6_route_items.append(StaticRouteIPv6(prefix=route_prefix, one_of_ip_route=ipv6_route_item)) values["ipv6_route"] = ipv6_route_items - if route_leak_between_services := values.pop("route_import_from", []): - rlbs_items = [] - for rl in route_leak_between_services: - redistribute_items = [] - for redistribute_item in rl.get("redistribute_to", []): - RedistributeToService( - protocol=as_global(redistribute_item["protocol"].value, RedistributeToServiceProtocol), + def configure_omp(self, values: dict) -> None: + for omp in self.omp_mapping.keys(): + if omp_advertises := values.get("omp", {}).get(omp, []): + pydantic_model_omp = self.omp_mapping[omp].ux2_model_omp + pydantic_model_prefix = self.omp_mapping[omp].ux2_model_prefix + pydantic_field = self.omp_mapping[omp].ux2_field + self._configure_omp(values, omp_advertises, pydantic_model_omp, pydantic_model_prefix, pydantic_field) + + def _configure_omp( + self, values: dict, omp_advertises: list, pydantic_model_omp, pydantic_model_prefix, pydantic_field + ) -> None: + omp_advertise_items = [] + for entry in omp_advertises: + prefix_list_items = [] + for prefix_entry in entry.get("prefix_list", []): + prefix_list_items.append( + pydantic_model_prefix( + prefix=prefix_entry["prefix_entry"], + aggregate_only=prefix_entry["aggregate_only"], + region=as_global(prefix_entry["region"].value, Region), ) - redistribute_items.append(redistribute_item) - - rlbs_item = RouteLeakBetweenServices( - source_vpn=rl["source_vpn"], - route_protocol=as_global(rl["protocol"].value, RouteLeakFromServiceProtocol), - redistribute_to_protocol=redistribute_items if redistribute_items else None, ) - rlbs_items.append(rlbs_item) - values["route_leak_between_services"] = rlbs_items - - if route_leak_from_global := values.pop("route_import", []): - rlfg_items = [] - for rl in route_leak_from_global: - redistribute_items = [] - for redistribute_item in rl.get("redistribute_to", []): - RedistributeToService( - protocol=as_global(redistribute_item["protocol"].value, RedistributeToServiceProtocol), - ) - redistribute_items.append(redistribute_item) - - rlfg_item = RouteLeakFromGlobal( - route_protocol=as_global(rl["protocol"].value, RouteLeakFromServiceProtocol), - redistribute_to_protocol=redistribute_items if redistribute_items else None, + if pydantic_model_omp == OmpAdvertiseIPv4: + pydantic_model_protocol = ProtocolIPv4 + else: + pydantic_model_protocol = ProtocolIPv6 + omp_advertise_items.append( + pydantic_model_omp( + omp_protocol=as_global(entry["protocol"].value, pydantic_model_protocol), + prefix_list=prefix_list_items if prefix_list_items else None, ) - rlfg_items.append(rlfg_item) - values["route_leak_from_global"] = rlfg_items - - if route_leak_from_service := values.pop("route_export", []): - rlfs_items = [] - for rl in route_leak_from_service: - redistribute_items = [] - for redistribute_item in rl.get("redistribute_to", []): + ) + values[pydantic_field] = omp_advertise_items + + def configure_routes(self, values: dict) -> None: + for route in self.routes_mapping.keys(): + if routes := values.get("ip", {}).get(route, []): + pydantic_model = self.routes_mapping[route].ux2_model + pydantic_field = self.routes_mapping[route].ux2_field + self._configure_route(values, routes, pydantic_model, pydantic_field) + + def _configure_route(self, values: dict, routes: list, pydantic_model, pydantic_field) -> None: + items = [] + for route in routes: + ipv4_interface = IPv4Interface(route.get("prefix").value) + service_prefix = Prefix( + address=as_global(ipv4_interface.network.network_address), + mask=as_global(str(ipv4_interface.netmask)), + ) + items.append(pydantic_model(prefix=service_prefix, vpn=route.get("vpn"))) + values[pydantic_field] = items + + def configure_route_leaks(self, values: dict) -> None: + for leak in self.route_leaks_mapping.keys(): + if route_leaks := values.get(leak, []): + pydantic_model = self.route_leaks_mapping[leak].ux2_model + pydantic_field = self.route_leaks_mapping[leak].ux2_field + self._configure_leak(values, route_leaks, pydantic_model, pydantic_field) + + def _configure_leak(self, values: dict, route_leaks: list, pydantic_model, pydantic_field) -> None: + items = [] + for rl in route_leaks: + redistribute_items = [] + for redistribute_item in rl.get("redistribute_to", []): + redistribute_items.append( RedistributeToService( protocol=as_global(redistribute_item["protocol"].value, RedistributeToServiceProtocol), ) - redistribute_items.append(redistribute_item) - - rlfs_item = RouteLeakFromService( - route_protocol=as_global(rl["protocol"].value, RouteLeakFromServiceProtocol), - redistribute_to_protocol=redistribute_items if redistribute_items else None, ) - rlfs_items.append(rlfs_item) - values["route_leak_from_service"] = rlfs_items - - for key in ["ecmp_hash_key"]: - values.pop(key, None) - - parcel_values = { - "parcel_name": name, - "parcel_description": description, - **values, - } - return LanVpnParcel(**parcel_values) # type: ignore + configuration = { + "route_protocol": as_global(rl["protocol"].value, RouteLeakFromServiceProtocol), + "redistribute_to_protocol": redistribute_items if redistribute_items else None, + } + if pydantic_model == RouteLeakBetweenServices: + configuration["source_vpn"] = rl["source_vpn"] + items.append(pydantic_model(**configuration)) + values[pydantic_field] = items