Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Policy: Added new class for creation of centralized policy lists and vSmart policy #222

Merged
merged 16 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions ENDPOINTS.md

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion vmngclient/endpoints/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,14 +519,18 @@ def merge_args(self, positional_args: Tuple, keyword_args: Dict[str, Any]) -> Di
Returns: Dict[str, Any]: all passed args as keyword arguments (excluding "self")
"""
all_args_names = [key for key in self.sig.parameters.keys()]
all_args_dict = dict(zip(all_args_names, positional_args))
all_args_dict = dict(self.defaults)
all_args_dict.update(dict(zip(all_args_names, positional_args)))
all_args_dict.update(keyword_args)
all_args_dict.pop("self", None)
return all_args_dict

def __call__(self, func):
original_func = getattr(func, "_ofunc", func) # grab original function
self.sig = signature(original_func)
self.defaults = {
key: value.default for (key, value) in self.sig.parameters.items() if value.default is not _empty
}
self.return_spec = self.specify_return_type()
self.payload_spec = self.specify_payload_type()
self.check_params()
Expand Down
262 changes: 262 additions & 0 deletions vmngclient/endpoints/configuration_policy_data_definition_builder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
# mypy: disable-error-code="empty-body"
from enum import Enum
from typing import Any, List, Optional, Union

from pydantic import BaseModel, Field, IPvAnyNetwork
from typing_extensions import Annotated, Literal

from vmngclient.endpoints import APIEndpoints, delete, get, post, put
from vmngclient.model.policy_definition import (
PolicyDefinition,
PolicyDefinitionCreationPayload,
PolicyDefinitionEditPayload,
PolicyDefinitionEditResponse,
PolicyDefinitionId,
PolicyDefinitionPreview,
)
from vmngclient.typed_list import DataSequence

# TODO: add validators for custom strings (eg.: port ranges, space separated networks)
# TODO: model actions


class DefaultActionType(str, Enum):
DROP = "drop"
ACCEPT = "accept"


class PLPEntryValues(str, Enum):
LOW = "low"
HIGH = "high"


class DNSEntryValues(str, Enum):
REQUEST = "request"
RESPONSE = "response"


class TrafficToEntryValues(str, Enum):
ACCESS = "access"
CORE = "core"
SERVICE = "service"


class DestinationRegionEntryValues(str, Enum):
PRIMARY = "primary-region"
SECONDARY = "secondary-region"
OTHER = "other-region"


class SequenceIpType(str, Enum):
IPV4 = "ipv4"
IPV6 = "ipv6"
ALL = "all"


class BaseAction(str, Enum):
DROP = "drop"
ACCEPT = "accept"


class SequenceType(str, Enum):
APPLICATION_FIREWALL = "applicationFirewall"
DATA = "data"
SERVICE_CHAINING = "serviceChaining"
TRAFFIC_ENGINEERING = "trafficEngineering"
QOS = "qos"


class PacketLengthEntry(BaseModel):
field: Literal["packetLength"]
value: str = Field(description="0-65536 range or single number")


class PLPEntry(BaseModel):
field: Literal["plp"]
value: PLPEntryValues


class ProtocolEntry(BaseModel):
field: Literal["protocol"]
value: str = Field(description="0-255 single numbers separate by space")


class DSCPEntry(BaseModel):
field: Literal["dscp"]
value: str = Field(description="0-63 single numbers separate by space")


class SourceIPEntry(BaseModel):
field: Literal["sourceIp"]
value: str = Field(description="IP network specifier separate by space")


class SourcePortEntry(BaseModel):
field: Literal["sourcePort"]
value: str = Field(description="0-65535 range or separate by space")


class DestinationIPEntry(BaseModel):
field: Literal["destinationIp"]
value: IPvAnyNetwork


class DestinationPortEntry(BaseModel):
field: Literal["destinationPort"]
value: str = Field(description="0-65535 range or separate by space")


class TCPEntry(BaseModel):
field: Literal["tcp"]
value: Literal["syn"]


class DNSEntry(BaseModel):
field: Literal["dns"]
value: DNSEntryValues


class TrafficToEntry(BaseModel):
field: Literal["trafficTo"]
value: TrafficToEntryValues


class DestinationRegionEntry(BaseModel):
field: Literal["destinationRegion"]
value: DestinationRegionEntryValues


class SourceDataPrefixListEntry(BaseModel):
field: Literal["sourceDataPrefixList"]
ref: str


class DestinationDataPrefixListEntry(BaseModel):
field: Literal["destinationDataPrefixList"]
ref: str


class SourceDataIPv6PrefixListEntry(BaseModel):
field: Literal["sourceDataIpv6PrefixList"]
ref: str


class DestinationDataIPv6PrefixListEntry(BaseModel):
field: Literal["destinationDataIpv6PrefixList"]
ref: str


class DNSAppListEntry(BaseModel):
field: Literal["dnsAppList"]
ref: str


class AppListEntry(BaseModel):
field: Literal["appList"]
ref: str


Entry = Annotated[
Union[
PacketLengthEntry,
PLPEntry,
ProtocolEntry,
DSCPEntry,
SourceIPEntry,
SourcePortEntry,
DestinationIPEntry,
DestinationPortEntry,
TCPEntry,
DNSEntry,
TrafficToEntry,
SourceDataPrefixListEntry,
DestinationDataPrefixListEntry,
SourceDataIPv6PrefixListEntry,
DestinationDataIPv6PrefixListEntry,
DestinationRegionEntry,
DNSAppListEntry,
AppListEntry,
],
Field(discriminator="field"),
]


class Match(BaseModel):
entries: List[Entry]


class Action(BaseModel):
pass


class Sequence(BaseModel):
sequence_id: int = Field(alias="sequenceId")
sequence_name: str = Field(alias="sequenceName")
base_action: BaseAction = Field(alias="baseAction")
sequence_type: SequenceType = Field(alias="sequenceType")
sequence_ip_type: SequenceIpType = Field(alias="sequenceIpType")
match: Match
actions: List[Any]


class DefaultAction(BaseModel):
type: DefaultActionType


class Data(BaseModel):
type: str = Field(default="data", const=True)
default_action: Optional[DefaultAction] = Field(
default=DefaultAction(type=DefaultActionType.DROP), alias="defaultAction"
)
is_activated_by_vsmart: Optional[bool] = Field(default=False, alias="isActivatedByVsmart")
sequences: List[Sequence] = []


class DataDefinitionCreationPayload(Data, PolicyDefinitionCreationPayload):
pass


class DataDefinitionEditPayload(Data, PolicyDefinitionEditPayload):
pass


class DataDefinition(Data, PolicyDefinition):
pass


class ConfigurationPolicyDataDefinitionBuilder(APIEndpoints):
@post("/template/policy/definition/data")
def create_policy_definition(self, payload: DataDefinitionCreationPayload) -> PolicyDefinitionId:
...

@delete("/template/policy/definition/data/{id}")
def delete_policy_definition(self, id: str) -> None:
...

def edit_multiple_policy_definition(self):
# PUT /template/policy/definition/data/multiple/{id}
...

@put("/template/policy/definition/data/{id}")
def edit_policy_definition(self, id: str, payload: DataDefinitionEditPayload) -> PolicyDefinitionEditResponse:
...

@get("/template/policy/definition/data", "data")
def get_definitions(self) -> DataSequence[PolicyDefinition]:
...

@get("/template/policy/definition/data/{id}")
def get_policy_definition(self, id: str) -> DataDefinition:
...

@post("/template/policy/definition/data/preview")
def preview_policy_definition(self, payload: DataDefinitionCreationPayload) -> PolicyDefinitionPreview:
...

@get("/template/policy/definition/data/preview/{id}")
def preview_policy_definition_by_id(self, id: str) -> PolicyDefinitionPreview:
...

def save_policy_definition_in_bulk(self):
# PUT /template/policy/definition/data/bulk
...
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# mypy: disable-error-code="empty-body"
from ipaddress import IPv4Network
from typing import List

from pydantic import BaseModel, Field, validator

from vmngclient.endpoints import APIEndpoints, delete, get, post, put
from vmngclient.model.policy_list import (
InfoTag,
PolicyList,
PolicyListCreationPayload,
PolicyListEditPayload,
PolicyListId,
PolicyListPreview,
)
from vmngclient.typed_list import DataSequence


class DataPrefixListEntry(BaseModel):
class Config:
allow_population_by_field_name = True

ip_prefix: str = Field(alias="ipPrefix", description="IP4 network prefixes separated by comma")

@validator("ip_prefix")
def check_network_prefixes(cls, ip_prefix: str):
nets = [IPv4Network(net.strip()) for net in ip_prefix.split(",")]
if len(nets) < 1:
raise ValueError("No network prefix provided")
return ip_prefix


class DataPrefixPayload(BaseModel):
entries: List[DataPrefixListEntry]
type: str = Field(default="dataPrefix", const=True)


class DataPrefixListCreationPayload(DataPrefixPayload, PolicyListCreationPayload):
pass


class DataPrefixListEditPayload(DataPrefixPayload, PolicyListEditPayload):
pass


class DataPrefixList(DataPrefixPayload, PolicyList):
pass


class ConfigurationPolicyDataPrefixListBuilder(APIEndpoints):
@post("/template/policy/list/dataprefix")
def create_policy_list(self, payload: DataPrefixListCreationPayload) -> PolicyListId:
...

@delete("/template/policy/list/dataprefix/{id}")
def delete_policy_list(self, id: str) -> None:
...

@delete("/template/policy/list/dataprefix")
def delete_policy_lists_with_info_tag(self, params: InfoTag) -> None:
# TODO: dont know how to assing tags to check if filter works
# (it is present in GET response but cannot be added to POST, PUT payload)
# for now it was tested with default info tag value == ""
...

@put("/template/policy/list/dataprefix/{id}")
def edit_policy_list(self, id: str, payload: DataPrefixListEditPayload) -> None:
...

@get("/template/policy/list/dataprefix/{id}")
def get_lists_by_id(self, id: str) -> DataPrefixList:
...

@get("/template/policy/list/dataprefix", "data")
def get_policy_lists(self) -> DataSequence[DataPrefixList]:
...

@get("/template/policy/list/dataprefix/filtered", "data")
def get_policy_lists_with_info_tag(self, params: InfoTag) -> DataSequence[DataPrefixList]:
# TODO: dont know how to assing tags to check if filter works
# (it is present in GET response but cannot be added to POST, PUT payload)
# for now it was tested with default info tag value == ""
...

@post("/template/policy/list/dataprefix/preview")
def preview_policy_list(self, payload: DataPrefixListCreationPayload) -> PolicyListPreview:
# TODO: not working for some reason
...

@get("/template/policy/list/dataprefix/preview/{id}")
def preview_policy_list_by_id(self, id: str) -> PolicyListPreview:
...
Loading