From ce414610c338a8e424ade9f7b581905440507994 Mon Sep 17 00:00:00 2001 From: PrzeG Date: Mon, 4 Mar 2024 08:19:25 +0100 Subject: [PATCH 1/8] Feature Template deserialization fixes --- catalystwan/api/templates/feature_template.py | 38 +++++++-- .../api/templates/models/cisco_aaa_model.py | 14 ++-- .../api/templates/models/cisco_bfd_model.py | 6 +- .../api/templates/models/cisco_bgp_model.py | 36 ++++---- .../templates/models/cisco_logging_model.py | 10 +-- .../api/templates/models/cisco_ntp_model.py | 8 +- .../api/templates/models/cisco_omp_model.py | 8 +- .../api/templates/models/cisco_ospf.py | 16 ++-- .../api/templates/models/cisco_ospfv3.py | 22 ++--- .../models/cisco_secure_internet_gateway.py | 12 +-- .../api/templates/models/cisco_snmp_model.py | 16 ++-- .../api/templates/models/cisco_system.py | 16 ++-- .../models/cisco_vpn_interface_model.py | 40 +++++---- .../api/templates/models/cisco_vpn_model.py | 56 ++++++------- catalystwan/utils/feature_template.py | 82 ++++++++++++++++--- 15 files changed, 237 insertions(+), 143 deletions(-) diff --git a/catalystwan/api/templates/feature_template.py b/catalystwan/api/templates/feature_template.py index f5069370..e9a8b3db 100644 --- a/catalystwan/api/templates/feature_template.py +++ b/catalystwan/api/templates/feature_template.py @@ -5,19 +5,47 @@ import json from abc import ABC, abstractmethod from pathlib import Path -from typing import TYPE_CHECKING, Dict, List, cast +from typing import TYPE_CHECKING, Any, Dict, List, Union, cast from jinja2 import DebugUndefined, Environment, FileSystemLoader, meta # type: ignore from pydantic import BaseModel, model_validator from catalystwan.api.templates.device_variable import DeviceVariable from catalystwan.utils.device_model import DeviceModel +from catalystwan.utils.pydantic_field import get_extra_field if TYPE_CHECKING: from catalystwan.session import ManagerSession + from catalystwan.utils.feature_template import FlattenedTemplateValue -class FeatureTemplate(BaseModel, ABC): +class FeatureTemplateValidator(BaseModel, ABC): + @model_validator(mode="before") + @classmethod + def map_fields(cls, values: Union[Any, Dict[str, Union[List[FlattenedTemplateValue], Any]]]): + from catalystwan.utils.feature_template import FlattenedTemplateValue + + if not isinstance(values, dict): + return values + mapped_values = {} + for field_name, field_info in cls.model_fields.items(): + payload_name = get_extra_field(field_info, "vmanage_key") or field_info.alias or field_name + data_path = get_extra_field(field_info, "data_path", []) + if payload_name not in values: + continue + else: + value = values[payload_name] + if value and isinstance(value, list) and all([isinstance(v, FlattenedTemplateValue)] for v in value): + for template_value in value: + if template_value.data_path == data_path: + mapped_values[field_name] = template_value.value + break + else: + mapped_values[field_name] = value + return mapped_values + + +class FeatureTemplate(FeatureTemplateValidator, ABC): template_name: str template_description: str device_models: List[DeviceModel] = [] @@ -82,12 +110,11 @@ def get(cls, session: ManagerSession, name: str) -> FeatureTemplate: Returns: FeatureTemplate: filed out feature template model """ - from catalystwan.utils.feature_template import choose_model, find_template_values + from catalystwan.utils.feature_template import choose_model, find_template_values, flatten_template_definition template_info = ( session.api.templates._get_feature_templates(summary=False).filter(name=name).single_or_default() ) - template_definition_as_dict = json.loads(cast(str, template_info.template_definiton)) feature_template_model = choose_model(type_value=template_info.template_type) @@ -96,11 +123,12 @@ def get(cls, session: ManagerSession, name: str) -> FeatureTemplate: values_from_template_definition = find_template_values( template_definition_as_dict, device_specific_variables=device_specific_variables ) + flattened_values = flatten_template_definition(values_from_template_definition) return feature_template_model( template_name=template_info.name, template_description=template_info.description, device_models=[DeviceModel(model) for model in template_info.device_type], device_specific_variables=device_specific_variables, - **values_from_template_definition, + **flattened_values, ) diff --git a/catalystwan/api/templates/models/cisco_aaa_model.py b/catalystwan/api/templates/models/cisco_aaa_model.py index d0992ba0..0a794f24 100644 --- a/catalystwan/api/templates/models/cisco_aaa_model.py +++ b/catalystwan/api/templates/models/cisco_aaa_model.py @@ -4,12 +4,12 @@ from pathlib import Path from typing import ClassVar, List, Optional -from pydantic import BaseModel, ConfigDict, Field +from pydantic import ConfigDict, Field -from catalystwan.api.templates.feature_template import FeatureTemplate +from catalystwan.api.templates.feature_template import FeatureTemplate, FeatureTemplateValidator -class User(BaseModel): +class User(FeatureTemplateValidator): name: str password: Optional[str] = None secret: Optional[str] = None @@ -17,7 +17,7 @@ class User(BaseModel): pubkey_chain: List[str] = Field(default=[], json_schema_extra={"vmanage_key": "pubkey-chain", "vip_type": "ignore"}) -class RadiusServer(BaseModel): +class RadiusServer(FeatureTemplateValidator): model_config = ConfigDict(populate_by_name=True) address: str @@ -31,7 +31,7 @@ class RadiusServer(BaseModel): key_type: Optional[str] = Field(default=None, json_schema_extra={"vmanage_key": "key-type"}) -class RadiusGroup(BaseModel): +class RadiusGroup(FeatureTemplateValidator): model_config = ConfigDict(arbitrary_types_allowed=True, populate_by_name=True) group_name: str = Field(json_schema_extra={"vmanage_key": "group-name"}) @@ -46,7 +46,7 @@ class DomainStripping(str, Enum): RIGHT_TO_LEFT = "right-to-left" -class TacacsServer(BaseModel): +class TacacsServer(FeatureTemplateValidator): model_config = ConfigDict(populate_by_name=True) address: str @@ -57,7 +57,7 @@ class TacacsServer(BaseModel): key_enum: Optional[str] = Field(default=None, json_schema_extra={"vmanage_key": "key-enum"}) -class TacacsGroup(BaseModel): +class TacacsGroup(FeatureTemplateValidator): model_config = ConfigDict(populate_by_name=True) group_name: str = Field(json_schema_extra={"vmanage_key": "group-name"}) diff --git a/catalystwan/api/templates/models/cisco_bfd_model.py b/catalystwan/api/templates/models/cisco_bfd_model.py index 053578db..f9779b27 100644 --- a/catalystwan/api/templates/models/cisco_bfd_model.py +++ b/catalystwan/api/templates/models/cisco_bfd_model.py @@ -4,10 +4,10 @@ from pathlib import Path from typing import ClassVar, List, Optional -from pydantic import BaseModel, ConfigDict, Field +from pydantic import ConfigDict, Field from catalystwan.api.templates.bool_str import BoolStr -from catalystwan.api.templates.feature_template import FeatureTemplate +from catalystwan.api.templates.feature_template import FeatureTemplate, FeatureTemplateValidator DEFAULT_BFD_COLOR_MULTIPLIER = 7 DEFAULT_BFD_DSCP = 48 @@ -41,7 +41,7 @@ class ColorType(str, Enum): PRIVATE6 = "private6" -class Color(BaseModel): +class Color(FeatureTemplateValidator): color: ColorType hello_interval: Optional[int] = Field( DEFAULT_BFD_HELLO_INTERVAL, json_schema_extra={"vmanage_key": "hello-interval"} diff --git a/catalystwan/api/templates/models/cisco_bgp_model.py b/catalystwan/api/templates/models/cisco_bgp_model.py index 85282459..df0bdb0e 100644 --- a/catalystwan/api/templates/models/cisco_bgp_model.py +++ b/catalystwan/api/templates/models/cisco_bgp_model.py @@ -4,37 +4,37 @@ from pathlib import Path from typing import ClassVar, List, Optional -from pydantic import BaseModel, ConfigDict, Field +from pydantic import ConfigDict, Field from catalystwan.api.templates.bool_str import BoolStr -from catalystwan.api.templates.feature_template import FeatureTemplate +from catalystwan.api.templates.feature_template import FeatureTemplate, FeatureTemplateValidator -class Export(BaseModel): +class Export(FeatureTemplateValidator): asn_ip: str = Field(json_schema_extra={"vmanage_key": "asn-ip"}) model_config = ConfigDict(populate_by_name=True) -class Import(BaseModel): +class Import(FeatureTemplateValidator): asn_ip: str = Field(json_schema_extra={"vmanage_key": "asn-ip"}) model_config = ConfigDict(populate_by_name=True) -class RouteTargetIpv4(BaseModel): +class RouteTargetIpv4(FeatureTemplateValidator): vpn_id: int = Field(json_schema_extra={"vmanage_key": "vpn-id"}) export: List[Export] import_: List[Import] = Field(json_schema_extra={"vmanage_key": "import"}) model_config = ConfigDict(populate_by_name=True) -class RouteTargetIpv6(BaseModel): +class RouteTargetIpv6(FeatureTemplateValidator): vpn_id: int = Field(json_schema_extra={"vmanage_key": "vpn-id"}) export: List[Export] import_: List[Import] = Field(json_schema_extra={"vmanage_key": "import"}) model_config = ConfigDict(populate_by_name=True) -class MplsInterface(BaseModel): +class MplsInterface(FeatureTemplateValidator): if_name: Optional[str] = Field(default=None, json_schema_extra={"vmanage_key": "if-name"}) model_config = ConfigDict(populate_by_name=True) @@ -43,25 +43,25 @@ class AddressFamilyType(str, Enum): IPV4_UNICAST = "ipv4-unicast" -class AggregateAddress(BaseModel): +class AggregateAddress(FeatureTemplateValidator): prefix: str as_set: Optional[BoolStr] = Field(default=None, json_schema_extra={"vmanage_key": "as-set"}) summary_only: Optional[BoolStr] = Field(default=None, json_schema_extra={"vmanage_key": "summary-only"}) model_config = ConfigDict(populate_by_name=True) -class Ipv6AggregateAddress(BaseModel): +class Ipv6AggregateAddress(FeatureTemplateValidator): prefix: str as_set: Optional[bool] = Field(False, json_schema_extra={"vmanage_key": "as-set"}) summary_only: Optional[bool] = Field(False, json_schema_extra={"vmanage_key": "summary-only"}) model_config = ConfigDict(populate_by_name=True) -class Network(BaseModel): +class Network(FeatureTemplateValidator): prefix: str -class Ipv6Network(BaseModel): +class Ipv6Network(FeatureTemplateValidator): prefix: str @@ -75,13 +75,13 @@ class Protocol(str, Enum): NAT = "nat" -class Redistribute(BaseModel): +class Redistribute(FeatureTemplateValidator): protocol: Protocol route_policy: Optional[str] = Field(default=None, json_schema_extra={"vmanage_key": "route-policy"}) model_config = ConfigDict(populate_by_name=True) -class AddressFamily(BaseModel): +class AddressFamily(FeatureTemplateValidator): family_type: AddressFamilyType = Field(json_schema_extra={"vmanage_key": "family-type"}) aggregate_address: Optional[List[AggregateAddress]] = Field( default=None, json_schema_extra={"vmanage_key": "aggregate-address"} @@ -112,13 +112,13 @@ class Direction(str, Enum): OUT = "out" -class RoutePolicy(BaseModel): +class RoutePolicy(FeatureTemplateValidator): direction: Direction pol_name: str = Field(json_schema_extra={"vmanage_key": "pol-name"}) model_config = ConfigDict(populate_by_name=True) -class NeighborAddressFamily(BaseModel): +class NeighborAddressFamily(FeatureTemplateValidator): family_type: NeighborFamilyType = Field(json_schema_extra={"vmanage_key": "family-type"}) prefix_num: Optional[int] = Field( default=None, json_schema_extra={"data_path": ["maximum-prefixes"], "vmanage_key": "prefix-num"} @@ -132,7 +132,7 @@ class NeighborAddressFamily(BaseModel): model_config = ConfigDict(populate_by_name=True) -class Neighbor(BaseModel): +class Neighbor(FeatureTemplateValidator): address: str description: Optional[str] = None shutdown: Optional[BoolStr] = None @@ -165,7 +165,7 @@ class IPv6NeighborFamilyType(str, Enum): IPV6_UNICAST = "ipv6-unicast" -class IPv6NeighborAddressFamily(BaseModel): +class IPv6NeighborAddressFamily(FeatureTemplateValidator): family_type: IPv6NeighborFamilyType = Field(json_schema_extra={"vmanage_key": "family-type"}) prefix_num: Optional[int] = Field( 0, json_schema_extra={"data_path": ["maximum-prefixes"], "vmanage_key": "prefix-num"} @@ -179,7 +179,7 @@ class IPv6NeighborAddressFamily(BaseModel): model_config = ConfigDict(populate_by_name=True) -class Ipv6Neighbor(BaseModel): +class Ipv6Neighbor(FeatureTemplateValidator): address: str description: Optional[str] = None shutdown: Optional[BoolStr] = None diff --git a/catalystwan/api/templates/models/cisco_logging_model.py b/catalystwan/api/templates/models/cisco_logging_model.py index 95c04ded..275282c7 100644 --- a/catalystwan/api/templates/models/cisco_logging_model.py +++ b/catalystwan/api/templates/models/cisco_logging_model.py @@ -4,10 +4,10 @@ from pathlib import Path from typing import ClassVar, List, Optional -from pydantic import BaseModel, ConfigDict, Field +from pydantic import ConfigDict, Field from catalystwan.api.templates.bool_str import BoolStr -from catalystwan.api.templates.feature_template import FeatureTemplate +from catalystwan.api.templates.feature_template import FeatureTemplate, FeatureTemplateValidator class Version(str, Enum): @@ -20,7 +20,7 @@ class AuthType(str, Enum): MUTUAL = "Mutual" -class TlsProfile(BaseModel): +class TlsProfile(FeatureTemplateValidator): profile: str version: Optional[Version] = Field(Version.TLSV11, json_schema_extra={"data_path": ["tls-version"]}) auth_type: AuthType = Field(json_schema_extra={"vmanage_key": "auth-type"}) @@ -41,7 +41,7 @@ class Priority(str, Enum): EMERGENCY = "emergency" -class Server(BaseModel): +class Server(FeatureTemplateValidator): name: str vpn: Optional[int] = None source_interface: Optional[str] = Field(default=None, json_schema_extra={"vmanage_key": "source-interface"}) @@ -56,7 +56,7 @@ class Server(BaseModel): model_config = ConfigDict(populate_by_name=True) -class Ipv6Server(BaseModel): +class Ipv6Server(FeatureTemplateValidator): name: str vpn: Optional[int] = None source_interface: Optional[str] = Field(default=None, json_schema_extra={"vmanage_key": "source-interface"}) diff --git a/catalystwan/api/templates/models/cisco_ntp_model.py b/catalystwan/api/templates/models/cisco_ntp_model.py index c4470015..2116fb1f 100644 --- a/catalystwan/api/templates/models/cisco_ntp_model.py +++ b/catalystwan/api/templates/models/cisco_ntp_model.py @@ -3,13 +3,13 @@ from pathlib import Path from typing import ClassVar, List, Optional -from pydantic import BaseModel, ConfigDict, Field +from pydantic import ConfigDict, Field from catalystwan.api.templates.bool_str import BoolStr -from catalystwan.api.templates.feature_template import FeatureTemplate +from catalystwan.api.templates.feature_template import FeatureTemplate, FeatureTemplateValidator -class Server(BaseModel): +class Server(FeatureTemplateValidator): model_config = ConfigDict(populate_by_name=True) name: str @@ -20,7 +20,7 @@ class Server(BaseModel): prefer: Optional[BoolStr] = None -class Authentication(BaseModel): +class Authentication(FeatureTemplateValidator): model_config = ConfigDict(populate_by_name=True) number: int diff --git a/catalystwan/api/templates/models/cisco_omp_model.py b/catalystwan/api/templates/models/cisco_omp_model.py index 3bf851dc..2a65412a 100644 --- a/catalystwan/api/templates/models/cisco_omp_model.py +++ b/catalystwan/api/templates/models/cisco_omp_model.py @@ -4,10 +4,10 @@ from pathlib import Path from typing import ClassVar, List, Optional -from pydantic import BaseModel, ConfigDict, Field +from pydantic import ConfigDict, Field from catalystwan.api.templates.bool_str import BoolStr -from catalystwan.api.templates.feature_template import FeatureTemplate +from catalystwan.api.templates.feature_template import FeatureTemplate, FeatureTemplateValidator DEFAULT_OMP_HOLDTIME = 60 DEFAULT_OMP_EOR_TIMER = 300 @@ -32,7 +32,7 @@ class Route(str, Enum): EXTERNAL = "external" -class IPv4Advertise(BaseModel): +class IPv4Advertise(FeatureTemplateValidator): protocol: IPv4AdvertiseProtocol route: Route @@ -47,7 +47,7 @@ class IPv6AdvertiseProtocol(str, Enum): ISIS = "isis" -class IPv6Advertise(BaseModel): +class IPv6Advertise(FeatureTemplateValidator): protocol: IPv6AdvertiseProtocol diff --git a/catalystwan/api/templates/models/cisco_ospf.py b/catalystwan/api/templates/models/cisco_ospf.py index 9a5262e6..3df7ca66 100644 --- a/catalystwan/api/templates/models/cisco_ospf.py +++ b/catalystwan/api/templates/models/cisco_ospf.py @@ -5,10 +5,10 @@ from pathlib import Path from typing import ClassVar, List, Optional -from pydantic import BaseModel, ConfigDict, Field +from pydantic import ConfigDict, Field from catalystwan.api.templates.bool_str import BoolStr -from catalystwan.api.templates.feature_template import FeatureTemplate +from catalystwan.api.templates.feature_template import FeatureTemplate, FeatureTemplateValidator DEFAULT_OSPF_HELLO_INTERVAL = 10 DEFAULT_OSPF_DEAD_INTERVAL = 40 @@ -37,7 +37,7 @@ class Protocol(str, Enum): EIGRP = "eigrp" -class Redistribute(BaseModel): +class Redistribute(FeatureTemplateValidator): protocol: Protocol route_policy: Optional[str] = Field(default=None, json_schema_extra={"vmanage_key": "route-policy"}) dia: Optional[BoolStr] = True @@ -49,7 +49,7 @@ class AdType(str, Enum): ON_STARTUP = "on-startup" -class RouterLsa(BaseModel): +class RouterLsa(FeatureTemplateValidator): ad_type: AdType = Field(json_schema_extra={"vmanage_key": "ad-type"}) time: int model_config = ConfigDict(populate_by_name=True) @@ -59,7 +59,7 @@ class Direction(str, Enum): IN = "in" -class RoutePolicy(BaseModel): +class RoutePolicy(FeatureTemplateValidator): direction: Direction pol_name: str = Field(json_schema_extra={"vmanage_key": "pol-name"}) model_config = ConfigDict(populate_by_name=True) @@ -78,7 +78,7 @@ class Type(str, Enum): NULL = "null" -class Interface(BaseModel): +class Interface(FeatureTemplateValidator): name: str hello_interval: Optional[int] = Field( DEFAULT_OSPF_DEAD_INTERVAL, json_schema_extra={"vmanage_key": "hello-interval"} @@ -100,14 +100,14 @@ class Interface(BaseModel): model_config = ConfigDict(populate_by_name=True) -class Range(BaseModel): +class Range(FeatureTemplateValidator): address: ipaddress.IPv4Interface cost: Optional[int] = None no_advertise: Optional[BoolStr] = Field(default=False, json_schema_extra={"vmanage_key": "no-advertise"}) model_config = ConfigDict(populate_by_name=True) -class Area(BaseModel): +class Area(FeatureTemplateValidator): a_num: int = Field(json_schema_extra={"vmanage_key": "a-num"}) stub: Optional[BoolStr] = Field( default=None, json_schema_extra={"vmanage_key": "no-summary", "data_path": ["stub"]} diff --git a/catalystwan/api/templates/models/cisco_ospfv3.py b/catalystwan/api/templates/models/cisco_ospfv3.py index 05a75e53..244b4cf3 100644 --- a/catalystwan/api/templates/models/cisco_ospfv3.py +++ b/catalystwan/api/templates/models/cisco_ospfv3.py @@ -5,10 +5,10 @@ from pathlib import Path from typing import ClassVar, List, Optional -from pydantic import BaseModel, ConfigDict, Field +from pydantic import ConfigDict, Field from catalystwan.api.templates.bool_str import BoolStr -from catalystwan.api.templates.feature_template import FeatureTemplate +from catalystwan.api.templates.feature_template import FeatureTemplate, FeatureTemplateValidator class MetricType(str, Enum): @@ -27,7 +27,7 @@ class Protocol(str, Enum): STATIC = "static" -class Redistribute(BaseModel): +class Redistribute(FeatureTemplateValidator): protocol: Protocol route_policy: Optional[str] = Field(default=None, json_schema_extra={"vmanage_key": "route-policy"}) dia: Optional[BoolStr] = True @@ -38,7 +38,7 @@ class AdType(str, Enum): ON_STARTUP = "on-startup" -class RouterLsa(BaseModel): +class RouterLsa(FeatureTemplateValidator): ad_type: AdType = Field(json_schema_extra={"vmanage_key": "ad-type"}) time: int model_config = ConfigDict(populate_by_name=True) @@ -60,7 +60,7 @@ class Type(str, Enum): SHA1 = "sha1" -class Interface(BaseModel): +class Interface(FeatureTemplateValidator): name: str hello_interval: Optional[int] = Field(10, json_schema_extra={"vmanage_key": "hello-interval"}) dead_interval: Optional[int] = Field(40, json_schema_extra={"vmanage_key": "dead-interval"}) @@ -76,14 +76,14 @@ class Interface(BaseModel): model_config = ConfigDict(populate_by_name=True) -class Range(BaseModel): +class Range(FeatureTemplateValidator): address: ipaddress.IPv4Interface cost: Optional[int] = None no_advertise: Optional[bool] = Field(False, json_schema_extra={"vmanage_key": "no-advertise"}) model_config = ConfigDict(populate_by_name=True) -class Area(BaseModel): +class Area(FeatureTemplateValidator): a_num: int = Field(json_schema_extra={"vmanage_key": "a-num"}) stub: Optional[BoolStr] = Field( default=None, json_schema_extra={"vmanage_key": "no-summary", "data_path": ["stub"]} @@ -98,13 +98,13 @@ class Area(BaseModel): model_config = ConfigDict(populate_by_name=True) -class RedistributeV6(BaseModel): +class RedistributeV6(FeatureTemplateValidator): protocol: Protocol route_policy: Optional[str] = Field(default=None, json_schema_extra={"vmanage_key": "route-policy"}) model_config = ConfigDict(populate_by_name=True) -class InterfaceV6(BaseModel): +class InterfaceV6(FeatureTemplateValidator): name: str hello_interval: Optional[int] = Field(10, json_schema_extra={"vmanage_key": "hello-interval"}) dead_interval: Optional[int] = Field(40, json_schema_extra={"vmanage_key": "dead-interval"}) @@ -120,14 +120,14 @@ class InterfaceV6(BaseModel): model_config = ConfigDict(populate_by_name=True) -class RangeV6(BaseModel): +class RangeV6(FeatureTemplateValidator): address: ipaddress.IPv6Interface cost: Optional[int] = None no_advertise: Optional[bool] = Field(False, json_schema_extra={"vmanage_key": "no-advertise"}) model_config = ConfigDict(populate_by_name=True) -class AreaV6(BaseModel): +class AreaV6(FeatureTemplateValidator): a_num: int = Field(json_schema_extra={"vmanage_key": "a-num"}) stub: Optional[BoolStr] = Field( default=None, json_schema_extra={"vmanage_key": "no-summary", "data_path": ["stub"]} diff --git a/catalystwan/api/templates/models/cisco_secure_internet_gateway.py b/catalystwan/api/templates/models/cisco_secure_internet_gateway.py index 6e5355c2..4384148f 100644 --- a/catalystwan/api/templates/models/cisco_secure_internet_gateway.py +++ b/catalystwan/api/templates/models/cisco_secure_internet_gateway.py @@ -5,9 +5,9 @@ from pathlib import Path from typing import ClassVar, List, Optional -from pydantic import BaseModel, ConfigDict, Field +from pydantic import ConfigDict, Field -from catalystwan.api.templates.feature_template import FeatureTemplate +from catalystwan.api.templates.feature_template import FeatureTemplate, FeatureTemplateValidator DEFAULT_TRACKER_THRESHOLD = 300 DEFAULT_TRACKER_INTERVAL = 60 @@ -74,7 +74,7 @@ class PerfectForwardSecrecy(str, Enum): NONE = "none" -class Interface(BaseModel): +class Interface(FeatureTemplateValidator): if_name: str = Field(json_schema_extra={"vmanage_key": "if-name"}) auto: bool shutdown: bool @@ -131,7 +131,7 @@ class SvcType(str, Enum): SIG = "sig" -class InterfacePair(BaseModel): +class InterfacePair(FeatureTemplateValidator): active_interface: str = Field(json_schema_extra={"vmanage_key": "active-interface"}) active_interface_weight: int = Field( DEFAULT_INTERFACE_PAIR_ACTIVE_INTERFACE_WEIGHT, json_schema_extra={"vmanage_key": "active-interface-weight"} @@ -155,7 +155,7 @@ class RefreshTimeUnit(str, Enum): DAY = "DAY" -class Service(BaseModel): +class Service(FeatureTemplateValidator): svc_type: SvcType = Field(SvcType.SIG, json_schema_extra={"vmanage_key": "svc-type"}) interface_pair: List[InterfacePair] = Field(json_schema_extra={"vmanage_key": "interface-pair"}) auth_required: Optional[bool] = Field(False, json_schema_extra={"vmanage_key": "auth-required"}) @@ -192,7 +192,7 @@ class TrackerType(str, Enum): SIG = "SIG" -class Tracker(BaseModel): +class Tracker(FeatureTemplateValidator): name: str endpoint_api_url: str = Field(json_schema_extra={"vmanage_key": "endpoint-api-url"}) threshold: Optional[int] = DEFAULT_TRACKER_THRESHOLD diff --git a/catalystwan/api/templates/models/cisco_snmp_model.py b/catalystwan/api/templates/models/cisco_snmp_model.py index 54e5c888..ca6c1fe8 100644 --- a/catalystwan/api/templates/models/cisco_snmp_model.py +++ b/catalystwan/api/templates/models/cisco_snmp_model.py @@ -4,18 +4,18 @@ from pathlib import Path from typing import ClassVar, List, Optional -from pydantic import BaseModel, ConfigDict, Field +from pydantic import ConfigDict, Field from catalystwan.api.templates.bool_str import BoolStr -from catalystwan.api.templates.feature_template import FeatureTemplate +from catalystwan.api.templates.feature_template import FeatureTemplate, FeatureTemplateValidator -class Oid(BaseModel): +class Oid(FeatureTemplateValidator): id: str exclude: Optional[BoolStr] = None -class View(BaseModel): +class View(FeatureTemplateValidator): name: str oid: Optional[List[Oid]] = None @@ -24,7 +24,7 @@ class Authorization(str, Enum): READ_ONLY = "read-only" -class Community(BaseModel): +class Community(FeatureTemplateValidator): name: str view: str authorization: Authorization @@ -36,7 +36,7 @@ class SecurityLevel(str, Enum): AUTHPRIV = "auth-priv" -class Group(BaseModel): +class Group(FeatureTemplateValidator): name: str security_level: SecurityLevel = Field(json_schema_extra={"vmanage_key": "security-level"}) view: str @@ -52,7 +52,7 @@ class Priv(str, Enum): AES_CFB_128 = "aes-cfb-128" -class User(BaseModel): +class User(FeatureTemplateValidator): name: str auth: Optional[Auth] = None auth_password: Optional[str] = Field(default=None, json_schema_extra={"vmanage_key": "auth-password"}) @@ -62,7 +62,7 @@ class User(BaseModel): model_config = ConfigDict(populate_by_name=True) -class Target(BaseModel): +class Target(FeatureTemplateValidator): vpn_id: int = Field(json_schema_extra={"vmanage_key": "vpn-id"}) ip: str port: int diff --git a/catalystwan/api/templates/models/cisco_system.py b/catalystwan/api/templates/models/cisco_system.py index b3affb91..0d2dd628 100644 --- a/catalystwan/api/templates/models/cisco_system.py +++ b/catalystwan/api/templates/models/cisco_system.py @@ -4,15 +4,15 @@ from pathlib import Path from typing import ClassVar, List, Optional -from pydantic import BaseModel, ConfigDict, Field +from pydantic import ConfigDict, Field from catalystwan.api.templates.bool_str import BoolStr from catalystwan.api.templates.device_variable import DeviceVariable -from catalystwan.api.templates.feature_template import FeatureTemplate +from catalystwan.api.templates.feature_template import FeatureTemplate, FeatureTemplateValidator from catalystwan.utils.timezone import Timezone -class MobileNumber(BaseModel): +class MobileNumber(FeatureTemplateValidator): number: str @@ -52,7 +52,7 @@ class Type(str, Enum): STATIC_ROUTE = "static-route" -class Tracker(BaseModel): +class Tracker(FeatureTemplateValidator): name: str endpoint_ip: str = Field(json_schema_extra={"vmanage_key": "endpoint-ip"}) endpoint_ip_transport_port: str = Field( @@ -71,11 +71,11 @@ class Tracker(BaseModel): model_config = ConfigDict(populate_by_name=True) -class Object(BaseModel): +class Object(FeatureTemplateValidator): number: int -class ObjectTrack(BaseModel): +class ObjectTrack(FeatureTemplateValidator): object_number: int = Field(json_schema_extra={"vmanage_key": "object-number"}) interface: str sig: str @@ -92,7 +92,7 @@ class Role(str, Enum): BORDER_ROUTER = "border-router" -class AffinityPerVrf(BaseModel): +class AffinityPerVrf(FeatureTemplateValidator): affinity_group_number: Optional[int] = Field( default=None, json_schema_extra={"vmanage_key": "affinity-group-number"} ) @@ -105,7 +105,7 @@ class EnableMrfMigration(str, Enum): ENABLE_FROM_BGP_CORE = "enabled-from-bgp-core" -class Vrf(BaseModel): +class Vrf(FeatureTemplateValidator): vrf_id: int = Field(json_schema_extra={"vmanage_key": "vrf-id"}) gateway_preference: Optional[List[int]] = Field( default=None, json_schema_extra={"vmanage_key": "gateway-preference"} diff --git a/catalystwan/api/templates/models/cisco_vpn_interface_model.py b/catalystwan/api/templates/models/cisco_vpn_interface_model.py index 84731d6b..09af6a56 100644 --- a/catalystwan/api/templates/models/cisco_vpn_interface_model.py +++ b/catalystwan/api/templates/models/cisco_vpn_interface_model.py @@ -5,10 +5,10 @@ from pathlib import Path from typing import ClassVar, List, Optional -from pydantic import BaseModel, ConfigDict, Field +from pydantic import ConfigDict, Field from catalystwan.api.templates.bool_str import BoolStr -from catalystwan.api.templates.feature_template import FeatureTemplate +from catalystwan.api.templates.feature_template import FeatureTemplate, FeatureTemplateValidator DEFAULT_STATIC_NAT64_SOURCE_VPN_ID = 0 DEFAULT_STATIC_NAT_SOURCE_VPN_ID = 0 @@ -22,11 +22,11 @@ DEFAULT_IPV6_VRRP_TIMER = 1000 -class SecondaryIPv4Address(BaseModel): +class SecondaryIPv4Address(FeatureTemplateValidator): address: Optional[ipaddress.IPv4Interface] = None -class SecondaryIPv6Address(BaseModel): +class SecondaryIPv6Address(FeatureTemplateValidator): address: Optional[ipaddress.IPv6Interface] = None @@ -35,13 +35,13 @@ class Direction(str, Enum): OUT = "out" -class AccessList(BaseModel): +class AccessList(FeatureTemplateValidator): direction: Direction acl_name: str = Field(json_schema_extra={"vmanage_key": "acl-name"}) model_config = ConfigDict(populate_by_name=True) -class DhcpHelperV6(BaseModel): +class DhcpHelperV6(FeatureTemplateValidator): address: ipaddress.IPv6Address vpn: Optional[int] = None @@ -52,7 +52,7 @@ class NatChoice(str, Enum): LOOPBACK = "Loopback" -class StaticNat66(BaseModel): +class StaticNat66(FeatureTemplateValidator): source_prefix: ipaddress.IPv6Interface = Field(json_schema_extra={"vmanage_key": "source-prefix"}) translated_source_prefix: str = Field(json_schema_extra={"vmanage_key": "translated-source-prefix"}) source_vpn_id: int = Field(DEFAULT_STATIC_NAT64_SOURCE_VPN_ID, json_schema_extra={"vmanage_key": "source-vpn-id"}) @@ -64,7 +64,7 @@ class StaticNatDirection(str, Enum): OUTSIDE = "outside" -class Static(BaseModel): +class Static(FeatureTemplateValidator): source_ip: ipaddress.IPv4Address = Field(json_schema_extra={"vmanage_key": "source-ip"}) translate_ip: ipaddress.IPv4Address = Field(json_schema_extra={"vmanage_key": "translate-ip"}) static_nat_direction: StaticNatDirection = Field( @@ -79,7 +79,7 @@ class Proto(str, Enum): UDP = "udp" -class StaticPortForward(BaseModel): +class StaticPortForward(FeatureTemplateValidator): source_ip: ipaddress.IPv4Address = Field(json_schema_extra={"vmanage_key": "source-ip"}) translate_ip: ipaddress.IPv4Address = Field(json_schema_extra={"vmanage_key": "translate-ip"}) static_nat_direction: StaticNatDirection = Field( @@ -110,7 +110,7 @@ class Encap(str, Enum): IPSEC = "ipsec" -class Encapsulation(BaseModel): +class Encapsulation(FeatureTemplateValidator): encap: Encap preference: Optional[int] = None weight: int = DEFAULT_ENCAPSULATION_WEIGHT @@ -178,12 +178,12 @@ class Duplex(str, Enum): AUTO = "auto" -class Ip(BaseModel): +class Ip(FeatureTemplateValidator): addr: ipaddress.IPv4Address mac: str -class Ipv4Secondary(BaseModel): +class Ipv4Secondary(FeatureTemplateValidator): address: ipaddress.IPv4Address @@ -192,14 +192,14 @@ class TrackAction(str, Enum): SHUTDOWN = "Shutdown" -class TrackingObject(BaseModel): +class TrackingObject(FeatureTemplateValidator): name: int track_action: TrackAction = Field(TrackAction.DECREMENT, json_schema_extra={"vmanage_key": "track-action"}) decrement: int model_config = ConfigDict(populate_by_name=True) -class Vrrp(BaseModel): +class Vrrp(FeatureTemplateValidator): grp_id: int = Field(json_schema_extra={"vmanage_key": "grp-id"}) priority: int = DEFAULT_VRRP_PRIORITY timer: int = DEFAULT_VRRP_TIMER @@ -219,13 +219,13 @@ class Vrrp(BaseModel): model_config = ConfigDict(populate_by_name=True) -class Ipv6(BaseModel): +class Ipv6(FeatureTemplateValidator): ipv6_link_local: ipaddress.IPv6Address = Field(json_schema_extra={"vmanage_key": "ipv6-link-local"}) prefix: Optional[ipaddress.IPv6Interface] = None model_config = ConfigDict(populate_by_name=True) -class Ipv6Vrrp(BaseModel): +class Ipv6Vrrp(FeatureTemplateValidator): grp_id: int = Field(json_schema_extra={"vmanage_key": "grp-id"}) priority: int = DEFAULT_IPV6_VRRP_PRIORITY timer: int = DEFAULT_IPV6_VRRP_TIMER @@ -245,12 +245,16 @@ class CiscoVpnInterfaceModel(FeatureTemplate): secondary_ipv4_address: Optional[List[SecondaryIPv4Address]] = Field( default=None, json_schema_extra={"data_path": ["ip"], "vmanage_key": "secondary-address"} ) - dhcp_ipv4_client: Optional[BoolStr] = Field(default=None, json_schema_extra={"vmanage_key": "dhcp-client"}) + dhcp_ipv4_client: Optional[BoolStr] = Field( + default=None, json_schema_extra={"data_path": ["ip"], "vmanage_key": "dhcp-client"} + ) dhcp_distance: Optional[int] = Field(default=None, json_schema_extra={"vmanage_key": "dhcp-distance"}) ipv6_address: Optional[ipaddress.IPv6Interface] = Field( default=None, json_schema_extra={"data_path": ["ipv6"], "vmanage_key": "address"} ) - dhcp_ipv6_client: Optional[BoolStr] = Field(default=None, json_schema_extra={"vmanage_key": "dhcp-client"}) + dhcp_ipv6_client: Optional[BoolStr] = Field( + default=None, json_schema_extra={"data_path": ["ipv6"], "vmanage_key": "dhcp-client"} + ) secondary_ipv6_address: Optional[List[SecondaryIPv6Address]] = Field( default=None, json_schema_extra={"data_path": ["ipv6"], "vmanage_key": "secondary-address"} ) diff --git a/catalystwan/api/templates/models/cisco_vpn_model.py b/catalystwan/api/templates/models/cisco_vpn_model.py index 641dde7d..04c5d66b 100644 --- a/catalystwan/api/templates/models/cisco_vpn_model.py +++ b/catalystwan/api/templates/models/cisco_vpn_model.py @@ -4,9 +4,9 @@ from pathlib import Path from typing import ClassVar, List, Optional -from pydantic import BaseModel, ConfigDict, Field, field_validator +from pydantic import ConfigDict, Field, field_validator -from catalystwan.api.templates.feature_template import FeatureTemplate +from catalystwan.api.templates.feature_template import FeatureTemplate, FeatureTemplateValidator class Role(str, Enum): @@ -14,19 +14,19 @@ class Role(str, Enum): SECONDARY = "secondary" -class Dns(BaseModel): +class Dns(FeatureTemplateValidator): dns_addr: str = Field(json_schema_extra={"vmanage_key": "dns-addr"}) role: Role = Role.PRIMARY model_config = ConfigDict(populate_by_name=True) -class DnsIpv6(BaseModel): +class DnsIpv6(FeatureTemplateValidator): dns_addr: str = Field(json_schema_extra={"vmanage_key": "dns-addr"}) role: Optional[Role] = Role.PRIMARY model_config = ConfigDict(populate_by_name=True) -class Host(BaseModel): +class Host(FeatureTemplateValidator): hostname: str ip: List[str] @@ -43,7 +43,7 @@ class SvcType(str, Enum): APPQOE = "appqoe" -class Service(BaseModel): +class Service(FeatureTemplateValidator): svc_type: SvcType = Field(json_schema_extra={"vmanage_key": "svc-type"}) address: List[str] interface: str @@ -60,24 +60,24 @@ class ServiceRouteService(str, Enum): SIG = "sig" -class ServiceRoute(BaseModel): +class ServiceRoute(FeatureTemplateValidator): prefix: str vpn: int service: ServiceRouteService = ServiceRouteService.SIG -class NextHop(BaseModel): +class NextHop(FeatureTemplateValidator): address: str distance: Optional[int] = 1 -class NextHopWithTrack(BaseModel): +class NextHopWithTrack(FeatureTemplateValidator): address: str distance: Optional[int] = 1 tracker: str -class Routev4(BaseModel): +class Routev4(FeatureTemplateValidator): prefix: str next_hop: Optional[List[NextHop]] = Field( default=None, json_schema_extra={"vmanage_key": "next-hop", "priority_order": ["address", "distance"]} @@ -92,7 +92,7 @@ class Routev4(BaseModel): model_config = ConfigDict(populate_by_name=True) -class NextHopv6(BaseModel): +class NextHopv6(FeatureTemplateValidator): address: str distance: Optional[int] = 1 @@ -102,7 +102,7 @@ class Nat(str, Enum): NAT66 = "NAT66" -class Routev6(BaseModel): +class Routev6(FeatureTemplateValidator): prefix: str next_hop: Optional[List[NextHopv6]] = Field(default=None, json_schema_extra={"vmanage_key": "next-hop"}) null0: Optional[bool] = None @@ -111,13 +111,13 @@ class Routev6(BaseModel): model_config = ConfigDict(populate_by_name=True) -class GreRoute(BaseModel): +class GreRoute(FeatureTemplateValidator): prefix: str vpn: int interface: Optional[List[str]] = None -class IpsecRoute(BaseModel): +class IpsecRoute(FeatureTemplateValidator): prefix: str vpn: int interface: Optional[List[str]] = None @@ -145,14 +145,14 @@ class Region(str, Enum): ACCESS = "access" -class PrefixList(BaseModel): +class PrefixList(FeatureTemplateValidator): prefix_entry: str = Field(json_schema_extra={"vmanage_key": "prefix-entry"}) aggregate_only: Optional[bool] = Field(default=None, json_schema_extra={"vmanage_key": "aggregate-only"}) region: Optional[Region] model_config = ConfigDict(populate_by_name=True) -class Advertise(BaseModel): +class Advertise(FeatureTemplateValidator): protocol: AdvertiseProtocol route_policy: Optional[str] = Field(default=None, json_schema_extra={"vmanage_key": "route-policy"}) protocol_sub_type: Optional[List[AdvertiseProtocolSubType]] = Field( @@ -175,7 +175,7 @@ class Ipv6AdvertiseProtocolSubType(str, Enum): EXTERNAL = "external" -class Ipv6Advertise(BaseModel): +class Ipv6Advertise(FeatureTemplateValidator): protocol: Ipv6AdvertiseProtocol route_policy: Optional[str] = Field(default=None, json_schema_extra={"vmanage_key": "route-policy"}) protocol_sub_type: Optional[List[Ipv6AdvertiseProtocolSubType]] = Field( @@ -194,7 +194,7 @@ class LeakFromGlobalProtocol(str, Enum): ODR = "odr" -class Pool(BaseModel): +class Pool(FeatureTemplateValidator): name: str start_address: str = Field(json_schema_extra={"vmanage_key": "start-address"}) end_address: str = Field(json_schema_extra={"vmanage_key": "end-address"}) @@ -215,7 +215,7 @@ class Overload(str, Enum): FALSE = "false" -class Natpool(BaseModel): +class Natpool(FeatureTemplateValidator): name: int prefix_length: int = Field(json_schema_extra={"vmanage_key": "prefix-length"}) range_start: str = Field(json_schema_extra={"vmanage_key": "range-start"}) @@ -231,7 +231,7 @@ class StaticNatDirection(str, Enum): OUTSIDE = "outside" -class Static(BaseModel): +class Static(FeatureTemplateValidator): pool_name: Optional[int] = Field(json_schema_extra={"vmanage_key": "pool-name"}) source_ip: str = Field(json_schema_extra={"vmanage_key": "source-ip"}) translate_ip: str = Field(json_schema_extra={"vmanage_key": "translate-ip"}) @@ -240,7 +240,7 @@ class Static(BaseModel): model_config = ConfigDict(populate_by_name=True) -class SubnetStatic(BaseModel): +class SubnetStatic(FeatureTemplateValidator): source_ip_subnet: str = Field(json_schema_extra={"vmanage_key": "source-ip-subnet"}) translate_ip_subnet: str = Field(json_schema_extra={"vmanage_key": "translate-ip-subnet"}) prefix_length: int = Field(json_schema_extra={"vmanage_key": "prefix-length"}) @@ -254,7 +254,7 @@ class Proto(str, Enum): UDP = "udp" -class PortForward(BaseModel): +class PortForward(FeatureTemplateValidator): pool_name: Optional[int] = Field(default=None, json_schema_extra={"vmanage_key": "pool-name"}) source_port: int = Field(json_schema_extra={"vmanage_key": "source-port"}) translate_port: int = Field(json_schema_extra={"vmanage_key": "translate-port"}) @@ -281,13 +281,13 @@ class RouteImportRedistributeProtocol(str, Enum): OSPF = "ospf" -class RouteImportRedistribute(BaseModel): +class RouteImportRedistribute(FeatureTemplateValidator): protocol: RouteImportRedistributeProtocol route_policy: Optional[str] = Field(default=None, json_schema_extra={"vmanage_key": "route-policy"}) model_config = ConfigDict(populate_by_name=True) -class RouteImport(BaseModel): +class RouteImport(FeatureTemplateValidator): protocol: RouteImportProtocol protocol_sub_type: List[RouteImportProtocolSubType] = Field(json_schema_extra={"vmanage_key": "protocol-sub-type"}) route_policy: Optional[str] = Field(default=None, json_schema_extra={"vmanage_key": "route-policy"}) @@ -313,13 +313,13 @@ class RouteImportFromRedistributeProtocol(str, Enum): OSPF = "ospf" -class RouteImportFromRedistribute(BaseModel): +class RouteImportFromRedistribute(FeatureTemplateValidator): protocol: RouteImportFromRedistributeProtocol route_policy: Optional[str] = Field(default=None, json_schema_extra={"vmanage_key": "route-policy"}) model_config = ConfigDict(populate_by_name=True) -class RouteImportFrom(BaseModel): +class RouteImportFrom(FeatureTemplateValidator): source_vpn: int = Field(json_schema_extra={"vmanage_key": "source-vpn"}) protocol: RouteImportFromProtocol protocol_sub_type: List[RouteImportFromProtocolSubType] = Field( @@ -347,13 +347,13 @@ class RouteExportRedistributeProtocol(str, Enum): OSPF = "ospf" -class RouteExportRedistribute(BaseModel): +class RouteExportRedistribute(FeatureTemplateValidator): protocol: RouteExportRedistributeProtocol route_policy: Optional[str] = Field(default=None, json_schema_extra={"vmanage_key": "route-policy"}) model_config = ConfigDict(populate_by_name=True) -class RouteExport(BaseModel): +class RouteExport(FeatureTemplateValidator): protocol: RouteExportProtocol protocol_sub_type: List[RouteExportProtocolSubType] = Field(json_schema_extra={"vmanage_key": "protocol-sub-type"}) route_policy: Optional[str] = Field(default=None, json_schema_extra={"vmanage_key": "route-policy"}) diff --git a/catalystwan/utils/feature_template.py b/catalystwan/utils/feature_template.py index bbab96c0..b4a3cfb1 100644 --- a/catalystwan/utils/feature_template.py +++ b/catalystwan/utils/feature_template.py @@ -1,6 +1,8 @@ # Copyright 2023 Cisco Systems, Inc. and its affiliates -from typing import Any, Dict, Optional, Union +from typing import Any, Dict, List, Optional, Union + +from pydantic import BaseModel from catalystwan.api.templates.device_variable import DeviceVariable from catalystwan.api.templates.models.supported import available_models @@ -31,6 +33,37 @@ def choose_model(type_value: str) -> Any: return available_models[type_value] +class FlattenedTemplateValue(BaseModel): + value: Any + data_path: List[str] + + +def flatten_template_definition(template_definition: Dict[str, Any]) -> Dict[str, List[FlattenedTemplateValue]]: + def get_flattened_dict( + template_definition: Dict[str, Any], + flattened_dict: Dict[str, List[FlattenedTemplateValue]] = {}, + path: List[str] = [], + ): + for key, value in template_definition.items(): + if isinstance(value, dict): + get_flattened_dict(value, flattened_dict, path=path + [key]) + else: + if key not in flattened_dict: + flattened_dict[key] = [] + if isinstance(value, list) and all([isinstance(v, dict) for v in value]): + flattened_value = FlattenedTemplateValue( + value=[get_flattened_dict(v, {}) for v in value], data_path=path + ) + flattened_dict[key].append(flattened_value) + else: + flattened_dict[key].append(FlattenedTemplateValue(value=value, data_path=path)) + return flattened_dict + + flattened_dict: Dict[str, List[FlattenedTemplateValue]] = {} + get_flattened_dict(template_definition, flattened_dict) + return flattened_dict + + def find_template_values( template_definition: dict, templated_values: dict = {}, @@ -39,7 +72,8 @@ def find_template_values( target_key_value_to_ignore: str = "ignore", target_key_for_template_value: str = "vipValue", device_specific_variables: Optional[Dict[str, DeviceVariable]] = None, -) -> Dict[str, Union[str, list]]: + path: List[str] = [], +) -> Dict[str, Union[str, list, dict]]: """Based on provided template definition generates a dictionary with template fields and values Args: @@ -56,14 +90,40 @@ def find_template_values( Returns: templated_values: dictionary containing template fields as key and values assigned to those fields as values """ + # if value object is reached, try to extract the value + if target_key in template_definition and template_definition[target_key] != target_key_value_to_ignore: + value = template_definition[target_key] + template_value = template_definition[target_key_for_template_value] + current_dict = templated_values + for path_key in path[:-1]: + if path_key not in current_dict: + current_dict[path_key] = {} + current_dict = current_dict[path_key] + current_dict[path[-1]] = template_value + + if value == "variableName" and (device_specific_variables is not None) and parent_key: + device_specific_variables[parent_key] = DeviceVariable(name=template_definition["vipVariableName"]) + elif template_definition["vipObjectType"] != "tree": + current_dict[path[-1]] = template_value + elif isinstance(template_value, dict): + find_template_values( + value, templated_values, parent_key, device_specific_variables=device_specific_variables, path=path + ) + elif isinstance(template_value, list): + current_dict[path[-1]] = [] + for item in template_value: + current_dict[path[-1]].append( + find_template_values(item, {}, device_specific_variables=device_specific_variables) + ) + + return templated_values + + # iterate the dict to extract values and assign them to their fields for key, value in template_definition.items(): - if key == target_key and value != target_key_value_to_ignore: - if value == "variableName" and (device_specific_variables is not None) and parent_key: - device_specific_variables[parent_key] = DeviceVariable(name=template_definition["vipVariableName"]) - else: - templated_values[parent_key] = template_definition[target_key_for_template_value] - elif isinstance(value, dict) and value != target_key_value_to_ignore: - find_template_values(value, templated_values, key, device_specific_variables=device_specific_variables) + if isinstance(value, dict) and value != target_key_value_to_ignore: + find_template_values( + value, templated_values, key, device_specific_variables=device_specific_variables, path=path + [key] + ) elif ( isinstance(value, list) and key == target_key_for_template_value @@ -73,6 +133,8 @@ def find_template_values( templated_values[parent_key] = [] for item in value: templated_values[parent_key].append( - find_template_values(item, {}, device_specific_variables=device_specific_variables) + find_template_values( + item, {}, device_specific_variables=device_specific_variables, path=path + [key] + ) ) return templated_values From 1e45dda28a7e67a5820cdd8ab15fd51f3b5e23e0 Mon Sep 17 00:00:00 2001 From: PrzeG Date: Mon, 4 Mar 2024 09:04:28 +0100 Subject: [PATCH 2/8] Small fixes --- catalystwan/api/templates/feature_template.py | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/catalystwan/api/templates/feature_template.py b/catalystwan/api/templates/feature_template.py index e9a8b3db..5bae8d12 100644 --- a/catalystwan/api/templates/feature_template.py +++ b/catalystwan/api/templates/feature_template.py @@ -27,22 +27,26 @@ def map_fields(cls, values: Union[Any, Dict[str, Union[List[FlattenedTemplateVal if not isinstance(values, dict): return values - mapped_values = {} for field_name, field_info in cls.model_fields.items(): - payload_name = get_extra_field(field_info, "vmanage_key") or field_info.alias or field_name - data_path = get_extra_field(field_info, "data_path", []) - if payload_name not in values: - continue + vmanage_key = get_extra_field(field_info, "vmanage_key") + if vmanage_key in values: + payload_name = vmanage_key + elif field_info.alias in values: + payload_name = field_info.alias + elif field_name in values: + payload_name = field_name else: - value = values[payload_name] - if value and isinstance(value, list) and all([isinstance(v, FlattenedTemplateValue)] for v in value): + continue + data_path = get_extra_field(field_info, "data_path", []) + value = values.pop(payload_name) + if value and isinstance(value, list) and all([isinstance(v, FlattenedTemplateValue) for v in value]): for template_value in value: if template_value.data_path == data_path: - mapped_values[field_name] = template_value.value + values[field_name] = template_value.value break else: - mapped_values[field_name] = value - return mapped_values + values[field_name] = value + return values class FeatureTemplate(FeatureTemplateValidator, ABC): From 6bdcb5a319c8aba058e9ed9f3b3609b405266e1a Mon Sep 17 00:00:00 2001 From: PrzeG Date: Tue, 5 Mar 2024 07:52:11 +0100 Subject: [PATCH 3/8] Code cleanup --- catalystwan/api/templates/feature_template.py | 13 ++-- .../api/templates/models/cisco_omp_model.py | 2 +- catalystwan/api/templates/models/supported.py | 2 + .../tests/templates/test_chose_model.py | 2 +- catalystwan/utils/dict.py | 40 ++++++++++ .../utils/feature_template/choose_model.py | 28 +++++++ .../feature_template/find_template_values.py | 77 +++++++++++++++++++ 7 files changed, 155 insertions(+), 9 deletions(-) create mode 100644 catalystwan/utils/feature_template/choose_model.py create mode 100644 catalystwan/utils/feature_template/find_template_values.py diff --git a/catalystwan/api/templates/feature_template.py b/catalystwan/api/templates/feature_template.py index 5bae8d12..c6d851c5 100644 --- a/catalystwan/api/templates/feature_template.py +++ b/catalystwan/api/templates/feature_template.py @@ -12,19 +12,18 @@ from catalystwan.api.templates.device_variable import DeviceVariable from catalystwan.utils.device_model import DeviceModel +from catalystwan.utils.dict import FlattenedDictValue, flatten_dict +from catalystwan.utils.feature_template.find_template_values import find_template_values from catalystwan.utils.pydantic_field import get_extra_field if TYPE_CHECKING: from catalystwan.session import ManagerSession - from catalystwan.utils.feature_template import FlattenedTemplateValue class FeatureTemplateValidator(BaseModel, ABC): @model_validator(mode="before") @classmethod - def map_fields(cls, values: Union[Any, Dict[str, Union[List[FlattenedTemplateValue], Any]]]): - from catalystwan.utils.feature_template import FlattenedTemplateValue - + def map_fields(cls, values: Union[Any, Dict[str, Union[List[FlattenedDictValue], Any]]]): if not isinstance(values, dict): return values for field_name, field_info in cls.model_fields.items(): @@ -39,7 +38,7 @@ def map_fields(cls, values: Union[Any, Dict[str, Union[List[FlattenedTemplateVal continue data_path = get_extra_field(field_info, "data_path", []) value = values.pop(payload_name) - if value and isinstance(value, list) and all([isinstance(v, FlattenedTemplateValue) for v in value]): + if value and isinstance(value, list) and all([isinstance(v, FlattenedDictValue) for v in value]): for template_value in value: if template_value.data_path == data_path: values[field_name] = template_value.value @@ -114,7 +113,7 @@ def get(cls, session: ManagerSession, name: str) -> FeatureTemplate: Returns: FeatureTemplate: filed out feature template model """ - from catalystwan.utils.feature_template import choose_model, find_template_values, flatten_template_definition + from catalystwan.utils.feature_template.choose_model import choose_model template_info = ( session.api.templates._get_feature_templates(summary=False).filter(name=name).single_or_default() @@ -127,7 +126,7 @@ def get(cls, session: ManagerSession, name: str) -> FeatureTemplate: values_from_template_definition = find_template_values( template_definition_as_dict, device_specific_variables=device_specific_variables ) - flattened_values = flatten_template_definition(values_from_template_definition) + flattened_values = flatten_dict(values_from_template_definition) return feature_template_model( template_name=template_info.name, diff --git a/catalystwan/api/templates/models/cisco_omp_model.py b/catalystwan/api/templates/models/cisco_omp_model.py index 2a65412a..baebfc8f 100644 --- a/catalystwan/api/templates/models/cisco_omp_model.py +++ b/catalystwan/api/templates/models/cisco_omp_model.py @@ -34,7 +34,7 @@ class Route(str, Enum): class IPv4Advertise(FeatureTemplateValidator): protocol: IPv4AdvertiseProtocol - route: Route + route: Optional[Route] = None class IPv6AdvertiseProtocol(str, Enum): diff --git a/catalystwan/api/templates/models/supported.py b/catalystwan/api/templates/models/supported.py index 43985bed..fc0ad1f2 100644 --- a/catalystwan/api/templates/models/supported.py +++ b/catalystwan/api/templates/models/supported.py @@ -5,6 +5,7 @@ from catalystwan.api.templates.models.cisco_bfd_model import CiscoBFDModel from catalystwan.api.templates.models.cisco_logging_model import CiscoLoggingModel from catalystwan.api.templates.models.cisco_ntp_model import CiscoNTPModel +from catalystwan.api.templates.models.cisco_omp_model import CiscoOMPModel from catalystwan.api.templates.models.cisco_ospf import CiscoOSPFModel from catalystwan.api.templates.models.cisco_secure_internet_gateway import CiscoSecureInternetGatewayModel from catalystwan.api.templates.models.cisco_snmp_model import CiscoSNMPModel @@ -31,4 +32,5 @@ "cisco_snmp": CiscoSNMPModel, "cisco_system": CiscoSystemModel, "cisco_secure_internet_gateway": CiscoSecureInternetGatewayModel, + "cisco_omp": CiscoOMPModel, } diff --git a/catalystwan/tests/templates/test_chose_model.py b/catalystwan/tests/templates/test_chose_model.py index af24a47c..fdf220a7 100644 --- a/catalystwan/tests/templates/test_chose_model.py +++ b/catalystwan/tests/templates/test_chose_model.py @@ -9,7 +9,7 @@ from catalystwan.api.templates.models.omp_vsmart_model import OMPvSmart from catalystwan.api.templates.models.security_vsmart_model import SecurityvSmart from catalystwan.api.templates.models.system_vsmart_model import SystemVsmart -from catalystwan.utils.feature_template import choose_model +from catalystwan.utils.feature_template.choose_model import choose_model class TestChooseModel(unittest.TestCase): diff --git a/catalystwan/utils/dict.py b/catalystwan/utils/dict.py index 393f7431..4bcb3e9f 100644 --- a/catalystwan/utils/dict.py +++ b/catalystwan/utils/dict.py @@ -1,5 +1,9 @@ # Copyright 2023 Cisco Systems, Inc. and its affiliates +from typing import Any, Dict, List + +from pydantic import BaseModel + def merge(a, b, path=None): if path is None: @@ -15,3 +19,39 @@ def merge(a, b, path=None): else: a[key] = b[key] return a + + +class FlattenedDictValue(BaseModel): + value: Any + data_path: List[str] + + +def flatten_dict(original_dict: Dict[str, Any]) -> Dict[str, List[FlattenedDictValue]]: + """ + Flattens a dictionary. + Each key corresponds to a list of FlattenedDictValue, allowing us to handle repeated keys in nesting. + """ + + def get_flattened_dict( + original_dict: Dict[str, Any], + flattened_dict: Dict[str, List[FlattenedDictValue]] = {}, + path: List[str] = [], + ): + for key, value in original_dict.items(): + if isinstance(value, dict): + get_flattened_dict(value, flattened_dict, path=path + [key]) + else: + if key not in flattened_dict: + flattened_dict[key] = [] + if isinstance(value, list) and all([isinstance(v, dict) for v in value]): + flattened_value = FlattenedDictValue( + value=[get_flattened_dict(v, {}) for v in value], data_path=path + ) + flattened_dict[key].append(flattened_value) + else: + flattened_dict[key].append(FlattenedDictValue(value=value, data_path=path)) + return flattened_dict + + flattened_dict: Dict[str, List[FlattenedDictValue]] = {} + get_flattened_dict(original_dict, flattened_dict) + return flattened_dict diff --git a/catalystwan/utils/feature_template/choose_model.py b/catalystwan/utils/feature_template/choose_model.py new file mode 100644 index 00000000..05bb8648 --- /dev/null +++ b/catalystwan/utils/feature_template/choose_model.py @@ -0,0 +1,28 @@ +from typing import Any + +from catalystwan.api.templates.models.supported import available_models +from catalystwan.exceptions import TemplateTypeError + + +def choose_model(type_value: str) -> Any: + """Chooses correct model based on provided type + + With provided type of feature template searches supported by catalystwan models + and returns correct for given type of feature template class. + + Args: + type_value: type of feature template + + Returns: + model + + Raises: + TemplateTypeError: Raises when the model is not supported by catalystwan. + """ + if type_value not in available_models: + for model in available_models.values(): + if model.type == type_value: # type: ignore + return model + raise TemplateTypeError(f"Feature template type '{type_value}' is not supported.") + + return available_models[type_value] diff --git a/catalystwan/utils/feature_template/find_template_values.py b/catalystwan/utils/feature_template/find_template_values.py new file mode 100644 index 00000000..b88667ca --- /dev/null +++ b/catalystwan/utils/feature_template/find_template_values.py @@ -0,0 +1,77 @@ +from typing import Dict, List, Optional, Union + +from catalystwan.api.templates.device_variable import DeviceVariable + + +def find_template_values( + template_definition: dict, + templated_values: dict = {}, + target_key: str = "vipType", + target_key_value_to_ignore: str = "ignore", + target_key_for_template_value: str = "vipValue", + device_specific_variables: Optional[Dict[str, DeviceVariable]] = None, + path: Optional[List[str]] = None, +) -> Dict[str, Union[str, list, dict]]: + """Based on provided template definition generates a dictionary with template fields and values + + Args: + template_definition: template definition provided as dict + templated_values: dictionary, empty at the beginning and filed out with names of fields as keys + and values of those fields as values + target_key: name of the key specifying if field is used in template, defaults to 'vipType' + target_key_value_to_ignore: value of the target key indicating + that field is not used in template, defaults to 'ignore' + target_key_for_template_value: name of the key specifying value of field used in template, + defaults to 'vipValue' + path: a list of keys indicating current path, defaults to None + Returns: + templated_values: dictionary containing template fields as key and values assigned to those fields as values + """ + if path is None: + path = [] + + # if value object is reached, try to extract the value + if target_key in template_definition: + if template_definition[target_key] == target_key_value_to_ignore: + return templated_values + + value = template_definition[target_key] + template_value = template_definition[target_key_for_template_value] + + field_key = path[-1] + # TODO: Handle nested DeviceVariable + if value == "variableName" and (device_specific_variables is not None): + device_specific_variables[field_key] = DeviceVariable(name=template_definition["vipVariableName"]) + elif template_definition["vipObjectType"] != "tree": + current_nesting = get_nested_dict(templated_values, path[:-1]) + current_nesting[field_key] = template_value + elif isinstance(template_value, dict): + find_template_values( + value, templated_values, device_specific_variables=device_specific_variables, path=path + ) + elif isinstance(template_value, list): + current_nesting = get_nested_dict(templated_values, path[:-1]) + current_nesting[field_key] = [] + for item in template_value: + current_nesting[field_key].append( + find_template_values(item, {}, device_specific_variables=device_specific_variables) + ) + + return templated_values + + # iterate the dict to extract values and assign them to their fields + for key, value in template_definition.items(): + if isinstance(value, dict) and value != target_key_value_to_ignore: + find_template_values( + value, templated_values, device_specific_variables=device_specific_variables, path=path + [key] + ) + return templated_values + + +def get_nested_dict(d: dict, path: List[str], populate: bool = True): + current_dict = d + for path_key in path: + if path_key not in current_dict and populate: + current_dict[path_key] = {} + current_dict = current_dict[path_key] + return current_dict From e0756b66f6fc26a053de26c3f8e9afa5cbcb840c Mon Sep 17 00:00:00 2001 From: PrzeG Date: Tue, 5 Mar 2024 07:57:31 +0100 Subject: [PATCH 4/8] Remove unused feature_template utils --- catalystwan/utils/feature_template.py | 140 -------------------------- 1 file changed, 140 deletions(-) delete mode 100644 catalystwan/utils/feature_template.py diff --git a/catalystwan/utils/feature_template.py b/catalystwan/utils/feature_template.py deleted file mode 100644 index b4a3cfb1..00000000 --- a/catalystwan/utils/feature_template.py +++ /dev/null @@ -1,140 +0,0 @@ -# Copyright 2023 Cisco Systems, Inc. and its affiliates - -from typing import Any, Dict, List, Optional, Union - -from pydantic import BaseModel - -from catalystwan.api.templates.device_variable import DeviceVariable -from catalystwan.api.templates.models.supported import available_models -from catalystwan.exceptions import TemplateTypeError - - -def choose_model(type_value: str) -> Any: - """Chooses correct model based on provided type - - With provided type of feature template searches supported by catalystwan models - and returns correct for given type of feature template class. - - Args: - type_value: type of feature template - - Returns: - model - - Raises: - TemplateTypeError: Raises when the model is not supported by catalystwan. - """ - if type_value not in available_models: - for model in available_models.values(): - if model.type == type_value: # type: ignore - return model - raise TemplateTypeError(f"Feature template type '{type_value}' is not supported.") - - return available_models[type_value] - - -class FlattenedTemplateValue(BaseModel): - value: Any - data_path: List[str] - - -def flatten_template_definition(template_definition: Dict[str, Any]) -> Dict[str, List[FlattenedTemplateValue]]: - def get_flattened_dict( - template_definition: Dict[str, Any], - flattened_dict: Dict[str, List[FlattenedTemplateValue]] = {}, - path: List[str] = [], - ): - for key, value in template_definition.items(): - if isinstance(value, dict): - get_flattened_dict(value, flattened_dict, path=path + [key]) - else: - if key not in flattened_dict: - flattened_dict[key] = [] - if isinstance(value, list) and all([isinstance(v, dict) for v in value]): - flattened_value = FlattenedTemplateValue( - value=[get_flattened_dict(v, {}) for v in value], data_path=path - ) - flattened_dict[key].append(flattened_value) - else: - flattened_dict[key].append(FlattenedTemplateValue(value=value, data_path=path)) - return flattened_dict - - flattened_dict: Dict[str, List[FlattenedTemplateValue]] = {} - get_flattened_dict(template_definition, flattened_dict) - return flattened_dict - - -def find_template_values( - template_definition: dict, - templated_values: dict = {}, - parent_key: Optional[str] = None, - target_key: str = "vipType", - target_key_value_to_ignore: str = "ignore", - target_key_for_template_value: str = "vipValue", - device_specific_variables: Optional[Dict[str, DeviceVariable]] = None, - path: List[str] = [], -) -> Dict[str, Union[str, list, dict]]: - """Based on provided template definition generates a dictionary with template fields and values - - Args: - template_definition: template definition provided as dict - templated_values: dictionary, empty at the beginning and filed out with names of fields as keys - and values of those fields as values - parent_key: parent key provided to keep track of fields, defaults to None - target_key: name of the key specifying if field is used in template, defaults to 'vipType' - target_key_value_to_ignore: value of the target key indicating - that field is not used in template, defaults to 'ignore' - target_key_for_template_value: name of the key specifying value of field used in template, - defaults to 'vipValue' - - Returns: - templated_values: dictionary containing template fields as key and values assigned to those fields as values - """ - # if value object is reached, try to extract the value - if target_key in template_definition and template_definition[target_key] != target_key_value_to_ignore: - value = template_definition[target_key] - template_value = template_definition[target_key_for_template_value] - current_dict = templated_values - for path_key in path[:-1]: - if path_key not in current_dict: - current_dict[path_key] = {} - current_dict = current_dict[path_key] - current_dict[path[-1]] = template_value - - if value == "variableName" and (device_specific_variables is not None) and parent_key: - device_specific_variables[parent_key] = DeviceVariable(name=template_definition["vipVariableName"]) - elif template_definition["vipObjectType"] != "tree": - current_dict[path[-1]] = template_value - elif isinstance(template_value, dict): - find_template_values( - value, templated_values, parent_key, device_specific_variables=device_specific_variables, path=path - ) - elif isinstance(template_value, list): - current_dict[path[-1]] = [] - for item in template_value: - current_dict[path[-1]].append( - find_template_values(item, {}, device_specific_variables=device_specific_variables) - ) - - return templated_values - - # iterate the dict to extract values and assign them to their fields - for key, value in template_definition.items(): - if isinstance(value, dict) and value != target_key_value_to_ignore: - find_template_values( - value, templated_values, key, device_specific_variables=device_specific_variables, path=path + [key] - ) - elif ( - isinstance(value, list) - and key == target_key_for_template_value - and template_definition.get(target_key) != target_key_value_to_ignore - and all([isinstance(v, dict) for v in value]) - ): - templated_values[parent_key] = [] - for item in value: - templated_values[parent_key].append( - find_template_values( - item, {}, device_specific_variables=device_specific_variables, path=path + [key] - ) - ) - return templated_values From 9dffe6327c3cdd1e24973d25bedd03b7606b9921 Mon Sep 17 00:00:00 2001 From: PrzeG Date: Tue, 5 Mar 2024 13:11:07 +0100 Subject: [PATCH 5/8] Add integration test --- .../test_find_template_values.py | 45 +++++++++++++++++++ catalystwan/utils/dict.py | 10 +++-- .../feature_template/find_template_values.py | 4 +- 3 files changed, 55 insertions(+), 4 deletions(-) create mode 100644 catalystwan/integration_tests/test_find_template_values.py diff --git a/catalystwan/integration_tests/test_find_template_values.py b/catalystwan/integration_tests/test_find_template_values.py new file mode 100644 index 00000000..c69e7600 --- /dev/null +++ b/catalystwan/integration_tests/test_find_template_values.py @@ -0,0 +1,45 @@ +import os +import unittest +from typing import Any, List, cast + +from catalystwan.session import create_manager_session +from catalystwan.utils.feature_template.find_template_values import find_template_values + + +class TestFindTemplateValues(unittest.TestCase): + def setUp(self) -> None: + self.session = create_manager_session( + url=cast(str, os.environ.get("TEST_VMANAGE_URL")), + port=cast(int, int(os.environ.get("TEST_VMANAGE_PORT"))), # type: ignore + username=cast(str, os.environ.get("TEST_VMANAGE_USERNAME")), + password=cast(str, os.environ.get("TEST_VMANAGE_PASSWORD")), + ) + self.templates = self.session.api.templates._get_feature_templates() + + def test_find_template_value(self): + for template in self.templates: + definition = template.template_definiton + with self.subTest(template_name=template.name): + parsed_values = find_template_values(definition) + self.assertFalse( + self.is_key_present(parsed_values, ["vipType", "vipValue", "vipVariableName", "vipObjectType"]) + ) + + def is_key_present(self, d: dict, keys: List[Any]): + """ + Checks if any key from keys is present within the dictionary d + """ + for key, value in d.items(): + if key in keys: + return True + if isinstance(value, dict): + if self.is_key_present(value, keys): + return True + if isinstance(value, list): + for v in value: + if self.is_key_present(v, keys): + return True + return False + + def tearDown(self) -> None: + self.session.close() diff --git a/catalystwan/utils/dict.py b/catalystwan/utils/dict.py index 4bcb3e9f..6b2a812b 100644 --- a/catalystwan/utils/dict.py +++ b/catalystwan/utils/dict.py @@ -1,6 +1,6 @@ # Copyright 2023 Cisco Systems, Inc. and its affiliates -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional from pydantic import BaseModel @@ -34,9 +34,13 @@ def flatten_dict(original_dict: Dict[str, Any]) -> Dict[str, List[FlattenedDictV def get_flattened_dict( original_dict: Dict[str, Any], - flattened_dict: Dict[str, List[FlattenedDictValue]] = {}, - path: List[str] = [], + flattened_dict: Optional[Dict[str, List[FlattenedDictValue]]] = None, + path: Optional[List[str]] = None, ): + if flattened_dict is None: + flattened_dict = {} + if path is None: + path = [] for key, value in original_dict.items(): if isinstance(value, dict): get_flattened_dict(value, flattened_dict, path=path + [key]) diff --git a/catalystwan/utils/feature_template/find_template_values.py b/catalystwan/utils/feature_template/find_template_values.py index b88667ca..16514a82 100644 --- a/catalystwan/utils/feature_template/find_template_values.py +++ b/catalystwan/utils/feature_template/find_template_values.py @@ -5,7 +5,7 @@ def find_template_values( template_definition: dict, - templated_values: dict = {}, + templated_values: Optional[dict] = None, target_key: str = "vipType", target_key_value_to_ignore: str = "ignore", target_key_for_template_value: str = "vipValue", @@ -29,6 +29,8 @@ def find_template_values( """ if path is None: path = [] + if templated_values is None: + templated_values = {} # if value object is reached, try to extract the value if target_key in template_definition: From 5b2a99e9b839b95a5ea02998507c437d7bb13468 Mon Sep 17 00:00:00 2001 From: PrzeG Date: Wed, 6 Mar 2024 11:09:47 +0100 Subject: [PATCH 6/8] find_template_values and Feature Template model adjustements --- .../api/templates/models/cisco_aaa_model.py | 2 +- .../models/cisco_secure_internet_gateway.py | 14 +++++----- .../api/templates/models/cisco_system.py | 18 ++++++------- .../models/cisco_vpn_interface_model.py | 2 +- .../api/templates/models/cisco_vpn_model.py | 26 +++++++++--------- .../test_find_template_values.py | 15 +++++++---- .../feature_template/find_template_values.py | 27 ++++++++++++++----- pyproject.toml | 1 + 8 files changed, 62 insertions(+), 43 deletions(-) diff --git a/catalystwan/api/templates/models/cisco_aaa_model.py b/catalystwan/api/templates/models/cisco_aaa_model.py index 0a794f24..18e73182 100644 --- a/catalystwan/api/templates/models/cisco_aaa_model.py +++ b/catalystwan/api/templates/models/cisco_aaa_model.py @@ -18,7 +18,7 @@ class User(FeatureTemplateValidator): class RadiusServer(FeatureTemplateValidator): - model_config = ConfigDict(populate_by_name=True) + model_config = ConfigDict(populate_by_name=True, coerce_numbers_to_str=True) address: str auth_port: int = Field(default=1812, json_schema_extra={"vmanage_key": "auth-port"}) diff --git a/catalystwan/api/templates/models/cisco_secure_internet_gateway.py b/catalystwan/api/templates/models/cisco_secure_internet_gateway.py index 4384148f..71b01362 100644 --- a/catalystwan/api/templates/models/cisco_secure_internet_gateway.py +++ b/catalystwan/api/templates/models/cisco_secure_internet_gateway.py @@ -81,9 +81,9 @@ class Interface(FeatureTemplateValidator): description: Optional[str] = None unnumbered: bool = True address: Optional[ipaddress.IPv4Interface] = None - tunnel_source: ipaddress.IPv4Address = Field(json_schema_extra={"vmanage_key": "tunnel-source"}) - tunnel_source_interface: str = Field(json_schema_extra={"vmanage_key": "tunnel-source-interface"}) - tunnel_route_via: str = Field(json_schema_extra={"vmanage_key": "tunnel-route-via"}) + tunnel_source: Optional[ipaddress.IPv4Address] = Field(default=None, json_schema_extra={"vmanage_key": "tunnel-source"}) + tunnel_source_interface: Optional[str] = Field(default=None, json_schema_extra={"vmanage_key": "tunnel-source-interface"}) + tunnel_route_via: Optional[str] = Field(default=None, json_schema_extra={"vmanage_key": "tunnel-route-via"}) tunnel_destination: str = Field(json_schema_extra={"vmanage_key": "tunnel-destination"}) application: Application = Application.SIG tunnel_set: TunnelSet = Field( @@ -157,7 +157,7 @@ class RefreshTimeUnit(str, Enum): class Service(FeatureTemplateValidator): svc_type: SvcType = Field(SvcType.SIG, json_schema_extra={"vmanage_key": "svc-type"}) - interface_pair: List[InterfacePair] = Field(json_schema_extra={"vmanage_key": "interface-pair"}) + interface_pair: List[InterfacePair] = Field(json_schema_extra={"data_path": ["ha-pairs"], "vmanage_key": "interface-pair"}) auth_required: Optional[bool] = Field(False, json_schema_extra={"vmanage_key": "auth-required"}) xff_forward_enabled: Optional[bool] = Field(False, json_schema_extra={"vmanage_key": "xff-forward-enabled"}) ofw_enabled: Optional[bool] = Field(False, json_schema_extra={"vmanage_key": "ofw-enabled"}) @@ -177,12 +177,12 @@ class Service(FeatureTemplateValidator): refresh_time_unit: Optional[RefreshTimeUnit] = Field( RefreshTimeUnit.MINUTE, json_schema_extra={"vmanage_key": "refresh-time-unit"} ) - enabled: Optional[bool] + enabled: Optional[bool] = None block_internet_until_accepted: Optional[bool] = Field( False, json_schema_extra={"vmanage_key": "block-internet-until-accepted"} ) force_ssl_inspection: Optional[bool] = Field(False, json_schema_extra={"vmanage_key": "force-ssl-inspection"}) - timeout: Optional[int] + timeout: Optional[int] = None data_center_primary: Optional[str] = Field("Auto", json_schema_extra={"vmanage_key": "data-center-primary"}) data_center_secondary: Optional[str] = Field("Auto", json_schema_extra={"vmanage_key": "data-center-secondary"}) model_config = ConfigDict(populate_by_name=True) @@ -208,7 +208,7 @@ class CiscoSecureInternetGatewayModel(FeatureTemplate): vpn_id: int = Field(DEFAULT_SIG_VPN_ID, json_schema_extra={"vmanage_key": "vpn-id"}) interface: List[Interface] service: List[Service] - tracker_src_ip: ipaddress.IPv4Interface = Field(json_schema_extra={"vmanage_key": "tracker-src-ip"}) + tracker_src_ip: Optional[ipaddress.IPv4Interface] = Field(default=None, json_schema_extra={"vmanage_key": "tracker-src-ip"}) tracker: Optional[List[Tracker]] = None payload_path: ClassVar[Path] = Path(__file__).parent / "DEPRECATED" diff --git a/catalystwan/api/templates/models/cisco_system.py b/catalystwan/api/templates/models/cisco_system.py index 0d2dd628..6532cc5c 100644 --- a/catalystwan/api/templates/models/cisco_system.py +++ b/catalystwan/api/templates/models/cisco_system.py @@ -54,15 +54,15 @@ class Type(str, Enum): class Tracker(FeatureTemplateValidator): name: str - endpoint_ip: str = Field(json_schema_extra={"vmanage_key": "endpoint-ip"}) - endpoint_ip_transport_port: str = Field( - json_schema_extra={"vmanage_key": "endpoint-ip", "data_path": ["endpoint-ip-transport-port"]} - ) - protocol: Protocol = Field(json_schema_extra={"data_path": ["endpoint-ip-transport-port"]}) - port: int = Field(json_schema_extra={"data_path": ["endpoint-ip-transport-port"]}) - endpoint_dns_name: str = Field(json_schema_extra={"vmanage_key": "endpoint-dns-name"}) - endpoint_api_url: str = Field(json_schema_extra={"vmanage_key": "endpoint-api-url"}) - elements: List[str] + endpoint_ip: Optional[str] = Field(default=None, json_schema_extra={"vmanage_key": "endpoint-ip"}) + endpoint_ip_transport_port: Optional[str] = Field( + default=None, json_schema_extra={"vmanage_key": "endpoint-ip", "data_path": ["endpoint-ip-transport-port"]} + ) + protocol: Optional[Protocol] = Field(default=None, json_schema_extra={"data_path": ["endpoint-ip-transport-port"]}) + port: Optional[int] = Field(default=None, json_schema_extra={"data_path": ["endpoint-ip-transport-port"]}) + endpoint_dns_name: Optional[str] = Field(default=None, json_schema_extra={"vmanage_key": "endpoint-dns-name"}) + endpoint_api_url: Optional[str] = Field(default=None, json_schema_extra={"vmanage_key": "endpoint-api-url"}) + elements: Optional[List[str]] = None boolean: Optional[Boolean] = Boolean.OR threshold: Optional[int] = 300 interval: Optional[int] = 60 diff --git a/catalystwan/api/templates/models/cisco_vpn_interface_model.py b/catalystwan/api/templates/models/cisco_vpn_interface_model.py index 09af6a56..66b82d12 100644 --- a/catalystwan/api/templates/models/cisco_vpn_interface_model.py +++ b/catalystwan/api/templates/models/cisco_vpn_interface_model.py @@ -238,7 +238,7 @@ class Ipv6Vrrp(FeatureTemplateValidator): class CiscoVpnInterfaceModel(FeatureTemplate): model_config = ConfigDict(arbitrary_types_allowed=True, populate_by_name=True) - if_name: str = Field(json_schema_extra={"vmanage_key": "if-name"}) + if_name: Optional[str] = Field(default=None, json_schema_extra={"vmanage_key": "if-name"}) interface_description: Optional[str] = Field(default=None, json_schema_extra={"vmanage_key": "description"}) poe: Optional[BoolStr] = None ipv4_address: Optional[str] = Field(default=None, json_schema_extra={"data_path": ["ip"], "vmanage_key": "address"}) diff --git a/catalystwan/api/templates/models/cisco_vpn_model.py b/catalystwan/api/templates/models/cisco_vpn_model.py index 04c5d66b..a07741d5 100644 --- a/catalystwan/api/templates/models/cisco_vpn_model.py +++ b/catalystwan/api/templates/models/cisco_vpn_model.py @@ -15,13 +15,13 @@ class Role(str, Enum): class Dns(FeatureTemplateValidator): - dns_addr: str = Field(json_schema_extra={"vmanage_key": "dns-addr"}) + dns_addr: Optional[str] = Field(default=None, json_schema_extra={"vmanage_key": "dns-addr"}) role: Role = Role.PRIMARY model_config = ConfigDict(populate_by_name=True) class DnsIpv6(FeatureTemplateValidator): - dns_addr: str = Field(json_schema_extra={"vmanage_key": "dns-addr"}) + dns_addr: Optional[str] = Field(default=None, json_schema_extra={"vmanage_key": "dns-addr"}) role: Optional[Role] = Role.PRIMARY model_config = ConfigDict(populate_by_name=True) @@ -45,8 +45,8 @@ class SvcType(str, Enum): class Service(FeatureTemplateValidator): svc_type: SvcType = Field(json_schema_extra={"vmanage_key": "svc-type"}) - address: List[str] - interface: str + address: Optional[List[str]] = None + interface: Optional[str] = None track_enable: bool = Field(True, json_schema_extra={"vmanage_key": "track-enable"}) model_config = ConfigDict(populate_by_name=True) @@ -67,18 +67,18 @@ class ServiceRoute(FeatureTemplateValidator): class NextHop(FeatureTemplateValidator): - address: str + address: Optional[str] = None distance: Optional[int] = 1 class NextHopWithTrack(FeatureTemplateValidator): - address: str + address: Optional[str] = None distance: Optional[int] = 1 tracker: str class Routev4(FeatureTemplateValidator): - prefix: str + prefix: Optional[str] = None next_hop: Optional[List[NextHop]] = Field( default=None, json_schema_extra={"vmanage_key": "next-hop", "priority_order": ["address", "distance"]} ) @@ -217,9 +217,9 @@ class Overload(str, Enum): class Natpool(FeatureTemplateValidator): name: int - prefix_length: int = Field(json_schema_extra={"vmanage_key": "prefix-length"}) - range_start: str = Field(json_schema_extra={"vmanage_key": "range-start"}) - range_end: str = Field(json_schema_extra={"vmanage_key": "range-end"}) + prefix_length: Optional[int] = Field(default=None, json_schema_extra={"vmanage_key": "prefix-length"}) + range_start: str = Field(default=None, json_schema_extra={"vmanage_key": "range-start"}) + range_end: Optional[str] = Field(default=None, json_schema_extra={"vmanage_key": "range-end"}) overload: Overload = Overload.TRUE direction: Direction tracker_id: Optional[int] = Field(default=None, json_schema_extra={"vmanage_key": "tracker-id"}) @@ -232,9 +232,9 @@ class StaticNatDirection(str, Enum): class Static(FeatureTemplateValidator): - pool_name: Optional[int] = Field(json_schema_extra={"vmanage_key": "pool-name"}) - source_ip: str = Field(json_schema_extra={"vmanage_key": "source-ip"}) - translate_ip: str = Field(json_schema_extra={"vmanage_key": "translate-ip"}) + pool_name: Optional[int] = Field(default=None, json_schema_extra={"vmanage_key": "pool-name"}) + source_ip: Optional[str] = Field(default=None, json_schema_extra={"vmanage_key": "source-ip"}) + translate_ip: Optional[str] = Field(default=None, json_schema_extra={"vmanage_key": "translate-ip"}) static_nat_direction: StaticNatDirection = Field(json_schema_extra={"vmanage_key": "static-nat-direction"}) tracker_id: Optional[int] = Field(default=None, json_schema_extra={"vmanage_key": "tracker-id"}) model_config = ConfigDict(populate_by_name=True) diff --git a/catalystwan/integration_tests/test_find_template_values.py b/catalystwan/integration_tests/test_find_template_values.py index c69e7600..ead2692d 100644 --- a/catalystwan/integration_tests/test_find_template_values.py +++ b/catalystwan/integration_tests/test_find_template_values.py @@ -1,6 +1,10 @@ import os import unittest from typing import Any, List, cast +import json + +from pydantic import ValidationError +from catalystwan.exceptions import TemplateTypeError from catalystwan.session import create_manager_session from catalystwan.utils.feature_template.find_template_values import find_template_values @@ -14,11 +18,11 @@ def setUp(self) -> None: username=cast(str, os.environ.get("TEST_VMANAGE_USERNAME")), password=cast(str, os.environ.get("TEST_VMANAGE_PASSWORD")), ) - self.templates = self.session.api.templates._get_feature_templates() - + self.templates = self.session.api.templates._get_feature_templates(summary=False) + def test_find_template_value(self): for template in self.templates: - definition = template.template_definiton + definition = json.loads(template.template_definiton) with self.subTest(template_name=template.name): parsed_values = find_template_values(definition) self.assertFalse( @@ -37,8 +41,9 @@ def is_key_present(self, d: dict, keys: List[Any]): return True if isinstance(value, list): for v in value: - if self.is_key_present(v, keys): - return True + if isinstance(v, dict): + if self.is_key_present(v, keys): + return True return False def tearDown(self) -> None: diff --git a/catalystwan/utils/feature_template/find_template_values.py b/catalystwan/utils/feature_template/find_template_values.py index 16514a82..d281859e 100644 --- a/catalystwan/utils/feature_template/find_template_values.py +++ b/catalystwan/utils/feature_template/find_template_values.py @@ -31,7 +31,6 @@ def find_template_values( path = [] if templated_values is None: templated_values = {} - # if value object is reached, try to extract the value if target_key in template_definition: if template_definition[target_key] == target_key_value_to_ignore: @@ -42,8 +41,23 @@ def find_template_values( field_key = path[-1] # TODO: Handle nested DeviceVariable - if value == "variableName" and (device_specific_variables is not None): - device_specific_variables[field_key] = DeviceVariable(name=template_definition["vipVariableName"]) + if value == "variableName": + if device_specific_variables is not None: + device_specific_variables[field_key] = DeviceVariable(name=template_definition["vipVariableName"]) + elif template_definition["vipType"] == "variable": + if device_specific_variables is not None and template_value: + device_specific_variables[field_key] = DeviceVariable(name=template_value) + elif template_definition["vipObjectType"] == "list": + current_nesting = get_nested_dict(templated_values, path[:-1]) + current_nesting[field_key] = [] + for item in template_value: + if isinstance(item, dict): + if target_key in item: + current_nesting[field_key].append(item[target_key_for_template_value]) + else: + current_nesting[field_key].append(find_template_values(item, device_specific_variables)) + else: + current_nesting[field_key].append(item) elif template_definition["vipObjectType"] != "tree": current_nesting = get_nested_dict(templated_values, path[:-1]) current_nesting[field_key] = template_value @@ -55,10 +69,9 @@ def find_template_values( current_nesting = get_nested_dict(templated_values, path[:-1]) current_nesting[field_key] = [] for item in template_value: - current_nesting[field_key].append( - find_template_values(item, {}, device_specific_variables=device_specific_variables) - ) - + item_value = find_template_values(item, {}, device_specific_variables=device_specific_variables) + if item_value: + current_nesting[field_key].append(item_value) return templated_values # iterate the dict to extract values and assign them to their fields diff --git a/pyproject.toml b/pyproject.toml index 9e4da60f..adb2341e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,7 @@ typing-extensions = "^4.6.1" parameterized = "^0.8.1" pytest = "^7.1.2" pytest-mock = "^3.7.0" +pytest-subtests = "^0.11.0" isort = "^5.10.1" pre-commit = "^2.19.0" mypy = "^1.0.0" From 780fd8e012a69eb215200817e9d649fb2e97a808 Mon Sep 17 00:00:00 2001 From: PrzeG Date: Wed, 6 Mar 2024 12:51:06 +0100 Subject: [PATCH 7/8] Pre-commit changes --- .../models/cisco_secure_internet_gateway.py | 16 ++++++++++++---- .../test_find_template_values.py | 7 ++----- .../feature_template/find_template_values.py | 2 +- 3 files changed, 15 insertions(+), 10 deletions(-) diff --git a/catalystwan/api/templates/models/cisco_secure_internet_gateway.py b/catalystwan/api/templates/models/cisco_secure_internet_gateway.py index 71b01362..b7bf6e20 100644 --- a/catalystwan/api/templates/models/cisco_secure_internet_gateway.py +++ b/catalystwan/api/templates/models/cisco_secure_internet_gateway.py @@ -81,8 +81,12 @@ class Interface(FeatureTemplateValidator): description: Optional[str] = None unnumbered: bool = True address: Optional[ipaddress.IPv4Interface] = None - tunnel_source: Optional[ipaddress.IPv4Address] = Field(default=None, json_schema_extra={"vmanage_key": "tunnel-source"}) - tunnel_source_interface: Optional[str] = Field(default=None, json_schema_extra={"vmanage_key": "tunnel-source-interface"}) + tunnel_source: Optional[ipaddress.IPv4Address] = Field( + default=None, json_schema_extra={"vmanage_key": "tunnel-source"} + ) + tunnel_source_interface: Optional[str] = Field( + default=None, json_schema_extra={"vmanage_key": "tunnel-source-interface"} + ) tunnel_route_via: Optional[str] = Field(default=None, json_schema_extra={"vmanage_key": "tunnel-route-via"}) tunnel_destination: str = Field(json_schema_extra={"vmanage_key": "tunnel-destination"}) application: Application = Application.SIG @@ -157,7 +161,9 @@ class RefreshTimeUnit(str, Enum): class Service(FeatureTemplateValidator): svc_type: SvcType = Field(SvcType.SIG, json_schema_extra={"vmanage_key": "svc-type"}) - interface_pair: List[InterfacePair] = Field(json_schema_extra={"data_path": ["ha-pairs"], "vmanage_key": "interface-pair"}) + interface_pair: List[InterfacePair] = Field( + json_schema_extra={"data_path": ["ha-pairs"], "vmanage_key": "interface-pair"} + ) auth_required: Optional[bool] = Field(False, json_schema_extra={"vmanage_key": "auth-required"}) xff_forward_enabled: Optional[bool] = Field(False, json_schema_extra={"vmanage_key": "xff-forward-enabled"}) ofw_enabled: Optional[bool] = Field(False, json_schema_extra={"vmanage_key": "ofw-enabled"}) @@ -208,7 +214,9 @@ class CiscoSecureInternetGatewayModel(FeatureTemplate): vpn_id: int = Field(DEFAULT_SIG_VPN_ID, json_schema_extra={"vmanage_key": "vpn-id"}) interface: List[Interface] service: List[Service] - tracker_src_ip: Optional[ipaddress.IPv4Interface] = Field(default=None, json_schema_extra={"vmanage_key": "tracker-src-ip"}) + tracker_src_ip: Optional[ipaddress.IPv4Interface] = Field( + default=None, json_schema_extra={"vmanage_key": "tracker-src-ip"} + ) tracker: Optional[List[Tracker]] = None payload_path: ClassVar[Path] = Path(__file__).parent / "DEPRECATED" diff --git a/catalystwan/integration_tests/test_find_template_values.py b/catalystwan/integration_tests/test_find_template_values.py index ead2692d..b650ecd1 100644 --- a/catalystwan/integration_tests/test_find_template_values.py +++ b/catalystwan/integration_tests/test_find_template_values.py @@ -1,10 +1,7 @@ +import json import os import unittest from typing import Any, List, cast -import json - -from pydantic import ValidationError -from catalystwan.exceptions import TemplateTypeError from catalystwan.session import create_manager_session from catalystwan.utils.feature_template.find_template_values import find_template_values @@ -19,7 +16,7 @@ def setUp(self) -> None: password=cast(str, os.environ.get("TEST_VMANAGE_PASSWORD")), ) self.templates = self.session.api.templates._get_feature_templates(summary=False) - + def test_find_template_value(self): for template in self.templates: definition = json.loads(template.template_definiton) diff --git a/catalystwan/utils/feature_template/find_template_values.py b/catalystwan/utils/feature_template/find_template_values.py index d281859e..7df7bea6 100644 --- a/catalystwan/utils/feature_template/find_template_values.py +++ b/catalystwan/utils/feature_template/find_template_values.py @@ -57,7 +57,7 @@ def find_template_values( else: current_nesting[field_key].append(find_template_values(item, device_specific_variables)) else: - current_nesting[field_key].append(item) + current_nesting[field_key].append(item) elif template_definition["vipObjectType"] != "tree": current_nesting = get_nested_dict(templated_values, path[:-1]) current_nesting[field_key] = template_value From 062d3d947f61c3937fb33ab2ce6cd65835b8abf2 Mon Sep 17 00:00:00 2001 From: PrzeG Date: Wed, 6 Mar 2024 14:33:05 +0100 Subject: [PATCH 8/8] Add support for deeper list nesting --- .../feature_template/find_template_values.py | 28 +++++++++++++------ 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/catalystwan/utils/feature_template/find_template_values.py b/catalystwan/utils/feature_template/find_template_values.py index 7df7bea6..51b39c31 100644 --- a/catalystwan/utils/feature_template/find_template_values.py +++ b/catalystwan/utils/feature_template/find_template_values.py @@ -1,4 +1,4 @@ -from typing import Dict, List, Optional, Union +from typing import Any, Dict, List, Optional, Union from catalystwan.api.templates.device_variable import DeviceVariable @@ -51,13 +51,7 @@ def find_template_values( current_nesting = get_nested_dict(templated_values, path[:-1]) current_nesting[field_key] = [] for item in template_value: - if isinstance(item, dict): - if target_key in item: - current_nesting[field_key].append(item[target_key_for_template_value]) - else: - current_nesting[field_key].append(find_template_values(item, device_specific_variables)) - else: - current_nesting[field_key].append(item) + current_nesting[field_key].append(process_list_value(item)) elif template_definition["vipObjectType"] != "tree": current_nesting = get_nested_dict(templated_values, path[:-1]) current_nesting[field_key] = template_value @@ -90,3 +84,21 @@ def get_nested_dict(d: dict, path: List[str], populate: bool = True): current_dict[path_key] = {} current_dict = current_dict[path_key] return current_dict + + +def process_list_value(item: Any, target_key: str = "vipType", target_key_for_template_value: str = "vipValue"): + if isinstance(item, dict): + if target_key in item: + if item["vipObjectType"] == "list": + result = [] + for nested_item in item[target_key_for_template_value]: + result.append(process_list_value(nested_item)) + return result + elif item["vipObjectType"] == "tree": + return find_template_values(item[target_key_for_template_value]) + else: + return item[target_key_for_template_value] + else: + return find_template_values(item) + else: + return item