From 9c60365a1cbe062c71d5181a9037fe0e01847e70 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Krawczyk?= <98938598+radkrawczyk@users.noreply.github.com> Date: Thu, 16 May 2024 10:05:44 +0200 Subject: [PATCH] Spliting DefaultAction into new types (#669) * Default action splited to new types * renamed AccessPolicyActionType to BasicPolicyActionType as it is commonly reused, added missing field specifier for defaultAction in multiple policies, fix typo in ControlBaseActionType --------- Co-authored-by: sbasan --- catalystwan/models/policy/__init__.py | 4 +-- .../policy/definition/access_control_list.py | 12 ++++----- .../definition/access_control_list_ipv6.py | 12 ++++----- .../models/policy/definition/control.py | 20 +++++++++------ .../models/policy/definition/device_access.py | 12 ++++----- .../policy/definition/device_access_ipv6.py | 12 ++++----- .../models/policy/definition/traffic_data.py | 10 ++++++-- .../policy/definition/zone_based_firewall.py | 20 ++++++++++++--- .../models/policy/policy_definition.py | 25 ++++++++----------- 9 files changed, 73 insertions(+), 54 deletions(-) diff --git a/catalystwan/models/policy/__init__.py b/catalystwan/models/policy/__init__.py index a6a1f440..7d990670 100644 --- a/catalystwan/models/policy/__init__.py +++ b/catalystwan/models/policy/__init__.py @@ -68,13 +68,13 @@ from .definition.zone_based_firewall import ZoneBasedFWPolicy, ZoneBasedFWPolicyGetResponse from .localized import LocalizedPolicy from .policy_definition import ( + BasicPolicyActionType, Carrier, DNSTypeEntryType, MultiRegionRole, OriginProtocol, PathType, PLPEntryType, - PolicyActionType, ServiceType, TLOCActionType, ) @@ -248,7 +248,7 @@ "PathType", "PLPEntryType", "PolicerList", - "PolicyActionType", + "BasicPolicyActionType", "PortList", "PreferredColorGroupList", "PrefixList", diff --git a/catalystwan/models/policy/definition/access_control_list.py b/catalystwan/models/policy/definition/access_control_list.py index 121bf6aa..c5d3f062 100644 --- a/catalystwan/models/policy/definition/access_control_list.py +++ b/catalystwan/models/policy/definition/access_control_list.py @@ -8,10 +8,11 @@ from typing_extensions import Annotated from catalystwan.models.policy.policy_definition import ( + BasicPolicyAction, + BasicPolicyActionType, ClassMapAction, ClassMapListEntry, CountAction, - DefaultAction, DefinitionWithSequencesCommonBase, DestinationDataPrefixListEntry, DestinationIPEntry, @@ -24,7 +25,6 @@ PacketLengthEntry, PLPEntry, PolicerAction, - PolicyActionType, PolicyDefinitionBase, PolicyDefinitionGetResponse, PolicyDefinitionId, @@ -71,7 +71,7 @@ class AclPolicySequence(PolicyDefinitionSequenceBase): sequence_type: Literal["acl"] = Field( default="acl", serialization_alias="sequenceType", validation_alias="sequenceType" ) - base_action: PolicyActionType = Field( + base_action: BasicPolicyActionType = Field( default="accept", serialization_alias="baseAction", validation_alias="baseAction" ) match: AclPolicySequenceMatch = AclPolicySequenceMatch() @@ -146,15 +146,15 @@ def associate_policer_list_action(self, policer_list_id: UUID) -> None: class AclPolicy(AclPolicyHeader, DefinitionWithSequencesCommonBase): sequences: List[AclPolicySequence] = [] - default_action: DefaultAction = Field( - default=DefaultAction(type="drop"), + default_action: BasicPolicyAction = Field( + default=BasicPolicyAction(type="drop"), serialization_alias="defaultAction", validation_alias="defaultAction", ) model_config = ConfigDict(populate_by_name=True) def add_acl_sequence( - self, name: str = "Access Control List", base_action: PolicyActionType = "accept" + self, name: str = "Access Control List", base_action: BasicPolicyActionType = "accept" ) -> AclPolicySequence: seq = AclPolicySequence( sequence_name=name, diff --git a/catalystwan/models/policy/definition/access_control_list_ipv6.py b/catalystwan/models/policy/definition/access_control_list_ipv6.py index 29e14f1d..2230a146 100644 --- a/catalystwan/models/policy/definition/access_control_list_ipv6.py +++ b/catalystwan/models/policy/definition/access_control_list_ipv6.py @@ -8,10 +8,11 @@ from typing_extensions import Annotated from catalystwan.models.policy.policy_definition import ( + BasicPolicyAction, + BasicPolicyActionType, ClassMapAction, ClassMapListEntry, CountAction, - DefaultAction, DefinitionWithSequencesCommonBase, DestinationDataIPv6PrefixListEntry, DestinationIPv6Entry, @@ -24,7 +25,6 @@ PacketLengthEntry, PLPEntry, PolicerAction, - PolicyActionType, PolicyDefinitionBase, PolicyDefinitionGetResponse, PolicyDefinitionId, @@ -71,7 +71,7 @@ class AclIPv6PolicySequence(PolicyDefinitionSequenceBase): sequence_type: Literal["aclv6"] = Field( default="aclv6", serialization_alias="sequenceType", validation_alias="sequenceType" ) - base_action: PolicyActionType = Field( + base_action: BasicPolicyActionType = Field( default="accept", serialization_alias="baseAction", validation_alias="baseAction" ) match: AclIPv6PolicySequenceMatch = AclIPv6PolicySequenceMatch() @@ -146,15 +146,15 @@ def associate_policer_list_action(self, policer_list_id: UUID) -> None: class AclIPv6Policy(AclIPv6PolicyHeader, DefinitionWithSequencesCommonBase): sequences: List[AclIPv6PolicySequence] = [] - default_action: DefaultAction = Field( - default=DefaultAction(type="drop"), + default_action: BasicPolicyAction = Field( + default=BasicPolicyAction(type="drop"), serialization_alias="defaultAction", validation_alias="defaultAction", ) model_config = ConfigDict(populate_by_name=True) def add_acl_sequence( - self, name: str = "Access Control List", base_action: PolicyActionType = "accept" + self, name: str = "Access Control List", base_action: BasicPolicyActionType = "accept" ) -> AclIPv6PolicySequence: seq = AclIPv6PolicySequence( sequence_name=name, diff --git a/catalystwan/models/policy/definition/control.py b/catalystwan/models/policy/definition/control.py index b2800bb7..1e5ee163 100644 --- a/catalystwan/models/policy/definition/control.py +++ b/catalystwan/models/policy/definition/control.py @@ -16,7 +16,6 @@ CommunityAdditiveEntry, CommunityEntry, CommunityListEntry, - DefaultAction, DefinitionWithSequencesCommonBase, DomainIDEntry, ExpandedCommunityListEntry, @@ -30,7 +29,7 @@ OriginProtocol, PathType, PathTypeEntry, - PolicyActionType, + PolicyActionBase, PolicyDefinitionBase, PolicyDefinitionGetResponse, PolicyDefinitionId, @@ -102,6 +101,11 @@ ControlPolicyRouteSequenceActions = Any # TODO ControlPolicyTLOCSequenceActions = Any # TODO +ControlPolicyBaseActionType = Literal["accept", "reject"] + + +class ControlPolicyBaseAction(PolicyActionBase): + type: ControlPolicyBaseActionType class ControlPolicyHeader(PolicyDefinitionBase): @@ -120,7 +124,7 @@ class ControlPolicyRouteSequence(PolicyDefinitionSequenceBase): sequence_type: Literal["route"] = Field( default="route", serialization_alias="sequenceType", validation_alias="sequenceType" ) - base_action: PolicyActionType = Field( + base_action: ControlPolicyBaseActionType = Field( default="reject", serialization_alias="baseAction", validation_alias="baseAction" ) match: ControlPolicyRouteSequenceMatch = ControlPolicyRouteSequenceMatch() @@ -239,7 +243,7 @@ class ControlPolicyTLOCSequence(PolicyDefinitionSequenceBase): sequence_type: Literal["tloc"] = Field( default="tloc", serialization_alias="sequenceType", validation_alias="sequenceType" ) - base_action: PolicyActionType = Field( + base_action: ControlPolicyBaseActionType = Field( default="reject", serialization_alias="baseAction", validation_alias="baseAction" ) match: ControlPolicyTLOCSequenceMatch = ControlPolicyTLOCSequenceMatch() @@ -314,15 +318,15 @@ def associate_affinity_action(self, affinity: int) -> None: class ControlPolicy(ControlPolicyHeader, DefinitionWithSequencesCommonBase): sequences: List[AnyControlPolicySequence] = [] - default_action: DefaultAction = Field( - default=DefaultAction(type="reject"), + default_action: ControlPolicyBaseAction = Field( + default=ControlPolicyBaseAction(type="reject"), serialization_alias="defaultAction", validation_alias="defaultAction", ) model_config = ConfigDict(populate_by_name=True) def add_route_sequence( - self, name: str = "Route", base_action: PolicyActionType = "reject" + self, name: str = "Route", base_action: ControlPolicyBaseActionType = "reject" ) -> ControlPolicyRouteSequence: seq = ControlPolicyRouteSequence( sequence_name=name, @@ -333,7 +337,7 @@ def add_route_sequence( return seq def add_tloc_sequence( - self, name: str = "TLOC", base_action: PolicyActionType = "reject" + self, name: str = "TLOC", base_action: ControlPolicyBaseActionType = "reject" ) -> ControlPolicyTLOCSequence: seq = ControlPolicyTLOCSequence( sequence_name=name, diff --git a/catalystwan/models/policy/definition/device_access.py b/catalystwan/models/policy/definition/device_access.py index d5ba1456..332e02b2 100644 --- a/catalystwan/models/policy/definition/device_access.py +++ b/catalystwan/models/policy/definition/device_access.py @@ -8,15 +8,15 @@ from typing_extensions import Annotated from catalystwan.models.policy.policy_definition import ( + BasicPolicyAction, + BasicPolicyActionType, CountAction, - DefaultAction, DefinitionWithSequencesCommonBase, DestinationDataPrefixListEntry, DestinationIPEntry, DestinationPortEntry, DeviceAccessProtocol, Match, - PolicyActionType, PolicyDefinitionBase, PolicyDefinitionGetResponse, PolicyDefinitionId, @@ -53,7 +53,7 @@ class DeviceAccessPolicySequence(PolicyDefinitionSequenceBase): sequence_type: Literal["deviceaccesspolicy"] = Field( default="deviceaccesspolicy", serialization_alias="sequenceType", validation_alias="sequenceType" ) - base_action: PolicyActionType = Field( + base_action: BasicPolicyActionType = Field( default="accept", serialization_alias="baseAction", validation_alias="baseAction" ) match: DeviceAccessPolicySequenceMatch = DeviceAccessPolicySequenceMatch() @@ -84,8 +84,8 @@ def associate_count_action(self, counter_name: str) -> None: class DeviceAccessPolicy(DeviceAccessPolicyHeader, DefinitionWithSequencesCommonBase): sequences: List[DeviceAccessPolicySequence] = [] - default_action: DefaultAction = Field( - default=DefaultAction(type="drop"), + default_action: BasicPolicyAction = Field( + default=BasicPolicyAction(type="drop"), serialization_alias="defaultAction", validation_alias="defaultAction", ) @@ -94,7 +94,7 @@ class DeviceAccessPolicy(DeviceAccessPolicyHeader, DefinitionWithSequencesCommon def add_acl_sequence( self, name: str = "Device Access Control List", - base_action: PolicyActionType = "accept", + base_action: BasicPolicyActionType = "accept", device_access_protocol: Optional[DeviceAccessProtocol] = None, ) -> DeviceAccessPolicySequence: seq = DeviceAccessPolicySequence( diff --git a/catalystwan/models/policy/definition/device_access_ipv6.py b/catalystwan/models/policy/definition/device_access_ipv6.py index 853dd49b..1358dca6 100644 --- a/catalystwan/models/policy/definition/device_access_ipv6.py +++ b/catalystwan/models/policy/definition/device_access_ipv6.py @@ -8,15 +8,15 @@ from typing_extensions import Annotated from catalystwan.models.policy.policy_definition import ( + BasicPolicyAction, + BasicPolicyActionType, CountAction, - DefaultAction, DefinitionWithSequencesCommonBase, DestinationDataIPv6PrefixListEntry, DestinationIPv6Entry, DestinationPortEntry, DeviceAccessProtocol, Match, - PolicyActionType, PolicyDefinitionBase, PolicyDefinitionGetResponse, PolicyDefinitionId, @@ -53,7 +53,7 @@ class DeviceAccessIPv6PolicySequence(PolicyDefinitionSequenceBase): sequence_type: Literal["deviceaccesspolicyv6"] = Field( default="deviceaccesspolicyv6", serialization_alias="sequenceType", validation_alias="sequenceType" ) - base_action: PolicyActionType = Field( + base_action: BasicPolicyActionType = Field( default="accept", serialization_alias="baseAction", validation_alias="baseAction" ) match: DeviceAccessIPv6PolicySequenceMatch = DeviceAccessIPv6PolicySequenceMatch() @@ -84,8 +84,8 @@ def associate_count_action(self, counter_name: str) -> None: class DeviceAccessIPv6Policy(DeviceAccessIPv6PolicyHeader, DefinitionWithSequencesCommonBase): sequences: List[DeviceAccessIPv6PolicySequence] = [] - default_action: DefaultAction = Field( - default=DefaultAction(type="drop"), + default_action: BasicPolicyAction = Field( + default=BasicPolicyAction(type="drop"), serialization_alias="defaultAction", validation_alias="defaultAction", ) @@ -94,7 +94,7 @@ class DeviceAccessIPv6Policy(DeviceAccessIPv6PolicyHeader, DefinitionWithSequenc def add_acl_sequence( self, name: str = "Device Access Control List", - base_action: PolicyActionType = "accept", + base_action: BasicPolicyActionType = "accept", device_access_protocol: Optional[DeviceAccessProtocol] = None, ) -> DeviceAccessIPv6PolicySequence: seq = DeviceAccessIPv6PolicySequence( diff --git a/catalystwan/models/policy/definition/traffic_data.py b/catalystwan/models/policy/definition/traffic_data.py index c13b173e..c8bf3f94 100644 --- a/catalystwan/models/policy/definition/traffic_data.py +++ b/catalystwan/models/policy/definition/traffic_data.py @@ -10,6 +10,8 @@ from catalystwan.models.common import EncapType, ICMPMessageType, ServiceChainNumber, TLOCColor from catalystwan.models.policy.policy_definition import ( AppListEntry, + BasicPolicyAction, + BasicPolicyActionType, CFlowDAction, CountAction, DefinitionWithSequencesCommonBase, @@ -40,7 +42,6 @@ PacketLengthEntry, PLPEntry, PolicerListEntry, - PolicyActionType, PolicyDefinitionBase, PolicyDefinitionGetResponse, PolicyDefinitionId, @@ -365,10 +366,15 @@ def associate_secure_internet_gateway_action(self, fallback_to_routing: bool = F class TrafficDataPolicy(TrafficDataPolicyHeader, DefinitionWithSequencesCommonBase): sequences: List[TrafficDataPolicySequence] = [] + default_action: BasicPolicyAction = Field( + default=BasicPolicyAction(type="drop"), + serialization_alias="defaultAction", + validation_alias="defaultAction", + ) model_config = ConfigDict(populate_by_name=True) def add_ipv4_sequence( - self, name: str = "Custom", base_action: PolicyActionType = "drop", log: bool = False + self, name: str = "Custom", base_action: BasicPolicyActionType = "drop", log: bool = False ) -> TrafficDataPolicySequence: seq = TrafficDataPolicySequence( sequence_name=name, diff --git a/catalystwan/models/policy/definition/zone_based_firewall.py b/catalystwan/models/policy/definition/zone_based_firewall.py index 9c4e1864..5f089aef 100644 --- a/catalystwan/models/policy/definition/zone_based_firewall.py +++ b/catalystwan/models/policy/definition/zone_based_firewall.py @@ -24,7 +24,7 @@ DestinationScalableGroupTagListEntry, LogAction, Match, - PolicyActionType, + PolicyActionBase, PolicyDefinitionBase, PolicyDefinitionGetResponse, PolicyDefinitionId, @@ -90,6 +90,13 @@ Field(discriminator="type"), ] +ZoneBasedFirewallDefaultActionType = Literal["drop", "pass"] +ZoneBasedFirewallBaseActionType = Literal["drop", "pass", "inspect"] + + +class ZoneBasedFirewallDefaultAction(PolicyActionBase): + type: ZoneBasedFirewallDefaultActionType + class ZoneBasedFWPolicyMatches(Match): entries: List[ZoneBasedFWPolicySequenceEntry] = [] @@ -155,7 +162,7 @@ def match_protocol_names(self, names: Set[str], protocol_map: Dict[str, Applicat for name in names: app_protocol = protocol_map.get(name, None) if app_protocol is None: - raise ValueError(f"{name} not found in protocol map keys: {protocol_map.keys()}") + raise ValueError(f"{name} not found in protocol map keys: {protocol_map.keys()}") # noqa: E713 app_protocols.append(app_protocol) self._insert_match(ProtocolNameEntry.from_application_protocols(app_protocols)) self._insert_match(DestinationPortEntry.from_application_protocols(app_protocols), False) @@ -204,6 +211,11 @@ class ZoneBasedFWPolicyHeader(PolicyDefinitionBase): class ZoneBasedFWPolicyDefinition(DefinitionWithSequencesCommonBase): + default_action: ZoneBasedFirewallDefaultAction = Field( + default=ZoneBasedFirewallDefaultAction(type="drop"), + serialization_alias="defaultAction", + validation_alias="defaultAction", + ) sequences: List[Union[ZoneBasedFWPolicySequence, ZoneBasedFWPolicySequenceWithRuleSets]] = [] entries: List[ZoneBasedFWPolicyEntry] = [] @@ -214,7 +226,7 @@ class ZoneBasedFWPolicy(ZoneBasedFWPolicyHeader): definition: ZoneBasedFWPolicyDefinition = ZoneBasedFWPolicyDefinition() def add_ipv4_rule( - self, name: str, base_action: PolicyActionType = "drop", log: bool = False + self, name: str, base_action: ZoneBasedFirewallBaseActionType = "drop", log: bool = False ) -> ZoneBasedFWPolicySequence: """Adds new IPv4 Rule to Zone Based Firewall Policy @@ -238,7 +250,7 @@ def add_ipv4_rule( return sequence def add_ipv4_rule_sets( - self, name: str, base_action: PolicyActionType = "drop", log: bool = False + self, name: str, base_action: ZoneBasedFirewallBaseActionType = "drop", log: bool = False ) -> ZoneBasedFWPolicySequenceWithRuleSets: sequence = ZoneBasedFWPolicySequenceWithRuleSets( sequence_name=name, diff --git a/catalystwan/models/policy/policy_definition.py b/catalystwan/models/policy/policy_definition.py index 3f5adbaf..c9a566eb 100644 --- a/catalystwan/models/policy/policy_definition.py +++ b/catalystwan/models/policy/policy_definition.py @@ -92,13 +92,8 @@ def networks_to_str(networks: Sequence[Union[IPv4Network, IPv6Network]]) -> str: "all", ] -PolicyActionType = Literal[ - "drop", - "accept", - "pass", - "inspect", - "reject", -] + +BasicPolicyActionType = Literal["accept", "drop"] SequenceType = Literal[ "applicationFirewall", @@ -953,9 +948,7 @@ class Action(BaseModel): class PolicyDefinitionSequenceBase(BaseModel): sequence_id: int = Field(default=0, serialization_alias="sequenceId", validation_alias="sequenceId") sequence_name: str = Field(serialization_alias="sequenceName", validation_alias="sequenceName") - base_action: PolicyActionType = Field( - default="drop", serialization_alias="baseAction", validation_alias="baseAction" - ) + base_action: str = Field(serialization_alias="baseAction", validation_alias="baseAction") sequence_type: SequenceType = Field(serialization_alias="sequenceType", validation_alias="sequenceType") sequence_ip_type: Optional[SequenceIpType] = Field( default="ipv4", serialization_alias="sequenceIpType", validation_alias="sequenceIpType" @@ -1054,8 +1047,12 @@ def wrapper(self: PolicyDefinitionSequenceBase, *args, **kwargs): return wrapper -class DefaultAction(BaseModel): - type: PolicyActionType +class PolicyActionBase(BaseModel): + type: str + + +class BasicPolicyAction(PolicyActionBase): + type: BasicPolicyActionType class InfoTag(BaseModel): @@ -1072,8 +1069,8 @@ class PolicyReference(BaseModel): class DefinitionWithSequencesCommonBase(BaseModel): - default_action: Optional[DefaultAction] = Field( - default=DefaultAction(type="drop"), + default_action: Optional[PolicyActionBase] = Field( + default=None, serialization_alias="defaultAction", validation_alias="defaultAction", )