Skip to content

Commit

Permalink
Added dynamic node selector based on service selectors. (#4)
Browse files Browse the repository at this point in the history
  • Loading branch information
s3rius authored Nov 19, 2024
1 parent 4ae73aa commit 849b904
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 14 deletions.
5 changes: 5 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ pub struct OperatorConfig {
#[arg(short = 't', long, env = "ROBOTLB_HCLOUD_TOKEN")]
pub hcloud_token: String,

/// If this flag is enabled, the operator will try to find target nodes
/// based on where target pods are actually deployed.
#[arg(long, env = "ROBOTLB_DYNAMIC_NODE_SELECTOR", default_value = "true")]
pub dynamic_node_selector: bool,

/// Default load balancer healthcheck retries cound.
#[arg(long, env = "ROBOTLB_DEFAULT_LB_RETRIES", default_value = "3")]
pub default_lb_retries: i32,
Expand Down
2 changes: 2 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ pub enum LBTrackerError {
KubeError(#[from] kube::Error),
#[error("Unknown LoadBalancing alorithm")]
UnknownLBAlgorithm,
#[error("Cannot get target nodes, because the service has no selector")]
ServiceWithoutSelector,

// HCloud API errors
#[error("Cannot attach load balancer to a network. Reason: {0}")]
Expand Down
1 change: 1 addition & 0 deletions src/lb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ impl LoadBalancer {
/// The target will receive the traffic from the services.
/// The target is identified by its IP address.
pub fn add_target(&mut self, ip: &str) {
tracing::debug!("Adding target {}", ip);
self.targets.push(ip.to_string());
}

Expand Down
91 changes: 77 additions & 14 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use error::{LBTrackerError, LBTrackerResult};
use futures::StreamExt;
use hcloud::apis::configuration::Configuration as HCloudConfig;
use k8s_openapi::{
api::core::v1::{Node, Service},
api::core::v1::{Node, Pod, Service},
serde_json::json,
};
use kube::{
Expand All @@ -32,7 +32,7 @@ use kube::{
};
use label_filter::LabelFilter;
use lb::LoadBalancer;
use std::{str::FromStr, sync::Arc, time::Duration};
use std::{collections::HashSet, str::FromStr, sync::Arc, time::Duration};

pub mod config;
pub mod consts;
Expand Down Expand Up @@ -155,34 +155,97 @@ pub async fn reconcile_service(
reconcile_load_balancer(lb, svc.clone(), context).await
}

/// Reconcile the `LoadBalancer` type of service.
/// This function will find the nodes based on the node selector
/// and create or update the load balancer.
pub async fn reconcile_load_balancer(
mut lb: LoadBalancer,
svc: Arc<Service>,
context: Arc<CurrentContext>,
) -> LBTrackerResult<Action> {
let label_filter = svc
/// Method to get nodes dynamically based on the pods.
/// This method will find the nodes where the target pods are deployed.
/// It will use the pod selector to find the pods and then get the nodes.
async fn get_nodes_dynamically(
svc: &Arc<Service>,
context: &Arc<CurrentContext>,
) -> LBTrackerResult<Vec<Node>> {
let pod_api = kube::Api::<Pod>::namespaced(
context.client.clone(),
svc.namespace()
.as_ref()
.map(String::as_str)
.unwrap_or_else(|| context.client.default_namespace()),
);

let Some(pod_selector) = svc.spec.as_ref().and_then(|spec| spec.selector.clone()) else {
return Err(LBTrackerError::ServiceWithoutSelector);
};

let label_selector = pod_selector
.iter()
.map(|(key, val)| format!("{key}={val}"))
.collect::<Vec<_>>()
.join(",");

let pods = pod_api
.list(&ListParams {
label_selector: Some(label_selector),
..Default::default()
})
.await?;

let target_nodes = pods
.iter()
.map(|pod| pod.spec.clone().unwrap_or_default().node_name)
.flatten()
.collect::<HashSet<_>>();

let nodes_api = kube::Api::<Node>::all(context.client.clone());
let nodes = nodes_api
.list(&ListParams::default())
.await?
.into_iter()
.filter(|node| target_nodes.contains(&node.name_any()))
.collect::<Vec<_>>();

Ok(nodes)
}

/// Get nodes based on the node selector.
/// This method will find the nodes based on the node selector
/// from the service annotations.
async fn get_nodes_by_selector(
svc: &Arc<Service>,
context: &Arc<CurrentContext>,
) -> LBTrackerResult<Vec<Node>> {
let node_selector = svc
.annotations()
.get(consts::LB_NODE_SELECTOR)
.map(String::as_str)
.map(LabelFilter::from_str)
.transpose()?
.unwrap_or_default();
.ok_or(LBTrackerError::ServiceWithoutSelector)?;
let label_filter = LabelFilter::from_str(node_selector)?;
let nodes_api = kube::Api::<Node>::all(context.client.clone());
let nodes = nodes_api
.list(&ListParams::default())
.await?
.into_iter()
.filter(|node| label_filter.check(node.labels()))
.collect::<Vec<_>>();
Ok(nodes)
}

/// Reconcile the `LoadBalancer` type of service.
/// This function will find the nodes based on the node selector
/// and create or update the load balancer.
pub async fn reconcile_load_balancer(
mut lb: LoadBalancer,
svc: Arc<Service>,
context: Arc<CurrentContext>,
) -> LBTrackerResult<Action> {
let mut node_ip_type = "InternalIP";
if lb.network_name.is_none() {
node_ip_type = "ExternalIP";
}

let nodes = if context.config.dynamic_node_selector {
get_nodes_dynamically(&svc, &context).await?
} else {
get_nodes_by_selector(&svc, &context).await?
};

for node in nodes {
let Some(status) = node.status else {
continue;
Expand Down

0 comments on commit 849b904

Please sign in to comment.