在为 BFE Ingress Controller 中开发 Annotation 时,可参考本开发设计指南。
核心关注以下方面:
- Annotation 的定义
- Annotation 的解析
- BFE 配置的生成
- BFE 配置的热加载
下面的讲述中,将结合 bfe.ingress.kubernetes.io/balance.weight
的实现作为例子。
该Annotation用于支持多个Service之间的负载均衡。
根据需求不同,您可能需要定义并实现一个BFE Ingress Controller 的 Annotation,或者对 k8s Ingress 已定义的 Annotation 进行实现。
- Key:
- BFE Ingress Controller 的 Annotation
bfe.ingress.kubernetes.io/{module}.{key}
bfe.ingress.kubernetes.io/{key}
- k8s Ingress 约定的 Annotation
kubernetes.io/{key}
ingressclass.kubernetes.io/{key}
- BFE Ingress Controller 的 Annotation
- Value: 根据需求设计定义
-
Key:
bfe.ingress.kubernetes.io/balance.weight
-
Value: 详见负载均衡
- Demo:
{"service": {"service1":80, "service2":20}}
- Demo:
-
源码
- /internal/bfeConfig/annotations/balance.go
const ( WeightKey = "balance.weight" WeightAnnotation = BfeAnnotationPrefix + WeightKey ) // ServicesWeight define struct of annotation "balance.weight" // example: {"service": {"service1":80, "service2":20}} type ServicesWeight map[string]int type Balance map[string]ServicesWeight
Annotation 名 | 作用 |
---|---|
bfe.ingress.kubernetes.io/balance.weight | 负载均衡 |
bfe.ingress.kubernetes.io/router.cookie | 路由匹配条件:匹配 Cookie |
bfe.ingress.kubernetes.io/router.header | 路由匹配条件:匹配 Header |
bfe.ingress.kubernetes.io/bfe-ingress-status | 生效状态 |
kubernetes.io/ingress.class | 申明 Ingress 类 |
ingressclass.kubernetes.io/is-default-class | 申明默认 Ingress 类 |
新 BFE Ingress Controller Annotation 的定义需兼容已有的 Annotation,在实现指定功能的基础上,尽量做到:
- 设计简洁
- 避免与已有 Annotation 功能重复
更多细节建议在 Issue 中讨论
- Kubernetes controller-runtime 监听事件后,触发 Reconcile
- Reconciler 在回调函数中,触发 configBuilder 的更新
- configBuilder 更新过程中,根据输入的 Ingress 资源 解析 指定 Annotation,用于后续生成 BFE 配置
-
指定 Annotation:
bfe.ingress.kubernetes.io/balance.weight
-
源码
- /internal/controllers/ingress/netv1/ingress_controller.go
func ReconcileV1Ingress(ctx context.Context, r client.Client, configBuilder *bfeConfig.ConfigBuilder, ingress *netv1.Ingress) error { // ... if err = configBuilder.UpdateIngress(ingress, service, endpoints, secrets); err != nil { configBuilder.DeleteIngress(ingress.Namespace, ingress.Name) return err } return nil }
- /internal/bfeConfig/configs/clusterConfig.go
func (c *ConfigBuilder) UpdateIngress(ingress *netv1.Ingress, services map[string]*corev1.Service, endpoints map[string]*corev1.Endpoints, secrets []*corev1.Secret) error { // ... // update cluster conf if err := c.clusterConf.UpdateIngress(ingress, services, endpoints); err != nil { c.serverDataConf.DeleteIngress(ingress.Namespace, ingress.Name) return err } // ... }
- /internal/bfeConfig/configs/clusterConfig.go
func (c *ClusterConfig) UpdateIngress(ingress *netv1.Ingress, services map[string]*corev1.Service, endpoints map[string]*corev1.Endpoints) error { // ... balance, _ := annotations.GetBalance(ingress.Annotations) // ... }
- /internal/bfeConfig/annotations/balance.go
// GetBalance parse annotation "balance.weight" func GetBalance(annotations map[string]string) (Balance, error) { value, ok := annotations[WeightAnnotation] if !ok { return nil, nil } var lb = make(Balance) err := json.Unmarshal([]byte(value), &lb) if err != nil { return nil, fmt.Errorf("annotation %s is illegal, error: %s", WeightAnnotation, err) } // check whether weight sum > 0 for _, services := range lb { sum := 0 for _, weight := range services { if weight < 0 { return nil, fmt.Errorf("weight of load balance service should >= 0") } sum += weight } if sum == 0 { return nil, fmt.Errorf("sum of all load balance service weight should > 0") } } return lb, nil }
- Kubernetes controller-runtime 监听事件后,触发 Reconcile
- Reconciler 在回调函数中,触发 configBuilder 的更新
- configBuilder 更新过程中,根据输入的 Ingress 资源 生成 多种 BFE 配置对象
-
更新的配置对象:
configBuilder.clusterConf
-
生成的负载均衡配置: 字段与格式说明
-
源码
- /internal/controllers/ingress/netv1/ingress_controller.go
func ReconcileV1Ingress(ctx context.Context, r client.Client, configBuilder *bfeConfig.ConfigBuilder, ingress *netv1.Ingress) error { // ... if err = configBuilder.UpdateIngress(ingress, service, endpoints, secrets); err != nil { configBuilder.DeleteIngress(ingress.Namespace, ingress.Name) return err } return nil }
- /internal/bfeConfig/configs/clusterConfig.go
func (c *ConfigBuilder) UpdateIngress(ingress *netv1.Ingress, services map[string]*corev1.Service, endpoints map[string]*corev1.Endpoints, secrets []*corev1.Secret) error { // ... // update cluster conf if err := c.clusterConf.UpdateIngress(ingress, services, endpoints); err != nil { c.serverDataConf.DeleteIngress(ingress.Namespace, ingress.Name) return err } // ... }
- /internal/bfeConfig/configs/clusterConfig.go
func (c *ClusterConfig) UpdateIngress(ingress *netv1.Ingress, services map[string]*corev1.Service, endpoints map[string]*corev1.Endpoints) error { if len(ingress.Spec.Rules) == 0 { return nil } balance, _ := annotations.GetBalance(ingress.Annotations) ingressName := util.NamespacedName(ingress.Namespace, ingress.Name) for _, rule := range ingress.Spec.Rules { for _, path := range rule.HTTP.Paths { // create cluster && subcluster for each Service clusterName := util.ClusterName(ingressName, path.Backend.Service) // cluster config (*c.clusterTableConf.Config)[clusterName] = c.newClusterBackend(ingress.Namespace, path.Backend.Service, balance, services, endpoints) // gslb config (*c.gslbConf.Clusters)[clusterName] = c.newGslbClusterConf(ingress.Namespace, path.Backend.Service.Name, balance) // put into map c.ingress2Cluster.Put(ingressName, clusterName) for service := range (*c.gslbConf.Clusters)[clusterName] { c.service2Cluster.Put(service, clusterName) } } } if len(option.Opts.Ingress.DefaultBackend) > 0 { c.addDefautBackend(endpoints[option.Opts.Ingress.DefaultBackend]) } if err := cluster_table_conf.ClusterTableConfCheck(c.clusterTableConf); err != nil { c.DeleteIngress(ingress.Namespace, ingress.Name) return err } c.setVersion() return nil }
- Ingress 资源的新增、更新、删除需分别适当处理
- BFE 配置对象
configBuilder.*
可能存在缓存,注意对缓存内容的更新
根据配置时间间隔,定时触发
对于多种 BFE 配置对象configBuilder.*
,分别执行以下逻辑:
-
负载均衡配置的热加载方式:热加载接口说明
-
源码
- /internal/bfeConfig/configBuilder.go
func (c *ConfigBuilder) InitReload(ctx context.Context) { tick := time.NewTicker(option.Opts.Ingress.ReloadInterval) go func() { defer tick.Stop() for { select { case <-tick.C: if err := c.reload(); err != nil { log.Error(err, "fail to reload config") } case <-ctx.Done(): log.Info("exit bfe reload") return } } }() } func (c *ConfigBuilder) reload() error { // ... if err := c.clusterConf.Reload(); err != nil { log.Error(err, "Fail to reload config", "clusterConf", c.clusterConf) return err } // ... }
- /internal/bfeConfig/configs/clusterConfig.go
func (c *ClusterConfig) Reload() error { reload := false if *c.gslbConf.Ts != c.gslbVersion { err := util.DumpBfeConf(GslbData, c.gslbConf) if err != nil { return fmt.Errorf("dump gslb.data error: %v", err) } reload = true } if *c.clusterTableConf.Version != c.clusterTableVersion { err := util.DumpBfeConf(ClusterTableData, c.clusterTableConf) if err != nil { return fmt.Errorf("dump cluster_table.data error: %v", err) } reload = true } if reload { if err := util.ReloadBfe(ConfigNameclusterConf); err != nil { return err } c.gslbVersion = *c.gslbConf.Ts c.clusterTableVersion = *c.clusterTableConf.Version } return nil }