Skip to content
This repository has been archived by the owner on Nov 21, 2024. It is now read-only.

Commit

Permalink
Spliting DefaultAction into new types (#669)
Browse files Browse the repository at this point in the history
* Default action splited to new types

* renamed AccessPolicyActionType to BasicPolicyActionType as it is commonly reused, added missing field specifier for defaultAction in multiple policies, fix typo in ControlBaseActionType

---------

Co-authored-by: sbasan <[email protected]>
  • Loading branch information
radkrawczyk and sbasan authored May 16, 2024
1 parent bce4d58 commit 9c60365
Show file tree
Hide file tree
Showing 9 changed files with 73 additions and 54 deletions.
4 changes: 2 additions & 2 deletions catalystwan/models/policy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,13 @@
from .definition.zone_based_firewall import ZoneBasedFWPolicy, ZoneBasedFWPolicyGetResponse
from .localized import LocalizedPolicy
from .policy_definition import (
BasicPolicyActionType,
Carrier,
DNSTypeEntryType,
MultiRegionRole,
OriginProtocol,
PathType,
PLPEntryType,
PolicyActionType,
ServiceType,
TLOCActionType,
)
Expand Down Expand Up @@ -248,7 +248,7 @@
"PathType",
"PLPEntryType",
"PolicerList",
"PolicyActionType",
"BasicPolicyActionType",
"PortList",
"PreferredColorGroupList",
"PrefixList",
Expand Down
12 changes: 6 additions & 6 deletions catalystwan/models/policy/definition/access_control_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@
from typing_extensions import Annotated

from catalystwan.models.policy.policy_definition import (
BasicPolicyAction,
BasicPolicyActionType,
ClassMapAction,
ClassMapListEntry,
CountAction,
DefaultAction,
DefinitionWithSequencesCommonBase,
DestinationDataPrefixListEntry,
DestinationIPEntry,
Expand All @@ -24,7 +25,6 @@
PacketLengthEntry,
PLPEntry,
PolicerAction,
PolicyActionType,
PolicyDefinitionBase,
PolicyDefinitionGetResponse,
PolicyDefinitionId,
Expand Down Expand Up @@ -71,7 +71,7 @@ class AclPolicySequence(PolicyDefinitionSequenceBase):
sequence_type: Literal["acl"] = Field(
default="acl", serialization_alias="sequenceType", validation_alias="sequenceType"
)
base_action: PolicyActionType = Field(
base_action: BasicPolicyActionType = Field(
default="accept", serialization_alias="baseAction", validation_alias="baseAction"
)
match: AclPolicySequenceMatch = AclPolicySequenceMatch()
Expand Down Expand Up @@ -146,15 +146,15 @@ def associate_policer_list_action(self, policer_list_id: UUID) -> None:

class AclPolicy(AclPolicyHeader, DefinitionWithSequencesCommonBase):
sequences: List[AclPolicySequence] = []
default_action: DefaultAction = Field(
default=DefaultAction(type="drop"),
default_action: BasicPolicyAction = Field(
default=BasicPolicyAction(type="drop"),
serialization_alias="defaultAction",
validation_alias="defaultAction",
)
model_config = ConfigDict(populate_by_name=True)

def add_acl_sequence(
self, name: str = "Access Control List", base_action: PolicyActionType = "accept"
self, name: str = "Access Control List", base_action: BasicPolicyActionType = "accept"
) -> AclPolicySequence:
seq = AclPolicySequence(
sequence_name=name,
Expand Down
12 changes: 6 additions & 6 deletions catalystwan/models/policy/definition/access_control_list_ipv6.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@
from typing_extensions import Annotated

from catalystwan.models.policy.policy_definition import (
BasicPolicyAction,
BasicPolicyActionType,
ClassMapAction,
ClassMapListEntry,
CountAction,
DefaultAction,
DefinitionWithSequencesCommonBase,
DestinationDataIPv6PrefixListEntry,
DestinationIPv6Entry,
Expand All @@ -24,7 +25,6 @@
PacketLengthEntry,
PLPEntry,
PolicerAction,
PolicyActionType,
PolicyDefinitionBase,
PolicyDefinitionGetResponse,
PolicyDefinitionId,
Expand Down Expand Up @@ -71,7 +71,7 @@ class AclIPv6PolicySequence(PolicyDefinitionSequenceBase):
sequence_type: Literal["aclv6"] = Field(
default="aclv6", serialization_alias="sequenceType", validation_alias="sequenceType"
)
base_action: PolicyActionType = Field(
base_action: BasicPolicyActionType = Field(
default="accept", serialization_alias="baseAction", validation_alias="baseAction"
)
match: AclIPv6PolicySequenceMatch = AclIPv6PolicySequenceMatch()
Expand Down Expand Up @@ -146,15 +146,15 @@ def associate_policer_list_action(self, policer_list_id: UUID) -> None:

class AclIPv6Policy(AclIPv6PolicyHeader, DefinitionWithSequencesCommonBase):
sequences: List[AclIPv6PolicySequence] = []
default_action: DefaultAction = Field(
default=DefaultAction(type="drop"),
default_action: BasicPolicyAction = Field(
default=BasicPolicyAction(type="drop"),
serialization_alias="defaultAction",
validation_alias="defaultAction",
)
model_config = ConfigDict(populate_by_name=True)

def add_acl_sequence(
self, name: str = "Access Control List", base_action: PolicyActionType = "accept"
self, name: str = "Access Control List", base_action: BasicPolicyActionType = "accept"
) -> AclIPv6PolicySequence:
seq = AclIPv6PolicySequence(
sequence_name=name,
Expand Down
20 changes: 12 additions & 8 deletions catalystwan/models/policy/definition/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
CommunityAdditiveEntry,
CommunityEntry,
CommunityListEntry,
DefaultAction,
DefinitionWithSequencesCommonBase,
DomainIDEntry,
ExpandedCommunityListEntry,
Expand All @@ -30,7 +29,7 @@
OriginProtocol,
PathType,
PathTypeEntry,
PolicyActionType,
PolicyActionBase,
PolicyDefinitionBase,
PolicyDefinitionGetResponse,
PolicyDefinitionId,
Expand Down Expand Up @@ -102,6 +101,11 @@

ControlPolicyRouteSequenceActions = Any # TODO
ControlPolicyTLOCSequenceActions = Any # TODO
ControlPolicyBaseActionType = Literal["accept", "reject"]


class ControlPolicyBaseAction(PolicyActionBase):
type: ControlPolicyBaseActionType


class ControlPolicyHeader(PolicyDefinitionBase):
Expand All @@ -120,7 +124,7 @@ class ControlPolicyRouteSequence(PolicyDefinitionSequenceBase):
sequence_type: Literal["route"] = Field(
default="route", serialization_alias="sequenceType", validation_alias="sequenceType"
)
base_action: PolicyActionType = Field(
base_action: ControlPolicyBaseActionType = Field(
default="reject", serialization_alias="baseAction", validation_alias="baseAction"
)
match: ControlPolicyRouteSequenceMatch = ControlPolicyRouteSequenceMatch()
Expand Down Expand Up @@ -239,7 +243,7 @@ class ControlPolicyTLOCSequence(PolicyDefinitionSequenceBase):
sequence_type: Literal["tloc"] = Field(
default="tloc", serialization_alias="sequenceType", validation_alias="sequenceType"
)
base_action: PolicyActionType = Field(
base_action: ControlPolicyBaseActionType = Field(
default="reject", serialization_alias="baseAction", validation_alias="baseAction"
)
match: ControlPolicyTLOCSequenceMatch = ControlPolicyTLOCSequenceMatch()
Expand Down Expand Up @@ -314,15 +318,15 @@ def associate_affinity_action(self, affinity: int) -> None:

class ControlPolicy(ControlPolicyHeader, DefinitionWithSequencesCommonBase):
sequences: List[AnyControlPolicySequence] = []
default_action: DefaultAction = Field(
default=DefaultAction(type="reject"),
default_action: ControlPolicyBaseAction = Field(
default=ControlPolicyBaseAction(type="reject"),
serialization_alias="defaultAction",
validation_alias="defaultAction",
)
model_config = ConfigDict(populate_by_name=True)

def add_route_sequence(
self, name: str = "Route", base_action: PolicyActionType = "reject"
self, name: str = "Route", base_action: ControlPolicyBaseActionType = "reject"
) -> ControlPolicyRouteSequence:
seq = ControlPolicyRouteSequence(
sequence_name=name,
Expand All @@ -333,7 +337,7 @@ def add_route_sequence(
return seq

def add_tloc_sequence(
self, name: str = "TLOC", base_action: PolicyActionType = "reject"
self, name: str = "TLOC", base_action: ControlPolicyBaseActionType = "reject"
) -> ControlPolicyTLOCSequence:
seq = ControlPolicyTLOCSequence(
sequence_name=name,
Expand Down
12 changes: 6 additions & 6 deletions catalystwan/models/policy/definition/device_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@
from typing_extensions import Annotated

from catalystwan.models.policy.policy_definition import (
BasicPolicyAction,
BasicPolicyActionType,
CountAction,
DefaultAction,
DefinitionWithSequencesCommonBase,
DestinationDataPrefixListEntry,
DestinationIPEntry,
DestinationPortEntry,
DeviceAccessProtocol,
Match,
PolicyActionType,
PolicyDefinitionBase,
PolicyDefinitionGetResponse,
PolicyDefinitionId,
Expand Down Expand Up @@ -53,7 +53,7 @@ class DeviceAccessPolicySequence(PolicyDefinitionSequenceBase):
sequence_type: Literal["deviceaccesspolicy"] = Field(
default="deviceaccesspolicy", serialization_alias="sequenceType", validation_alias="sequenceType"
)
base_action: PolicyActionType = Field(
base_action: BasicPolicyActionType = Field(
default="accept", serialization_alias="baseAction", validation_alias="baseAction"
)
match: DeviceAccessPolicySequenceMatch = DeviceAccessPolicySequenceMatch()
Expand Down Expand Up @@ -84,8 +84,8 @@ def associate_count_action(self, counter_name: str) -> None:

class DeviceAccessPolicy(DeviceAccessPolicyHeader, DefinitionWithSequencesCommonBase):
sequences: List[DeviceAccessPolicySequence] = []
default_action: DefaultAction = Field(
default=DefaultAction(type="drop"),
default_action: BasicPolicyAction = Field(
default=BasicPolicyAction(type="drop"),
serialization_alias="defaultAction",
validation_alias="defaultAction",
)
Expand All @@ -94,7 +94,7 @@ class DeviceAccessPolicy(DeviceAccessPolicyHeader, DefinitionWithSequencesCommon
def add_acl_sequence(
self,
name: str = "Device Access Control List",
base_action: PolicyActionType = "accept",
base_action: BasicPolicyActionType = "accept",
device_access_protocol: Optional[DeviceAccessProtocol] = None,
) -> DeviceAccessPolicySequence:
seq = DeviceAccessPolicySequence(
Expand Down
12 changes: 6 additions & 6 deletions catalystwan/models/policy/definition/device_access_ipv6.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@
from typing_extensions import Annotated

from catalystwan.models.policy.policy_definition import (
BasicPolicyAction,
BasicPolicyActionType,
CountAction,
DefaultAction,
DefinitionWithSequencesCommonBase,
DestinationDataIPv6PrefixListEntry,
DestinationIPv6Entry,
DestinationPortEntry,
DeviceAccessProtocol,
Match,
PolicyActionType,
PolicyDefinitionBase,
PolicyDefinitionGetResponse,
PolicyDefinitionId,
Expand Down Expand Up @@ -53,7 +53,7 @@ class DeviceAccessIPv6PolicySequence(PolicyDefinitionSequenceBase):
sequence_type: Literal["deviceaccesspolicyv6"] = Field(
default="deviceaccesspolicyv6", serialization_alias="sequenceType", validation_alias="sequenceType"
)
base_action: PolicyActionType = Field(
base_action: BasicPolicyActionType = Field(
default="accept", serialization_alias="baseAction", validation_alias="baseAction"
)
match: DeviceAccessIPv6PolicySequenceMatch = DeviceAccessIPv6PolicySequenceMatch()
Expand Down Expand Up @@ -84,8 +84,8 @@ def associate_count_action(self, counter_name: str) -> None:

class DeviceAccessIPv6Policy(DeviceAccessIPv6PolicyHeader, DefinitionWithSequencesCommonBase):
sequences: List[DeviceAccessIPv6PolicySequence] = []
default_action: DefaultAction = Field(
default=DefaultAction(type="drop"),
default_action: BasicPolicyAction = Field(
default=BasicPolicyAction(type="drop"),
serialization_alias="defaultAction",
validation_alias="defaultAction",
)
Expand All @@ -94,7 +94,7 @@ class DeviceAccessIPv6Policy(DeviceAccessIPv6PolicyHeader, DefinitionWithSequenc
def add_acl_sequence(
self,
name: str = "Device Access Control List",
base_action: PolicyActionType = "accept",
base_action: BasicPolicyActionType = "accept",
device_access_protocol: Optional[DeviceAccessProtocol] = None,
) -> DeviceAccessIPv6PolicySequence:
seq = DeviceAccessIPv6PolicySequence(
Expand Down
10 changes: 8 additions & 2 deletions catalystwan/models/policy/definition/traffic_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from catalystwan.models.common import EncapType, ICMPMessageType, ServiceChainNumber, TLOCColor
from catalystwan.models.policy.policy_definition import (
AppListEntry,
BasicPolicyAction,
BasicPolicyActionType,
CFlowDAction,
CountAction,
DefinitionWithSequencesCommonBase,
Expand Down Expand Up @@ -40,7 +42,6 @@
PacketLengthEntry,
PLPEntry,
PolicerListEntry,
PolicyActionType,
PolicyDefinitionBase,
PolicyDefinitionGetResponse,
PolicyDefinitionId,
Expand Down Expand Up @@ -365,10 +366,15 @@ def associate_secure_internet_gateway_action(self, fallback_to_routing: bool = F

class TrafficDataPolicy(TrafficDataPolicyHeader, DefinitionWithSequencesCommonBase):
sequences: List[TrafficDataPolicySequence] = []
default_action: BasicPolicyAction = Field(
default=BasicPolicyAction(type="drop"),
serialization_alias="defaultAction",
validation_alias="defaultAction",
)
model_config = ConfigDict(populate_by_name=True)

def add_ipv4_sequence(
self, name: str = "Custom", base_action: PolicyActionType = "drop", log: bool = False
self, name: str = "Custom", base_action: BasicPolicyActionType = "drop", log: bool = False
) -> TrafficDataPolicySequence:
seq = TrafficDataPolicySequence(
sequence_name=name,
Expand Down
20 changes: 16 additions & 4 deletions catalystwan/models/policy/definition/zone_based_firewall.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
DestinationScalableGroupTagListEntry,
LogAction,
Match,
PolicyActionType,
PolicyActionBase,
PolicyDefinitionBase,
PolicyDefinitionGetResponse,
PolicyDefinitionId,
Expand Down Expand Up @@ -90,6 +90,13 @@
Field(discriminator="type"),
]

ZoneBasedFirewallDefaultActionType = Literal["drop", "pass"]
ZoneBasedFirewallBaseActionType = Literal["drop", "pass", "inspect"]


class ZoneBasedFirewallDefaultAction(PolicyActionBase):
type: ZoneBasedFirewallDefaultActionType


class ZoneBasedFWPolicyMatches(Match):
entries: List[ZoneBasedFWPolicySequenceEntry] = []
Expand Down Expand Up @@ -155,7 +162,7 @@ def match_protocol_names(self, names: Set[str], protocol_map: Dict[str, Applicat
for name in names:
app_protocol = protocol_map.get(name, None)
if app_protocol is None:
raise ValueError(f"{name} not found in protocol map keys: {protocol_map.keys()}")
raise ValueError(f"{name} not found in protocol map keys: {protocol_map.keys()}") # noqa: E713
app_protocols.append(app_protocol)
self._insert_match(ProtocolNameEntry.from_application_protocols(app_protocols))
self._insert_match(DestinationPortEntry.from_application_protocols(app_protocols), False)
Expand Down Expand Up @@ -204,6 +211,11 @@ class ZoneBasedFWPolicyHeader(PolicyDefinitionBase):


class ZoneBasedFWPolicyDefinition(DefinitionWithSequencesCommonBase):
default_action: ZoneBasedFirewallDefaultAction = Field(
default=ZoneBasedFirewallDefaultAction(type="drop"),
serialization_alias="defaultAction",
validation_alias="defaultAction",
)
sequences: List[Union[ZoneBasedFWPolicySequence, ZoneBasedFWPolicySequenceWithRuleSets]] = []
entries: List[ZoneBasedFWPolicyEntry] = []

Expand All @@ -214,7 +226,7 @@ class ZoneBasedFWPolicy(ZoneBasedFWPolicyHeader):
definition: ZoneBasedFWPolicyDefinition = ZoneBasedFWPolicyDefinition()

def add_ipv4_rule(
self, name: str, base_action: PolicyActionType = "drop", log: bool = False
self, name: str, base_action: ZoneBasedFirewallBaseActionType = "drop", log: bool = False
) -> ZoneBasedFWPolicySequence:
"""Adds new IPv4 Rule to Zone Based Firewall Policy
Expand All @@ -238,7 +250,7 @@ def add_ipv4_rule(
return sequence

def add_ipv4_rule_sets(
self, name: str, base_action: PolicyActionType = "drop", log: bool = False
self, name: str, base_action: ZoneBasedFirewallBaseActionType = "drop", log: bool = False
) -> ZoneBasedFWPolicySequenceWithRuleSets:
sequence = ZoneBasedFWPolicySequenceWithRuleSets(
sequence_name=name,
Expand Down
Loading

0 comments on commit 9c60365

Please sign in to comment.