diff --git a/catalystwan/api/configuration_groups/parcel.py b/catalystwan/api/configuration_groups/parcel.py index a48a2c6e6..792088226 100644 --- a/catalystwan/api/configuration_groups/parcel.py +++ b/catalystwan/api/configuration_groups/parcel.py @@ -21,10 +21,10 @@ class _ParcelBase(BaseModel): validation_alias="description", description="Set the parcel description", ) - data: Optional[Any] = None + # data: Optional[Any] = None _parcel_data_key: str = PrivateAttr(default="data") - @model_serializer(mode="wrap") + @model_serializer(mode="wrap", when_used="json") def envelope_parcel_data(self, handler) -> Dict[str, Any]: model_dict = handler(self) model_dict[self._parcel_data_key] = {} @@ -49,7 +49,7 @@ class OptionType(str, Enum): class ParcelAttribute(BaseModel): - model_config = ConfigDict(extra="forbid") + model_config = ConfigDict(extra="forbid", populate_by_name=True) option_type: OptionType = Field(serialization_alias="optionType", validation_alias="optionType") diff --git a/catalystwan/api/policy_api.py b/catalystwan/api/policy_api.py index bb73b2931..ec32519bb 100644 --- a/catalystwan/api/policy_api.py +++ b/catalystwan/api/policy_api.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Type, overload +from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Tuple, Type, overload from uuid import UUID from catalystwan.api.task_status_api import Task @@ -637,6 +637,12 @@ def get(self, type: Type[AnyPolicyList], id: Optional[UUID] = None) -> Any: return endpoints.get_lists_by_id(id=id) return endpoints.get_policy_lists() + def get_all(self) -> List[AnyPolicyList]: + infos: List[AnyPolicyList] = [] + for list_type, _ in POLICY_LIST_ENDPOINTS_MAP.items(): + infos.extend(self.get(list_type)) + return infos + class PolicyDefinitionsAPI: def __init__(self, session: ManagerSession): @@ -780,6 +786,12 @@ def get(self, type: Type[AnyPolicyDefinition], id: Optional[UUID] = None) -> Any return endpoints.get_policy_definition(id=id) return endpoints.get_definitions() + def get_all(self) -> List[Tuple[type, PolicyDefinitionInfo]]: + all_items: List[Tuple[type, PolicyDefinitionInfo]] = [] + for definition_type, _ in POLICY_DEFINITION_ENDPOINTS_MAP.items(): + all_items.extend([(definition_type, info) for info in self.get(definition_type)]) + return all_items + class PolicyAPI: """This is exposing so called 'UX 1.0' API""" diff --git a/catalystwan/api/template_api.py b/catalystwan/api/template_api.py index 4fa20e0ea..0517977dd 100644 --- a/catalystwan/api/template_api.py +++ b/catalystwan/api/template_api.py @@ -1,11 +1,13 @@ from __future__ import annotations +import datetime as dt import json import logging from enum import Enum -from typing import TYPE_CHECKING, Any, Optional, Type, overload +from typing import TYPE_CHECKING, Any, List, Optional, Type, overload from ciscoconfparse import CiscoConfParse # type: ignore +from pydantic import BaseModel, ConfigDict, Field from catalystwan.api.task_status_api import Task from catalystwan.api.templates.cli_template import CLITemplate @@ -67,6 +69,41 @@ class DeviceTemplateFeature(Enum): ALL = "all" +class TemplateInformation(BaseModel): + model_config = ConfigDict(populate_by_name=True) + + last_updated_by: str = Field(serialization_alias="lastUpdatedBy", validation_alias="lastUpdatedBy") + id: str = Field(serialization_alias="templateId", validation_alias="templateId") + factory_default: bool = Field(serialization_alias="factoryDefault", validation_alias="factoryDefault") + name: str = Field(serialization_alias="templateName", validation_alias="templateName") + devices_attached: int = Field(serialization_alias="devicesAttached", validation_alias="devicesAttached") + description: str = Field(serialization_alias="templateDescription", validation_alias="templateDescription") + last_updated_on: dt.datetime = Field(serialization_alias="lastUpdatedOn", validation_alias="lastUpdatedOn") + resource_group: Optional[str] = Field(None, serialization_alias="resourceGroup", validation_alias="resourceGroup") + + +class FeatureTemplateInformation(TemplateInformation): + model_config = ConfigDict(populate_by_name=True) + + template_type: str = Field(serialization_alias="templateType", validation_alias="templateType") + device_type: List[str] = Field(serialization_alias="deviceType", validation_alias="deviceType") + version: str = Field(serialization_alias="templateMinVersion", validation_alias="templateMinVersion") + template_definiton: Optional[str] = Field( + None, serialization_alias="templateDefinition", validation_alias="templateDefinition" + ) + + +class DeviceTemplateInformation(TemplateInformation): + model_config = ConfigDict(populate_by_name=True) + + device_type: str = Field(serialization_alias="deviceType", validation_alias="deviceType") + template_class: str = Field(serialization_alias="templateClass", validation_alias="templateClass") + config_type: str = Field(serialization_alias="configType", validation_alias="configType") + template_attached: int = Field(serialization_alias="templateAttached", validation_alias="templateAttached") + draft_mode: Optional[str] = Field(None, serialization_alias="draftMode", validation_alias="draftMode") + device_role: Optional[str] = Field(None, serialization_alias="deviceRole", validation_alias="deviceRole") + + class TemplatesAPI: def __init__(self, session: ManagerSession) -> None: self.session = session @@ -691,3 +728,14 @@ def load_running(self, device: Device) -> CiscoConfParse: config = CiscoConfParse(response["config"].splitlines()) logger.debug(f"Template loaded from {device.hostname}.") return config + + def get_feature_templates(self) -> DataSequence[FeatureTemplateInformation]: + endpoint = "/dataservice/template/feature" + fr_templates = self.session.get(endpoint) + return fr_templates.dataseq(FeatureTemplateInformation) + + def get_device_templates(self) -> DataSequence[DeviceTemplateInformation]: + endpoint = "/dataservice/template/device" + params = {"feature": "all"} + templates = self.session.get(url=endpoint, params=params) + return templates.dataseq(DeviceTemplateInformation) diff --git a/catalystwan/endpoints/configuration/feature_profile/sdwan/system.py b/catalystwan/endpoints/configuration/feature_profile/sdwan/system.py index 3ca5c33f4..ed788b40f 100644 --- a/catalystwan/endpoints/configuration/feature_profile/sdwan/system.py +++ b/catalystwan/endpoints/configuration/feature_profile/sdwan/system.py @@ -44,6 +44,11 @@ def delete_sdwan_system_feature_profile(self, system_id: str) -> None: def create_aaa_profile_parcel_for_system(self, system_id: str, payload: _ParcelBase) -> ParcelId: ... + @versions(supported_versions=(">=20.9"), raises=False) + @post("/v1/feature-profile/sdwan/system/{system_id}/bfd") + def create_bfd_profile_parcel_for_system(self, system_id: str, payload: _ParcelBase) -> ParcelId: + ... + @versions(supported_versions=(">=20.9"), raises=False) @put("/v1/feature-profile/sdwan/system/{system_id}/aaa/{parcel_id}") def edit_aaa_profile_parcel_for_system(self, system_id: str, parcel_id: str, payload: _ParcelBase) -> ParcelId: diff --git a/catalystwan/endpoints/configuration_group.py b/catalystwan/endpoints/configuration_group.py index 74c2629b7..75412b16c 100644 --- a/catalystwan/endpoints/configuration_group.py +++ b/catalystwan/endpoints/configuration_group.py @@ -2,7 +2,7 @@ from datetime import datetime from typing import List, Optional -from pydantic.v1 import BaseModel, Field +from pydantic import BaseModel, Field from catalystwan.endpoints import APIEndpoints, delete, get, post, put, versions from catalystwan.models.configuration.common import Solution diff --git a/catalystwan/models/configuration/config_migration.py b/catalystwan/models/configuration/config_migration.py index 1d94fb157..d29db50a2 100644 --- a/catalystwan/models/configuration/config_migration.py +++ b/catalystwan/models/configuration/config_migration.py @@ -1,7 +1,11 @@ from typing import List -from pydantic import BaseModel, Field +from pydantic import BaseModel, ConfigDict, Field +from catalystwan.api.configuration_groups.parcel import _ParcelBase +from catalystwan.api.template_api import DeviceTemplateInformation, FeatureTemplateInformation +from catalystwan.endpoints.configuration_group import ConfigGroup +from catalystwan.models.configuration.feature_profile.common import FeatureProfileCreationPayload from catalystwan.models.policy import ( AnyPolicyDefinition, AnyPolicyList, @@ -12,23 +16,48 @@ class UX1Policies(BaseModel): - centralized_policies: List[CentralizedPolicy] = Field(default=[], serialization_alias="centralizedPolicies") - localized_policies: List[LocalizedPolicy] = Field(default=[], serialization_alias="localizedPolicies") - security_policies: List[SecurityPolicy] = Field(default=[], serialization_alias="securityPolicies") - policy_definitions: List[AnyPolicyDefinition] = Field(default=[], serialization_alias="policyDefinitions") - policy_lists: List[AnyPolicyList] = Field(default=[], serialization_alias="policyLists") + model_config = ConfigDict(populate_by_name=True) + centralized_policies: List[CentralizedPolicy] = Field( + default=[], serialization_alias="centralizedPolicies", validation_alias="centralizedPolicies" + ) + localized_policies: List[LocalizedPolicy] = Field( + default=[], serialization_alias="localizedPolicies", validation_alias="localizedPolicies" + ) + security_policies: List[SecurityPolicy] = Field( + default=[], serialization_alias="securityPolicies", validation_alias="securityPolicies" + ) + policy_definitions: List[AnyPolicyDefinition] = Field( + default=[], serialization_alias="policyDefinitions", validation_alias="policyDefinitions" + ) + policy_lists: List[AnyPolicyList] = Field( + default=[], serialization_alias="policyLists", validation_alias="policyLists" + ) class UX1Templates(BaseModel): - pass + features: List[FeatureTemplateInformation] = Field(default=[]) + devices: List[DeviceTemplateInformation] = Field(default=[]) class UX1Config(BaseModel): # All UX1 Configuration items - Mega Model - policies: UX1Policies - templates: UX1Templates + model_config = ConfigDict(populate_by_name=True) + policies: UX1Policies = UX1Policies() + templates: UX1Templates = UX1Templates() class UX2Config(BaseModel): # All UX2 Configuration items - Mega Model - pass + model_config = ConfigDict(populate_by_name=True) + config_groups: List[ConfigGroup] = Field( + default=[], serialization_alias="configurationGroups", validation_alias="configurationGroups" + ) + policy_groups: List[ConfigGroup] = Field( + default=[], serialization_alias="policyGroups", validation_alias="policyGroups" + ) + feature_profiles: List[FeatureProfileCreationPayload] = Field( + default=[], serialization_alias="featureProfiles", validation_alias="featureProfiles" + ) + profile_parcels: List[_ParcelBase] = Field( + default=[], serialization_alias="profileParcels", validation_alias="profileParcels" + ) diff --git a/catalystwan/models/configuration/feature_profile/converters/feature_template/__init__.py b/catalystwan/models/configuration/feature_profile/converters/feature_template/__init__.py new file mode 100644 index 000000000..c241269e4 --- /dev/null +++ b/catalystwan/models/configuration/feature_profile/converters/feature_template/__init__.py @@ -0,0 +1,58 @@ +from typing import Any, Dict, List + +from catalystwan.api.template_api import FeatureTemplateInformation +from catalystwan.models.configuration.feature_profile.sdwan.system import AnySystemParcel + +from .aaa import AAATemplateConverter +from .base import FeatureTemplateConverter +from .bfd import BFDTemplateConverter +from .normalizator import template_definition_normalization + +supported_parcel_converters: Dict[Any, FeatureTemplateConverter] = { + ("cisco_aaa", "cedge_aaa"): AAATemplateConverter, # type: ignore[dict-item] + ("cisco_bfd",): BFDTemplateConverter, # type: ignore[dict-item] +} + + +def choose_parcel_converter(template_type: str) -> FeatureTemplateConverter: + """ + This function is used to choose the correct parcel factory based on the template type. + + Args: + template_type (str): The template type used to determine the correct factory. + + Returns: + BaseFactory: The chosen parcel factory. + + Raises: + ValueError: If the template type is not supported. + """ + for key in supported_parcel_converters.keys(): + if template_type in key: + return supported_parcel_converters[key] + raise ValueError(f"Template type {template_type} not supported") + + +def create_parcel_from_template(template: FeatureTemplateInformation) -> AnySystemParcel: + """ + Creates a new instance of a _ParcelBase based on the given template. + + Args: + template (FeatureTemplateInformation): The template to use for creating the _ParcelBase instance. + + Returns: + _ParcelBase: The created _ParcelBase instance. + + Raises: + ValueError: If the given template type is not supported. + """ + converter = choose_parcel_converter(template.template_type) + template_values = template_definition_normalization(template.template_definiton) + return converter.create_parcel(template.name, template.description, template_values) + + +__all__ = ["create_parcel_from_template"] + + +def __dir__() -> "List[str]": + return list(__all__) diff --git a/catalystwan/models/configuration/feature_profile/converters/feature_template/aaa.py b/catalystwan/models/configuration/feature_profile/converters/feature_template/aaa.py new file mode 100644 index 000000000..cf25340a1 --- /dev/null +++ b/catalystwan/models/configuration/feature_profile/converters/feature_template/aaa.py @@ -0,0 +1,30 @@ +from catalystwan.models.configuration.feature_profile.sdwan.system import AAA + + +class AAATemplateConverter: + @staticmethod + def create_parcel(name: str, description: str, template_values: dict) -> AAA: + """ + Creates an AAA object based on the provided template values. + + Returns: + AAA: An AAA object with the provided template values. + """ + template_values["name"] = name + template_values["description"] = description + + delete_properties = ( + "radius_client", + "radius_trustsec_group", + "rda_server_key", + "domain_stripping", + "auth_type", + "port", + "cts_auth_list", + ) + + for prop in delete_properties: + if template_values.get(prop) is not None: + del template_values[prop] + + return AAA(**template_values) diff --git a/catalystwan/models/configuration/feature_profile/converters/feature_template/base.py b/catalystwan/models/configuration/feature_profile/converters/feature_template/base.py new file mode 100644 index 000000000..c3144e720 --- /dev/null +++ b/catalystwan/models/configuration/feature_profile/converters/feature_template/base.py @@ -0,0 +1,9 @@ +from typing_extensions import Protocol + +from catalystwan.models.configuration.feature_profile.sdwan.system import AnySystemParcel + + +class FeatureTemplateConverter(Protocol): + @staticmethod + def create_parcel(name: str, description: str, template_values: dict) -> AnySystemParcel: + ... diff --git a/catalystwan/models/configuration/feature_profile/converters/feature_template/bfd.py b/catalystwan/models/configuration/feature_profile/converters/feature_template/bfd.py new file mode 100644 index 000000000..d4c0bc6a1 --- /dev/null +++ b/catalystwan/models/configuration/feature_profile/converters/feature_template/bfd.py @@ -0,0 +1,20 @@ +from catalystwan.models.configuration.feature_profile.sdwan.system import BFD + + +class BFDTemplateConverter: + @staticmethod + def create_parcel(name: str, description: str, template_values: dict) -> BFD: + """ + Creates an BFD object based on the provided template values. + + Returns: + BFD: An BFD object with the provided template values. + """ + template_values["name"] = name + template_values["description"] = description + + if template_values.get("color") is not None: + template_values["colors"] = template_values["color"] + del template_values["color"] + + return BFD(**template_values) diff --git a/catalystwan/models/configuration/feature_profile/converters/feature_template/logging.py b/catalystwan/models/configuration/feature_profile/converters/feature_template/logging.py new file mode 100644 index 000000000..4eaf95ffc --- /dev/null +++ b/catalystwan/models/configuration/feature_profile/converters/feature_template/logging.py @@ -0,0 +1,27 @@ +# from catalystwan.models.configuration.feature_profile.sdwan.system import Logging + +# class LoggingTemplateConverter: + +# @staticmethod +# def create_parcel(name, description, template_values: dict): +# """ +# Creates an Logging object based on the provided template values. + +# Returns: +# Logging: An Logging object with the provided template values. +# """ +# template_values["name"] = name +# template_values["description"] = description + +# template_values["disk"] = { +# "disk_enable": template_values["enable"], +# "file": { +# "disk_file_size": template_values["size"], +# "disk_file_rotate": template_values["rotate"] +# } +# } +# del template_values["enable"] +# del template_values["size"] +# del template_values["rotate"] + +# return Logging(**template_values) diff --git a/catalystwan/models/configuration/feature_profile/converters/feature_template/normalizator.py b/catalystwan/models/configuration/feature_profile/converters/feature_template/normalizator.py new file mode 100644 index 000000000..21e3bf734 --- /dev/null +++ b/catalystwan/models/configuration/feature_profile/converters/feature_template/normalizator.py @@ -0,0 +1,80 @@ +import json +from typing import cast + +from catalystwan.api.configuration_groups.parcel import as_global +from catalystwan.utils.feature_template import find_template_values + + +def template_definition_normalization(template_definition): + """ + Normalizes a template definition by changing keys to snake_case and casting all leafs values to global types. + + Args: + template_definition (str): The template definition in JSON format. + + Returns: + dict: The normalized template values. + + """ + + def to_snake_case(s: str): + """ + Converts a string from kebab-case to snake_case. + + Args: + s (str): The string to be converted. + + Returns: + str: The converted string. + + """ + if "-" in s: + temp = s.split("-") + return "_".join(ele for ele in temp) + return s + + def transform_dict(d): + """ + Transforms a nested dictionary into a normalized form. + + Args: + d (dict): The nested dictionary to be transformed. + + Returns: + dict: The transformed dictionary. + + """ + if isinstance(d, list): + return [transform_dict(i) if isinstance(i, (dict, list)) else i for i in d] + return {to_snake_case(a): transform_dict(b) if isinstance(b, (dict, list)) else b for a, b in d.items()} + + def cast_leafs_to_global(node: dict): + """ + Recursively casts all leaf values in a nested dictionary or list to the global configuration type. + + Args: + node (dict): The nested dictionary or list to be processed. + + Returns: + None + + """ + for key, item in node.items(): + if isinstance(item, dict): + cast_leafs_to_global(item) + elif isinstance(item, list): + for i in item: + if isinstance(i, dict): + cast_leafs_to_global(i) + else: + node[key] = as_global(item) + + template_definition_as_dict = json.loads(cast(str, template_definition)) + + template_values = find_template_values(template_definition_as_dict) + + template_values = transform_dict(template_values) + + cast_leafs_to_global(template_values) + + return template_values diff --git a/catalystwan/models/configuration/feature_profile/converters/recast.py b/catalystwan/models/configuration/feature_profile/converters/recast.py new file mode 100644 index 000000000..8ccc7b0c0 --- /dev/null +++ b/catalystwan/models/configuration/feature_profile/converters/recast.py @@ -0,0 +1,59 @@ +from ipaddress import AddressValueError, IPv4Address, IPv6Address +from typing import List, Union + +from pydantic import BeforeValidator +from typing_extensions import Annotated + +from catalystwan.api.configuration_groups.parcel import Global, Variable +from catalystwan.models.common import TLOCColor + + +def recast_as_global_bool(global_: Global[str]): + value = global_.value + if value == "true": + return Global[bool](value=True) + elif value == "false": + return Global[bool](value=False) + + +def recast_as_global_list_str(global_: Global[str]): + value = global_.value + return Global[List[str]](value=[v for v in value.split(",")]) + + +def recast_as_global_ipv6_ipv4(global_: Global[str]): + value = global_.value + try: + return Global[IPv4Address](value=IPv4Address(value)) + except AddressValueError: + pass + try: + return Global[IPv6Address](value=IPv6Address(value)) + except AddressValueError: + pass + return value + + +def recast_as_global_str(global_: Global[int]): + value = global_.value + return Global[str](value=str(value)) + + +def recast_as_variable(global_: Global[str]): + value = global_.value + return Variable(value=value) + + +def recast_as_global_color_literal(global_: Global[str]): + value = global_.value + return Global[TLOCColor](value=value) # type: ignore[arg-type] + + +DefaultGlobalBool = Annotated[Global[bool], BeforeValidator(recast_as_global_bool)] +DefaultGlobalList = Annotated[Global[List[str]], BeforeValidator(recast_as_global_list_str)] +DefaultGlobalIPAddress = Annotated[ + Union[Global[IPv4Address], Global[IPv6Address]], BeforeValidator(recast_as_global_ipv6_ipv4) +] +DefaultGlobalStr = Annotated[Global[str], BeforeValidator(recast_as_global_str)] +DefaultVariableStr = Annotated[Variable, BeforeValidator(recast_as_variable)] +DefaultGlobalColorLiteral = Annotated[Global[TLOCColor], BeforeValidator(recast_as_global_color_literal)] diff --git a/catalystwan/models/configuration/feature_profile/sdwan/policy_object/policy/data_prefix.py b/catalystwan/models/configuration/feature_profile/sdwan/policy_object/policy/data_prefix.py index b4d7721f7..b549a3316 100644 --- a/catalystwan/models/configuration/feature_profile/sdwan/policy_object/policy/data_prefix.py +++ b/catalystwan/models/configuration/feature_profile/sdwan/policy_object/policy/data_prefix.py @@ -11,14 +11,16 @@ class DataPrefixEntry(BaseModel): ipv4_address: Global[IPv4Address] = Field(serialization_alias="ipv4Address", validation_alias="ipv4Address") ipv4_prefix_length: Global[int] = Field(serialization_alias="ipv4PrefixLength", validation_alias="ipv4PrefixLength") + @staticmethod + def from_ipv4_network(ipv4_network: IPv4Network) -> "DataPrefixEntry": + return DataPrefixEntry( + ipv4_address=as_global(ipv4_network.network_address), + ipv4_prefix_length=as_global(ipv4_network.prefixlen), + ) + class DataPrefixParcel(_ParcelBase): entries: List[DataPrefixEntry] = Field(default_factory=list, validation_alias=AliasPath("data", "entries")) def add_data_prefix(self, ipv4_network: IPv4Network): - self.entries.append( - DataPrefixEntry( - ipv4_address=as_global(ipv4_network.network_address), - ipv4_prefix_length=as_global(ipv4_network.prefixlen), - ) - ) + self.entries.append(DataPrefixEntry.from_ipv4_network(ipv4_network)) diff --git a/catalystwan/models/configuration/feature_profile/sdwan/system/__init__.py b/catalystwan/models/configuration/feature_profile/sdwan/system/__init__.py new file mode 100644 index 000000000..f06f24ee0 --- /dev/null +++ b/catalystwan/models/configuration/feature_profile/sdwan/system/__init__.py @@ -0,0 +1,17 @@ +from typing import List, Mapping, Union + +from .aaa import AAA +from .bfd import BFD + +SYSTEM_PAYLOAD_ENDPOINT_MAPPING: Mapping[type, str] = { + AAA: "aaa", + BFD: "bfd", +} + +AnySystemParcel = Union[AAA, BFD] + +__all__ = ["AAA", "BFD", "AnySystemParcel", "SYSTEM_PAYLOAD_ENDPOINT_MAPPING"] + + +def __dir__() -> "List[str]": + return list(__all__) diff --git a/catalystwan/models/configuration/feature_profile/sdwan/system/aaa.py b/catalystwan/models/configuration/feature_profile/sdwan/system/aaa.py index 7a1f09bed..c18dd2b80 100644 --- a/catalystwan/models/configuration/feature_profile/sdwan/system/aaa.py +++ b/catalystwan/models/configuration/feature_profile/sdwan/system/aaa.py @@ -4,6 +4,12 @@ from pydantic import AliasPath, BaseModel, ConfigDict, Field from catalystwan.api.configuration_groups.parcel import Default, Global, Variable, _ParcelBase, as_default, as_global +from catalystwan.models.configuration.feature_profile.converters.recast import ( + DefaultGlobalBool, + DefaultGlobalIPAddress, + DefaultGlobalList, + DefaultGlobalStr, +) class PubkeyChainItem(BaseModel): @@ -27,7 +33,7 @@ class PubkeyChainItem(BaseModel): class UserItem(BaseModel): - model_config = ConfigDict(extra="forbid", populate_by_name=True) + model_config = ConfigDict(extra="ignore", populate_by_name=True) name: Union[Global[str], Variable] = Field(description="Set the username") password: Union[Global[str], Variable] = Field( @@ -61,7 +67,9 @@ def add_pubkey_chain_item(self, key: str) -> PubkeyChainItem: class RadiusServerItem(BaseModel): model_config = ConfigDict(extra="forbid", populate_by_name=True) - address: Union[Global[IPv4Address], Global[IPv6Address]] = Field(description="Set IP address of Radius server") + address: Union[DefaultGlobalIPAddress, Global[IPv4Address], Global[IPv6Address]] = Field( + description="Set IP address of Radius server" + ) auth_port: Union[Global[int], Default[int], Variable, None] = Field( default=as_default(1812), validation_alias="authPort", @@ -93,7 +101,7 @@ class RadiusServerItem(BaseModel): description="Set the TACACS server shared type 7 encrypted key", ) # Literal["6", "7"] - key_enum: Union[Global[str], Default[None], None] = Field( + key_enum: Union[DefaultGlobalStr, Global[str], Default[None], None] = Field( default=None, validation_alias="keyEnum", serialization_alias="keyEnum", @@ -110,7 +118,7 @@ class RadiusServerItem(BaseModel): class Radius(BaseModel): - model_config = ConfigDict(extra="forbid", populate_by_name=True) + model_config = ConfigDict(extra="ignore", populate_by_name=True) group_name: Global[str] = Field( validation_alias="groupName", serialization_alias="groupName", description="Set Radius server Group Name" ) @@ -156,7 +164,9 @@ def generate_radius_server( class TacacsServerItem(BaseModel): model_config = ConfigDict(extra="forbid", populate_by_name=True) - address: Union[Global[IPv4Address], Global[IPv6Address]] = Field(description="Set IP address of TACACS server") + address: Union[DefaultGlobalIPAddress, Global[IPv4Address], Global[IPv6Address]] = Field( + description="Set IP address of TACACS server" + ) port: Union[Variable, Global[int], Default[int], None] = Field(default=None, description="TACACS Port") timeout: Union[Variable, Global[int], Default[int], None] = Field( default=None, @@ -176,7 +186,7 @@ class TacacsServerItem(BaseModel): description="Set the TACACS server shared type 7 encrypted key", ) # Literal["6", "7"] - key_enum: Union[Global[str], Default[None], None] = Field( + key_enum: Union[DefaultGlobalStr, Global[str], Default[None], None] = Field( default=None, validation_alias="keyEnum", serialization_alias="keyEnum", @@ -231,13 +241,13 @@ class AccountingRuleItem(BaseModel): method: Global[str] = Field(description="Configure Accounting Method") # Literal['1', '15'] level: Union[Global[str], Default[None], None] = Field(None, description="Privilege level when method is commands") - start_stop: Union[Variable, Global[bool], Default[bool], None] = Field( + start_stop: Union[DefaultGlobalBool, Variable, Global[bool], Default[bool], None] = Field( default=None, validation_alias="startStop", serialization_alias="startStop", description="Record start and stop without waiting", ) - group: Global[List[str]] = Field(description="Use Server-group") + group: Union[DefaultGlobalList, Global[List[str]]] = Field(description="Use Server-group") class AuthorizationRuleItem(BaseModel): @@ -249,8 +259,8 @@ class AuthorizationRuleItem(BaseModel): method: Global[str] # Literal['1', '15'] level: Global[str] = Field(description="Privilege level when method is commands") - group: Global[List[str]] = Field(description="Use Server-group") - if_authenticated: Union[Global[bool], Default[bool], None] = Field( + group: Union[DefaultGlobalList, Global[List[str]]] = Field(description="Use Server-group") + if_authenticated: Union[DefaultGlobalBool, Global[bool], Default[bool], None] = Field( default=None, validation_alias="ifAuthenticated", serialization_alias="ifAuthenticated", @@ -259,24 +269,26 @@ class AuthorizationRuleItem(BaseModel): class AAA(_ParcelBase): - authentication_group: Union[Variable, Global[bool], Default[bool]] = Field( + authentication_group: Union[DefaultGlobalBool, Variable, Global[bool], Default[bool]] = Field( default=as_default(False), validation_alias=AliasPath("data", "authenticationGroup"), description="Authentication configurations parameters", ) - accounting_group: Union[Variable, Global[bool], Default[bool]] = Field( + accounting_group: Union[DefaultGlobalBool, Variable, Global[bool], Default[bool]] = Field( default=as_default(False), validation_alias=AliasPath("data", "accountingGroup"), description="Accounting configurations parameters", ) # local, radius, tacacs - server_auth_order: Global[List[str]] = Field( + server_auth_order: Union[DefaultGlobalList, Global[List[str]]] = Field( validation_alias=AliasPath("data", "serverAuthOrder"), min_length=1, max_length=4, description="ServerGroups priority order", ) - user: Optional[List[UserItem]] = Field(default=None, description="Create local login account", min_length=1) + user: Optional[List[UserItem]] = Field( + default=None, validation_alias=AliasPath("data", "user"), description="Create local login account", min_length=1 + ) radius: Optional[List[Radius]] = Field( default=None, validation_alias=AliasPath("data", "radius"), description="Configure the Radius serverGroup" ) @@ -286,12 +298,12 @@ class AAA(_ParcelBase): accounting_rule: Optional[List[AccountingRuleItem]] = Field( default=None, validation_alias=AliasPath("data", "accountingRule"), description="Configure the accounting rules" ) - authorization_console: Union[Variable, Global[bool], Default[bool]] = Field( + authorization_console: Union[DefaultGlobalBool, Variable, Global[bool], Default[bool]] = Field( default=as_default(False), validation_alias=AliasPath("data", "authorizationConsole"), description="For enabling console authorization", ) - authorization_config_commands: Union[Variable, Global[bool], Default[bool]] = Field( + authorization_config_commands: Union[DefaultGlobalBool, Variable, Global[bool], Default[bool]] = Field( default=as_default(False), validation_alias=AliasPath("data", "authorizationConfigCommands"), description="For configuration mode commands.", diff --git a/catalystwan/models/configuration/feature_profile/sdwan/system/bfd.py b/catalystwan/models/configuration/feature_profile/sdwan/system/bfd.py new file mode 100644 index 000000000..574a4ed53 --- /dev/null +++ b/catalystwan/models/configuration/feature_profile/sdwan/system/bfd.py @@ -0,0 +1,44 @@ +from typing import List, Optional, Union + +from pydantic import AliasPath, BaseModel, ConfigDict, Field + +from catalystwan.api.configuration_groups.parcel import Global, _ParcelBase, as_global +from catalystwan.models.common import TLOCColor +from catalystwan.models.configuration.feature_profile.converters.recast import ( + DefaultGlobalBool, + DefaultGlobalColorLiteral, +) + +DEFAULT_BFD_COLOR_MULTIPLIER = as_global(7) +DEFAULT_BFD_DSCP = as_global(48) +DEFAULT_BFD_HELLO_INTERVAL = as_global(1000) +DEFAULT_BFD_POLL_INTERVAL = as_global(600000) +DEFAULT_BFD_MULTIPLIER = as_global(6) + + +class Color(BaseModel): + color: Union[DefaultGlobalColorLiteral, Global[TLOCColor]] + hello_interval: Optional[Global[int]] = Field( + default=DEFAULT_BFD_HELLO_INTERVAL, validation_alias="helloInterval", serialization_alias="helloInterval" + ) + multiplier: Optional[Global[int]] = DEFAULT_BFD_COLOR_MULTIPLIER + pmtu_discovery: Optional[Union[DefaultGlobalBool, Global[bool]]] = Field( + default=as_global(True), validation_alias="pmtuDiscovery", serialization_alias="pmtuDiscovery" + ) + dscp: Optional[Global[int]] = DEFAULT_BFD_DSCP + model_config = ConfigDict(populate_by_name=True) + + +class BFD(_ParcelBase): + model_config = ConfigDict(arbitrary_types_allowed=True, populate_by_name=True) + + multiplier: Optional[Global[int]] = Field( + default=DEFAULT_BFD_MULTIPLIER, validation_alias=AliasPath("data", "multiplier") + ) + poll_interval: Optional[Global[int]] = Field( + default=DEFAULT_BFD_POLL_INTERVAL, validation_alias=AliasPath("data", "pollInterval") + ) + default_dscp: Optional[Global[int]] = Field( + default=DEFAULT_BFD_DSCP, validation_alias=AliasPath("data", "defaultDscp") + ) + colors: Optional[List[Color]] = Field(default=None, validation_alias=AliasPath("data", "colors")) diff --git a/catalystwan/models/configuration/feature_profile/sdwan/system/logging.py b/catalystwan/models/configuration/feature_profile/sdwan/system/logging.py new file mode 100644 index 000000000..64ac7033b --- /dev/null +++ b/catalystwan/models/configuration/feature_profile/sdwan/system/logging.py @@ -0,0 +1,52 @@ +# flake8: noqa + +# import enum +# from typing import List, Literal, Optional, Union +# from catalystwan.api.configuration_groups.parcel import _ParcelBase, Global, as_global +# from catalystwan.models.configuration.common import AuthType, Priority, Version +# from pydantic import AliasPath, BaseModel, ConfigDict, Field +# from catalystwan.models.configuration.feature_profile.converters.recast import ( +# DefaultGlobalBool, +# DefaultGlobalStr, +# ) + +# class Server(BaseModel): +# name: Global[str] +# vpn: Optional[Union[DefaultGlobalStr, Global[str]]] = None +# source_interface: Optional[Global[str]] = Field(default=None, serialization_alias="sourceInterface", validation_alias="sourceInterface") +# priority: Optional[Union[DefaultGlobalLiteral, Global[Priority]]] = "information" +# enable_tls: Optional[Union[DefaultGlobalBool, Global[bool]]] = Field(default=as_global(False), serialization_alias="tlsEnable", validation_alias="tlsEnable") +# custom_profile: Optional[Union[DefaultGlobalBool, Global[bool]]] = Field( +# default=as_global(False), serialization_alias="tlsPropertiesCustomProfile", validation_alias="tlsPropertiesCustomProfile" +# ) +# profile_properties: Optional[Global[str]] = Field(default=None, serialization_alias="tlsPropertiesProfile", validation_alias="tlsPropertiesProfile") +# model_config = ConfigDict(populate_by_name=True) + + +# class Ipv6Server(BaseModel): +# name: Global[str] +# vpn: Optional[Union[DefaultGlobalStr, Global[str]]] = None +# source_interface: Optional[Global[str]] = Field(default=None, serialization_alias="sourceInterface", validation_alias="sourceInterface") +# priority: Optional[Union[DefaultGlobalLiteral, Global[Priority]]] = "information" +# enable_tls: Optional[Union[DefaultGlobalBool, Global[bool]]] = Field(default=as_global(False), serialization_alias="tlsEnable", validation_alias="tlsEnable") +# custom_profile: Optional[Union[DefaultGlobalBool, Global[bool]]] = Field( +# default=as_global(False), serialization_alias="tlsPropertiesCustomProfile", validation_alias="tlsPropertiesCustomProfile" +# ) +# profile_properties: Optional[Global[str]] = Field(default=None, serialization_alias="tlsPropertiesProfile", validation_alias="tlsPropertiesProfile") +# model_config = ConfigDict(populate_by_name=True) + +# # +# class File(BaseModel): +# disk_file_size: Optional[Global[int]] = Field(default=None, serialization_alias="diskFileSize", validation_alias="diskFileSize") +# disk_file_rotate: Optional[Global[int]] = Field(default=None, serialization_alias="diskFileRotate", validation_alias="diskFileRotate") + + +# class Disk(BaseModel): +# disk_enable: Optional[Global[bool]] = Field(default=False, serialization_alias="diskEnable", validation_alias="diskEnable") +# file: File + +# class Logging(_ParcelBase): +# disk: Optional[Disk] = Field(default=None, validation_alias=AliasPath("data", "disk")) +# tls_profile: Optional[List[TlsProfile]] = Field(default=None, validation_alias=AliasPath("data", "tlsProfile")) +# server: Optional[List[Server]] = Field(default=None, validation_alias=AliasPath("data", "server")) +# ipv6_server: Optional[List[Ipv6Server]] = Field(default=None, validation_alias=AliasPath("data", "ipv6Server")) diff --git a/catalystwan/models/policy/centralized.py b/catalystwan/models/policy/centralized.py index c06d188f2..9f28f0a7e 100644 --- a/catalystwan/models/policy/centralized.py +++ b/catalystwan/models/policy/centralized.py @@ -211,12 +211,12 @@ def try_parse(cls, policy_definition): # while POST /template/policy/vsmart requires a regular object # it makes sense to reuse that model for both requests and present parsed data to the user if isinstance(policy_definition, str): - return CentralizedPolicyDefinition.parse_raw(policy_definition) + return CentralizedPolicyDefinition.model_validate_json(policy_definition) return policy_definition class CentralizedPolicyEditPayload(PolicyEditPayload, CentralizedPolicy): - rid: Optional[str] = Field(default=None, serialization_alias="@rid", validation_alias="@rid") + rid: Optional[int] = Field(default=None, serialization_alias="@rid", validation_alias="@rid") class CentralizedPolicyInfo(PolicyInfo, CentralizedPolicyEditPayload): diff --git a/catalystwan/models/policy/lists.py b/catalystwan/models/policy/lists.py index c707b35ec..5686c3f66 100644 --- a/catalystwan/models/policy/lists.py +++ b/catalystwan/models/policy/lists.py @@ -4,7 +4,12 @@ from pydantic import BaseModel, Field +from catalystwan.api.configuration_groups.parcel import _ParcelBase from catalystwan.models.common import InterfaceType, TLOCColor, WellKnownBGPCommunities +from catalystwan.models.configuration.feature_profile.sdwan.policy_object.policy.data_prefix import ( + DataPrefixEntry, + DataPrefixParcel, +) from catalystwan.models.policy.lists_entries import ( AppListEntry, AppProbeClassListEntry, @@ -55,6 +60,9 @@ def _add_entry(self, entry: Any, single: bool = False) -> None: else: self.entries.append(entry) + def to_policy_object_parcel(self) -> _ParcelBase: + return _ParcelBase(parcel_name=self.name, parcel_description=self.description) + class DataPrefixList(PolicyListBase): type: Literal["dataPrefix"] = "dataPrefix" @@ -63,6 +71,11 @@ class DataPrefixList(PolicyListBase): def add_prefix(self, ip_prefix: IPv4Network) -> None: self._add_entry(DataPrefixListEntry(ip_prefix=ip_prefix)) + def to_policy_object_parcel(self) -> DataPrefixParcel: + parcel = DataPrefixParcel(parcel_name=self.name, parcel_description=self.description) + parcel.entries = [DataPrefixEntry.from_ipv4_network(i.ip_prefix) for i in self.entries] + return parcel + class SiteList(PolicyListBase): type: Literal["site"] = "site" diff --git a/catalystwan/models/policy/lists_entries.py b/catalystwan/models/policy/lists_entries.py index 6b75eaad9..823805338 100644 --- a/catalystwan/models/policy/lists_entries.py +++ b/catalystwan/models/policy/lists_entries.py @@ -326,7 +326,9 @@ class SLAClassListEntry(BaseModel): latency: Optional[str] = None loss: Optional[str] = None jitter: Optional[str] = None - app_probe_class: Optional[UUID] = Field(serialization_alias="appProbeClass", validation_alias="appProbeClass") + app_probe_class: Optional[UUID] = Field( + default=None, serialization_alias="appProbeClass", validation_alias="appProbeClass" + ) fallback_best_tunnel: Optional[FallbackBestTunnel] = Field( default=None, serialization_alias="fallbackBestTunnel", validation_alias="fallbackBestTunnel" ) diff --git a/catalystwan/utils/feature_template.py b/catalystwan/utils/feature_template.py index c82a02a2e..22412de35 100644 --- a/catalystwan/utils/feature_template.py +++ b/catalystwan/utils/feature_template.py @@ -31,7 +31,7 @@ def choose_model(type_value: str) -> Any: def find_template_values( template_definition: dict, - templated_values: dict = {}, + templated_values: Optional[dict] = None, parent_key: Optional[str] = None, target_key: str = "vipType", target_key_value_to_ignore: str = "ignore", @@ -54,6 +54,9 @@ def find_template_values( Returns: templated_values: dictionary containing template fields as key and values assigned to those fields as values """ + if templated_values is None: + templated_values = {} + for key, value in template_definition.items(): if key == target_key and value != target_key_value_to_ignore: if value == "variableName" and (device_specific_variables is not None) and parent_key: diff --git a/catalystwan/workflows/config_migration.py b/catalystwan/workflows/config_migration.py new file mode 100644 index 000000000..bcbaa3512 --- /dev/null +++ b/catalystwan/workflows/config_migration.py @@ -0,0 +1,74 @@ +import logging +from typing import Callable + +from catalystwan.api.policy_api import POLICY_LIST_ENDPOINTS_MAP +from catalystwan.models.configuration.config_migration import UX1Config, UX2Config +from catalystwan.models.configuration.feature_profile.converters.feature_template import create_parcel_from_template +from catalystwan.session import ManagerSession + +logger = logging.getLogger(__name__) + + +def log_progress(task: str, completed: int, total: int) -> None: + logger.info(f"{task} {completed}/{total}") + + +def transform(ux1: UX1Config) -> UX2Config: + ux2 = UX2Config() + ux2.profile_parcels.extend([lst.to_policy_object_parcel() for lst in ux1.policies.policy_lists]) + ux2.profile_parcels.extend([create_parcel_from_template(ft) for ft in ux1.templates.features]) + return ux2 + + +def collect_ux1_config(session: ManagerSession, progress: Callable[[str, int, int], None] = log_progress) -> UX1Config: + ux1 = UX1Config() + + """Collect Policies""" + policy_api = session.api.policy + progress("Collecting Policy Info", 0, 3) + + centralized_policy_ids = [info.policy_id for info in policy_api.centralized.get()] + progress("Collecting Policy Info", 1, 3) + + localized_policy_ids = [info.policy_id for info in policy_api.localized.get()] + progress("Collecting Policy Info", 2, 3) + + policy_definition_types_and_ids = [ + (policy_type, info.definition_id) for policy_type, info in policy_api.definitions.get_all() + ] + progress("Collecting Policy Info", 3, 3) + + policy_list_types = POLICY_LIST_ENDPOINTS_MAP.keys() + for i, policy_list_type in enumerate(policy_list_types): + ux1.policies.policy_lists.extend(policy_api.lists.get(policy_list_type)) + progress("Collecting Policy Lists", i + 1, len(policy_list_types)) + + for i, type_and_id in enumerate(policy_definition_types_and_ids): + ux1.policies.policy_definitions.append(policy_api.definitions.get(*type_and_id)) + progress("Collecting Policy Definitions", i + 1, len(policy_definition_types_and_ids)) + + for i, cpid in enumerate(centralized_policy_ids): + ux1.policies.centralized_policies.append(policy_api.centralized.get(id=cpid)) + progress("Collecting Centralized Policies", i + 1, len(centralized_policy_ids)) + + for i, lpid in enumerate(localized_policy_ids): + ux1.policies.localized_policies.append(policy_api.localized.get(id=lpid)) + progress("Collecting Localized Policies", i + 1, len(localized_policy_ids)) + + ux1.policies.policy_lists = policy_api.lists.get_all() + + """Collect Templates""" + template_api = session.api.templates + progress("Collecting Templates Info", 0, 2) + + ux1.templates.features = [t for t in template_api.get_feature_templates()] + progress("Collecting Templates Info", 1, 2) + + ux1.templates.devices = [t for t in template_api.get_device_templates()] + progress("Collecting Templates Info", 2, 2) + + return ux1 + + +def push_ux2_config(session: ManagerSession) -> None: + pass diff --git a/examples/policies_configuration_guide.py b/examples/policies_configuration_guide.py index 7c2abe9df..e38ea7d2a 100644 --- a/examples/policies_configuration_guide.py +++ b/examples/policies_configuration_guide.py @@ -46,12 +46,12 @@ PrefixList, RegionList, SiteList, - SLAClassList, TLOCList, TrafficDataPolicy, VPNList, VPNMembershipPolicy, ) +from catalystwan.models.policy.lists import SLAClassList logger = logging.getLogger(__name__) @@ -184,7 +184,7 @@ def configure_groups_of_interest(api: PolicyAPI) -> List[ConfigItem]: configured_items.append(ConfigItem(ClassMapList, class_map.name, class_map_id)) app_probe_class = AppProbeClassList(name="MyAppProbeClass") - app_probe_class.assign_forwarding_class("MyClassMap").add_color_mapping("3g", 5) + app_probe_class.assign_forwarding_class("MyClassMap").add_color_mapping("green", 5) app_probe_class_id = api.lists.create(app_probe_class) configured_items.append(ConfigItem(AppProbeClassList, app_probe_class.name, app_probe_class_id)) diff --git a/pyproject.toml b/pyproject.toml index 3b731f807..69efdd4f0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "catalystwan" -version = "0.30.0" +version = "0.31.0dev1" description = "Cisco Catalyst WAN SDK for Python" authors = ["kagorski "] readme = "README.md"