Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
implement missing items
Browse files Browse the repository at this point in the history
  • Loading branch information
sbasan committed Mar 13, 2024
1 parent 6a770c1 commit 0b2aff9
Show file tree
Hide file tree
Showing 13 changed files with 198 additions and 177 deletions.
8 changes: 4 additions & 4 deletions catalystwan/api/feature_profile_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
IPv6DataPrefixParcel,
IPv6PrefixListParcel,
LocalDomainParcel,
PolicierParcel,
PolicerParcel,
PreferredColorGroupParcel,
PrefixListParcel,
ProtocolListParcel,
Expand Down Expand Up @@ -620,7 +620,7 @@ def get(self, profile_id: UUID, parcel_type: Type[LocalDomainParcel]) -> DataSeq
...

@overload
def get(self, profile_id: UUID, parcel_type: Type[PolicierParcel]) -> DataSequence[Parcel[Any]]:
def get(self, profile_id: UUID, parcel_type: Type[PolicerParcel]) -> DataSequence[Parcel[Any]]:
...

@overload
Expand Down Expand Up @@ -732,7 +732,7 @@ def get(self, profile_id: UUID, parcel_type: Type[LocalDomainParcel], parcel_id:
...

@overload
def get(self, profile_id: UUID, parcel_type: Type[PolicierParcel], parcel_id: UUID) -> DataSequence[Parcel[Any]]:
def get(self, profile_id: UUID, parcel_type: Type[PolicerParcel], parcel_id: UUID) -> DataSequence[Parcel[Any]]:
...

@overload
Expand Down Expand Up @@ -880,7 +880,7 @@ def delete(self, profile_id: UUID, parcel_type: Type[LocalDomainParcel], list_ob
...

@overload
def delete(self, profile_id: UUID, parcel_type: Type[PolicierParcel], list_object_id: UUID) -> None:
def delete(self, profile_id: UUID, parcel_type: Type[PolicerParcel], list_object_id: UUID) -> None:
...

@overload
Expand Down
31 changes: 30 additions & 1 deletion catalystwan/models/common.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Copyright 2023 Cisco Systems, Inc. and its affiliates

from typing import Dict, List, Literal, Sequence, Set, Tuple, Union
from typing import Dict, List, Literal, Optional, Sequence, Set, Tuple, Union
from uuid import UUID

from pydantic import PlainSerializer
Expand Down Expand Up @@ -59,6 +59,35 @@ def check_any_of_exclusive_field_sets(values: Dict, field_sets: List[Tuple[Set[s
BeforeValidator(lambda x: int(x)),
]

IntRange = Tuple[int, Optional[int]]


def int_range_str_validator(value: Union[str, IntRange], ascending: bool = True) -> IntRange:
"""Validates input given as string containing integer pair separated by hyphen eg: '1-3' or single number '1'"""
if isinstance(value, str):
int_list = [int(i) for i in value.strip().split("-")]
assert 0 < len(int_list) <= 2, "Number range must contain one or two numbers"
first = int_list[0]
second = None if len(int_list) == 1 else int_list[1]
int_range = (first, second)
else:
int_range = value
if ascending and int_range[1] is not None:
assert int_range[0] < int_range[1], "Numbers in range must be in ascending order"
return int_range


def int_range_serializer(value: IntRange) -> str:
"""Serializes integer pair as string separated by hyphen eg: '1-3' or single number '1'"""
return "-".join((str(i) for i in value if i is not None))


IntRangeStr = Annotated[
IntRange,
PlainSerializer(int_range_serializer, return_type=str, when_used="json-unless-none"),
BeforeValidator(int_range_str_validator),
]


def str_as_uuid_list(val: Union[str, Sequence[UUID]]) -> Sequence[UUID]:
if isinstance(val, str):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from .policy.fowarding_class import FowardingClassParcel, FowardingClassQueueEntry
from .policy.ipv6_data_prefix import IPv6DataPrefixEntry, IPv6DataPrefixParcel
from .policy.ipv6_prefix_list import IPv6PrefixListEntry, IPv6PrefixListParcel
from .policy.policier import PolicierEntry, PolicierParcel
from .policy.policer import PolicerEntry, PolicerParcel
from .policy.prefered_group_color import Preference, PreferredColorGroupEntry, PreferredColorGroupParcel
from .policy.prefix_list import PrefixListEntry, PrefixListParcel
from .policy.sla_class import FallbackBestTunnel, SLAAppProbeClass, SLAClassCriteria, SLAClassListEntry, SLAClassParcel
Expand Down Expand Up @@ -57,7 +57,7 @@
IPv6DataPrefixParcel,
IPv6PrefixListParcel,
LocalDomainParcel,
PolicierParcel,
PolicerParcel,
PreferredColorGroupParcel,
PrefixListParcel,
ProtocolListParcel,
Expand Down Expand Up @@ -101,8 +101,8 @@
"IPv6PrefixListParcel",
"LocalDomainListEntry",
"LocalDomainParcel",
"PolicierEntry",
"PolicierParcel",
"PolicerEntry",
"PolicerParcel",
"Preference",
"PreferredColorGroupEntry",
"PreferredColorGroupParcel",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,17 @@
class ExpandedCommunityParcel(_ParcelBase):
type_: Literal["expanded-community"] = Field(default="expanded-community", exclude=True)
model_config = ConfigDict(populate_by_name=True)
expandedCommunityList: Global[list] = Field(
expanded_community_list: Global[list] = Field(
default=as_global([]),
serialization_alias="expandedCommunityList",
validation_alias=AliasPath("data", "expandedCommunityList"),
)

def add_community(self, expanded_community: str):
self.expandedCommunityList.value.append(expanded_community)
self.expanded_community_list.value.append(expanded_community)

@field_validator("expandedCommunityList")
@field_validator("expanded_community_list")
@classmethod
def check_rate(cls, expanded_community_list: Global):
def check_list_str(cls, expanded_community_list: Global):
assert all([isinstance(ec, str) for ec in expanded_community_list.value])
return expanded_community_list
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Copyright 2024 Cisco Systems, Inc. and its affiliates

from ipaddress import IPv6Address, IPv6Network
from ipaddress import IPv6Address, IPv6Interface
from typing import List, Literal

from pydantic import AliasPath, BaseModel, ConfigDict, Field
Expand All @@ -18,10 +18,10 @@ class IPv6DataPrefixParcel(_ParcelBase):
type_: Literal["data-ipv6-prefix"] = Field(default="data-ipv6-prefix", exclude=True)
entries: List[IPv6DataPrefixEntry] = Field(default=[], validation_alias=AliasPath("data", "entries"))

def add_prefix(self, ipv6_network: IPv6Network):
def add_prefix(self, ipv6_network: IPv6Interface):
self.entries.append(
IPv6DataPrefixEntry(
ipv6_address=as_global(ipv6_network.network_address),
ipv6_prefix_length=as_global(ipv6_network.prefixlen),
ipv6_address=as_global(ipv6_network.network.network_address),
ipv6_prefix_length=as_global(ipv6_network.network.prefixlen),
)
)
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Copyright 2024 Cisco Systems, Inc. and its affiliates

from ipaddress import IPv6Address, IPv6Network
from typing import List, Literal
from ipaddress import IPv6Address, IPv6Interface
from typing import List, Literal, Optional

from pydantic import AliasPath, BaseModel, ConfigDict, Field

Expand All @@ -11,17 +11,27 @@
class IPv6PrefixListEntry(BaseModel):
model_config = ConfigDict(populate_by_name=True)
ipv6_address: Global[IPv6Address] = Field(serialization_alias="ipv6Address", validation_alias="ipv6Address")
ipv6_prefix_length: Global[int] = Field(serialization_alias="ipv6PrefixLength", validation_alias="ipv6PrefixLength")
ipv6_prefix_length: Global[int] = Field(
serialization_alias="ipv6PrefixLength", validation_alias="ipv6PrefixLength", ge=0, le=128
)
le_range_prefix_length: Optional[Global[int]] = Field(
serialization_alias="leRangePrefixLength", validation_alias="leRangePrefixLength"
)
ge_range_prefix_length: Optional[Global[int]] = Field(
serialization_alias="geRangePrefixLength", validation_alias="geRangePrefixLength"
)


class IPv6PrefixListParcel(_ParcelBase):
type_: Literal["ipv6-prefix"] = Field(default="ipv6-prefix", exclude=True)
entries: List[IPv6PrefixListEntry] = Field(default=[], validation_alias=AliasPath("data", "entries"))

def add_prefix(self, ipv6_network: IPv6Network):
def add_prefix(self, ipv6_network: IPv6Interface, ge: Optional[int] = None, le: Optional[int] = None):
self.entries.append(
IPv6PrefixListEntry(
ipv6_address=as_global(ipv6_network.network_address),
ipv6_prefix_length=as_global(ipv6_network.prefixlen),
ipv6_address=as_global(ipv6_network.network.network_address),
ipv6_prefix_length=as_global(ipv6_network.network.prefixlen),
le_range_prefix_length=as_global(le) if le is not None else None,
ge_range_prefix_length=as_global(ge) if ge is not None else None,
)
)
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
]


class PolicierEntry(BaseModel):
class PolicerEntry(BaseModel):
model_config = ConfigDict(populate_by_name=True)
burst: Global[int]
exceed: Global[PolicerExceedAction]
Expand All @@ -31,13 +31,13 @@ def check_rate(cls, rate_str: Global):
return rate_str


class PolicierParcel(_ParcelBase):
class PolicerParcel(_ParcelBase):
type_: Literal["policer"] = Field(default="policer", exclude=True)
entries: List[PolicierEntry] = Field(default=[], validation_alias=AliasPath("data", "entries"))
entries: List[PolicerEntry] = Field(default=[], validation_alias=AliasPath("data", "entries"))

def add_entry(self, burst: int, exceed: PolicerExceedAction, rate: int):
self.entries.append(
PolicierEntry(
PolicerEntry(
burst=as_global(burst),
exceed=as_global(exceed, PolicerExceedAction),
rate=as_global(rate),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,11 @@ class SecurityPortParcel(_ParcelBase):
type_: Literal["security-port"] = Field(default="security-port", exclude=True)
entries: List[SecurityPortListEntry] = Field(default=[], validation_alias=AliasPath("data", "entries"))

def add_port(self, port: str):
def _add_port(self, port: str):
self.entries.append(SecurityPortListEntry(port=as_global(port)))

def add_port(self, port: int):
self._add_port(str(port))

def add_port_range(self, start_port: int, end_port: int):
self._add_port(f"{start_port}-{end_port}")
75 changes: 8 additions & 67 deletions catalystwan/models/policy/lists.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,6 @@
from pydantic import BaseModel, Field

from catalystwan.models.common import InterfaceType, TLOCColor, WellKnownBGPCommunities
from catalystwan.models.configuration.feature_profile.sdwan.policy_object import AnyPolicyObjectParcel
from catalystwan.models.configuration.feature_profile.sdwan.policy_object.policy.application_list import (
ApplicationListParcel,
)
from catalystwan.models.configuration.feature_profile.sdwan.policy_object.policy.data_prefix import (
DataPrefixEntry,
DataPrefixParcel,
)
from catalystwan.models.configuration.feature_profile.sdwan.policy_object.policy.tloc_list import TlocParcel
from catalystwan.models.configuration.feature_profile.sdwan.policy_object.security.zone import SecurityZoneListParcel
from catalystwan.models.policy.lists_entries import (
AppListEntry,
AppProbeClassListEntry,
Expand Down Expand Up @@ -67,9 +57,6 @@ def _add_entry(self, entry: Any, single: bool = False) -> None:
else:
self.entries.append(entry)

def to_policy_object_parcel(self) -> Optional[AnyPolicyObjectParcel]:
return None


class DataPrefixList(PolicyListBase):
type: Literal["dataPrefix"] = "dataPrefix"
Expand All @@ -78,13 +65,6 @@ class DataPrefixList(PolicyListBase):
def add_prefix(self, ip_prefix: IPv4Network) -> None:
self._add_entry(DataPrefixListEntry(ip_prefix=ip_prefix))

def to_policy_object_parcel(self) -> DataPrefixParcel:
return DataPrefixParcel(
parcel_name=self.name,
parcel_description=self.description,
entries=[DataPrefixEntry.from_ipv4_network(i.ip_prefix) for i in self.entries],
)


class SiteList(PolicyListBase):
type: Literal["site"] = "site"
Expand All @@ -98,48 +78,29 @@ def add_site_range(self, site_range: Tuple[int, int]):
entry = SiteListEntry(site_id=f"{site_range[0]}-{site_range[1]}")
self._add_entry(entry)

def to_policy_object_parcel(self) -> None:
return None


class VPNList(PolicyListBase):
type: Literal["vpn"] = "vpn"
entries: List[VPNListEntry] = []

def add_vpns(self, vpns: Set[int]):
for vpn in vpns:
self._add_entry(VPNListEntry(vpn=str(vpn)))
self._add_entry(VPNListEntry(vpn=(vpn, None)))

def add_vpn_range(self, vpn_range: Tuple[int, int]):
entry = VPNListEntry(vpn=f"{vpn_range[0]}-{vpn_range[1]}")
self._add_entry(entry)

def to_policy_object_parcel(self) -> None:
return None
self._add_entry(VPNListEntry(vpn=vpn_range))


class ZoneList(PolicyListBase):
type: Literal["zone"] = "zone"
entries: List[ZoneListEntry] = []

def assign_vpns(self, vpns: Set[int]) -> None:
self.entries = [ZoneListEntry(vpn=str(vpn)) for vpn in vpns]
self.entries = [ZoneListEntry(vpn=(vpn, None)) for vpn in vpns]

def assign_interfaces(self, ifs: Set[InterfaceType]) -> None:
self.entries = [ZoneListEntry(interface=interface) for interface in ifs]

def to_policy_object_parcel(self) -> SecurityZoneListParcel:
parcel = SecurityZoneListParcel(
parcel_name=self.name,
parcel_description=self.description,
)
for e in self.entries:
if e.vpn is not None:
parcel.add_vpn(e.vpn)
if e.interface is not None:
parcel.add_interface(e.interface)
return parcel


class FQDNList(PolicyListBase):
type: Literal["fqdn"] = "fqdn"
Expand Down Expand Up @@ -176,18 +137,6 @@ def add_app(self, app: str) -> None:
def add_app_family(self, app_family: str) -> None:
self._add_entry(AppListEntry(app_family=app_family))

def to_policy_object_parcel(self) -> ApplicationListParcel:
parcel = ApplicationListParcel(
parcel_name=self.name,
parcel_description=self.description,
)
for entry in self.entries:
if entry.app is not None:
parcel.add_application(entry.app)
elif entry.app_family is not None:
parcel.add_application_family(entry.app_family)
return parcel


class ColorList(PolicyListBase):
type: Literal["color"] = "color"
Expand Down Expand Up @@ -249,7 +198,7 @@ class PolicerList(PolicyListBase):

def police(self, burst: int, rate: int, exceed: PolicerExceedAction = "drop") -> None:
# Policer list must have only single entry!
entry = PolicerListEntry(burst=str(burst), exceed=exceed, rate=str(rate))
entry = PolicerListEntry(burst=burst, exceed=exceed, rate=rate)
self._add_entry(entry, single=True)


Expand Down Expand Up @@ -324,15 +273,6 @@ def add_tloc(self, tloc: IPv4Address, color: TLOCColor, encap: EncapType, prefer
_preference = str(preference) if preference is not None else None
self.entries.append(TLOCListEntry(tloc=tloc, color=color, encap=encap, preference=_preference))

def to_policy_object_parcel(self) -> TlocParcel:
parcel = TlocParcel(
parcel_name=self.name,
parcel_description=self.description,
)
for i in self.entries:
parcel.add_entry(i.tloc, i.color, i.encap, i.preference)
return parcel


class PreferredColorGroupList(PolicyListBase):
type: Literal["preferredColorGroup"] = "preferredColorGroup"
Expand Down Expand Up @@ -366,15 +306,16 @@ class PrefixList(PolicyListBase):
entries: List[PrefixListEntry] = []

def add_prefix(self, prefix: IPv4Network, ge: Optional[int] = None, le: Optional[int] = None) -> None:
_ge = str(ge) if ge is not None else None
_le = str(le) if le is not None else None
self._add_entry(PrefixListEntry(ip_prefix=prefix, ge=_ge, le=_le))
self._add_entry(PrefixListEntry(ip_prefix=prefix, ge=ge, le=le))


class IPv6PrefixList(PolicyListBase):
type: Literal["ipv6prefix"] = "ipv6prefix"
entries: List[IPv6PrefixListEntry] = []

def add_prefix(self, prefix: IPv6Interface, ge: Optional[int] = None, le: Optional[int] = None) -> None:
self._add_entry(IPv6PrefixListEntry(ipv6_prefix=prefix, ge=ge, le=le))


class RegionList(PolicyListBase):
type: Literal["region"] = "region"
Expand Down
Loading

0 comments on commit 0b2aff9

Please sign in to comment.