Skip to content

Commit

Permalink
Refactor how settings are passed and how logging works
Browse files Browse the repository at this point in the history
  • Loading branch information
LeaveMyYard committed Oct 5, 2023
1 parent 11d5c78 commit 31758d8
Show file tree
Hide file tree
Showing 16 changed files with 329 additions and 348 deletions.
99 changes: 48 additions & 51 deletions robusta_krr/core/integrations/kubernetes.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import logging
from concurrent.futures import ThreadPoolExecutor
from typing import AsyncGenerator, AsyncIterator, Callable, Optional, Union

Expand All @@ -19,25 +20,25 @@
V2HorizontalPodAutoscalerList,
)

from robusta_krr.core.models.config import settings
from robusta_krr.core.models.objects import HPAData, K8sObjectData, KindLiteral, PodData
from robusta_krr.core.models.result import ResourceAllocations
from robusta_krr.utils.configurable import Configurable

from .rollout import RolloutAppsV1Api

logger = logging.getLogger("krr")

AnyKubernetesAPIObject = Union[V1Deployment, V1DaemonSet, V1StatefulSet, V1Pod, V1Job]
HPAKey = tuple[str, str, str]


class ClusterLoader(Configurable):
def __init__(self, cluster: Optional[str], *args, **kwargs):
super().__init__(*args, **kwargs)

class ClusterLoader:
def __init__(self, cluster: Optional[str]):
self.cluster = cluster
# This executor will be running requests to Kubernetes API
self.executor = ThreadPoolExecutor(self.config.max_workers)
self.executor = ThreadPoolExecutor(settings.max_workers)
self.api_client = (
config.new_client_from_config(context=cluster, config_file=self.config.kubeconfig)
config.new_client_from_config(context=cluster, config_file=settings.kubeconfig)
if cluster is not None
else None
)
Expand All @@ -57,9 +58,9 @@ async def list_scannable_objects(self) -> AsyncGenerator[K8sObjectData, None]:
A list of scannable objects.
"""

self.info(f"Listing scannable objects in {self.cluster}")
self.debug(f"Namespaces: {self.config.namespaces}")
self.debug(f"Resources: {self.config.resources}")
logger.info(f"Listing scannable objects in {self.cluster}")
logger.debug(f"Namespaces: {settings.namespaces}")
logger.debug(f"Resources: {settings.resources}")

self.__hpa_list = await self._try_list_hpa()

Expand All @@ -76,20 +77,20 @@ async def list_scannable_objects(self) -> AsyncGenerator[K8sObjectData, None]:
async with objects_combined.stream() as streamer:
async for object in streamer:
# NOTE: By default we will filter out kube-system namespace
if self.config.namespaces == "*" and object.namespace == "kube-system":
if settings.namespaces == "*" and object.namespace == "kube-system":
continue
yield object

async def list_pods(self, object: K8sObjectData) -> list[PodData]:
selector = self._build_selector_query(object.api_resource.spec.selector)
selector = self._build_selector_query(object._api_resource.spec.selector)
if selector is None:
return []

loop = asyncio.get_running_loop()
ret: V1PodList = await loop.run_in_executor(
self.executor,
lambda: self.core.list_namespaced_pod(
namespace=object.api_resource.metadata.namespace, label_selector=selector
namespace=object._api_resource.metadata.namespace, label_selector=selector
),
)
return [PodData(name=pod.metadata.name, deleted=False) for pod in ret.items]
Expand Down Expand Up @@ -134,33 +135,33 @@ def __build_obj(
)

def _should_list_resource(self, resource: str):
if self.config.resources == "*":
if settings.resources == "*":
return True
return resource.lower() in self.config.resources
return resource.lower() in settings.resources

async def _list_workflows(
self, kind: KindLiteral, all_namespaces_request: Callable, namespaced_request: Callable
) -> AsyncIterator[K8sObjectData]:
if not self._should_list_resource(kind):
self.debug(f"Skipping {kind}s in {self.cluster}")
logger.debug(f"Skipping {kind}s in {self.cluster}")
return

if kind == "Rollout" and not self.__rollouts_available:
return

self.debug(f"Listing {kind}s in {self.cluster}")
logger.debug(f"Listing {kind}s in {self.cluster}")
loop = asyncio.get_running_loop()

try:
if self.config.namespaces == "*":
if settings.namespaces == "*":
ret_multi = await loop.run_in_executor(
self.executor,
lambda: all_namespaces_request(
watch=False,
label_selector=self.config.selector,
label_selector=settings.selector,
),
)
self.debug(f"Found {len(ret_multi.items)} {kind} in {self.cluster}")
logger.debug(f"Found {len(ret_multi.items)} {kind} in {self.cluster}")
for item in ret_multi.items:
for container in item.spec.template.spec.containers:
yield self.__build_obj(item, container, kind)
Expand All @@ -171,10 +172,10 @@ async def _list_workflows(
lambda: namespaced_request(
namespace=namespace,
watch=False,
label_selector=self.config.selector,
label_selector=settings.selector,
),
)
for namespace in self.config.namespaces
for namespace in settings.namespaces
]

total_items = 0
Expand All @@ -185,16 +186,15 @@ async def _list_workflows(
for container in item.spec.template.spec.containers:
yield self.__build_obj(item, container, kind)

self.debug(f"Found {total_items} {kind} in {self.cluster}")
logger.debug(f"Found {total_items} {kind} in {self.cluster}")
except ApiException as e:
if kind == "Rollout" and e.status in [400, 401, 403, 404]:
if self.__rollouts_available:
self.debug(f"Rollout API not available in {self.cluster}")
logger.debug(f"Rollout API not available in {self.cluster}")
self.__rollouts_available = False
else:
self.error(f"Error {e.status} listing {kind} in cluster {self.cluster}: {e.reason}")
self.debug_exception()
self.error("Will skip this object type and continue.")
logger.exception(f"Error {e.status} listing {kind} in cluster {self.cluster}: {e.reason}")
logger.error("Will skip this object type and continue.")

def _list_deployments(self) -> AsyncIterator[K8sObjectData]:
return self._list_workflows(
Expand Down Expand Up @@ -311,19 +311,16 @@ async def _try_list_hpa(self) -> dict[HPAKey, HPAData]:
try:
return await self.__list_hpa()
except Exception as e:
self.error(f"Error trying to list hpa in cluster {self.cluster}: {e}")
self.debug_exception()
self.error(
logger.exception(f"Error trying to list hpa in cluster {self.cluster}: {e}")
logger.error(
"Will assume that there are no HPA. "
"Be careful as this may lead to inaccurate results if object actually has HPA."
)
return {}


class KubernetesLoader(Configurable):
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)

class KubernetesLoader:
def __init__(self) -> None:
self._cluster_loaders: dict[Optional[str], ClusterLoader] = {}

async def list_clusters(self) -> Optional[list[str]]:
Expand All @@ -333,44 +330,44 @@ async def list_clusters(self) -> Optional[list[str]]:
A list of clusters.
"""

if self.config.inside_cluster:
self.debug("Working inside the cluster")
if settings.inside_cluster:
logger.debug("Working inside the cluster")
return None

try:
contexts, current_context = config.list_kube_config_contexts(self.config.kubeconfig)
contexts, current_context = config.list_kube_config_contexts(settings.kubeconfig)
except config.ConfigException:
if self.config.clusters is not None and self.config.clusters != "*":
self.warning("Could not load context from kubeconfig.")
self.warning(f"Falling back to clusters from CLI: {self.config.clusters}")
return self.config.clusters
if settings.clusters is not None and settings.clusters != "*":
logger.warning("Could not load context from kubeconfig.")
logger.warning(f"Falling back to clusters from CLI: {settings.clusters}")
return settings.clusters
else:
self.error(
logger.error(
"Could not load context from kubeconfig. "
"Please check your kubeconfig file or pass -c flag with the context name."
)
return None

self.debug(f"Found {len(contexts)} clusters: {', '.join([context['name'] for context in contexts])}")
self.debug(f"Current cluster: {current_context['name']}")
logger.debug(f"Found {len(contexts)} clusters: {', '.join([context['name'] for context in contexts])}")
logger.debug(f"Current cluster: {current_context['name']}")

self.debug(f"Configured clusters: {self.config.clusters}")
logger.debug(f"Configured clusters: {settings.clusters}")

# None, empty means current cluster
if not self.config.clusters:
if not settings.clusters:
return [current_context["name"]]

# * means all clusters
if self.config.clusters == "*":
if settings.clusters == "*":
return [context["name"] for context in contexts]

return [context["name"] for context in contexts if context["name"] in self.config.clusters]
return [context["name"] for context in contexts if context["name"] in settings.clusters]

def _try_create_cluster_loader(self, cluster: Optional[str]) -> Optional[ClusterLoader]:
try:
return ClusterLoader(cluster=cluster, config=self.config)
return ClusterLoader(cluster=cluster)
except Exception as e:
self.error(f"Could not load cluster {cluster} and will skip it: {e}")
logger.error(f"Could not load cluster {cluster} and will skip it: {e}")
return None

async def list_scannable_objects(self, clusters: Optional[list[str]]) -> AsyncIterator[K8sObjectData]:
Expand All @@ -386,7 +383,7 @@ async def list_scannable_objects(self, clusters: Optional[list[str]]) -> AsyncIt

self.cluster_loaders = {cl.cluster: cl for cl in _cluster_loaders if cl is not None}
if self.cluster_loaders == {}:
self.error("Could not load any cluster.")
logger.error("Could not load any cluster.")
return

# https://stackoverflow.com/questions/55299564/join-multiple-async-generators-in-python
Expand Down
33 changes: 13 additions & 20 deletions robusta_krr/core/integrations/prometheus/loader.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
from __future__ import annotations

import datetime
import logging
from concurrent.futures import ThreadPoolExecutor
from typing import TYPE_CHECKING, Optional

from kubernetes import config as k8s_config
from kubernetes.client.api_client import ApiClient
from prometrix import MetricsNotFound, PrometheusNotFound

from robusta_krr.core.models.config import settings
from robusta_krr.core.models.objects import K8sObjectData, PodData
from robusta_krr.utils.configurable import Configurable

from .metrics_service.prometheus_metrics_service import PrometheusMetricsService
from .metrics_service.thanos_metrics_service import ThanosMetricsService
from .metrics_service.victoria_metrics_service import VictoriaMetricsService

if TYPE_CHECKING:
from robusta_krr.core.abstract.strategies import BaseStrategy, MetricsPodData
from robusta_krr.core.models.config import Config

logger = logging.getLogger("krr")

METRICS_SERVICES = {
"Prometheus": PrometheusMetricsService,
Expand All @@ -26,53 +28,44 @@
}


class PrometheusMetricsLoader(Configurable):
def __init__(
self,
config: Config,
*,
cluster: Optional[str] = None,
) -> None:
class PrometheusMetricsLoader:
def __init__(self, *, cluster: Optional[str] = None) -> None:
"""
Initializes the Prometheus Loader.
Args:
config (Config): The configuration object.
cluster (Optional[str]): The name of the cluster. Defaults to None.
"""

super().__init__(config=config)

self.executor = ThreadPoolExecutor(self.config.max_workers)
self.executor = ThreadPoolExecutor(settings.max_workers)

self.api_client = (
k8s_config.new_client_from_config(config_file=self.config.kubeconfig, context=cluster)
k8s_config.new_client_from_config(config_file=settings.kubeconfig, context=cluster)
if cluster is not None
else None
)
loader = self.get_metrics_service(config, api_client=self.api_client, cluster=cluster)
loader = self.get_metrics_service(api_client=self.api_client, cluster=cluster)
if loader is None:
raise PrometheusNotFound("No Prometheus or metrics service found")

self.loader = loader

self.info(f"{self.loader.name} connected successfully for {cluster or 'default'} cluster")
logger.info(f"{self.loader.name} connected successfully for {cluster or 'default'} cluster")

def get_metrics_service(
self,
config: Config,
api_client: Optional[ApiClient] = None,
cluster: Optional[str] = None,
) -> Optional[PrometheusMetricsService]:
for service_name, metric_service_class in METRICS_SERVICES.items():
try:
loader = metric_service_class(config, api_client=api_client, cluster=cluster, executor=self.executor)
loader = metric_service_class(api_client=api_client, cluster=cluster, executor=self.executor)
loader.check_connection()
self.echo(f"{service_name} found")
logger.info(f"{service_name} found")
loader.validate_cluster_name()
return loader
except MetricsNotFound as e:
self.debug(f"{service_name} not found: {e}")
logger.debug(f"{service_name} not found: {e}")

return None

Expand Down
13 changes: 5 additions & 8 deletions robusta_krr/core/integrations/prometheus/metrics/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@

from robusta_krr.core.abstract.metrics import BaseMetric
from robusta_krr.core.abstract.strategies import PodsTimeData
from robusta_krr.core.models.config import Config
from robusta_krr.core.models.config import settings
from robusta_krr.core.models.objects import K8sObjectData
from robusta_krr.utils.configurable import Configurable


class PrometheusSeries(TypedDict):
Expand All @@ -37,7 +36,7 @@ class PrometheusMetricData(pd.BaseModel):
type: QueryType


class PrometheusMetric(BaseMetric, Configurable):
class PrometheusMetric(BaseMetric):
"""
Base class for all metric loaders.
Expand All @@ -63,12 +62,10 @@ class PrometheusMetric(BaseMetric, Configurable):

def __init__(
self,
config: Config,
prometheus: CustomPrometheusConnect,
service_name: str,
executor: Optional[ThreadPoolExecutor] = None,
) -> None:
super().__init__(config)
self.prometheus = prometheus
self.service_name = service_name

Expand All @@ -84,9 +81,9 @@ def get_prometheus_cluster_label(self) -> str:
Returns:
str: a promql safe label string for querying the cluster.
"""
if self.config.prometheus_cluster_label is None:
if settings.prometheus_cluster_label is None:
return ""
return f', {self.config.prometheus_label}="{self.config.prometheus_cluster_label}"'
return f', {settings.prometheus_label}="{settings.prometheus_cluster_label}"'

@abc.abstractmethod
def get_query(self, object: K8sObjectData, duration: str, step: str) -> str:
Expand Down Expand Up @@ -237,7 +234,7 @@ def filter_prom_jobs_results(
if len(relevant_kubelet_metric) == 1:
return_list.append(relevant_kubelet_metric[0])
continue
sorted_relevant_series = sorted(relevant_series, key=lambda s: s["metric"].get("job"), reverse=False)
sorted_relevant_series = sorted(relevant_series, key=lambda s: s["metric"].get("job", ""), reverse=False)
return_list.append(sorted_relevant_series[0])
return return_list

Expand Down
Loading

0 comments on commit 31758d8

Please sign in to comment.