From 0b2aff99459950b769fcc615833567ed204dcb76 Mon Sep 17 00:00:00 2001 From: sbasan Date: Wed, 13 Mar 2024 07:31:14 +0100 Subject: [PATCH] implement missing items --- catalystwan/api/feature_profile_api.py | 8 +- catalystwan/models/common.py | 31 ++++++- .../sdwan/policy_object/__init__.py | 8 +- .../policy/expanded_community_list.py | 8 +- .../policy_object/policy/ipv6_data_prefix.py | 8 +- .../policy_object/policy/ipv6_prefix_list.py | 22 +++-- .../policy/{policier.py => policer.py} | 8 +- .../policy_object/security/security_port.py | 8 +- catalystwan/models/policy/lists.py | 75 ++------------- catalystwan/models/policy/lists_entries.py | 92 ++++++++----------- .../{policies => policy}/policy_lists.py | 63 +++++++++++-- catalystwan/workflows/config_migration.py | 5 +- examples/parcel_configuration_guide.py | 39 ++++---- 13 files changed, 198 insertions(+), 177 deletions(-) rename catalystwan/models/configuration/feature_profile/sdwan/policy_object/policy/{policier.py => policer.py} (85%) rename catalystwan/utils/config_migration/converters/{policies => policy}/policy_lists.py (70%) diff --git a/catalystwan/api/feature_profile_api.py b/catalystwan/api/feature_profile_api.py index 25d5b914..6dd7626b 100644 --- a/catalystwan/api/feature_profile_api.py +++ b/catalystwan/api/feature_profile_api.py @@ -40,7 +40,7 @@ IPv6DataPrefixParcel, IPv6PrefixListParcel, LocalDomainParcel, - PolicierParcel, + PolicerParcel, PreferredColorGroupParcel, PrefixListParcel, ProtocolListParcel, @@ -620,7 +620,7 @@ def get(self, profile_id: UUID, parcel_type: Type[LocalDomainParcel]) -> DataSeq ... @overload - def get(self, profile_id: UUID, parcel_type: Type[PolicierParcel]) -> DataSequence[Parcel[Any]]: + def get(self, profile_id: UUID, parcel_type: Type[PolicerParcel]) -> DataSequence[Parcel[Any]]: ... @overload @@ -732,7 +732,7 @@ def get(self, profile_id: UUID, parcel_type: Type[LocalDomainParcel], parcel_id: ... @overload - def get(self, profile_id: UUID, parcel_type: Type[PolicierParcel], parcel_id: UUID) -> DataSequence[Parcel[Any]]: + def get(self, profile_id: UUID, parcel_type: Type[PolicerParcel], parcel_id: UUID) -> DataSequence[Parcel[Any]]: ... @overload @@ -880,7 +880,7 @@ def delete(self, profile_id: UUID, parcel_type: Type[LocalDomainParcel], list_ob ... @overload - def delete(self, profile_id: UUID, parcel_type: Type[PolicierParcel], list_object_id: UUID) -> None: + def delete(self, profile_id: UUID, parcel_type: Type[PolicerParcel], list_object_id: UUID) -> None: ... @overload diff --git a/catalystwan/models/common.py b/catalystwan/models/common.py index 145e8e1c..d01e9006 100644 --- a/catalystwan/models/common.py +++ b/catalystwan/models/common.py @@ -1,6 +1,6 @@ # Copyright 2023 Cisco Systems, Inc. and its affiliates -from typing import Dict, List, Literal, Sequence, Set, Tuple, Union +from typing import Dict, List, Literal, Optional, Sequence, Set, Tuple, Union from uuid import UUID from pydantic import PlainSerializer @@ -59,6 +59,35 @@ def check_any_of_exclusive_field_sets(values: Dict, field_sets: List[Tuple[Set[s BeforeValidator(lambda x: int(x)), ] +IntRange = Tuple[int, Optional[int]] + + +def int_range_str_validator(value: Union[str, IntRange], ascending: bool = True) -> IntRange: + """Validates input given as string containing integer pair separated by hyphen eg: '1-3' or single number '1'""" + if isinstance(value, str): + int_list = [int(i) for i in value.strip().split("-")] + assert 0 < len(int_list) <= 2, "Number range must contain one or two numbers" + first = int_list[0] + second = None if len(int_list) == 1 else int_list[1] + int_range = (first, second) + else: + int_range = value + if ascending and int_range[1] is not None: + assert int_range[0] < int_range[1], "Numbers in range must be in ascending order" + return int_range + + +def int_range_serializer(value: IntRange) -> str: + """Serializes integer pair as string separated by hyphen eg: '1-3' or single number '1'""" + return "-".join((str(i) for i in value if i is not None)) + + +IntRangeStr = Annotated[ + IntRange, + PlainSerializer(int_range_serializer, return_type=str, when_used="json-unless-none"), + BeforeValidator(int_range_str_validator), +] + def str_as_uuid_list(val: Union[str, Sequence[UUID]]) -> Sequence[UUID]: if isinstance(val, str): diff --git a/catalystwan/models/configuration/feature_profile/sdwan/policy_object/__init__.py b/catalystwan/models/configuration/feature_profile/sdwan/policy_object/__init__.py index 1fc84858..2ecd3b30 100644 --- a/catalystwan/models/configuration/feature_profile/sdwan/policy_object/__init__.py +++ b/catalystwan/models/configuration/feature_profile/sdwan/policy_object/__init__.py @@ -13,7 +13,7 @@ from .policy.fowarding_class import FowardingClassParcel, FowardingClassQueueEntry from .policy.ipv6_data_prefix import IPv6DataPrefixEntry, IPv6DataPrefixParcel from .policy.ipv6_prefix_list import IPv6PrefixListEntry, IPv6PrefixListParcel -from .policy.policier import PolicierEntry, PolicierParcel +from .policy.policer import PolicerEntry, PolicerParcel from .policy.prefered_group_color import Preference, PreferredColorGroupEntry, PreferredColorGroupParcel from .policy.prefix_list import PrefixListEntry, PrefixListParcel from .policy.sla_class import FallbackBestTunnel, SLAAppProbeClass, SLAClassCriteria, SLAClassListEntry, SLAClassParcel @@ -57,7 +57,7 @@ IPv6DataPrefixParcel, IPv6PrefixListParcel, LocalDomainParcel, - PolicierParcel, + PolicerParcel, PreferredColorGroupParcel, PrefixListParcel, ProtocolListParcel, @@ -101,8 +101,8 @@ "IPv6PrefixListParcel", "LocalDomainListEntry", "LocalDomainParcel", - "PolicierEntry", - "PolicierParcel", + "PolicerEntry", + "PolicerParcel", "Preference", "PreferredColorGroupEntry", "PreferredColorGroupParcel", diff --git a/catalystwan/models/configuration/feature_profile/sdwan/policy_object/policy/expanded_community_list.py b/catalystwan/models/configuration/feature_profile/sdwan/policy_object/policy/expanded_community_list.py index 01edff73..c87e9215 100644 --- a/catalystwan/models/configuration/feature_profile/sdwan/policy_object/policy/expanded_community_list.py +++ b/catalystwan/models/configuration/feature_profile/sdwan/policy_object/policy/expanded_community_list.py @@ -10,17 +10,17 @@ class ExpandedCommunityParcel(_ParcelBase): type_: Literal["expanded-community"] = Field(default="expanded-community", exclude=True) model_config = ConfigDict(populate_by_name=True) - expandedCommunityList: Global[list] = Field( + expanded_community_list: Global[list] = Field( default=as_global([]), serialization_alias="expandedCommunityList", validation_alias=AliasPath("data", "expandedCommunityList"), ) def add_community(self, expanded_community: str): - self.expandedCommunityList.value.append(expanded_community) + self.expanded_community_list.value.append(expanded_community) - @field_validator("expandedCommunityList") + @field_validator("expanded_community_list") @classmethod - def check_rate(cls, expanded_community_list: Global): + def check_list_str(cls, expanded_community_list: Global): assert all([isinstance(ec, str) for ec in expanded_community_list.value]) return expanded_community_list diff --git a/catalystwan/models/configuration/feature_profile/sdwan/policy_object/policy/ipv6_data_prefix.py b/catalystwan/models/configuration/feature_profile/sdwan/policy_object/policy/ipv6_data_prefix.py index 87757295..be4cc0e8 100644 --- a/catalystwan/models/configuration/feature_profile/sdwan/policy_object/policy/ipv6_data_prefix.py +++ b/catalystwan/models/configuration/feature_profile/sdwan/policy_object/policy/ipv6_data_prefix.py @@ -1,6 +1,6 @@ # Copyright 2024 Cisco Systems, Inc. and its affiliates -from ipaddress import IPv6Address, IPv6Network +from ipaddress import IPv6Address, IPv6Interface from typing import List, Literal from pydantic import AliasPath, BaseModel, ConfigDict, Field @@ -18,10 +18,10 @@ class IPv6DataPrefixParcel(_ParcelBase): type_: Literal["data-ipv6-prefix"] = Field(default="data-ipv6-prefix", exclude=True) entries: List[IPv6DataPrefixEntry] = Field(default=[], validation_alias=AliasPath("data", "entries")) - def add_prefix(self, ipv6_network: IPv6Network): + def add_prefix(self, ipv6_network: IPv6Interface): self.entries.append( IPv6DataPrefixEntry( - ipv6_address=as_global(ipv6_network.network_address), - ipv6_prefix_length=as_global(ipv6_network.prefixlen), + ipv6_address=as_global(ipv6_network.network.network_address), + ipv6_prefix_length=as_global(ipv6_network.network.prefixlen), ) ) diff --git a/catalystwan/models/configuration/feature_profile/sdwan/policy_object/policy/ipv6_prefix_list.py b/catalystwan/models/configuration/feature_profile/sdwan/policy_object/policy/ipv6_prefix_list.py index 7608ef2b..43a9eb31 100644 --- a/catalystwan/models/configuration/feature_profile/sdwan/policy_object/policy/ipv6_prefix_list.py +++ b/catalystwan/models/configuration/feature_profile/sdwan/policy_object/policy/ipv6_prefix_list.py @@ -1,7 +1,7 @@ # Copyright 2024 Cisco Systems, Inc. and its affiliates -from ipaddress import IPv6Address, IPv6Network -from typing import List, Literal +from ipaddress import IPv6Address, IPv6Interface +from typing import List, Literal, Optional from pydantic import AliasPath, BaseModel, ConfigDict, Field @@ -11,17 +11,27 @@ class IPv6PrefixListEntry(BaseModel): model_config = ConfigDict(populate_by_name=True) ipv6_address: Global[IPv6Address] = Field(serialization_alias="ipv6Address", validation_alias="ipv6Address") - ipv6_prefix_length: Global[int] = Field(serialization_alias="ipv6PrefixLength", validation_alias="ipv6PrefixLength") + ipv6_prefix_length: Global[int] = Field( + serialization_alias="ipv6PrefixLength", validation_alias="ipv6PrefixLength", ge=0, le=128 + ) + le_range_prefix_length: Optional[Global[int]] = Field( + serialization_alias="leRangePrefixLength", validation_alias="leRangePrefixLength" + ) + ge_range_prefix_length: Optional[Global[int]] = Field( + serialization_alias="geRangePrefixLength", validation_alias="geRangePrefixLength" + ) class IPv6PrefixListParcel(_ParcelBase): type_: Literal["ipv6-prefix"] = Field(default="ipv6-prefix", exclude=True) entries: List[IPv6PrefixListEntry] = Field(default=[], validation_alias=AliasPath("data", "entries")) - def add_prefix(self, ipv6_network: IPv6Network): + def add_prefix(self, ipv6_network: IPv6Interface, ge: Optional[int] = None, le: Optional[int] = None): self.entries.append( IPv6PrefixListEntry( - ipv6_address=as_global(ipv6_network.network_address), - ipv6_prefix_length=as_global(ipv6_network.prefixlen), + ipv6_address=as_global(ipv6_network.network.network_address), + ipv6_prefix_length=as_global(ipv6_network.network.prefixlen), + le_range_prefix_length=as_global(le) if le is not None else None, + ge_range_prefix_length=as_global(ge) if ge is not None else None, ) ) diff --git a/catalystwan/models/configuration/feature_profile/sdwan/policy_object/policy/policier.py b/catalystwan/models/configuration/feature_profile/sdwan/policy_object/policy/policer.py similarity index 85% rename from catalystwan/models/configuration/feature_profile/sdwan/policy_object/policy/policier.py rename to catalystwan/models/configuration/feature_profile/sdwan/policy_object/policy/policer.py index d375446f..00339ff3 100644 --- a/catalystwan/models/configuration/feature_profile/sdwan/policy_object/policy/policier.py +++ b/catalystwan/models/configuration/feature_profile/sdwan/policy_object/policy/policer.py @@ -12,7 +12,7 @@ ] -class PolicierEntry(BaseModel): +class PolicerEntry(BaseModel): model_config = ConfigDict(populate_by_name=True) burst: Global[int] exceed: Global[PolicerExceedAction] @@ -31,13 +31,13 @@ def check_rate(cls, rate_str: Global): return rate_str -class PolicierParcel(_ParcelBase): +class PolicerParcel(_ParcelBase): type_: Literal["policer"] = Field(default="policer", exclude=True) - entries: List[PolicierEntry] = Field(default=[], validation_alias=AliasPath("data", "entries")) + entries: List[PolicerEntry] = Field(default=[], validation_alias=AliasPath("data", "entries")) def add_entry(self, burst: int, exceed: PolicerExceedAction, rate: int): self.entries.append( - PolicierEntry( + PolicerEntry( burst=as_global(burst), exceed=as_global(exceed, PolicerExceedAction), rate=as_global(rate), diff --git a/catalystwan/models/configuration/feature_profile/sdwan/policy_object/security/security_port.py b/catalystwan/models/configuration/feature_profile/sdwan/policy_object/security/security_port.py index a06147eb..3025ba8a 100644 --- a/catalystwan/models/configuration/feature_profile/sdwan/policy_object/security/security_port.py +++ b/catalystwan/models/configuration/feature_profile/sdwan/policy_object/security/security_port.py @@ -32,5 +32,11 @@ class SecurityPortParcel(_ParcelBase): type_: Literal["security-port"] = Field(default="security-port", exclude=True) entries: List[SecurityPortListEntry] = Field(default=[], validation_alias=AliasPath("data", "entries")) - def add_port(self, port: str): + def _add_port(self, port: str): self.entries.append(SecurityPortListEntry(port=as_global(port))) + + def add_port(self, port: int): + self._add_port(str(port)) + + def add_port_range(self, start_port: int, end_port: int): + self._add_port(f"{start_port}-{end_port}") diff --git a/catalystwan/models/policy/lists.py b/catalystwan/models/policy/lists.py index 811bc9c1..f3584114 100644 --- a/catalystwan/models/policy/lists.py +++ b/catalystwan/models/policy/lists.py @@ -7,16 +7,6 @@ from pydantic import BaseModel, Field from catalystwan.models.common import InterfaceType, TLOCColor, WellKnownBGPCommunities -from catalystwan.models.configuration.feature_profile.sdwan.policy_object import AnyPolicyObjectParcel -from catalystwan.models.configuration.feature_profile.sdwan.policy_object.policy.application_list import ( - ApplicationListParcel, -) -from catalystwan.models.configuration.feature_profile.sdwan.policy_object.policy.data_prefix import ( - DataPrefixEntry, - DataPrefixParcel, -) -from catalystwan.models.configuration.feature_profile.sdwan.policy_object.policy.tloc_list import TlocParcel -from catalystwan.models.configuration.feature_profile.sdwan.policy_object.security.zone import SecurityZoneListParcel from catalystwan.models.policy.lists_entries import ( AppListEntry, AppProbeClassListEntry, @@ -67,9 +57,6 @@ def _add_entry(self, entry: Any, single: bool = False) -> None: else: self.entries.append(entry) - def to_policy_object_parcel(self) -> Optional[AnyPolicyObjectParcel]: - return None - class DataPrefixList(PolicyListBase): type: Literal["dataPrefix"] = "dataPrefix" @@ -78,13 +65,6 @@ class DataPrefixList(PolicyListBase): def add_prefix(self, ip_prefix: IPv4Network) -> None: self._add_entry(DataPrefixListEntry(ip_prefix=ip_prefix)) - def to_policy_object_parcel(self) -> DataPrefixParcel: - return DataPrefixParcel( - parcel_name=self.name, - parcel_description=self.description, - entries=[DataPrefixEntry.from_ipv4_network(i.ip_prefix) for i in self.entries], - ) - class SiteList(PolicyListBase): type: Literal["site"] = "site" @@ -98,9 +78,6 @@ def add_site_range(self, site_range: Tuple[int, int]): entry = SiteListEntry(site_id=f"{site_range[0]}-{site_range[1]}") self._add_entry(entry) - def to_policy_object_parcel(self) -> None: - return None - class VPNList(PolicyListBase): type: Literal["vpn"] = "vpn" @@ -108,14 +85,10 @@ class VPNList(PolicyListBase): def add_vpns(self, vpns: Set[int]): for vpn in vpns: - self._add_entry(VPNListEntry(vpn=str(vpn))) + self._add_entry(VPNListEntry(vpn=(vpn, None))) def add_vpn_range(self, vpn_range: Tuple[int, int]): - entry = VPNListEntry(vpn=f"{vpn_range[0]}-{vpn_range[1]}") - self._add_entry(entry) - - def to_policy_object_parcel(self) -> None: - return None + self._add_entry(VPNListEntry(vpn=vpn_range)) class ZoneList(PolicyListBase): @@ -123,23 +96,11 @@ class ZoneList(PolicyListBase): entries: List[ZoneListEntry] = [] def assign_vpns(self, vpns: Set[int]) -> None: - self.entries = [ZoneListEntry(vpn=str(vpn)) for vpn in vpns] + self.entries = [ZoneListEntry(vpn=(vpn, None)) for vpn in vpns] def assign_interfaces(self, ifs: Set[InterfaceType]) -> None: self.entries = [ZoneListEntry(interface=interface) for interface in ifs] - def to_policy_object_parcel(self) -> SecurityZoneListParcel: - parcel = SecurityZoneListParcel( - parcel_name=self.name, - parcel_description=self.description, - ) - for e in self.entries: - if e.vpn is not None: - parcel.add_vpn(e.vpn) - if e.interface is not None: - parcel.add_interface(e.interface) - return parcel - class FQDNList(PolicyListBase): type: Literal["fqdn"] = "fqdn" @@ -176,18 +137,6 @@ def add_app(self, app: str) -> None: def add_app_family(self, app_family: str) -> None: self._add_entry(AppListEntry(app_family=app_family)) - def to_policy_object_parcel(self) -> ApplicationListParcel: - parcel = ApplicationListParcel( - parcel_name=self.name, - parcel_description=self.description, - ) - for entry in self.entries: - if entry.app is not None: - parcel.add_application(entry.app) - elif entry.app_family is not None: - parcel.add_application_family(entry.app_family) - return parcel - class ColorList(PolicyListBase): type: Literal["color"] = "color" @@ -249,7 +198,7 @@ class PolicerList(PolicyListBase): def police(self, burst: int, rate: int, exceed: PolicerExceedAction = "drop") -> None: # Policer list must have only single entry! - entry = PolicerListEntry(burst=str(burst), exceed=exceed, rate=str(rate)) + entry = PolicerListEntry(burst=burst, exceed=exceed, rate=rate) self._add_entry(entry, single=True) @@ -324,15 +273,6 @@ def add_tloc(self, tloc: IPv4Address, color: TLOCColor, encap: EncapType, prefer _preference = str(preference) if preference is not None else None self.entries.append(TLOCListEntry(tloc=tloc, color=color, encap=encap, preference=_preference)) - def to_policy_object_parcel(self) -> TlocParcel: - parcel = TlocParcel( - parcel_name=self.name, - parcel_description=self.description, - ) - for i in self.entries: - parcel.add_entry(i.tloc, i.color, i.encap, i.preference) - return parcel - class PreferredColorGroupList(PolicyListBase): type: Literal["preferredColorGroup"] = "preferredColorGroup" @@ -366,15 +306,16 @@ class PrefixList(PolicyListBase): entries: List[PrefixListEntry] = [] def add_prefix(self, prefix: IPv4Network, ge: Optional[int] = None, le: Optional[int] = None) -> None: - _ge = str(ge) if ge is not None else None - _le = str(le) if le is not None else None - self._add_entry(PrefixListEntry(ip_prefix=prefix, ge=_ge, le=_le)) + self._add_entry(PrefixListEntry(ip_prefix=prefix, ge=ge, le=le)) class IPv6PrefixList(PolicyListBase): type: Literal["ipv6prefix"] = "ipv6prefix" entries: List[IPv6PrefixListEntry] = [] + def add_prefix(self, prefix: IPv6Interface, ge: Optional[int] = None, le: Optional[int] = None) -> None: + self._add_entry(IPv6PrefixListEntry(ipv6_prefix=prefix, ge=ge, le=le)) + class RegionList(PolicyListBase): type: Literal["region"] = "region" diff --git a/catalystwan/models/policy/lists_entries.py b/catalystwan/models/policy/lists_entries.py index a9fb94e3..7ee837ad 100644 --- a/catalystwan/models/policy/lists_entries.py +++ b/catalystwan/models/policy/lists_entries.py @@ -1,12 +1,19 @@ # Copyright 2023 Cisco Systems, Inc. and its affiliates -from ipaddress import IPv4Address, IPv4Network, IPv6Interface, IPv6Network +from ipaddress import IPv4Address, IPv4Network, IPv6Interface from typing import List, Literal, Optional, Set from uuid import UUID from pydantic import BaseModel, ConfigDict, Field, IPvAnyAddress, field_validator, model_validator -from catalystwan.models.common import InterfaceType, IntStr, TLOCColor, check_fields_exclusive +from catalystwan.models.common import ( + InterfaceType, + IntRangeStr, + IntStr, + TLOCColor, + check_fields_exclusive, + str_as_str_list, +) def check_jitter_ms(jitter_str: Optional[str]) -> Optional[str]: @@ -52,14 +59,16 @@ class ColorDSCPMap(BaseModel): class ColorGroupPreference(BaseModel): model_config = ConfigDict(populate_by_name=True) - color_preference: str = Field(serialization_alias="colorPreference", validation_alias="colorPreference") + color_preference: Set[TLOCColor] = Field(serialization_alias="colorPreference", validation_alias="colorPreference") path_preference: PathPreference = Field(serialization_alias="pathPreference", validation_alias="pathPreference") + _color_pref = field_validator("color_preference", mode="before")(str_as_str_list) + @staticmethod def from_color_set_and_path( color_preference: Set[TLOCColor], path_preference: PathPreference ) -> "ColorGroupPreference": - return ColorGroupPreference(color_preference=" ".join(color_preference), path_preference=path_preference) + return ColorGroupPreference(color_preference=color_preference, path_preference=path_preference) class FallbackBestTunnel(BaseModel): @@ -144,29 +153,28 @@ class SiteListEntry(BaseModel): class VPNListEntry(BaseModel): - vpn: str = Field(description="0-65530 range or single number") + vpn: IntRangeStr = Field(description="0-65530 range or single number") @field_validator("vpn") @classmethod - def check_vpn_range(cls, vpns_str: str): - vpns = [int(vpn) for vpn in vpns_str.split("-")] - assert len(vpns) <= 2 - for vpn in vpns: - assert 0 <= vpn <= 65530 - if len(vpns) == 2: - assert vpns[0] <= vpns[1] - return vpns_str + def check_vpn_range(cls, vpn: IntRangeStr): + for i in vpn: + if i is not None: + assert 0 <= i <= 65_530 + return vpn class ZoneListEntry(BaseModel): - vpn: Optional[str] = Field(default=None, description="0-65530 single number") + vpn: Optional[IntRangeStr] = Field(default=None, description="0-65530 single number") interface: Optional[InterfaceType] = None @field_validator("vpn") @classmethod - def check_vpn_range(cls, vpn_str: str): - assert 0 <= int(vpn_str) <= 65530 - return vpn_str + def check_vpn_range(cls, vpn: IntRangeStr): + for i in vpn: + if i is not None: + assert 0 <= i <= 65_530 + return vpn @model_validator(mode="after") def check_vpn_xor_interface(self): @@ -191,13 +199,15 @@ def check_country_xor_continent(self): class PortListEntry(BaseModel): - port: str + port: IntRangeStr @field_validator("port") @classmethod - def check_port_range(cls, port_str: str): - assert 0 <= int(port_str) <= 65535 - return port_str + def check_port(cls, port: IntRangeStr): + for i in port: + if i is not None: + assert 0 <= i <= 65_535 + return port class ProtocolNameListEntry(BaseModel): @@ -275,21 +285,9 @@ class CommunityListEntry(BaseModel): class PolicerListEntry(BaseModel): model_config = ConfigDict(populate_by_name=True) - burst: str = Field(description="bytes: integer in range 15000-10000000") + burst: IntStr = Field(description="bytes", ge=15_000, le=10_000_000) exceed: PolicerExceedAction = "drop" - rate: str = Field(description="bps: integer in range 8-100000000000") - - @field_validator("burst") - @classmethod - def check_burst(cls, burst_str: str): - assert 15000 <= int(burst_str) <= 10_000_000 - return burst_str - - @field_validator("rate") - @classmethod - def check_rate(cls, rate_str: str): - assert 8 <= int(rate_str) <= 100_000_000_000 - return rate_str + rate: IntStr = Field(description="bps", ge=8, le=100_000_000_000) class ASPathListEntry(BaseModel): @@ -398,30 +396,16 @@ class PrefixListEntry(BaseModel): model_config = ConfigDict(populate_by_name=True) ip_prefix: IPv4Network = Field(serialization_alias="ipPrefix", validation_alias="ipPrefix") - ge: Optional[str] = None - le: Optional[str] = None - - @field_validator("ge", "le", check_fields=False) - @classmethod - def check_ge_and_le(cls, ge_le_str: Optional[str]): - if ge_le_str is not None: - assert 0 <= int(ge_le_str) <= 32 - return ge_le_str + ge: Optional[IntStr] = Field(default=None, ge=0, le=32) + le: Optional[IntStr] = Field(default=None, ge=0, le=32) class IPv6PrefixListEntry(BaseModel): model_config = ConfigDict(populate_by_name=True) - ipv6_prefix: IPv6Network = Field(serialization_alias="ipv6Prefix", validation_alias="ipv6Prefix") - ge: Optional[str] = None - le: Optional[str] = None - - @field_validator("ge", "le", check_fields=False) - @classmethod - def check_ge_and_le(cls, ge_le_str: Optional[str]): - if ge_le_str is not None: - assert 0 <= int(ge_le_str) <= 128 - return ge_le_str + ipv6_prefix: IPv6Interface = Field(serialization_alias="ipv6Prefix", validation_alias="ipv6Prefix") + ge: Optional[IntStr] = Field(default=None, ge=0, le=128) + le: Optional[IntStr] = Field(default=None, ge=0, le=128) class RegionListEntry(BaseModel): diff --git a/catalystwan/utils/config_migration/converters/policies/policy_lists.py b/catalystwan/utils/config_migration/converters/policy/policy_lists.py similarity index 70% rename from catalystwan/utils/config_migration/converters/policies/policy_lists.py rename to catalystwan/utils/config_migration/converters/policy/policy_lists.py index be30adef..0ead146f 100644 --- a/catalystwan/utils/config_migration/converters/policies/policy_lists.py +++ b/catalystwan/utils/config_migration/converters/policy/policy_lists.py @@ -1,5 +1,6 @@ from typing import Any, Callable, Dict, List, Mapping, NamedTuple, Optional, Sequence, Type +from catalystwan.models.common import int_range_serializer from catalystwan.models.configuration.feature_profile.sdwan.policy_object import ( AnyPolicyObjectParcel, ApplicationListParcel, @@ -14,7 +15,7 @@ IPv6DataPrefixParcel, IPv6PrefixListParcel, LocalDomainParcel, - PolicierParcel, + PolicerParcel, PreferredColorGroupParcel, PrefixListParcel, ProtocolListParcel, @@ -76,8 +77,6 @@ def app_list(in_: AppList) -> ApplicationListParcel: # TODO: def as_path(in_: ASPathList): - - def class_map(in_: ClassMapList) -> FowardingClassParcel: out = FowardingClassParcel(**_get_parcel_name_desc(in_)) for entry in in_.entries: @@ -101,68 +100,108 @@ def community(in_: CommunityList) -> StandardCommunityParcel: def data_prefix_ipv6(in_: DataIPv6PrefixList) -> IPv6DataPrefixParcel: out = IPv6DataPrefixParcel(**_get_parcel_name_desc(in_)) + for entry in in_.entries: + out.add_prefix(entry.ipv6_prefix) return out def data_prefix(in_: DataPrefixList) -> DataPrefixParcel: out = DataPrefixParcel(**_get_parcel_name_desc(in_)) + for entry in in_.entries: + out.add_data_prefix(entry.ip_prefix) return out def expanded_community(in_: ExpandedCommunityList) -> ExpandedCommunityParcel: out = ExpandedCommunityParcel(**_get_parcel_name_desc(in_)) + for entry in in_.entries: + out.add_community(entry.community) return out def fqdn(in_: FQDNList) -> FQDNDomainParcel: out = FQDNDomainParcel(**_get_parcel_name_desc(in_)) + out.from_fqdns([entry.pattern for entry in in_.entries]) return out def geo_location(in_: GeoLocationList) -> GeoLocationListParcel: out = GeoLocationListParcel(**_get_parcel_name_desc(in_)) + for entry in in_.entries: + if entry.country is not None: + out.add_country(entry.country) + if entry.continent is not None: + out.add_continent(entry.continent) return out def ips_signature(in_: IPSSignatureList) -> IPSSignatureParcel: out = IPSSignatureParcel(**_get_parcel_name_desc(in_)) + for entry in in_.entries: + out.add_signature(f"{entry.generator_id}:{entry.signature_id}") return out def prefix_ipv6(in_: IPv6PrefixList) -> IPv6PrefixListParcel: out = IPv6PrefixListParcel(**_get_parcel_name_desc(in_)) + for entry in in_.entries: + out.add_prefix(ipv6_network=entry.ipv6_prefix, ge=entry.ge, le=entry.le) return out # TODO: def local_app(in_: LocalAppList): def local_domain(in_: LocalDomainList) -> LocalDomainParcel: out = LocalDomainParcel(**_get_parcel_name_desc(in_)) + out.from_local_domains([entry.name_server for entry in in_.entries]) return out # TODO: def mirror_list(in_: MirrorList): -def policer(in_: PolicerList) -> PolicierParcel: - out = PolicierParcel(**_get_parcel_name_desc(in_)) +def policer(in_: PolicerList) -> PolicerParcel: + out = PolicerParcel(**_get_parcel_name_desc(in_)) + for entry in in_.entries: + out.add_entry(burst=entry.burst, exceed=entry.exceed, rate=entry.rate) return out def port(in_: PortList) -> SecurityPortParcel: out = SecurityPortParcel(**_get_parcel_name_desc(in_)) + for entry in in_.entries: + out._add_port(int_range_serializer(entry.port)) return out def preferred_color_group(in_: PreferredColorGroupList) -> PreferredColorGroupParcel: out = PreferredColorGroupParcel(**_get_parcel_name_desc(in_)) + for entry in in_.entries: + out.add_primary( + color_preference=list(entry.primary_preference.color_preference), + path_preference=entry.primary_preference.path_preference, + ) + if entry.secondary_preference is not None: + out.add_secondary( + color_preference=list(entry.secondary_preference.color_preference), + path_preference=entry.secondary_preference.path_preference, + ) + if entry.tertiary_preference is not None: + out.add_tertiary( + color_preference=list(entry.tertiary_preference.color_preference), + path_preference=entry.tertiary_preference.path_preference, + ) return out def prefix(in_: PrefixList) -> PrefixListParcel: out = PrefixListParcel(**_get_parcel_name_desc(in_)) + for entry in in_.entries: + out.add_prefix(entry.ip_prefix) return out def protocol(in_: ProtocolNameList) -> ProtocolListParcel: out = ProtocolListParcel(**_get_parcel_name_desc(in_)) + for entry in in_.entries: + out.add_protocol(entry.protocol_name) return out @@ -170,27 +209,39 @@ def protocol(in_: ProtocolNameList) -> ProtocolListParcel: # TODO: def site(in_: SiteList): def sla_class(in_: SLAClassList) -> SLAClassParcel: out = SLAClassParcel(**_get_parcel_name_desc(in_)) + # TODO: requires app probe id return out def tloc(in_: TLOCList) -> TlocParcel: out = TlocParcel(**_get_parcel_name_desc(in_)) + for entry in in_.entries: + out.add_entry(tloc=entry.tloc, color=entry.color, encapsulation=entry.encap, preference=entry.preference) return out def url_allow(in_: URLAllowList) -> URLAllowParcel: out = URLAllowParcel(**_get_parcel_name_desc(in_)) + for entry in in_.entries: + out.add_url(entry.pattern) return out def url_block(in_: URLBlockList) -> URLBlockParcel: out = URLBlockParcel(**_get_parcel_name_desc(in_)) + for entry in in_.entries: + out.add_url(entry.pattern) return out -# TODO: def vpn(in_: VPNList): +# TODO: def vpn(in_: VPNList): needs to be converted to item from service profile def zone(in_: ZoneList) -> SecurityZoneListParcel: out = SecurityZoneListParcel(**_get_parcel_name_desc(in_)) + for entry in in_.entries: + if entry.interface is not None: + out.add_interface(entry.interface) + if entry.vpn is not None: + out.add_vpn(int_range_serializer(entry.vpn)) return out diff --git a/catalystwan/workflows/config_migration.py b/catalystwan/workflows/config_migration.py index 5553598f..6d71aff2 100644 --- a/catalystwan/workflows/config_migration.py +++ b/catalystwan/workflows/config_migration.py @@ -6,6 +6,7 @@ from catalystwan.models.configuration.config_migration import UX1Config, UX2Config from catalystwan.session import ManagerSession from catalystwan.utils.config_migration.converters.feature_template import create_parcel_from_template +from catalystwan.utils.config_migration.converters.policy.policy_lists import convert_all as convert_policy_lists from catalystwan.utils.config_migration.creators.config_group import ConfigGroupCreator logger = logging.getLogger(__name__) @@ -50,9 +51,7 @@ def transform(ux1: UX1Config) -> UX2Config: if ft.template_type in SUPPORTED_TEMPLATE_TYPES: ux2.profile_parcels.append(create_parcel_from_template(ft)) # Policy Lists - for policy_list in ux1.policies.policy_lists: - if (parcel := policy_list.to_policy_object_parcel()) is not None: - ux2.profile_parcels.append(parcel) + ux2.profile_parcels.extend(convert_policy_lists(ux1.policies.policy_lists).output) return ux2 diff --git a/examples/parcel_configuration_guide.py b/examples/parcel_configuration_guide.py index 773e6c18..17a9767f 100644 --- a/examples/parcel_configuration_guide.py +++ b/examples/parcel_configuration_guide.py @@ -3,7 +3,7 @@ import logging import sys from dataclasses import dataclass -from ipaddress import IPv4Address, IPv4Network, IPv6Network +from ipaddress import IPv4Address, IPv4Network, IPv6Interface from typing import Any, List, Optional, Tuple from uuid import UUID @@ -23,7 +23,7 @@ IPv6DataPrefixParcel, IPv6PrefixListParcel, LocalDomainParcel, - PolicierParcel, + PolicerParcel, PreferredColorGroupParcel, PrefixListParcel, ProtocolListParcel, @@ -115,9 +115,9 @@ def configure_groups_of_interest(profile_id: UUID, api: PolicyObjectFeatureProfi # Create security port parcel and add ports security_port = SecurityPortParcel(parcel_name="SecurityPortParcelExmaple") - security_port.add_port("10") - security_port.add_port("50-100") - security_port.add_port("400-999") + security_port.add_port(10) + security_port.add_port_range(50, 100) + security_port.add_port_range(400, 999) # Create Geolocation parcel and add geolocations geolocation = GeoLocationListParcel(parcel_name="GeoLocationListParcelExample") @@ -169,15 +169,15 @@ def configure_groups_of_interest(profile_id: UUID, api: PolicyObjectFeatureProfi # Create IPv6 data prefix parcel and add IPv6 prefixes ipv6_data_prefix = IPv6DataPrefixParcel(parcel_name="IPv6DataPrefixExample") - ipv6_data_prefix.add_prefix(IPv6Network("2000:0:0:0::/128")) - ipv6_data_prefix.add_prefix(IPv6Network("2001:0:0:0::/128")) - ipv6_data_prefix.add_prefix(IPv6Network("2002:0:0:0::/128")) + ipv6_data_prefix.add_prefix(IPv6Interface("2000:0:0:0::/128")) + ipv6_data_prefix.add_prefix(IPv6Interface("2001:0:0:0::/128")) + ipv6_data_prefix.add_prefix(IPv6Interface("2002:0:0:0::/128")) # Create IPv6 prefix parcel and add IPv6 prefixes ipv6_prefix_list = IPv6PrefixListParcel(parcel_name="IPv6PrefixListExample") - ipv6_prefix_list.add_prefix(IPv6Network("2000:0:0:0::/64")) - ipv6_prefix_list.add_prefix(IPv6Network("2001:0:0:0::/64")) - ipv6_prefix_list.add_prefix(IPv6Network("2002:0:0:0::/64")) + ipv6_prefix_list.add_prefix(IPv6Interface("2000:0:0:0::/64")) + ipv6_prefix_list.add_prefix(IPv6Interface("2001:0:0:0::/64")) + ipv6_prefix_list.add_prefix(IPv6Interface("2002:0:0:0::/64"), ge=10, le=100) # Create Application family parcel and add application families application_list_parcel = ApplicationListParcel(parcel_name="AppListExample") @@ -194,7 +194,7 @@ def configure_groups_of_interest(profile_id: UUID, api: PolicyObjectFeatureProfi preferred_group_color.add_tertiary(color_preference=["metro-ethernet"], path_preference="multi-hop-path") # Create Policier parcel and add policiers - policier = PolicierParcel(parcel_name="PolicierParcelExmaple") + policier = PolicerParcel(parcel_name="PolicierParcelExmaple") policier.add_entry(burst=17000, exceed="drop", rate=1000) # Create Fowarding Class parcel and add fowarding classes @@ -208,9 +208,10 @@ def configure_groups_of_interest(profile_id: UUID, api: PolicyObjectFeatureProfi # Create Standard Community Parcel and add standard communities standard_community = StandardCommunityParcel(parcel_name="StandardCommunityParcelExample") - standard_community.add_community("internet") - standard_community.add_community("local-AS") - standard_community.add_community("no-advertise") + standard_community.add_well_known_community("internet") + standard_community.add_well_known_community("local-AS") + standard_community.add_well_known_community("no-advertise") + standard_community.add_community(100, 1000) # Create Expanded Community Parcel and add expanded communities expanded_community = ExpandedCommunityParcel(parcel_name="ExpandedCommunityParcel") @@ -256,18 +257,18 @@ def configure_groups_of_interest(profile_id: UUID, api: PolicyObjectFeatureProfi for item in items: items_ids.append((api.create(profile_id, item), item.__class__)) - _id, _ = items_ids[-1] + id_, _ = items_ids[-1] sla = SLAClassParcel(parcel_name="SLAClassParcelExample") - sla.add_entry(app_probe_class_id=_id.id, jitter=20, latency=50, loss=100) + sla.add_entry(app_probe_class_id=id_.id, jitter=20, latency=50, loss=100) sla.add_fallback(criteria="jitter-latency-loss", latency_variance=10, jitter_variance=10, loss_variance=10) items_ids.append((api.create(profile_id, sla), sla.__class__)) input("Press enter to delete...") - for _id, item_type in reversed(items_ids): - api.delete(profile_id, item_type, _id.id) + for id_, item_type in reversed(items_ids): + api.delete(profile_id, item_type, id_.id) def retrive_groups_of_interest(profile_id: UUID, api: PolicyObjectFeatureProfileAPI):