Skip to content

Commit

Permalink
自动生成adapter配置
Browse files Browse the repository at this point in the history
  • Loading branch information
puzhihao committed Nov 22, 2024
1 parent ef10ff6 commit 81699f5
Showing 1 changed file with 141 additions and 58 deletions.
199 changes: 141 additions & 58 deletions pkg/controller/autoscaler/autoscaler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,28 @@ func (ac *AutoscalerController) syncAutoscalers(key string) error {
return ac.sync(d, hpaList)
}

func (ac *AutoscalerController) fetchPrometheusAdapterConfig(d *appsv1.Deployment) (*v1.ConfigMap, error) {
cm, err := ac.client.CoreV1().ConfigMaps("default").Get(context.TODO(), "prometheus-adapter", metav1.GetOptions{})
if err != nil {
ac.eventRecorder.Eventf(d, v1.EventTypeWarning, "FailedGetCM", fmt.Sprintf("Failed extract get CM %s/%s", "default", "prometheus-adapter"))
return nil, err
}
return cm, nil
}

func (ac *AutoscalerController) restartPrometheusAdapterDeployment() error {
deployment, err := ac.client.AppsV1().Deployments("default").Get(context.TODO(), "prometheus-adapter", metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get deployment prometheus-adapter: %v", err)
}
if deployment.Spec.Template.Annotations == nil {
deployment.Spec.Template.Annotations = make(map[string]string)
}
deployment.Spec.Template.Annotations["kubectl.kubernetes.io/restartedAt"] = metav1.Now().Format("2006-01-02T15:04:05Z")
_, err = ac.client.AppsV1().Deployments("default").Update(context.TODO(), deployment, metav1.UpdateOptions{})
return err
}

func (ac *AutoscalerController) sync(d *appsv1.Deployment, hpaList []*autoscalingv2.HorizontalPodAutoscaler) error {
// 1. deployment 存在,但是 hpa 注释不存在 => 移除已存在的 hpa
if !ac.IsDeploymentControlHPA(d) {
Expand All @@ -211,72 +233,62 @@ func (ac *AutoscalerController) sync(d *appsv1.Deployment, hpaList []*autoscalin
ac.eventRecorder.Eventf(d, v1.EventTypeWarning, "FailedNewestHPA", fmt.Sprintf("Failed extract newest HPA %s/%s", d.GetNamespace(), d.GetName()))
return err
}
cm, err := ac.client.CoreV1().ConfigMaps("default").Get(context.TODO(), "prometheus-adapter", metav1.GetOptions{})
if err != nil {
ac.eventRecorder.Eventf(d, v1.EventTypeWarning, "FailedGetCM", fmt.Sprintf("Failed extract get CM %s/%s", "default", "prometheus-adapter"))
return err
}

var config controller.PrometheusAdapterConfig

err = yaml.Unmarshal([]byte(cm.Data["config.yaml"]), &config)
if err != nil {
fmt.Println(err)
}
exists := false
for _, v := range config.ExternalRules {
if v.SeriesQuery == newHPA.Spec.Metrics[0].External.Metric.Name {
exists = true
break
}
}
if !exists {
newRule := controller.ExternalRule{
MetricsQuery: "<<.Series>>",
Name: controller.RuleName{
As: "",
Matches: "",
},
Resources: controller.ResourceMap{
Overrides: map[string]controller.ResourceOverride{
"namespace": {Resource: "namespace"},
},
},
SeriesQuery: newHPA.Spec.Metrics[0].External.Metric.Name,
}
config.ExternalRules = append(config.ExternalRules, newRule)
updatedYaml, err := yaml.Marshal(&config)
if newHPA.Spec.Metrics[0].External != nil {
//获取ConfigMap
configMap, err := ac.fetchPrometheusAdapterConfig(d)
if err != nil {
fmt.Println(err)
return err
}
// 更新 ConfigMap
cm.Data["config.yaml"] = string(updatedYaml)
_, err = ac.client.CoreV1().ConfigMaps("default").Update(context.TODO(), cm, metav1.UpdateOptions{})
var config controller.PrometheusAdapterConfig
err = yaml.Unmarshal([]byte(configMap.Data["config.yaml"]), &config)
if err != nil {
ac.eventRecorder.Eventf(d, v1.EventTypeWarning, "FailedSetCM", fmt.Sprintf("Failed extract set CM %s/%s", "default", "prometheus-adapter"))
klog.Errorf("Failed to unmarshal Prometheus adapter config: %v", err)
return err
}
deployment, err := ac.client.AppsV1().Deployments("default").Get(context.TODO(), "prometheus-adapter", metav1.GetOptions{})
if err != nil {
ac.eventRecorder.Eventf(d, v1.EventTypeWarning, "FailedGetDeployment", fmt.Sprintf("Failed extract get deployment %s/%s", "default", "prometheus-adapter"))
return err
}
// 更新 Annotation 触发重启
if deployment.Spec.Template.Annotations == nil {
deployment.Spec.Template.Annotations = make(map[string]string)
exists := false
for _, v := range config.ExternalRules {
if v.SeriesQuery == newHPA.Spec.Metrics[0].External.Metric.Name {
exists = true
break
}
}
deployment.Spec.Template.Annotations["kubectl.kubernetes.io/restartedAt"] = metav1.Now().Format("2006-01-02T15:04:05Z")

_, err = ac.client.AppsV1().Deployments("default").Update(context.TODO(), deployment, metav1.UpdateOptions{})
if err != nil {
ac.eventRecorder.Eventf(d, v1.EventTypeWarning, "FailedSetDeployment", fmt.Sprintf("Failed extract set deployment %s/%s", "default", "prometheus-adapter"))
return err
if !exists {
newRule := controller.ExternalRule{
MetricsQuery: "<<.Series>>",
Name: controller.RuleName{
As: "",
Matches: "",
},
Resources: controller.ResourceMap{
Overrides: map[string]controller.ResourceOverride{
"namespace": {Resource: "namespace"},
},
},
SeriesQuery: newHPA.Spec.Metrics[0].External.Metric.Name,
}
config.ExternalRules = append(config.ExternalRules, newRule)
updatedYaml, err := yaml.Marshal(&config)
if err != nil {
klog.Errorf("Failed to marshal Prometheus adapter config: %v", err)
return err
}
// 更新 ConfigMap
configMap.Data["config.yaml"] = string(updatedYaml)
_, err = ac.client.CoreV1().ConfigMaps("default").Update(context.TODO(), configMap, metav1.UpdateOptions{})
if err != nil {
ac.eventRecorder.Eventf(d, v1.EventTypeWarning, "FailedSetCM", fmt.Sprintf("Failed extract set CM %s/%s", "default", "prometheus-adapter"))
return err
}
err = ac.restartPrometheusAdapterDeployment()
if err != nil {
klog.ErrorS(err, "Failed to restart Prometheus adapter deployment")
return err
}
klog.Infof("新规则已添加到 ExternalRules 并更新到 ConfigMap")
klog.Infof("Deployment %s/%s 已成功重启\n", "default", "prometheus-adapter")
} else {
klog.Infof("规则已存在,跳过添加")
}
fmt.Println("新规则已添加到 ExternalRules 并更新到 ConfigMap")
fmt.Printf("Deployment %s/%s 已成功重启\n", "default", "prometheus-adapter")
} else {
fmt.Println("规则已存在,跳过添加")
}

if len(hpaList) == 0 {
Expand All @@ -298,6 +310,77 @@ func (ac *AutoscalerController) sync(d *appsv1.Deployment, hpaList []*autoscalin
klog.V(2).Infof("HPA: %s/%s is not changed", newHPA.Namespace, newHPA.Name)
return nil
}
if oldHPA.Spec.Metrics[0].External != nil {
hpaList, err := ac.client.AutoscalingV2().HorizontalPodAutoscalers("").List(context.TODO(), metav1.ListOptions{})
if err != nil {
return err
}
// 定义一个 map 来存储 hpaList 中 External.Metric.Name 出现的次数
metricNameCount := make(map[string]int)

// 遍历 hpaList,将 Metric.Name 的出现次数统计到 map 中
for _, hpa := range hpaList.Items {
if len(hpa.Spec.Metrics) > 0 && hpa.Spec.Metrics[0].External != nil {
metricName := hpa.Spec.Metrics[0].External.Metric.Name
metricNameCount[metricName]++
}
}

// 检查 oldHPA.Spec.Metrics 是否有值,并判断是否需要处理
if len(oldHPA.Spec.Metrics) > 0 && oldHPA.Spec.Metrics[0].External != nil {
metricName := oldHPA.Spec.Metrics[0].External.Metric.Name
if count, exists := metricNameCount[metricName]; exists && count == 1 {
// 获取 Prometheus Adapter 的 ConfigMap
configMap, err := ac.fetchPrometheusAdapterConfig(d)
if err != nil {
return err
}

// 解析 ConfigMap 中的 ExternalRules
var config controller.PrometheusAdapterConfig
err = yaml.Unmarshal([]byte(configMap.Data["config.yaml"]), &config)
if err != nil {
klog.Errorf("Failed to unmarshal Prometheus adapter config: %v", err)
return err
}

// 过滤掉与 oldHPA.Spec.Metrics[0].External.Metric.Name 匹配的规则
var updatedRules []controller.ExternalRule
exists := false
for _, rule := range config.ExternalRules {
if rule.SeriesQuery != metricName {
updatedRules = append(updatedRules, rule)
} else {
exists = true
klog.Infof("Removed rule with SeriesQuery: %s", rule.SeriesQuery)
}
}
if !exists {

config.ExternalRules = updatedRules

// 更新 ConfigMap
updatedYaml, err := yaml.Marshal(&config)
if err != nil {
klog.Errorf("Failed to marshal updated Prometheus adapter config: %v", err)
return err
}
configMap.Data["config.yaml"] = string(updatedYaml)
_, err = ac.client.CoreV1().ConfigMaps(configMap.Namespace).Update(context.TODO(), configMap, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("Failed to update ConfigMap %s/%s: %v", configMap.Namespace, configMap.Name, err)
return err
}
err = ac.restartPrometheusAdapterDeployment()
if err != nil {
return err
}
klog.Infof("ConfigMap %s/%s updated successfully, rule with SeriesQuery %s removed", configMap.Namespace, configMap.Name, metricName)
}
}
}

}
if _, err = ac.client.AutoscalingV2().HorizontalPodAutoscalers(newHPA.Namespace).Update(context.TODO(), newHPA, metav1.UpdateOptions{}); err != nil {
if !errors.IsNotFound(err) {
ac.eventRecorder.Eventf(newHPA, v1.EventTypeWarning, "FailedUpdateHPA", fmt.Sprintf("Failed to Recover update HPA %s/%s", newHPA.Namespace, newHPA.Name))
Expand Down

0 comments on commit 81699f5

Please sign in to comment.