diff --git a/catalystwan/api/feature_profile_api.py b/catalystwan/api/feature_profile_api.py index 25d5b914..6bee5c20 100644 --- a/catalystwan/api/feature_profile_api.py +++ b/catalystwan/api/feature_profile_api.py @@ -8,6 +8,7 @@ from pydantic import Json from catalystwan.endpoints.configuration.feature_profile.sdwan.other import OtherFeatureProfile +from catalystwan.endpoints.configuration.feature_profile.sdwan.service import ServiceFeatureProfile from catalystwan.endpoints.configuration.feature_profile.sdwan.system import SystemFeatureProfile from catalystwan.models.configuration.feature_profile.sdwan.other import AnyOtherParcel from catalystwan.typed_list import DataSequence @@ -77,6 +78,7 @@ def __init__(self, session: ManagerSession): self.policy_object = PolicyObjectFeatureProfileAPI(session=session) self.system = SystemFeatureProfileAPI(session=session) self.other = OtherFeatureProfileAPI(session=session) + self.service = ServiceFeatureProfileAPI(session=session) class FeatureProfileAPI(Protocol): @@ -196,6 +198,39 @@ def delete(self, profile_id: UUID, parcel_type: Type[AnyOtherParcel], parcel_id: return self.endpoint.delete(profile_id, parcel_type._get_parcel_type(), parcel_id) +class ServiceFeatureProfileAPI: + """ + SDWAN Feature Profile Service APIs + """ + + def __init__(self, session: ManagerSession): + self.session = session + self.endpoint = ServiceFeatureProfile(session) + + def get_profiles( + self, limit: Optional[int] = None, offset: Optional[int] = None + ) -> DataSequence[FeatureProfileInfo]: + """ + Get all Service Feature Profiles + """ + payload = GetFeatureProfilesPayload(limit=limit if limit else None, offset=offset if offset else None) + + return self.endpoint.get_sdwan_service_feature_profiles(payload) + + def create_profile(self, name: str, description: str) -> FeatureProfileCreationResponse: + """ + Create Service Feature Profile + """ + payload = FeatureProfileCreationPayload(name=name, description=description) + return self.endpoint.create_sdwan_service_feature_profile(payload) + + def delete_profile(self, profile_id: UUID) -> None: + """ + Delete Service Feature Profile + """ + self.endpoint.delete_sdwan_service_feature_profile(profile_id) + + class SystemFeatureProfileAPI: """ SDWAN Feature Profile System APIs diff --git a/catalystwan/endpoints/configuration/feature_profile/sdwan/service.py b/catalystwan/endpoints/configuration/feature_profile/sdwan/service.py new file mode 100644 index 00000000..9a4faa4c --- /dev/null +++ b/catalystwan/endpoints/configuration/feature_profile/sdwan/service.py @@ -0,0 +1,35 @@ +# Copyright 2024 Cisco Systems, Inc. and its affiliates + +# mypy: disable-error-code="empty-body" +from typing import Optional +from uuid import UUID + +from catalystwan.endpoints import APIEndpoints, delete, get, post, versions +from catalystwan.models.configuration.feature_profile.common import ( + FeatureProfileCreationPayload, + FeatureProfileCreationResponse, + FeatureProfileInfo, + GetFeatureProfilesPayload, +) +from catalystwan.typed_list import DataSequence + + +class ServiceFeatureProfile(APIEndpoints): + @versions(supported_versions=(">=20.9"), raises=False) + @get("/v1/feature-profile/sdwan/service") + def get_sdwan_service_feature_profiles( + self, payload: Optional[GetFeatureProfilesPayload] + ) -> DataSequence[FeatureProfileInfo]: + ... + + @versions(supported_versions=(">=20.9"), raises=False) + @post("/v1/feature-profile/sdwan/service") + def create_sdwan_service_feature_profile( + self, payload: FeatureProfileCreationPayload + ) -> FeatureProfileCreationResponse: + ... + + @versions(supported_versions=(">=20.9"), raises=False) + @delete("/v1/feature-profile/sdwan/service/{profile_id}") + def delete_sdwan_service_feature_profile(self, profile_id: UUID) -> None: + ... diff --git a/catalystwan/integration_tests/feature_profile/sdwan/service/test_models.py b/catalystwan/integration_tests/feature_profile/sdwan/service/test_models.py new file mode 100644 index 00000000..7233570f --- /dev/null +++ b/catalystwan/integration_tests/feature_profile/sdwan/service/test_models.py @@ -0,0 +1,47 @@ +import os +import unittest +from ipaddress import IPv4Address +from typing import cast + +from catalystwan.api.configuration_groups.parcel import Global +from catalystwan.models.configuration.feature_profile.sdwan.service.dhcp_server import ( + AddressPool, + LanVpnDhcpServerParcel, + SubnetMask, +) +from catalystwan.session import create_manager_session + + +class TestServiceFeatureProfileModels(unittest.TestCase): + def setUp(self) -> None: + self.session = create_manager_session( + url=cast(str, os.environ.get("TEST_VMANAGE_URL")), + port=cast(int, int(os.environ.get("TEST_VMANAGE_PORT"))), # type: ignore + username=cast(str, os.environ.get("TEST_VMANAGE_USERNAME")), + password=cast(str, os.environ.get("TEST_VMANAGE_PASSWORD")), + ) + self.profile_id = self.session.api.sdwan_feature_profiles.service.create_profile( + "TestProfile", "Description" + ).id + + def test_when_default_values_dhcp_server_parcel_expect_successful_post(self): + # Arrange + url = f"dataservice/v1/feature-profile/sdwan/service/{self.profile_id}/dhcp-server" + dhcp_server_parcel = LanVpnDhcpServerParcel( + parcel_name="DhcpServerDefault", + parcel_description="Dhcp Server Parcel", + address_pool=AddressPool( + network_address=Global[IPv4Address](value=IPv4Address("10.0.0.2")), + subnet_mask=Global[SubnetMask](value="255.255.255.255"), + ), + ) + # Act + response = self.session.post( + url=url, data=dhcp_server_parcel.model_dump_json(by_alias=True, exclude_none=True) + ) # This will be changed to the actual method + # Assert + assert response.status_code == 200 + + def tearDown(self) -> None: + self.session.api.sdwan_feature_profiles.service.delete_profile(self.profile_id) + self.session.close() diff --git a/catalystwan/models/configuration/feature_profile/sdwan/service/dhcp_server.py b/catalystwan/models/configuration/feature_profile/sdwan/service/dhcp_server.py index 07a66733..466d72d3 100644 --- a/catalystwan/models/configuration/feature_profile/sdwan/service/dhcp_server.py +++ b/catalystwan/models/configuration/feature_profile/sdwan/service/dhcp_server.py @@ -1,84 +1,161 @@ # Copyright 2024 Cisco Systems, Inc. and its affiliates -from typing import List, Optional, Union - -from pydantic import BaseModel, ConfigDict, Field - -from catalystwan.api.configuration_groups.parcel import Default, Global, Variable - - -class OptionCodeAscii(BaseModel): - model_config = ConfigDict(arbitrary_types_allowed=True, populate_by_name=True) - - code: Union[Global[int], Variable] - ascii: Union[Global[str], Variable] - - -class OptionCodeHex(BaseModel): - model_config = ConfigDict(arbitrary_types_allowed=True, populate_by_name=True) - - code: Union[Global[int], Variable] - hex: Union[Global[str], Variable] - - -class OptionCodeIP(BaseModel): - model_config = ConfigDict(arbitrary_types_allowed=True, populate_by_name=True) - - code: Union[Global[int], Variable] - ip: Union[Global[List[str]], Variable] - - -class StaticLease(BaseModel): - model_config = ConfigDict(arbitrary_types_allowed=True, populate_by_name=True) - - mac_address: Union[Global[str], Variable] = Field(serialization_alias="macAddress", validation_alias="macAddress") - ip: Union[Global[str], Variable] - - -class DhcpAddressPool(BaseModel): - model_config = ConfigDict(arbitrary_types_allowed=True, populate_by_name=True) - - network_address: Union[Global[str], Variable] = Field( - serialization_alias="networkAddress", validation_alias="networkAddress" +from __future__ import annotations + +import re +from ipaddress import IPv4Address +from typing import List, Literal, Optional, Union + +from pydantic import AliasPath, BaseModel, ConfigDict, Field, field_validator, model_validator + +from catalystwan.api.configuration_groups.parcel import Default, Global, Variable, _ParcelBase +from catalystwan.models.common import check_fields_exclusive + +SubnetMask = Literal[ + "255.255.255.255", + "255.255.255.254", + "255.255.255.252", + "255.255.255.248", + "255.255.255.240", + "255.255.255.224", + "255.255.255.192", + "255.255.255.128", + "255.255.255.0", + "255.255.254.0", + "255.255.252.0", + "255.255.248.0", + "255.255.240.0", + "255.255.224.0", + "255.255.192.0", + "255.255.128.0", + "255.255.0.0", + "255.254.0.0", + "255.252.0.0", + "255.240.0.0", + "255.224.0.0", + "255.192.0.0", + "255.128.0.0", + "255.0.0.0", + "254.0.0.0", + "252.0.0.0", + "248.0.0.0", + "240.0.0.0", + "224.0.0.0", + "192.0.0.0", + "128.0.0.0", + "0.0.0.0", +] +MAC_PATTERN_1 = re.compile(r"^([0-9A-Fa-f]{2}[:-]){5}[0-9A-Fa-f]{2}$") +MAC_PATTERN_2 = re.compile(r"^[0-9a-fA-F]{4}\.[0-9a-fA-F]{4}\.[0-9a-fA-F]{4}$") + + +class AddressPool(BaseModel): + """ + Configure IPv4 prefix range of the DHCP address pool + """ + + model_config = ConfigDict( + extra="forbid", + populate_by_name=True, + ) + network_address: Union[Variable, Global[IPv4Address]] = Field( + ..., serialization_alias="networkAddress", validation_alias="networkAddress", description="Network Address" + ) + subnet_mask: Union[Variable, Global[SubnetMask]] = Field( + ..., serialization_alias="subnetMask", validation_alias="subnetMask", description="Subnet Mask" ) - subnet_mask: Union[Global[str], Variable] = Field(serialization_alias="subnetMask", validation_alias="subnetMask") - -class DhcpServerData(BaseModel): - model_config = ConfigDict(arbitrary_types_allowed=True, populate_by_name=True) - address_pool: DhcpAddressPool = Field(serialization_alias="addressPool", validation_alias="addressPool") - exclude: Optional[Union[Global[List[str]], Variable, Default[None]]] = None - lease_time: Optional[Union[Global[int], Variable, Default[int]]] = Field( - serialization_alias="leaseTime", validation_alias="leaseTime", default=Default[int](value=86400) +class StaticLeaseItem(BaseModel): + model_config = ConfigDict( + extra="forbid", + populate_by_name=True, ) - interface_mtu: Optional[Union[Global[int], Variable, Default[None]]] = Field( - serialization_alias="interfaceMtu", validation_alias="interfaceMtu", default=None + mac_address: Union[Global[str], Variable] = Field( + ..., serialization_alias="macAddress", validation_alias="macAddress", description="Set MAC address of client" ) - domain_name: Optional[Union[Global[str], Variable, Default[None]]] = Field( - serialization_alias="domainName", validation_alias="domainName", default=None + ip: Union[Global[IPv4Address], Variable] = Field(..., description="Set client’s static IP address") + + @field_validator("mac_address") + @classmethod + def check_mac_address(cls, mac_address: Union[Global[str], Variable]): + if isinstance(mac_address, Variable): + return mac_address + value = mac_address.value + if MAC_PATTERN_1.match(value) or MAC_PATTERN_2.match(value): + return mac_address + raise ValueError("Invalid MAC address") + + +class OptionCode(BaseModel): + model_config = ConfigDict( + extra="forbid", + populate_by_name=True, ) - default_gateway: Optional[Union[Global[str], Variable, Default[None]]] = Field( - serialization_alias="defaultGateway", validation_alias="defaultGateway", default=None + code: Union[Global[int], Variable] = Field(..., description="Set Option Code") + ip: Optional[Union[Global[List[IPv4Address]], Variable]] = Field(default=None, description="Set ip address") + hex: Optional[Union[Global[str], Variable]] = Field(default=None, description="Set HEX value") + ascii: Optional[Union[Global[str], Variable]] = Field(default=None, description="Set ASCII value") + + @model_validator(mode="after") + def check_ip_hex_ascii_exclusive(self): + check_fields_exclusive(self.__dict__, {"ip", "hex", "ascii"}, True) + return self + + +class LanVpnDhcpServerParcel(_ParcelBase): + """ + LAN VPN DHCP Server profile parcel schema for POST request + """ + + type_: Literal["dhcp-server"] = Field(default="dhcp-server", exclude=True) + model_config = ConfigDict( + extra="forbid", + populate_by_name=True, ) - dns_servers: Optional[Union[Global[List[str]], Variable, Default[None]]] = Field( - serialization_alias="dnsServers", validation_alias="dnsServers", default=None + address_pool: AddressPool = Field( + ..., + validation_alias=AliasPath("data", "addressPool"), + description="Configure IPv4 prefix range of the DHCP address pool", ) - tftp_servers: Optional[Union[Global[List[str]], Variable, Default[None]]] = Field( - serialization_alias="tftpServers", validation_alias="tftpServers", default=None + exclude: Union[Global[List[IPv4Address]], Variable, Default[None]] = Field( + default=Default[None](value=None), + validation_alias=AliasPath("data", "exclude"), + description="Configure IPv4 address to exclude from DHCP address pool", ) - static_lease: Optional[List[StaticLease]] = Field( - serialization_alias="staticLease", validation_alias="staticLease", default=None + lease_time: Union[Global[int], Variable, Default[int]] = Field( + default=Default[int](value=86400), + validation_alias=AliasPath("data", "leaseTime"), + description="Configure how long a DHCP-assigned IP address is valid", ) - option_code: Optional[List[Union[OptionCodeAscii, OptionCodeHex, OptionCodeIP]]] = Field( - serialization_alias="optionCode", validation_alias="optionCode", default=None + interface_mtu: Union[Global[int], Variable, Default[None]] = Field( + default=Default[None](value=None), + validation_alias=AliasPath("data", "interfaceMtu"), + description="Set MTU on interface to DHCP client", + ) + domain_name: Union[Global[str], Variable, Default[None]] = Field( + default=Default[None](value=None), + validation_alias=AliasPath("data", "domainName"), + description="Set domain name client uses to resolve hostnames", + ) + default_gateway: Union[Global[IPv4Address], Variable, Default[None]] = Field( + default=Default[None](value=None), + validation_alias=AliasPath("data", "defaultGateway"), + description="Set IP address of default gateway", + ) + dns_servers: Union[Global[List[IPv4Address]], Variable, Default[None]] = Field( + default=Default[None](value=None), + validation_alias=AliasPath("data", "dnsServers"), + description="Configure one or more DNS server IP addresses", + ) + tftp_servers: Union[Global[List[IPv4Address]], Variable, Default[None]] = Field( + default=Default[None](value=None), + validation_alias=AliasPath("data", "tftpServers"), + description="Configure TFTP server IP addresses", + ) + static_lease: Optional[List[StaticLeaseItem]] = Field( + default=None, validation_alias=AliasPath("data", "staticLease"), description="Configure static IP addresses" + ) + option_code: Optional[List[OptionCode]] = Field( + default=None, validation_alias=AliasPath("data", "optionCode"), description="Configure Options Code" ) - - -class DhcpSeverCreationPayload(BaseModel): - model_config = ConfigDict(arbitrary_types_allowed=True, populate_by_name=True) - - name: str - description: Optional[str] = None - data: DhcpServerData - metadata: Optional[dict] = None diff --git a/catalystwan/utils/config_migration/converters/feature_template/dhcp.py b/catalystwan/utils/config_migration/converters/feature_template/dhcp.py new file mode 100644 index 00000000..acd8fd10 --- /dev/null +++ b/catalystwan/utils/config_migration/converters/feature_template/dhcp.py @@ -0,0 +1,77 @@ +from copy import deepcopy +from ipaddress import IPv4Address, IPv4Network +from typing import List + +from catalystwan.api.configuration_groups.parcel import Global, as_global, as_variable +from catalystwan.models.configuration.feature_profile.sdwan.service.dhcp_server import ( + LanVpnDhcpServerParcel, + SubnetMask, +) + + +class DhcpTemplateConverter: + supported_template_types = ("dhcp", "cisco_dhcp_server") + + @staticmethod + def create_parcel(name: str, description: str, template_values: dict) -> LanVpnDhcpServerParcel: + """ + Create a LanVpnDhcpServerParcel object based on the provided parameters. + + Args: + name (str): The name of the parcel. + description (str): The description of the parcel. + template_values (dict): The template values used to populate the parcel. + + Returns: + LanVpnDhcpServerParcel: The created LanVpnDhcpServerParcel object. + + """ + + def convert_str_list_to_ipv4_list(d: dict, key: str) -> None: + """ + Convert a list of strings representing IPv4 addresses to a list of IPv4Address objects. + + Args: + d (dict): The dictionary containing the key-value pair to be converted. + key (str): The key in the dictionary representing the list of strings. + + Returns: + None. The function modifies the dictionary in-place by + replacing the list of strings with a list of IPv4Address objects. + """ + if str_list := d.get(key, as_global([])).value: + d[key] = Global[List[IPv4Address]](value=[IPv4Address(ip) for ip in str_list]) + + values = deepcopy(template_values) + values.update(values.pop("options", {})) + + if address_pool := values.get("address_pool"): + value = address_pool.value + network = IPv4Network(value) + address = as_global(network.network_address) + mask = as_global(str(network.netmask), SubnetMask) + values["address_pool"] = {"network_address": address, "subnet_mask": mask} + + for key in ("exclude", "dns_servers", "tftp_servers"): + convert_str_list_to_ipv4_list(values, key) + + if static_leases := values.get("static_lease", []): + mac_address_variable = "{{{{dhcp_1_staticLease_{}_macAddress}}}}" + ip_variable = "{{{{dhcp_1_staticLease_{}_ip}}}}" + static_lease = [] + for i, entry in enumerate(static_leases): + static_lease.append( + { + "mac_address": entry.get("mac_address", as_variable(mac_address_variable.format(i + 1))), + "ip": entry.get("ip", as_variable(ip_variable.format(i + 1))), + } + ) + values["static_lease"] = static_lease + + parcel_values = { + "parcel_name": name, + "parcel_description": description, + **values, + } + + return LanVpnDhcpServerParcel(**parcel_values) # type: ignore diff --git a/catalystwan/utils/config_migration/converters/feature_template/factory_method.py b/catalystwan/utils/config_migration/converters/feature_template/factory_method.py index 41a6c167..45d7a39e 100644 --- a/catalystwan/utils/config_migration/converters/feature_template/factory_method.py +++ b/catalystwan/utils/config_migration/converters/feature_template/factory_method.py @@ -5,6 +5,7 @@ from catalystwan.api.template_api import FeatureTemplateInformation from catalystwan.exceptions import CatalystwanException from catalystwan.models.configuration.feature_profile.sdwan.system import AnySystemParcel +from catalystwan.utils.config_migration.converters.feature_template.dhcp import DhcpTemplateConverter from catalystwan.utils.feature_template.find_template_values import find_template_values from .aaa import AAATemplateConverter @@ -37,6 +38,7 @@ BGPTemplateConverter, ThousandEyesTemplateConverter, UcseTemplateConverter, + DhcpTemplateConverter, ] diff --git a/catalystwan/workflows/config_migration.py b/catalystwan/workflows/config_migration.py index b0974bd5..826c1978 100644 --- a/catalystwan/workflows/config_migration.py +++ b/catalystwan/workflows/config_migration.py @@ -36,6 +36,8 @@ "cisco_bgp", "cisco_thousandeyes", "ucse", + "dhcp", + "cisco_dhcp_server", ]