Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port 5363 UI for the kubernetes exporter moving the config into port #19

Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
7e0188b
Added support for control the app config from the UI using Polling an…
yairsimantov20 Dec 11, 2023
e7079db
fixed miss type
yairsimantov20 Dec 11, 2023
311502d
Renaming
yairsimantov20 Dec 11, 2023
d84ad22
updated typing
yairsimantov20 Dec 11, 2023
a32b98b
at the start of polling getting the current state
yairsimantov20 Dec 11, 2023
a04dfd1
fixed schema & logs
yairsimantov20 Dec 11, 2023
25254fb
revert useragent addition
yairsimantov20 Dec 12, 2023
1eef152
fixes
yairsimantov20 Dec 13, 2023
97edf37
cr requests
yairsimantov20 Dec 13, 2023
56bad41
better config management
yairsimantov20 Dec 13, 2023
eb24061
removed bool config managment
yairsimantov20 Dec 13, 2023
296e170
better error for config file
yairsimantov20 Dec 13, 2023
2c21603
moved dependent and create missing to the app config
yairsimantov20 Dec 13, 2023
62a731d
starting kafka config once
yairsimantov20 Dec 13, 2023
f1ef258
starting kafka config once
yairsimantov20 Dec 13, 2023
7aae882
unified configuration
yairsimantov20 Dec 13, 2023
5be6bdb
fixed configuration flag issue
yairsimantov20 Dec 13, 2023
a6ced85
not failing for non-existing configuration on startup
yairsimantov20 Dec 14, 2023
1db8ded
change imports
yairsimantov20 Dec 14, 2023
7498948
changing offset reset to latest
yairsimantov20 Dec 14, 2023
f4e8dd5
starting resync in goroutine
yairsimantov20 Dec 14, 2023
6326254
revert moving the handle
yairsimantov20 Dec 17, 2023
bbb39ee
revert moving the handle
yairsimantov20 Dec 17, 2023
a1e4031
iconsume and consumer tests
yairsimantov20 Dec 18, 2023
3e3c91c
iconsume and consumer tests
yairsimantov20 Dec 18, 2023
80421b1
dependency injection and tests
yairsimantov20 Dec 18, 2023
d0e4f66
dep injection
yairsimantov20 Dec 18, 2023
a522369
event handler tests
yairsimantov20 Dec 20, 2023
1838961
file renaming
yairsimantov20 Dec 20, 2023
9879466
removed function
yairsimantov20 Dec 20, 2023
2cf5c27
cr
yairsimantov20 Dec 20, 2023
d4816cd
cr
yairsimantov20 Dec 20, 2023
ae6ccad
closing after one message consumer
yairsimantov20 Dec 20, 2023
06451f1
closing after one message consumer
yairsimantov20 Dec 20, 2023
86ec4e7
cr
yairsimantov20 Dec 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 20 additions & 41 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,37 +3,25 @@ package main
import (
"flag"
"fmt"
"github.com/port-labs/port-k8s-exporter/pkg/goutils"
"github.com/port-labs/port-k8s-exporter/pkg/port"
"os"

"github.com/port-labs/port-k8s-exporter/pkg/config"
"github.com/port-labs/port-k8s-exporter/pkg/event_listener"
"github.com/port-labs/port-k8s-exporter/pkg/handlers"
"github.com/port-labs/port-k8s-exporter/pkg/k8s"
"github.com/port-labs/port-k8s-exporter/pkg/port"
"github.com/port-labs/port-k8s-exporter/pkg/port/cli"
"github.com/port-labs/port-k8s-exporter/pkg/port/integration"
"k8s.io/klog/v2"
)

var (
configFilePath string
resyncInterval uint
eventListenerType string
stateKey string
deleteDependents bool
createMissingRelatedEntities bool
portBaseURL string
portClientId string
portClientSecret string
)

func InitiateHandler(exporterConfig *port.Config, k8sClient *k8s.Client, portClient *cli.PortClient) (*handlers.ControllersHandler, error) {
apiConfig, err := integration.GetIntegrationConfig(portClient, stateKey)
func initiateHandler(exporterConfig *port.Config, k8sClient *k8s.Client, portClient *cli.PortClient) (*handlers.ControllersHandler, error) {
apiConfig, err := integration.GetIntegrationConfig(portClient, config.ApplicationConfig.StateKey)
if err != nil {
klog.Fatalf("Error getting K8s integration config: %s", err.Error())
}

cli.WithDeleteDependents(apiConfig.DeleteDependents)(portClient)
cli.WithCreateMissingRelatedEntities(apiConfig.CreateMissingRelatedEntities)(portClient)

newHandler := handlers.NewControllersHandler(exporterConfig, apiConfig, k8sClient, portClient)
newHandler.Handle()

Expand All @@ -42,7 +30,6 @@ func InitiateHandler(exporterConfig *port.Config, k8sClient *k8s.Client, portCli

func main() {
klog.InitFlags(nil)
flag.Parse()

k8sConfig := k8s.NewKubeConfig()

Expand All @@ -56,52 +43,44 @@ func main() {
klog.Fatalf("Error building K8s client: %s", err.Error())
}

portClient, err := cli.New(portBaseURL,
cli.WithClientID(portClientId), cli.WithClientSecret(portClientSecret),
cli.WithDeleteDependents(deleteDependents), cli.WithCreateMissingRelatedEntities(createMissingRelatedEntities),
cli.WithHeader("User-Agent", fmt.Sprintf("port-k8s-exporter/0.1 (statekey/%s)", stateKey)),
portClient, err := cli.New(config.ApplicationConfig.PortBaseURL,
cli.WithClientID(config.ApplicationConfig.PortClientId), cli.WithClientSecret(config.ApplicationConfig.PortClientSecret),
cli.WithHeader("User-Agent", fmt.Sprintf("port-k8s-exporter/0.1 (statekey/%s)", config.ApplicationConfig.StateKey)),
)

if err != nil {
klog.Fatalf("Error building Port client: %s", err.Error())
}

exporterConfig, err := config.New(configFilePath, resyncInterval, stateKey, eventListenerType)
exporterConfig, err := config.GetConfigFile(config.ApplicationConfig.ConfigFilePath, config.ApplicationConfig.ResyncInterval, config.ApplicationConfig.StateKey, config.ApplicationConfig.EventListenerType)
if err != nil {
yairsimantov20 marked this conversation as resolved.
Show resolved Hide resolved
klog.Fatalf("Error building Port K8s Exporter config: %s", err.Error())
}

_, err = integration.GetIntegrationConfig(portClient, stateKey)
_, err = integration.GetIntegrationConfig(portClient, config.ApplicationConfig.StateKey)
if err != nil {
err = integration.NewIntegration(portClient, stateKey, exporterConfig)
if exporterConfig == nil {
klog.Fatalf("The integration does not exist and no config file was provided")
}
err = integration.NewIntegration(portClient, exporterConfig, exporterConfig.Resources)
if err != nil {
klog.Fatalf("Error creating K8s integration: %s", err.Error())
}
}
cli.WithHeader("User-Agent", fmt.Sprintf("port-k8s-exporter/0.1 (statekey/%s)", exporterConfig.StateKey))(portClient)

klog.Info("Starting controllers handler")
handler, _ := InitiateHandler(exporterConfig, k8sClient, portClient)
eventListener := event_listener.NewEventListener(stateKey, eventListenerType, handler, portClient)
handler, _ := initiateHandler(exporterConfig, k8sClient, portClient)
eventListener := event_listener.NewEventListener(config.ApplicationConfig.StateKey, config.ApplicationConfig.EventListenerType, handler, portClient)
err = eventListener.Start(func(handler *handlers.ControllersHandler) (*handlers.ControllersHandler, error) {
handler.Stop()
return InitiateHandler(exporterConfig, k8sClient, portClient)
return initiateHandler(exporterConfig, k8sClient, portClient)
})
if err != nil {
klog.Fatalf("Error starting event listener: %s", err.Error())
yairsimantov20 marked this conversation as resolved.
Show resolved Hide resolved
}
klog.Info("Started controllers handler")
}

func init() {
flag.StringVar(&configFilePath, "config", "", "Path to Port K8s Exporter config file. Required.")
flag.StringVar(&stateKey, "state-key", "", "Port K8s Exporter state key id. Required.")
flag.BoolVar(&deleteDependents, "delete-dependents", false, "Flag to enable deletion of dependent Port Entities. Optional.")
flag.BoolVar(&createMissingRelatedEntities, "create-missing-related-entities", false, "Flag to enable creation of missing related Port entities. Optional.")
flag.UintVar(&resyncInterval, "resync-interval", 0, "The re-sync interval in minutes. Optional.")
flag.StringVar(&portBaseURL, "port-base-url", "https://api.getport.io", "Port base URL. Optional.")
portClientId = os.Getenv("PORT_CLIENT_ID")
portClientSecret = os.Getenv("PORT_CLIENT_SECRET")

eventListenerType = goutils.GetEnvOrDefault("EVENT_LISTENER__TYPE", "POLLING")
config.Init()
flag.Parse()
}
38 changes: 18 additions & 20 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,25 @@
package config

import (
"github.com/port-labs/port-k8s-exporter/pkg/port"
"os"
var KafkaConfig = &KafkaConfiguration{}
var PollingListenerRate uint

"gopkg.in/yaml.v2"
)
var ApplicationConfig = &ApplicationConfiguration{}

func New(filepath string, resyncInterval uint, stateKey string, eventListenerType string) (*port.Config, error) {
c := &port.Config{
ResyncInterval: resyncInterval,
StateKey: stateKey,
EventListenerType: eventListenerType,
}
config, err := os.ReadFile(filepath)
if err != nil {
return nil, err
}
func Init() {
// Kafka listener Configuration
NewString(&KafkaConfig.Brokers, "event-listener-brokers", "localhost:9092", "Kafka event listener brokers")
NewString(&KafkaConfig.SecurityProtocol, "event-listener-security-protocol", "plaintext", "Kafka event listener security protocol")
NewString(&KafkaConfig.AuthenticationMechanism, "event-listener-authentication-mechanism", "none", "Kafka event listener authentication mechanism")

err = yaml.Unmarshal(config, c)
if err != nil {
return nil, err
}
// Polling listener Configuration
NewUInt(&PollingListenerRate, "event-listener-polling-rate", 60, "Polling event listener polling rate")

return c, nil
// Application Configuration
NewString(&ApplicationConfig.ConfigFilePath, "config", "config.yaml", "Path to Port K8s Exporter config file. Required.")
NewString(&ApplicationConfig.StateKey, "state-key", "my-k8s-exporter", "Port K8s Exporter state key id. Required.")
NewUInt(&ApplicationConfig.ResyncInterval, "resync-interval", 0, "The re-sync interval in minutes. Optional.")
NewString(&ApplicationConfig.PortBaseURL, "port-base-url", "https://api.getport.io", "Port base URL. Optional.")
NewString(&ApplicationConfig.PortClientId, "port-client-id", "", "Port client id. Required.")
NewString(&ApplicationConfig.PortClientSecret, "port-client-secret", "", "Port client secret. Required.")
NewString(&ApplicationConfig.EventListenerType, "event-listener-type", "POLLING", "Event listener type, can be either POLLING or KAFKA. Optional.")
}
21 changes: 21 additions & 0 deletions pkg/config/models.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package config

type KafkaConfiguration struct {
Brokers string
SecurityProtocol string
GroupID string
AuthenticationMechanism string
Username string
Password string
KafkaSecurityEnabled bool
}

type ApplicationConfiguration struct {
ConfigFilePath string
StateKey string
ResyncInterval uint
PortBaseURL string
PortClientId string
PortClientSecret string
EventListenerType string
}
62 changes: 62 additions & 0 deletions pkg/config/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package config

import (
"flag"
"github.com/port-labs/port-k8s-exporter/pkg/goutils"
"github.com/port-labs/port-k8s-exporter/pkg/port"
"gopkg.in/yaml.v2"
"k8s.io/klog/v2"
"os"
"slices"

Check failure on line 10 in pkg/config/utils.go

View workflow job for this annotation

GitHub Actions / build

package slices is not in GOROOT (/opt/hostedtoolcache/go/1.19.13/x64/src/slices)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

slices package doesn't exists in go 1.19, only starting from 1.21: https://pkg.go.dev/slices?tab=versions

"strings"
)

var keys []string

func prepareEnvKey(key string) string {
newKey := strings.ToUpper(strings.ReplaceAll(key, "-", "_"))

if slices.Contains(keys, newKey) {
klog.Fatalf("Application Error : Found duplicate config key: %s", newKey)
}

keys = append(keys, newKey)
return newKey
}

func NewString(v *string, key string, defaultValue string, description string) {
value := goutils.GetStringEnvOrDefault(prepareEnvKey(key), defaultValue)
flag.StringVar(v, key, value, description)
}

func NewUInt(v *uint, key string, defaultValue uint, description string) {
value := uint(goutils.GetUintEnvOrDefault(prepareEnvKey(key), uint64(defaultValue)))
flag.UintVar(v, key, value, description)
}

type FileNotFoundError struct {
s string
}

func (e *FileNotFoundError) Error() string {
return e.s
}

func GetConfigFile(filepath string, resyncInterval uint, stateKey string, eventListenerType string) (*port.Config, error) {
c := &port.Config{
ResyncInterval: resyncInterval,
StateKey: stateKey,
EventListenerType: eventListenerType,
}
config, err := os.ReadFile(filepath)
if err != nil {
return c, &FileNotFoundError{err.Error()}
}

err = yaml.Unmarshal(config, c)
if err != nil {
return nil, err
}

return c, nil
}
24 changes: 9 additions & 15 deletions pkg/event_listener/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package consumer

import (
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/port-labs/port-k8s-exporter/pkg/config"
"k8s.io/klog/v2"
"os"
"os/signal"
Expand All @@ -12,19 +13,9 @@ type Consumer struct {
client *kafka.Consumer
}

type KafkaConfiguration struct {
Brokers string
SecurityProtocol string
GroupID string
AuthenticationMechanism string
Username string
Password string
KafkaSecurityEnabled bool
}

type JsonHandler func(value []byte)

func NewConsumer(config *KafkaConfiguration) (*Consumer, error) {
func NewConsumer(config *config.KafkaConfiguration) (*Consumer, error) {
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": config.Brokers,
"group.id": config.GroupID,
Expand All @@ -44,14 +35,17 @@ func NewConsumer(config *KafkaConfiguration) (*Consumer, error) {

func (c *Consumer) Consume(topic string, handler JsonHandler) {
topics := []string{topic}
_ = c.client.SubscribeTopics(topics, nil)
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
err := c.client.SubscribeTopics(topics, nil)
if err != nil {
klog.Fatalf("Error subscribing to topic: %s", err.Error())
}
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

run := true
for run {
select {
case sig := <-sigchan:
case sig := <-sigChan:
klog.Infof("Caught signal %v: terminating\n", sig)
run = false
default:
Expand Down
30 changes: 14 additions & 16 deletions pkg/event_listener/event_listener_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package event_listener
import (
"encoding/json"
"fmt"
"github.com/port-labs/port-k8s-exporter/pkg/config"
"github.com/port-labs/port-k8s-exporter/pkg/event_listener/consumer"
"github.com/port-labs/port-k8s-exporter/pkg/event_listener/polling"
"github.com/port-labs/port-k8s-exporter/pkg/goutils"
"github.com/port-labs/port-k8s-exporter/pkg/handlers"
"github.com/port-labs/port-k8s-exporter/pkg/port"
"github.com/port-labs/port-k8s-exporter/pkg/port/cli"
Expand Down Expand Up @@ -52,7 +52,7 @@ func NewEventListener(stateKey string, eventListenerType string, controllerHandl

func startKafkaEventListener(l *EventListener, resync func()) error {
klog.Infof("Starting Kafka event listener")
klog.Infof("Geting Consumer Information")
klog.Infof("Getting Consumer Information")
credentials, err := kafka_credentials.GetKafkaCredentials(l.portClient)
if err != nil {
return err
Expand All @@ -61,33 +61,32 @@ func startKafkaEventListener(l *EventListener, resync func()) error {
if err != nil {
return err
}
config := &consumer.KafkaConfiguration{
Brokers: goutils.GetEnvOrDefault("EVENT_LISTENER__BROKERS", "b-1-public.publicclusterprod.t9rw6w.c1.kafka.eu-west-1.amazonaws.com:9196,b-2-public.publicclusterprod.t9rw6w.c1.kafka.eu-west-1.amazonaws.com:9196,b-3-public.publicclusterprod.t9rw6w.c1.kafka.eu-west-1.amazonaws.com:9196"),
SecurityProtocol: goutils.GetEnvOrDefault("EVENT_LISTENER__SECURITY_PROTOCOL", "SASL_SSL"),
AuthenticationMechanism: goutils.GetEnvOrDefault("EVENT_LISTENER__AUTHENTICATION_MECHANISM", "SCRAM-SHA-512"),
KafkaSecurityEnabled: goutils.GetBooleanEnvOrDefault("EVENT_LISTENER__KAFKA_SECURITY_ENABLED", true),

c := &config.KafkaConfiguration{
Brokers: config.KafkaConfig.Brokers,
SecurityProtocol: config.KafkaConfig.SecurityProtocol,
AuthenticationMechanism: config.KafkaConfig.AuthenticationMechanism,
Username: credentials.Username,
Password: credentials.Password,
GroupID: orgId + ".k8s." + l.stateKey,
}

topic := orgId + ".change.log"
instance, err := consumer.NewConsumer(config)
instance, err := consumer.NewConsumer(c)

if err != nil {
return err
}

klog.Infof("Starting consumer for topic %s and groupId %s", topic, config.GroupID)
klog.Infof("Starting consumer for topic %s and groupId %s", topic, c.GroupID)
instance.Consume(topic, func(value []byte) {
incomingMessage := &IncomingMessage{}
parsingError := json.Unmarshal(value, &incomingMessage)
if shouldResync(l.stateKey, incomingMessage) {
klog.Infof("Changes detected. Resyncing...")
resync()
}
if parsingError != nil {
yairsimantov20 marked this conversation as resolved.
Show resolved Hide resolved
utilruntime.HandleError(fmt.Errorf("error handling message: %s", parsingError.Error()))
} else if shouldResync(l.stateKey, incomingMessage) {
klog.Infof("Changes detected. Resyncing...")
resync()
}
})

Expand All @@ -96,9 +95,8 @@ func startKafkaEventListener(l *EventListener, resync func()) error {

func startPollingEventListener(l *EventListener, resync func()) {
klog.Infof("Starting polling event listener")
pollingRate := goutils.GetIntEnvOrDefault("EVENT_LISTENER__POLLING_RATE", 60)
klog.Infof("Polling rate set to %d seconds", pollingRate)
pollingHandler := polling.NewPollingHandler(pollingRate, l.stateKey, l.portClient)
klog.Infof("Polling rate set to %d seconds", config.PollingListenerRate)
pollingHandler := polling.NewPollingHandler(config.PollingListenerRate, l.stateKey, l.portClient)
pollingHandler.Run(resync)
}

Expand Down
Loading
Loading