Skip to content

Commit

Permalink
Ability to use a forEach on a jsonpatch
Browse files Browse the repository at this point in the history
Now, we can iterate over a list and apply a jsonpatch on each element of
the list. For example, we can remove the limits on all the containers
from a pod:

```yaml
patch:
  op: forEach
  elements: .spec.containers
  patch:
    - op: remove
      path: .limits
```
  • Loading branch information
jordipiqueselles committed May 12, 2024
1 parent bc26b6d commit 10be3c3
Show file tree
Hide file tree
Showing 8 changed files with 226 additions and 81 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ format: build

.PHONY: unittests
unittests: build
poetry run pytest tests
poetry run pytest tests --cov=generic_k8s_webhook
poetry run coverage html

.PHONY: check-pyproject
Expand Down
1 change: 0 additions & 1 deletion generic_k8s_webhook/config_parser/expr_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,6 @@ def __init__(self) -> None:

def parse(self, raw_string: str) -> op.Operator:
tree = self.parser.parse(raw_string)
print(tree.pretty()) # debug mode
operator = self.transformer.transform(tree)
return operator

Expand Down
79 changes: 48 additions & 31 deletions generic_k8s_webhook/config_parser/jsonpatch_parser.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import abc

import generic_k8s_webhook.config_parser.operator_parser as op_parser
from generic_k8s_webhook import jsonpatch_helpers, utils
from generic_k8s_webhook import jsonpatch_helpers, utils, operators
from generic_k8s_webhook.config_parser.common import ParsingException


Expand All @@ -18,6 +18,36 @@ def _parse_path(self, raw_elem: dict, key: str) -> list[str]:
return path[1:]


class IJsonPatchParser(abc.ABC):
def parse(self, raw_patch: list, path_op: str) -> list[jsonpatch_helpers.JsonPatchOperator]:
patch = []
dict_parse_op = self._get_dict_parse_op()
for i, raw_elem in enumerate(raw_patch):
op = utils.must_pop(raw_elem, "op", f"Missing key 'op' in {raw_elem}")

# Select the appropiate class needed to parse the operation "op"
if op not in dict_parse_op:
raise ParsingException(f"Unsupported patch operation {op} on {path_op}")
parse_op = dict_parse_op[op]
try:
parsed_elem = parse_op.parse(raw_elem, f"{path_op}.{i}")
except Exception as e:
raise ParsingException(f"Error when parsing {path_op}") from e

# Make sure we have extracted all the keys from "raw_elem"
if len(raw_elem) > 0:
raise ValueError(f"Unexpected keys {raw_elem}")
patch.append(parsed_elem)

return patch

@abc.abstractmethod
def _get_dict_parse_op(self) -> dict[str, ParserOp]:
"""A dictionary with the classes that can parse the json patch operations
supported by this JsonPatchParser
"""


class ParseAdd(ParserOp):
def parse(self, raw_elem: dict, path_op: str) -> jsonpatch_helpers.JsonPatchOperator:
path = self._parse_path(raw_elem, "path")
Expand Down Expand Up @@ -70,34 +100,23 @@ def parse(self, raw_elem: dict, path_op: str) -> jsonpatch_helpers.JsonPatchOper
return jsonpatch_helpers.JsonPatchExpr(path, operator)


class IJsonPatchParser(abc.ABC):
def parse(self, raw_patch: list, path_op: str) -> list[jsonpatch_helpers.JsonPatchOperator]:
patch = []
dict_parse_op = self._get_dict_parse_op()
for i, raw_elem in enumerate(raw_patch):
op = utils.must_pop(raw_elem, "op", f"Missing key 'op' in {raw_elem}")

# Select the appropiate class needed to parse the operation "op"
if op not in dict_parse_op:
raise ParsingException(f"Unsupported patch operation {op} on {path_op}")
parse_op = dict_parse_op[op]
try:
parsed_elem = parse_op.parse(raw_elem, f"{path_op}.{i}")
except Exception as e:
raise ParsingException(f"Error when parsing {path_op}") from e

# Make sure we have extracted all the keys from "raw_elem"
if len(raw_elem) > 0:
raise ValueError(f"Unexpected keys {raw_elem}")
patch.append(parsed_elem)

return patch
class ParseForEach(ParserOp):
def __init__(self, meta_op_parser: op_parser.MetaOperatorParser, jsonpatch_parser: IJsonPatchParser) -> None:
self.meta_op_parser = meta_op_parser
self.jsonpatch_parser = jsonpatch_parser

@abc.abstractmethod
def _get_dict_parse_op(self) -> dict[str, ParserOp]:
"""A dictionary with the classes that can parse the json patch operations
supported by this JsonPatchParser
"""
def parse(self, raw_elem: dict, path_op: str) -> jsonpatch_helpers.JsonPatchOperator:
elems = utils.must_pop(raw_elem, "elements", f"Missing key 'elements' in {raw_elem}")
op = self.meta_op_parser.parse(elems, f"{path_op}.elements")
if not isinstance(op, operators.OperatorWithRef):
raise ParsingException(
f"The expression in {path_op}.elements must reference elements in the json that we want to patch"
)
list_raw_patch = utils.must_pop(raw_elem, "patch", f"Missing key 'patch' in {raw_elem}")
if not isinstance(list_raw_patch, list):
raise ParsingException(f"In {path_op}.patch we expect a list of patch, but got {list_raw_patch}")
jsonpatch_op = self.jsonpatch_parser.parse(list_raw_patch, f"{path_op}.patch")
return jsonpatch_helpers.JsonPatchForEach(op, jsonpatch_op)


class JsonPatchParserV1(IJsonPatchParser):
Expand Down Expand Up @@ -131,7 +150,5 @@ def __init__(self, meta_op_parser: op_parser.MetaOperatorParser) -> None:

def _get_dict_parse_op(self) -> dict[str, ParserOp]:
dict_parse_op_v1 = super()._get_dict_parse_op()
dict_parse_op_v2 = {
"expr": ParseExpr(self.meta_op_parser),
}
dict_parse_op_v2 = {"expr": ParseExpr(self.meta_op_parser), "forEach": ParseForEach(self.meta_op_parser, self)}
return {**dict_parse_op_v1, **dict_parse_op_v2}
94 changes: 67 additions & 27 deletions generic_k8s_webhook/jsonpatch_helpers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import abc
from typing import Any
from typing import Any, Union

import jsonpatch

Expand All @@ -12,17 +12,32 @@ def __init__(self, path: list[str]) -> None:
self.path = path

@abc.abstractmethod
def generate_patch(self, json_to_patch: dict | list) -> jsonpatch.JsonPatch:
def generate_patch(self, contexts: list[Union[list, dict]], prefix: list[str] = None) -> jsonpatch.JsonPatch:
pass

def _format_path(self, path: list[str], prefix: list[str]) -> str:
"""Converts the `path` to a string separated by "/" and starts also by "/"
If a prefix is defined and the path is not absolute, then the prefix is preprended.
An absolute path is one whose first element is "$"
"""
if path[0] == "$":
final_path = path[1:]
elif prefix:
final_path = prefix + path
else:
final_path = path
final_path = [str(elem) for elem in final_path]
return "/" + "/".join(final_path)


class JsonPatchAdd(JsonPatchOperator):
def __init__(self, path: list[str], value: Any) -> None:
super().__init__(path)
self.value = value

# Remember the op "add" is like an assignment
def generate_patch(self, json_to_patch: dict | list) -> jsonpatch.JsonPatch:
def generate_patch(self, contexts: list[Union[list, dict]], prefix: list[str] = None) -> jsonpatch.JsonPatch:
json_to_patch = contexts[-1]
# Check how many (nested) keys already exist
existing_path = []
first_non_existing_key = None
Expand Down Expand Up @@ -67,29 +82,25 @@ def generate_patch(self, json_to_patch: dict | list) -> jsonpatch.JsonPatch:
else:
new_value = {key: new_value}

# Convert the list to a string separated by "/"
formatted_path = "/" + "/".join(new_path)

return jsonpatch.JsonPatch(
[
{
"op": "add",
"path": formatted_path,
"path": self._format_path(new_path, prefix),
"value": new_value,
}
]
)


class JsonPatchRemove(JsonPatchOperator):
def generate_patch(self, json_to_patch: dict | list) -> jsonpatch.JsonPatch:
def generate_patch(self, contexts: list[Union[list, dict]], prefix: list[str] = None) -> jsonpatch.JsonPatch:
# TODO If the key to remove doesn't exist, this must become a no-op
formatted_path = "/" + "/".join(self.path)
return jsonpatch.JsonPatch(
[
{
"op": "remove",
"path": formatted_path,
"path": self._format_path(self.path, prefix),
}
]
)
Expand All @@ -100,41 +111,53 @@ def __init__(self, path: list[str], value: Any) -> None:
super().__init__(path)
self.value = value

def generate_patch(self, json_to_patch: dict | list) -> jsonpatch.JsonPatch:
formatted_path = "/" + "/".join(self.path)
return jsonpatch.JsonPatch([{"op": "replace", "path": formatted_path, "value": self.value}])
def generate_patch(self, contexts: list[Union[list, dict]], prefix: list[str] = None) -> jsonpatch.JsonPatch:
return jsonpatch.JsonPatch(
[{"op": "replace", "path": self._format_path(self.path, prefix), "value": self.value}]
)


class JsonPatchCopy(JsonPatchOperator):
def __init__(self, path: list[str], fromm: Any) -> None:
super().__init__(path)
self.fromm = fromm

def generate_patch(self, json_to_patch: dict | list) -> jsonpatch.JsonPatch:
formatted_path = "/" + "/".join(self.path)
formatted_from = "/" + "/".join(self.fromm)
return jsonpatch.JsonPatch([{"op": "copy", "path": formatted_path, "from": formatted_from}])
def generate_patch(self, contexts: list[Union[list, dict]], prefix: list[str] = None) -> jsonpatch.JsonPatch:
return jsonpatch.JsonPatch(
[
{
"op": "copy",
"path": self._format_path(self.path, prefix),
"from": self._format_path(self.fromm, prefix),
}
]
)


class JsonPatchMove(JsonPatchOperator):
def __init__(self, path: list[str], fromm: Any) -> None:
super().__init__(path)
self.fromm = fromm

def generate_patch(self, json_to_patch: dict | list) -> jsonpatch.JsonPatch:
formatted_path = "/" + "/".join(self.path)
formatted_from = "/" + "/".join(self.fromm)
return jsonpatch.JsonPatch([{"op": "move", "path": formatted_path, "from": formatted_from}])
def generate_patch(self, contexts: list[Union[list, dict]], prefix: list[str] = None) -> jsonpatch.JsonPatch:
return jsonpatch.JsonPatch(
[
{
"op": "move",
"path": self._format_path(self.path, prefix),
"from": self._format_path(self.fromm, prefix),
}
]
)


class JsonPatchTest(JsonPatchOperator):
def __init__(self, path: list[str], value: Any) -> None:
super().__init__(path)
self.value = value

def generate_patch(self, json_to_patch: dict | list) -> jsonpatch.JsonPatch:
formatted_path = "/" + "/".join(self.path)
return jsonpatch.JsonPatch([{"op": "test", "path": formatted_path, "value": self.value}])
def generate_patch(self, contexts: list[Union[list, dict]], prefix: list[str] = None) -> jsonpatch.JsonPatch:
return jsonpatch.JsonPatch([{"op": "test", "path": self._format_path(self.path, prefix), "value": self.value}])


class JsonPatchExpr(JsonPatchOperator):
Expand All @@ -147,7 +170,24 @@ def __init__(self, path: list[str], value: operators.Operator) -> None:
super().__init__(path)
self.value = value

def generate_patch(self, json_to_patch: dict | list) -> jsonpatch.JsonPatch:
actual_value = self.value.get_value([json_to_patch])
def generate_patch(self, contexts: list[Union[list, dict]], prefix: list[str] = None) -> jsonpatch.JsonPatch:
actual_value = self.value.get_value(contexts)
json_patch_add = JsonPatchAdd(self.path, actual_value)
return json_patch_add.generate_patch(json_to_patch)
return json_patch_add.generate_patch(contexts, prefix)


class JsonPatchForEach(JsonPatchOperator):
"""Generates a jsonpatch for each element from a list"""

def __init__(self, op_with_ref: operators.OperatorWithRef, list_jsonpatch_op: list[JsonPatchOperator]) -> None:
super().__init__([])
self.op_with_ref = op_with_ref
self.list_jsonpatch_op = list_jsonpatch_op

def generate_patch(self, contexts: list[Union[list, dict]], prefix: list[str] = None) -> jsonpatch.JsonPatch:
list_raw_patch = []
for payload, path in self.op_with_ref.get_value_with_ref(contexts):
for jsonpatch_op in self.list_jsonpatch_op:
patch_obj = jsonpatch_op.generate_patch(contexts + [payload], path)
list_raw_patch.extend(patch_obj.patch)
return jsonpatch.JsonPatch(list_raw_patch)
63 changes: 48 additions & 15 deletions generic_k8s_webhook/operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,33 @@ def __init__(self, op_inputs: Any, path_op: str) -> None:

@abc.abstractmethod
def input_type(self) -> type | None:
pass
"""Returns the expected type for the input parameters. This must match with
the return type of the operators that generate input data for this one
"""

@abc.abstractmethod
def return_type(self) -> type | None:
pass
"""Returns the expected type for the return value of the `get_value` function"""

@abc.abstractmethod
def get_value(self, contexts: list):
pass
def get_value(self, contexts: list) -> Any:
"""Returns a value for this operator given a certain context
Args:
contexts (list): It's the list of contexts (json payloads) used to evaluate this operator
"""


class OperatorWithRef(Operator):
@abc.abstractmethod
def get_value_with_ref(self, contexts: list) -> Any:
"""Similar to `get_value`, but returns a tuple (or list of tuples) where the first element
is the actual return value and the second one is a reference to the place in the context
that was used to get this value
Args:
contexts (list): It's the list of contexts (json payloads) used to evaluate this operator
"""


# It's the base class for operators like and, or, sum, etc.
Expand Down Expand Up @@ -344,41 +362,56 @@ def return_type(self) -> type | None:
return type(self.value)


class GetValue(Operator):
class GetValue(OperatorWithRef):
def __init__(self, path: list[str], context_id: int) -> None:
self.path = path
self.context_id = context_id

def get_value(self, contexts: list):
values_with_ref = self.get_value_with_ref(contexts)
if isinstance(values_with_ref, list):
return [value for value, _ in values_with_ref]
value, _ = values_with_ref
return value

def get_value_with_ref(self, contexts: list):
context = contexts[self.context_id]
return self._get_value_from_json(context, self.path)
return self._get_value_from_json(context, self.path, [])

def _get_value_from_json(self, data: Union[list, dict], path: list):
def _get_value_from_json(
self, data: Union[list, dict], path: list, formated_path: list
) -> Union[tuple, list[tuple]]:
if len(path) == 0 or path[0] == "":
return data
# It can return both a single data point or a list of elements
# In the first case, we just return a tuple (data, path)
# In the second case, we create a tuple for each element in the list
# so we know the path of each element
if isinstance(data, list):
return [(elem, formated_path + [i]) for i, elem in enumerate(data)]
return (data, formated_path)

if path[0] == "*":
return self._evaluate_wildcard(data, path)
return self._evaluate_wildcard(data, path, formated_path)

if isinstance(data, dict):
key = path[0]
if key in data:
return self._get_value_from_json(data[key], path[1:])
return self._get_value_from_json(data[key], path[1:], formated_path + [key])
elif isinstance(data, list):
key = int(path[0])
if 0 <= key < len(data):
return self._get_value_from_json(data[key], path[1:])
return self._get_value_from_json(data[key], path[1:], formated_path + [key])
else:
raise RuntimeError(f"Expected list or dict, but got {data}")

return None
return []

def _evaluate_wildcard(self, data: Union[list, dict], path: list):
def _evaluate_wildcard(self, data: Union[list, dict], path: list, formated_path: list) -> list[tuple]:
if not isinstance(data, list):
raise RuntimeError(f"Expected list when evaluating '*', but got {data}")
l = []
for elem in data:
sublist = self._get_value_from_json(elem, path[1:])
for i, elem in enumerate(data):
sublist = self._get_value_from_json(elem, path[1:], formated_path + [i])
if isinstance(sublist, list):
l.extend(sublist)
else:
Expand Down
Loading

0 comments on commit 10be3c3

Please sign in to comment.