From cbd087226b0bf6ea2782a6ab45b52b149795cc9c Mon Sep 17 00:00:00 2001 From: Cosmin Tupangiu Date: Mon, 18 Nov 2024 11:24:40 +0100 Subject: [PATCH] agent/agent: Refactor run command The run command has been refractor to make it simpler. The inventory status method does not have a loop anymore and it is called from the agent itself. The health check state is checked before sending update requests. The idea is to reorganize the "inventory" and "healthcheck" into a stateless services and let the agent control everything. This commit does not change the logic of the agent in any way, it just tries to improve organization of the code and the readability. Signed-off-by: Cosmin Tupangiu --- internal/agent/agent.go | 149 +++++++++++++++++++++++++----------- internal/agent/health.go | 27 +++++-- internal/agent/inventory.go | 52 +------------ internal/agent/server.go | 79 ++++++++++--------- 4 files changed, 172 insertions(+), 135 deletions(-) diff --git a/internal/agent/agent.go b/internal/agent/agent.go index 2fad1f0..fca20f1 100644 --- a/internal/agent/agent.go +++ b/internal/agent/agent.go @@ -2,6 +2,9 @@ package agent import ( "context" + "fmt" + "net" + "net/url" "os" "os/signal" "syscall" @@ -9,25 +12,31 @@ import ( "github.com/kubev2v/migration-planner/internal/agent/client" "github.com/kubev2v/migration-planner/pkg/log" + "github.com/lthibault/jitterbug" utilruntime "k8s.io/apimachinery/pkg/util/runtime" ) const ( // name of the file which stores the current inventory - InventoryFile = "inventory.json" + InventoryFile = "inventory.json" + defaultAgentPort = 3333 ) // New creates a new agent. func New(log *log.PrefixLogger, config *Config) *Agent { return &Agent{ - config: config, - log: log, + config: config, + log: log, + healtCheckStopCh: make(chan chan any), } } type Agent struct { - config *Config - log *log.PrefixLogger + config *Config + log *log.PrefixLogger + server *Server + healtCheckStopCh chan chan any + credUrl string } func (a *Agent) GetLogPrefix() string { @@ -41,64 +50,116 @@ func (a *Agent) Run(ctx context.Context) error { a.log.Infof("Configuration: %s", a.config.String()) defer utilruntime.HandleCrash() - ctx, cancel := context.WithCancel(ctx) - shutdownSignals := []os.Signal{os.Interrupt, syscall.SIGTERM} - // handle teardown - shutdownHandler := make(chan os.Signal, 2) - signal.Notify(shutdownHandler, shutdownSignals...) - // health check closing ch - healthCheckCh := make(chan chan any) - go func(ctx context.Context) { - select { - case <-shutdownHandler: - a.log.Infof("Received SIGTERM or SIGINT signal, shutting down.") - //We must wait for the health checker to close any open requests and the log file. - c := make(chan any) - healthCheckCh <- c - <-c - a.log.Infof("Health check stopped.") - - close(shutdownHandler) - cancel() - case <-ctx.Done(): - a.log.Infof("Context has been cancelled, shutting down.") - //We must wait for the health checker to close any open requests and the log file. - c := make(chan any) - healthCheckCh <- c - <-c - a.log.Infof("Health check stopped.") - - close(shutdownHandler) - cancel() - } - }(ctx) - - StartServer(a.log, a.config) client, err := newPlannerClient(a.config) if err != nil { return err } + sig := make(chan os.Signal, 1) + signal.Notify(sig, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) + + ctx, cancel := context.WithCancel(ctx) + a.start(ctx, client) + + <-sig + + a.log.Info("stopping agent...") + + a.Stop() + cancel() + + return nil +} + +func (a *Agent) Stop() { + serverCh := make(chan any) + a.server.Stop(serverCh) + + <-serverCh + a.log.Info("server stopped") + + c := make(chan any) + a.healtCheckStopCh <- c + <-c + a.log.Info("health check stopped") +} + +func (a *Agent) start(ctx context.Context, plannerClient client.Planner) { + // start server + a.server = NewServer(defaultAgentPort, a.config.DataDir, a.config.WwwDir) + go a.server.Start(a.log) + + // get the credentials url + a.initializeCredentialUrl() + // start the health check healthChecker, err := NewHealthChecker( a.log, - client, + plannerClient, a.config.DataDir, time.Duration(a.config.HealthCheckInterval*int64(time.Second)), ) if err != nil { - return err + a.log.Fatalf("failed to start health check: %w", err) } - healthChecker.Start(healthCheckCh) + + // TODO refactor health checker to call it from the main goroutine + healthChecker.Start(a.healtCheckStopCh) collector := NewCollector(a.log, a.config.DataDir) collector.collect(ctx) - inventoryUpdater := NewInventoryUpdater(a.log, a.config, client) - inventoryUpdater.UpdateServiceWithInventory(ctx) + inventoryUpdater := NewInventoryUpdater(a.log, a.config, a.credUrl, plannerClient) + updateTicker := jitterbug.New(time.Duration(a.config.UpdateInterval.Duration), &jitterbug.Norm{Stdev: 30 * time.Millisecond, Mean: 0}) - return nil + go func() { + for { + select { + case <-ctx.Done(): + return + case <-updateTicker.C: + } + + // check for health. Send requests only if we have connectivity + if healthChecker.State() == HealthCheckStateConsoleUnreachable { + continue + } + + // set the status + inventoryUpdater.UpdateServiceWithInventory(ctx) + } + }() + +} + +func (a *Agent) initializeCredentialUrl() { + // Parse the service URL + parsedURL, err := url.Parse(a.config.PlannerService.Service.Server) + if err != nil { + a.log.Errorf("error parsing service URL: %v", err) + a.credUrl = "N/A" + return + } + + // Use either port if specified, or scheme + port := parsedURL.Port() + if port == "" { + port = parsedURL.Scheme + } + + // Connect to service + conn, err := net.Dial("tcp", fmt.Sprintf("%s:%s", parsedURL.Hostname(), port)) + if err != nil { + a.log.Errorf("failed connecting to migration planner: %v", err) + a.credUrl = "N/A" + return + } + defer conn.Close() + + localAddr := conn.LocalAddr().(*net.TCPAddr) + a.credUrl = fmt.Sprintf("http://%s:%d", localAddr.IP.String(), defaultAgentPort) + a.log.Infof("Discovered Agent IP address: %s", a.credUrl) } func newPlannerClient(cfg *Config) (client.Planner, error) { diff --git a/internal/agent/health.go b/internal/agent/health.go index 5ffb228..132d8db 100644 --- a/internal/agent/health.go +++ b/internal/agent/health.go @@ -12,16 +12,19 @@ import ( "github.com/kubev2v/migration-planner/pkg/log" ) +type AgentHealthState int + const ( - healthCheckStateConsoleUnreachable = iota - healthCheckStateConsoleReachable + HealthCheckStateConsoleUnreachable AgentHealthState = iota + HealthCheckStateConsoleReachable logFilename = "health.log" defaultTimeout = 5 //seconds ) type HealthChecker struct { once sync.Once - state int + lock sync.Mutex + state AgentHealthState checkInterval time.Duration client client.Planner logFilepath string @@ -51,7 +54,7 @@ func NewHealthChecker(log *log.PrefixLogger, client client.Planner, logFolder st return nil, fmt.Errorf("failed to open file %s for append %w", logFile, err) } return &HealthChecker{ - state: healthCheckStateConsoleUnreachable, + state: HealthCheckStateConsoleUnreachable, checkInterval: checkInterval, client: client, logFilepath: logFile, @@ -100,6 +103,12 @@ func (h *HealthChecker) Start(closeCh chan chan any) { }) } +func (h *HealthChecker) State() AgentHealthState { + h.lock.Lock() + defer h.lock.Unlock() + return h.state +} + func (h *HealthChecker) do() { ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout*time.Second) defer cancel() @@ -109,14 +118,18 @@ func (h *HealthChecker) do() { if _, err := h.logFile.Write([]byte(fmt.Sprintf("[%s] console.redhat.com is unreachable.\n", time.Now().Format(time.RFC3339)))); err != nil { h.logger.Errorf("failed to write to log file %s %w", h.logFilepath, err) } - h.state = healthCheckStateConsoleUnreachable + h.lock.Lock() + h.state = HealthCheckStateConsoleUnreachable + h.lock.Unlock() return } // if state changed from unreachable to ok log the entry - if h.state == healthCheckStateConsoleUnreachable { + if h.state == HealthCheckStateConsoleUnreachable { if _, err := h.logFile.Write([]byte(fmt.Sprintf("[%s] console.redhat.com is OK.\n", time.Now().Format(time.RFC3339)))); err != nil { h.logger.Errorf("failed to write to log file %s %w", h.logFilepath, err) } } - h.state = healthCheckStateConsoleReachable + h.lock.Lock() + h.state = HealthCheckStateConsoleReachable + h.lock.Unlock() } diff --git a/internal/agent/inventory.go b/internal/agent/inventory.go index b67dce4..548c7ed 100644 --- a/internal/agent/inventory.go +++ b/internal/agent/inventory.go @@ -5,10 +5,7 @@ import ( "context" "encoding/json" "fmt" - "net" - "net/url" "path/filepath" - "time" "github.com/google/uuid" api "github.com/kubev2v/migration-planner/api/v1alpha1" @@ -16,7 +13,6 @@ import ( "github.com/kubev2v/migration-planner/internal/agent/client" "github.com/kubev2v/migration-planner/internal/agent/fileio" "github.com/kubev2v/migration-planner/pkg/log" - "github.com/lthibault/jitterbug" ) type InventoryUpdater struct { @@ -32,59 +28,19 @@ type InventoryData struct { Error string `json:"error"` } -func NewInventoryUpdater(log *log.PrefixLogger, config *Config, client client.Planner) *InventoryUpdater { +func NewInventoryUpdater(log *log.PrefixLogger, config *Config, credUrl string, client client.Planner) *InventoryUpdater { return &InventoryUpdater{ log: log, config: config, client: client, prevStatus: []byte{}, + credUrl: credUrl, } } func (u *InventoryUpdater) UpdateServiceWithInventory(ctx context.Context) { - updateTicker := jitterbug.New(time.Duration(u.config.UpdateInterval.Duration), &jitterbug.Norm{Stdev: 30 * time.Millisecond, Mean: 0}) - defer updateTicker.Stop() - - u.initializeCredentialUrl() - - for { - select { - case <-ctx.Done(): - return - case <-updateTicker.C: - status, statusInfo, inventory := calculateStatus(u.config.DataDir) - u.updateSourceStatus(ctx, status, statusInfo, inventory) - } - } -} - -func (u *InventoryUpdater) initializeCredentialUrl() { - // Parse the service URL - parsedURL, err := url.Parse(u.config.PlannerService.Service.Server) - if err != nil { - u.log.Errorf("error parsing service URL: %v", err) - u.credUrl = "N/A" - return - } - - // Use either port if specified, or scheme - port := parsedURL.Port() - if port == "" { - port = parsedURL.Scheme - } - - // Connect to service - conn, err := net.Dial("tcp", fmt.Sprintf("%s:%s", parsedURL.Hostname(), port)) - if err != nil { - u.log.Errorf("failed connecting to migration planner: %v", err) - u.credUrl = "N/A" - return - } - defer conn.Close() - - localAddr := conn.LocalAddr().(*net.TCPAddr) - u.credUrl = fmt.Sprintf("http://%s:%d", localAddr.IP.String(), agentPort) - u.log.Infof("Discovered Agent IP address: %s", u.credUrl) + status, statusInfo, inventory := calculateStatus(u.config.DataDir) + u.updateSourceStatus(ctx, status, statusInfo, inventory) } func calculateStatus(dataDir string) (api.SourceStatus, string, *api.Inventory) { diff --git a/internal/agent/server.go b/internal/agent/server.go index d326b6c..5401ebe 100644 --- a/internal/agent/server.go +++ b/internal/agent/server.go @@ -4,9 +4,6 @@ import ( "context" "fmt" "net/http" - "os" - "os/signal" - "syscall" "time" "github.com/go-chi/chi" @@ -14,49 +11,59 @@ import ( "github.com/kubev2v/migration-planner/pkg/log" ) -const ( - agentPort = 3333 -) +/* +Server serves 3 endpoints: +- /login serves the credentials login form +- /api/v1/credentials called by the agent ui to pass the credentials entered by the user +- /api/v1/status return the status of the agent. +*/ +type Server struct { + port int + dataFolder string + wwwFolder string + restServer *http.Server + log *log.PrefixLogger +} + +func NewServer(port int, dataFolder, wwwFolder string) *Server { + return &Server{ + port: port, + dataFolder: dataFolder, + wwwFolder: wwwFolder, + } +} -func StartServer(log *log.PrefixLogger, config *Config) { +func (s *Server) Start(log *log.PrefixLogger) { router := chi.NewRouter() router.Use(middleware.RequestID) router.Use(middleware.Logger) - RegisterFileServer(router, log, config.WwwDir) - RegisterApi(router, log, config.DataDir) + RegisterFileServer(router, log, s.wwwFolder) + RegisterApi(router, log, s.dataFolder) + + s.restServer = &http.Server{Addr: fmt.Sprintf("0.0.0.0:%d", s.port), Handler: router} + + // Run the server + s.log = log + err := s.restServer.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + s.log.Fatalf("failed to start server: %w", err) + } +} + +func (s *Server) Stop(stopCh chan any) { + shutdownCtx, _ := context.WithTimeout(context.Background(), 10*time.Second) // nolint:govet + doneCh := make(chan any) - server := &http.Server{Addr: fmt.Sprintf("0.0.0.0:%d", agentPort), Handler: router} - serverCtx, serverStopCtx := context.WithCancel(context.Background()) - sig := make(chan os.Signal, 1) - signal.Notify(sig, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) go func() { - <-sig - shutdownCtx, _ := context.WithTimeout(serverCtx, 30*time.Second) // nolint:govet - - go func() { - <-shutdownCtx.Done() - if shutdownCtx.Err() == context.DeadlineExceeded { - log.Fatal("graceful shutdown timed out.. forcing exit.") - } - }() - - // Trigger graceful shutdown - err := server.Shutdown(shutdownCtx) + err := s.restServer.Shutdown(shutdownCtx) if err != nil { - log.Fatal(err) + s.log.Errorf("failed to graceful shutdown the server: %s", err) } - serverStopCtx() + close(doneCh) }() - go func() { - // Run the server - err := server.ListenAndServe() - if err != nil && err != http.ErrServerClosed { - log.Fatal(err.Error()) - } + <-doneCh - // Wait for server context to be stopped - <-serverCtx.Done() - }() + close(stopCh) }