Skip to content

Commit

Permalink
Merge pull request #141 from robusta-dev/prometheus-metrics-mixin-ref…
Browse files Browse the repository at this point in the history
…actoring

Refactor mixins for prometheus metrics
  • Loading branch information
LeaveMyYard authored Sep 5, 2023
2 parents 3d0b3bc + 292d80f commit 27817c4
Show file tree
Hide file tree
Showing 8 changed files with 117 additions and 103 deletions.
4 changes: 2 additions & 2 deletions robusta_krr/core/integrations/prometheus/metrics/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from .base import PrometheusMetric
from .cpu import CPULoader, PercentileCPULoader, CPUAmountLoader
from .memory import MaxMemoryLoader, MemoryLoader, MemoryAmountLoader
from .cpu import CPUAmountLoader, CPULoader, PercentileCPULoader
from .memory import MaxMemoryLoader, MemoryAmountLoader, MemoryLoader
133 changes: 46 additions & 87 deletions robusta_krr/core/integrations/prometheus/metrics/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,26 @@

import abc
import asyncio
import copy
import datetime
import enum
from concurrent.futures import ThreadPoolExecutor
from functools import reduce
from typing import Any, Optional
from typing import Any, Optional, TypedDict

import numpy as np
import pydantic as pd
from prometrix import CustomPrometheusConnect

from robusta_krr.core.abstract.metrics import BaseMetric
from robusta_krr.core.abstract.strategies import PodsTimeData
from robusta_krr.core.models.config import Config
from robusta_krr.core.models.objects import K8sObjectData
from robusta_krr.utils.configurable import Configurable
from prometrix import CustomPrometheusConnect


class PrometheusSeries(TypedDict):
metric: dict[str, Any]
values: list[list[float]]


class QueryType(str, enum.Enum):
Expand All @@ -38,9 +42,24 @@ class PrometheusMetric(BaseMetric, Configurable):
Base class for all metric loaders.
Metric loaders are used to load metrics from a specified source (like Prometheus in this case).
`query_type`: the type of query to use when querying Prometheus.
Can be either `QueryType.Query` or `QueryType.QueryRange`.
By default, `QueryType.Query` is used.
`filtering`: if multiple metrics with the same name were found, searches for the kubelet metric.
If not found - returns first one in alphabetical order. Set to False if you want to disable this behavior.
`pods_batch_size`: if the number of pods is too large for a single query, the query is split into multiple sub-queries.
Each sub-query result is then combined into a single result using the `combine_batches` method.
You can override this method to change the way the results are combined.
This parameter specifies the maximum number of pods per query.
Set to None to disable batching
"""

query_type: QueryType = QueryType.QueryRange
query_type: QueryType = QueryType.Query
filtering: bool = True
pods_batch_size: Optional[int] = 50

def __init__(
self,
Expand All @@ -55,6 +74,9 @@ def __init__(

self.executor = executor

if self.pods_batch_size is not None and self.pods_batch_size <= 0:
raise ValueError("pods_batch_size must be positive")

def get_prometheus_cluster_label(self) -> str:
"""
Generates the cluster label for querying a centralized Prometheus
Expand Down Expand Up @@ -96,7 +118,7 @@ def _step_to_string(self, step: datetime.timedelta) -> str:
return f"{int(step.total_seconds()) // (60 * 60 * 24)}d"
return f"{int(step.total_seconds()) // 60}m"

def _query_prometheus_sync(self, data: PrometheusMetricData) -> list[dict]:
def _query_prometheus_sync(self, data: PrometheusMetricData) -> list[PrometheusSeries]:
if data.type == QueryType.QueryRange:
value = self.prometheus.custom_query_range(
query=data.query,
Expand All @@ -113,7 +135,7 @@ def _query_prometheus_sync(self, data: PrometheusMetricData) -> list[dict]:
result["values"] = [result.pop("value")]
return results

async def query_prometheus(self, data: PrometheusMetricData) -> list[dict]:
async def query_prometheus(self, data: PrometheusMetricData) -> list[PrometheusSeries]:
"""
Asynchronous method that queries Prometheus to fetch metrics.
Expand Down Expand Up @@ -149,6 +171,16 @@ async def load_data(
end_time = datetime.datetime.now().replace(second=0, microsecond=0).astimezone()
start_time = end_time - period

# Here if we split the object into multiple sub-objects, we query each sub-object recursively.
if self.pods_batch_size is not None and object.pods_count > self.pods_batch_size:
results = await asyncio.gather(
*[
self.load_data(splitted_object, period, step)
for splitted_object in object.split_into_batches(self.pods_batch_size)
]
)
return self.combine_batches(results)

result = await self.query_prometheus(
PrometheusMetricData(
query=query,
Expand All @@ -162,31 +194,12 @@ async def load_data(
if result == []:
return {}

return {pod_result["metric"]["pod"]: np.array(pod_result["values"], dtype=np.float64) for pod_result in result}

if self.filtering:
result = self.filter_prom_jobs_results(result)

class QueryRangeMetric(PrometheusMetric):
"""This type of PrometheusMetric is used to query metrics for a specific time range."""

query_type = QueryType.QueryRange


class QueryMetric(PrometheusMetric):
"""This type of PrometheusMetric is used to query metrics for a specific time."""

query_type = QueryType.Query


PrometheusSeries = Any


class FilterJobsMixin(PrometheusMetric):
"""
This is the version of the BasicMetricLoader, that filters out data,
if multiple metrics with the same name were found.
return {pod_result["metric"]["pod"]: np.array(pod_result["values"], dtype=np.float64) for pod_result in result}

Searches for the kubelet metric. If not found - returns first one in alphabetical order.
"""
# --------------------- Filtering Jobs --------------------- #

@staticmethod
def get_target_name(series: PrometheusSeries) -> Optional[str]:
Expand All @@ -209,16 +222,16 @@ def filter_prom_jobs_results(
return series_list_result

target_names = {
FilterJobsMixin.get_target_name(series)
PrometheusMetric.get_target_name(series)
for series in series_list_result
if FilterJobsMixin.get_target_name(series)
if PrometheusMetric.get_target_name(series)
}
return_list: list[PrometheusSeries] = []

# takes kubelet job if exists, return first job alphabetically if it doesn't
for target_name in target_names:
relevant_series = [
series for series in series_list_result if FilterJobsMixin.get_target_name(series) == target_name
series for series in series_list_result if PrometheusMetric.get_target_name(series) == target_name
]
relevant_kubelet_metric = [series for series in relevant_series if series["metric"].get("job") == "kubelet"]
if len(relevant_kubelet_metric) == 1:
Expand All @@ -228,22 +241,7 @@ def filter_prom_jobs_results(
return_list.append(sorted_relevant_series[0])
return return_list

async def query_prometheus(self, data: PrometheusMetricData) -> list[PrometheusSeries]:
result = await super().query_prometheus(data)
return self.filter_prom_jobs_results(result)


class BatchedRequestMixin(PrometheusMetric):
"""
This type of PrometheusMetric is used to split the query into multiple queries,
each querying a subset of the pods of the object.
The results of the queries are then combined into a single result.
This is useful when the number of pods is too large for a single query.
"""

pods_batch_size = 50
# --------------------- Batching Queries --------------------- #

def combine_batches(self, results: list[PodsTimeData]) -> PodsTimeData:
"""
Expand All @@ -257,42 +255,3 @@ def combine_batches(self, results: list[PodsTimeData]) -> PodsTimeData:
"""

return reduce(lambda x, y: x | y, results, {})

@staticmethod
def _slice_object(object: K8sObjectData, s: slice) -> K8sObjectData:
obj_copy = copy.deepcopy(object)
obj_copy.pods = object.pods[s]
return obj_copy

@staticmethod
def _split_objects(object: K8sObjectData, max_pods: int) -> list[K8sObjectData]:
"""
Splits the object into multiple objects, each containing at most max_pods pods.
Args:
object (K8sObjectData): The object to split.
Returns:
list[K8sObjectData]: A list of objects.
"""
return [
BatchedRequestMixin._slice_object(object, slice(i, i + max_pods))
for i in range(0, len(object.pods), max_pods)
]

async def load_data(
self, object: K8sObjectData, period: datetime.timedelta, step: datetime.timedelta
) -> PodsTimeData:
splitted_objects = self._split_objects(object, self.pods_batch_size)

# If we do not exceed the batch size, we can use the regular load_data method.
if len(splitted_objects) <= 1:
return await super().load_data(object, period, step)

results = await asyncio.gather(
*[
super(BatchedRequestMixin, self).load_data(splitted_object, period, step)
for splitted_object in splitted_objects
]
)
return self.combine_batches(results)
27 changes: 22 additions & 5 deletions robusta_krr/core/integrations/prometheus/metrics/cpu.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
from robusta_krr.core.models.objects import K8sObjectData

from .base import BatchedRequestMixin, FilterJobsMixin, QueryMetric, QueryRangeMetric
from .base import PrometheusMetric, QueryType


class CPULoader(QueryRangeMetric, FilterJobsMixin, BatchedRequestMixin):
class CPULoader(PrometheusMetric):
"""
A metric loader for loading CPU usage metrics.
"""

query_type: QueryType = QueryType.QueryRange

def get_query(self, object: K8sObjectData, duration: str, step: str) -> str:
pods_selector = "|".join(pod.name for pod in object.pods)
cluster_label = self.get_prometheus_cluster_label()
Expand All @@ -19,8 +25,15 @@ def get_query(self, object: K8sObjectData, duration: str, step: str) -> str:
"""


def PercentileCPULoader(percentile: float) -> type[QueryMetric]:
class PercentileCPULoader(QueryMetric, FilterJobsMixin, BatchedRequestMixin):
def PercentileCPULoader(percentile: float) -> type[PrometheusMetric]:
"""
A factory for creating percentile CPU usage metric loaders.
"""

if not 0 <= percentile <= 100:
raise ValueError("percentile must be between 0 and 100")

class PercentileCPULoader(PrometheusMetric):
def get_query(self, object: K8sObjectData, duration: str, step: str) -> str:
pods_selector = "|".join(pod.name for pod in object.pods)
cluster_label = self.get_prometheus_cluster_label()
Expand All @@ -41,7 +54,11 @@ def get_query(self, object: K8sObjectData, duration: str, step: str) -> str:
return PercentileCPULoader


class CPUAmountLoader(QueryMetric, FilterJobsMixin, BatchedRequestMixin):
class CPUAmountLoader(PrometheusMetric):
"""
A metric loader for loading CPU points count.
"""

def get_query(self, object: K8sObjectData, duration: str, step: str) -> str:
pods_selector = "|".join(pod.name for pod in object.pods)
cluster_label = self.get_prometheus_cluster_label()
Expand Down
22 changes: 18 additions & 4 deletions robusta_krr/core/integrations/prometheus/metrics/memory.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
from robusta_krr.core.models.objects import K8sObjectData

from .base import BatchedRequestMixin, FilterJobsMixin, QueryMetric, QueryRangeMetric
from .base import PrometheusMetric, QueryType


class MemoryLoader(QueryRangeMetric, FilterJobsMixin, BatchedRequestMixin):
class MemoryLoader(PrometheusMetric):
"""
A metric loader for loading memory usage metrics.
"""

query_type: QueryType = QueryType.QueryRange

def get_query(self, object: K8sObjectData, duration: str, step: str) -> str:
pods_selector = "|".join(pod.name for pod in object.pods)
cluster_label = self.get_prometheus_cluster_label()
Expand All @@ -19,7 +25,11 @@ def get_query(self, object: K8sObjectData, duration: str, step: str) -> str:
"""


class MaxMemoryLoader(QueryMetric, FilterJobsMixin, BatchedRequestMixin):
class MaxMemoryLoader(PrometheusMetric):
"""
A metric loader for loading max memory usage metrics.
"""

def get_query(self, object: K8sObjectData, duration: str, step: str) -> str:
pods_selector = "|".join(pod.name for pod in object.pods)
cluster_label = self.get_prometheus_cluster_label()
Expand All @@ -35,7 +45,11 @@ def get_query(self, object: K8sObjectData, duration: str, step: str) -> str:
"""


class MemoryAmountLoader(QueryMetric, FilterJobsMixin, BatchedRequestMixin):
class MemoryAmountLoader(PrometheusMetric):
"""
A metric loader for loading memory points count.
"""

def get_query(self, object: K8sObjectData, duration: str, step: str) -> str:
pods_selector = "|".join(pod.name for pod in object.pods)
cluster_label = self.get_prometheus_cluster_label()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
from robusta_krr.core.abstract.strategies import PodsTimeData
from robusta_krr.core.models.config import Config
from robusta_krr.core.models.objects import K8sObjectData, PodData
from robusta_krr.utils.service_discovery import MetricsServiceDiscovery
from robusta_krr.utils.batched import batched
from robusta_krr.utils.service_discovery import MetricsServiceDiscovery

from ..metrics import PrometheusMetric
from ..prometheus_utils import ClusterNotSpecifiedException, generate_prometheus_config
Expand Down
27 changes: 26 additions & 1 deletion robusta_krr/core/models/objects.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from __future__ import annotations

from typing import Literal, Optional

import pydantic as pd

from robusta_krr.core.models.allocations import ResourceAllocations
from robusta_krr.utils.batched import batched

KindLiteral = Literal["Deployment", "DaemonSet", "StatefulSet", "Job", "Rollout"]

Expand Down Expand Up @@ -32,7 +35,7 @@ class K8sObjectData(pd.BaseModel):
pods: list[PodData] = []
hpa: Optional[HPAData]
namespace: str
kind: str
kind: KindLiteral
allocations: ResourceAllocations

def __str__(self) -> str:
Expand All @@ -52,3 +55,25 @@ def deleted_pods_count(self) -> int:
@property
def pods_count(self) -> int:
return len(self.pods)

def split_into_batches(self, n: int) -> list[K8sObjectData]:
"""
Batch this object into n objects, splitting the pods into batches of size n.
"""

if self.pods_count <= n:
return [self]

return [
K8sObjectData(
cluster=self.cluster,
name=self.name,
container=self.container,
pods=batch,
hpa=self.hpa,
namespace=self.namespace,
kind=self.kind,
allocations=self.allocations,
)
for batch in batched(self.pods, n)
]
4 changes: 2 additions & 2 deletions robusta_krr/strategies/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
StrategySettings,
)
from robusta_krr.core.integrations.prometheus.metrics import (
CPUAmountLoader,
MaxMemoryLoader,
MemoryAmountLoader,
PercentileCPULoader,
PrometheusMetric,
CPUAmountLoader,
MemoryAmountLoader,
)


Expand Down
Loading

0 comments on commit 27817c4

Please sign in to comment.