From 7aae88259104df77076ffbe8aaf1d938343ba669 Mon Sep 17 00:00:00 2001 From: yair Date: Wed, 13 Dec 2023 14:41:17 +0200 Subject: [PATCH] unified configuration --- main.go | 39 +++------- pkg/config/config.go | 82 ++++---------------- pkg/config/models.go | 21 +++++ pkg/config/utils.go | 72 +++++++++++++++++ pkg/event_listener/consumer/consumer.go | 13 +--- pkg/event_listener/event_listener_handler.go | 19 ++--- 6 files changed, 124 insertions(+), 122 deletions(-) create mode 100644 pkg/config/models.go create mode 100644 pkg/config/utils.go diff --git a/main.go b/main.go index 854beef..c313998 100644 --- a/main.go +++ b/main.go @@ -13,26 +13,14 @@ import ( "k8s.io/klog/v2" ) -var ( - configFilePath string - resyncInterval uint - eventListenerType string - stateKey string - deleteDependents bool - createMissingRelatedEntities bool - portBaseURL string - portClientId string - portClientSecret string -) - func initiateHandler(exporterConfig *port.Config, k8sClient *k8s.Client, portClient *cli.PortClient) (*handlers.ControllersHandler, error) { - apiConfig, err := integration.GetIntegrationConfig(portClient, stateKey) + apiConfig, err := integration.GetIntegrationConfig(portClient, config.ApplicationConfig.StateKey) if err != nil { klog.Fatalf("Error getting K8s integration config: %s", err.Error()) } - cli.WithDeleteDependents(deleteDependents)(portClient) - cli.WithCreateMissingRelatedEntities(createMissingRelatedEntities)(portClient) + cli.WithDeleteDependents(apiConfig.DeleteDependents)(portClient) + cli.WithCreateMissingRelatedEntities(apiConfig.CreateMissingRelatedEntities)(portClient) newHandler := handlers.NewControllersHandler(exporterConfig, apiConfig, k8sClient, portClient) newHandler.Handle() @@ -56,21 +44,21 @@ func main() { klog.Fatalf("Error building K8s client: %s", err.Error()) } - portClient, err := cli.New(portBaseURL, - cli.WithClientID(portClientId), cli.WithClientSecret(portClientSecret), - cli.WithHeader("User-Agent", fmt.Sprintf("port-k8s-exporter/0.1 (statekey/%s)", stateKey)), + portClient, err := cli.New(config.ApplicationConfig.PortBaseURL, + cli.WithClientID(config.ApplicationConfig.PortClientId), cli.WithClientSecret(config.ApplicationConfig.PortClientSecret), + cli.WithHeader("User-Agent", fmt.Sprintf("port-k8s-exporter/0.1 (statekey/%s)", config.ApplicationConfig.StateKey)), ) if err != nil { klog.Fatalf("Error building Port client: %s", err.Error()) } - exporterConfig, err := config.GetConfigFile(configFilePath, resyncInterval, stateKey, eventListenerType) + exporterConfig, err := config.GetConfigFile(config.ApplicationConfig.ConfigFilePath, config.ApplicationConfig.ResyncInterval, config.ApplicationConfig.StateKey, config.ApplicationConfig.EventListenerType) if err != nil { klog.Fatalf("Error building Port K8s Exporter config: %s", err.Error()) } - _, err = integration.GetIntegrationConfig(portClient, stateKey) + _, err = integration.GetIntegrationConfig(portClient, config.ApplicationConfig.StateKey) if err != nil { if exporterConfig == nil { klog.Fatalf("The integration does not exist and no config file was provided") @@ -83,7 +71,7 @@ func main() { klog.Info("Starting controllers handler") handler, _ := initiateHandler(exporterConfig, k8sClient, portClient) - eventListener := event_listener.NewEventListener(stateKey, eventListenerType, handler, portClient) + eventListener := event_listener.NewEventListener(config.ApplicationConfig.StateKey, config.ApplicationConfig.EventListenerType, handler, portClient) err = eventListener.Start(func(handler *handlers.ControllersHandler) (*handlers.ControllersHandler, error) { handler.Stop() return initiateHandler(exporterConfig, k8sClient, portClient) @@ -94,14 +82,5 @@ func main() { } func init() { - configFilePath = config.NewString("config", "", "Path to Port K8s Exporter config file. Required.") - stateKey = config.NewString("state-key", "", "Port K8s Exporter state key id. Required.") - - resyncInterval = config.NewUInt("resync-interval", 0, "The re-sync interval in minutes. Optional.") - portBaseURL = config.NewString("port-base-url", "https://api.getport.io", "Port base URL. Optional.") - - portClientId = config.NewString("port-client-id", "", "Port client id. Required.") - portClientSecret = config.NewString("port-client-secret", "", "Port client secret. Required.") - eventListenerType = config.NewString("event-listener-type", "POLLING", "Event listener type. Optional.") } diff --git a/pkg/config/config.go b/pkg/config/config.go index 2798b4c..f2e5637 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -1,72 +1,18 @@ package config -import ( - "flag" - "github.com/port-labs/port-k8s-exporter/pkg/goutils" - "github.com/port-labs/port-k8s-exporter/pkg/port" - "gopkg.in/yaml.v2" - "k8s.io/klog/v2" - "os" - "slices" - "strings" -) - -var keys []string - -func prepareEnvKey(key string) string { - newKey := strings.ToUpper(strings.ReplaceAll(key, "-", "_")) - - if slices.Contains(keys, newKey) { - klog.Fatalf("Application Error : Found duplicate config key: %s", newKey) - } - - keys = append(keys, newKey) - return newKey +var KafkaConfig = &KafkaConfiguration{ + Brokers: NewString("event-listener-brokers", "localhost:9092", "Kafka brokers"), + SecurityProtocol: NewString("event-listener-security-protocol", "plaintext", "Kafka security protocol"), + AuthenticationMechanism: NewString("event-listener-authentication-mechanism", "none", "Kafka authentication mechanism"), } - -func NewString(key string, defaultValue string, description string) string { - var value string - flag.StringVar(&value, key, "", description) - if value == "" { - value = goutils.GetStringEnvOrDefault(prepareEnvKey(key), defaultValue) - } - - return value -} - -func NewUInt(key string, defaultValue uint, description string) uint { - var value uint64 - flag.Uint64Var(&value, key, 0, description) - if value == 0 { - value = goutils.GetUintEnvOrDefault(prepareEnvKey(key), uint64(defaultValue)) - } - - return uint(value) -} - -type FileNotFoundError struct { - s string -} - -func (e *FileNotFoundError) Error() string { - return e.s -} - -func GetConfigFile(filepath string, resyncInterval uint, stateKey string, eventListenerType string) (*port.Config, error) { - c := &port.Config{ - ResyncInterval: resyncInterval, - StateKey: stateKey, - EventListenerType: eventListenerType, - } - config, err := os.ReadFile(filepath) - if err != nil { - return c, &FileNotFoundError{err.Error()} - } - - err = yaml.Unmarshal(config, c) - if err != nil { - return nil, err - } - - return c, nil +var PollingListenerRate = NewUInt("event-listener-polling-rate", 60, "Polling rate for the polling event listener") + +var ApplicationConfig = &ApplicationConfiguration{ + ConfigFilePath: NewString("config", "", "Path to Port K8s Exporter config file. Required."), + StateKey: NewString("state-key", "", "Port K8s Exporter state key id. Required."), + ResyncInterval: NewUInt("resync-interval", 0, "The re-sync interval in minutes. Optional."), + PortBaseURL: NewString("port-base-url", "https://api.getport.io", "Port base URL. Optional."), + PortClientId: NewString("port-client-id", "", "Port client id. Required."), + PortClientSecret: NewString("port-client-secret", "", "Port client secret. Required."), + EventListenerType: NewString("event-listener-type", "POLLING", "Event listener type. Optional."), } diff --git a/pkg/config/models.go b/pkg/config/models.go new file mode 100644 index 0000000..f57eef1 --- /dev/null +++ b/pkg/config/models.go @@ -0,0 +1,21 @@ +package config + +type KafkaConfiguration struct { + Brokers string + SecurityProtocol string + GroupID string + AuthenticationMechanism string + Username string + Password string + KafkaSecurityEnabled bool +} + +type ApplicationConfiguration struct { + ConfigFilePath string + StateKey string + ResyncInterval uint + PortBaseURL string + PortClientId string + PortClientSecret string + EventListenerType string +} diff --git a/pkg/config/utils.go b/pkg/config/utils.go new file mode 100644 index 0000000..2798b4c --- /dev/null +++ b/pkg/config/utils.go @@ -0,0 +1,72 @@ +package config + +import ( + "flag" + "github.com/port-labs/port-k8s-exporter/pkg/goutils" + "github.com/port-labs/port-k8s-exporter/pkg/port" + "gopkg.in/yaml.v2" + "k8s.io/klog/v2" + "os" + "slices" + "strings" +) + +var keys []string + +func prepareEnvKey(key string) string { + newKey := strings.ToUpper(strings.ReplaceAll(key, "-", "_")) + + if slices.Contains(keys, newKey) { + klog.Fatalf("Application Error : Found duplicate config key: %s", newKey) + } + + keys = append(keys, newKey) + return newKey +} + +func NewString(key string, defaultValue string, description string) string { + var value string + flag.StringVar(&value, key, "", description) + if value == "" { + value = goutils.GetStringEnvOrDefault(prepareEnvKey(key), defaultValue) + } + + return value +} + +func NewUInt(key string, defaultValue uint, description string) uint { + var value uint64 + flag.Uint64Var(&value, key, 0, description) + if value == 0 { + value = goutils.GetUintEnvOrDefault(prepareEnvKey(key), uint64(defaultValue)) + } + + return uint(value) +} + +type FileNotFoundError struct { + s string +} + +func (e *FileNotFoundError) Error() string { + return e.s +} + +func GetConfigFile(filepath string, resyncInterval uint, stateKey string, eventListenerType string) (*port.Config, error) { + c := &port.Config{ + ResyncInterval: resyncInterval, + StateKey: stateKey, + EventListenerType: eventListenerType, + } + config, err := os.ReadFile(filepath) + if err != nil { + return c, &FileNotFoundError{err.Error()} + } + + err = yaml.Unmarshal(config, c) + if err != nil { + return nil, err + } + + return c, nil +} diff --git a/pkg/event_listener/consumer/consumer.go b/pkg/event_listener/consumer/consumer.go index 2ef4f41..9a82e15 100644 --- a/pkg/event_listener/consumer/consumer.go +++ b/pkg/event_listener/consumer/consumer.go @@ -2,6 +2,7 @@ package consumer import ( "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/port-labs/port-k8s-exporter/pkg/config" "k8s.io/klog/v2" "os" "os/signal" @@ -12,19 +13,9 @@ type Consumer struct { client *kafka.Consumer } -type KafkaConfiguration struct { - Brokers string - SecurityProtocol string - GroupID string - AuthenticationMechanism string - Username string - Password string - KafkaSecurityEnabled bool -} - type JsonHandler func(value []byte) -func NewConsumer(config *KafkaConfiguration) (*Consumer, error) { +func NewConsumer(config *config.KafkaConfiguration) (*Consumer, error) { c, err := kafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": config.Brokers, "group.id": config.GroupID, diff --git a/pkg/event_listener/event_listener_handler.go b/pkg/event_listener/event_listener_handler.go index 2f490e2..1516b08 100644 --- a/pkg/event_listener/event_listener_handler.go +++ b/pkg/event_listener/event_listener_handler.go @@ -30,13 +30,6 @@ type EventListener struct { portClient *cli.PortClient } -var kafkaConfig = &consumer.KafkaConfiguration{ - Brokers: config.NewString("event-listener-brokers", "localhost:9092", "Kafka brokers"), - SecurityProtocol: config.NewString("event-listener-security-protocol", "plaintext", "Kafka security protocol"), - AuthenticationMechanism: config.NewString("event-listener-authentication-mechanism", "none", "Kafka authentication mechanism"), -} -var pollingListenerRate = config.NewUInt("event-listener-polling-rate", 60, "Polling rate for the polling event listener") - func shouldResync(stateKey string, message *IncomingMessage) bool { return message.Diff != nil && message.Diff.After != nil && @@ -69,10 +62,10 @@ func startKafkaEventListener(l *EventListener, resync func()) error { return err } - c := &consumer.KafkaConfiguration{ - Brokers: kafkaConfig.Brokers, - SecurityProtocol: kafkaConfig.SecurityProtocol, - AuthenticationMechanism: kafkaConfig.AuthenticationMechanism, + c := &config.KafkaConfiguration{ + Brokers: config.KafkaConfig.Brokers, + SecurityProtocol: config.KafkaConfig.SecurityProtocol, + AuthenticationMechanism: config.KafkaConfig.AuthenticationMechanism, Username: credentials.Username, Password: credentials.Password, GroupID: orgId + ".k8s." + l.stateKey, @@ -102,8 +95,8 @@ func startKafkaEventListener(l *EventListener, resync func()) error { func startPollingEventListener(l *EventListener, resync func()) { klog.Infof("Starting polling event listener") - klog.Infof("Polling rate set to %d seconds", pollingListenerRate) - pollingHandler := polling.NewPollingHandler(pollingListenerRate, l.stateKey, l.portClient) + klog.Infof("Polling rate set to %d seconds", config.PollingListenerRate) + pollingHandler := polling.NewPollingHandler(config.PollingListenerRate, l.stateKey, l.portClient) pollingHandler.Run(resync) }