diff --git a/internal/agent/agent.go b/internal/agent/agent.go index ed8f64d..d1bd3f9 100644 --- a/internal/agent/agent.go +++ b/internal/agent/agent.go @@ -2,6 +2,9 @@ package agent import ( "context" + "fmt" + "net" + "net/url" "os" "os/signal" "syscall" @@ -9,12 +12,14 @@ import ( "github.com/kubev2v/migration-planner/internal/agent/client" "github.com/kubev2v/migration-planner/pkg/log" + "github.com/lthibault/jitterbug" utilruntime "k8s.io/apimachinery/pkg/util/runtime" ) const ( // name of the file which stores the current inventory - InventoryFile = "inventory.json" + InventoryFile = "inventory.json" + defaultAgentPort = 3333 ) // This varible is set during build time. @@ -25,14 +30,18 @@ var version string // New creates a new agent. func New(log *log.PrefixLogger, config *Config) *Agent { return &Agent{ - config: config, - log: log, + config: config, + log: log, + healtCheckStopCh: make(chan chan any), } } type Agent struct { - config *Config - log *log.PrefixLogger + config *Config + log *log.PrefixLogger + server *Server + healtCheckStopCh chan chan any + credUrl string } func (a *Agent) GetLogPrefix() string { @@ -46,64 +55,116 @@ func (a *Agent) Run(ctx context.Context) error { a.log.Infof("Configuration: %s", a.config.String()) defer utilruntime.HandleCrash() - ctx, cancel := context.WithCancel(ctx) - shutdownSignals := []os.Signal{os.Interrupt, syscall.SIGTERM} - // handle teardown - shutdownHandler := make(chan os.Signal, 2) - signal.Notify(shutdownHandler, shutdownSignals...) - // health check closing ch - healthCheckCh := make(chan chan any) - go func(ctx context.Context) { - select { - case <-shutdownHandler: - a.log.Infof("Received SIGTERM or SIGINT signal, shutting down.") - //We must wait for the health checker to close any open requests and the log file. - c := make(chan any) - healthCheckCh <- c - <-c - a.log.Infof("Health check stopped.") - - close(shutdownHandler) - cancel() - case <-ctx.Done(): - a.log.Infof("Context has been cancelled, shutting down.") - //We must wait for the health checker to close any open requests and the log file. - c := make(chan any) - healthCheckCh <- c - <-c - a.log.Infof("Health check stopped.") - - close(shutdownHandler) - cancel() - } - }(ctx) - - StartServer(a.log, a.config) client, err := newPlannerClient(a.config) if err != nil { return err } + sig := make(chan os.Signal, 1) + signal.Notify(sig, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) + + ctx, cancel := context.WithCancel(ctx) + a.start(ctx, client) + + <-sig + + a.log.Info("stopping agent...") + + a.Stop() + cancel() + + return nil +} + +func (a *Agent) Stop() { + serverCh := make(chan any) + a.server.Stop(serverCh) + + <-serverCh + a.log.Info("server stopped") + + c := make(chan any) + a.healtCheckStopCh <- c + <-c + a.log.Info("health check stopped") +} + +func (a *Agent) start(ctx context.Context, plannerClient client.Planner) { + // start server + a.server = NewServer(defaultAgentPort, a.config.DataDir, a.config.WwwDir) + go a.server.Start(a.log) + + // get the credentials url + a.initializeCredentialUrl() + // start the health check healthChecker, err := NewHealthChecker( a.log, - client, + plannerClient, a.config.DataDir, time.Duration(a.config.HealthCheckInterval*int64(time.Second)), ) if err != nil { - return err + a.log.Fatalf("failed to start health check: %w", err) } - healthChecker.Start(healthCheckCh) + + // TODO refactor health checker to call it from the main goroutine + healthChecker.Start(a.healtCheckStopCh) collector := NewCollector(a.log, a.config.DataDir) collector.collect(ctx) - inventoryUpdater := NewInventoryUpdater(a.log, a.config, client) - inventoryUpdater.UpdateServiceWithInventory(ctx) + inventoryUpdater := NewInventoryUpdater(a.log, a.config, a.credUrl, plannerClient) + updateTicker := jitterbug.New(time.Duration(a.config.UpdateInterval.Duration), &jitterbug.Norm{Stdev: 30 * time.Millisecond, Mean: 0}) - return nil + go func() { + for { + select { + case <-ctx.Done(): + return + case <-updateTicker.C: + } + + // check for health. Send requests only if we have connectivity + if healthChecker.State() == HealthCheckStateConsoleUnreachable { + continue + } + + // set the status + inventoryUpdater.UpdateServiceWithInventory(ctx) + } + }() + +} + +func (a *Agent) initializeCredentialUrl() { + // Parse the service URL + parsedURL, err := url.Parse(a.config.PlannerService.Service.Server) + if err != nil { + a.log.Errorf("error parsing service URL: %v", err) + a.credUrl = "N/A" + return + } + + // Use either port if specified, or scheme + port := parsedURL.Port() + if port == "" { + port = parsedURL.Scheme + } + + // Connect to service + conn, err := net.Dial("tcp", fmt.Sprintf("%s:%s", parsedURL.Hostname(), port)) + if err != nil { + a.log.Errorf("failed connecting to migration planner: %v", err) + a.credUrl = "N/A" + return + } + defer conn.Close() + + localAddr := conn.LocalAddr().(*net.TCPAddr) + a.credUrl = fmt.Sprintf("http://%s:%d", localAddr.IP.String(), defaultAgentPort) + a.log.Infof("Discovered Agent IP address: %s", a.credUrl) } func newPlannerClient(cfg *Config) (client.Planner, error) { diff --git a/internal/agent/config.go b/internal/agent/config.go index f13ea2c..9c6f6be 100644 --- a/internal/agent/config.go +++ b/internal/agent/config.go @@ -27,7 +27,9 @@ const ( // DefaultPlannerEndpoint is the default address of the migration planner server DefaultPlannerEndpoint = "https://localhost:7443" // DefaultHealthCheck is the default value for health check interval in seconds. - DefaultHealthCheck = 5 * 60 // 5 min + // default value set 10s health check should be faster than the update period in order to block it + // if the console is unreachable + DefaultHealthCheck = 10 ) type Config struct { diff --git a/internal/agent/health.go b/internal/agent/health.go index 5ffb228..1e879ca 100644 --- a/internal/agent/health.go +++ b/internal/agent/health.go @@ -12,16 +12,19 @@ import ( "github.com/kubev2v/migration-planner/pkg/log" ) +type AgentHealthState int + const ( - healthCheckStateConsoleUnreachable = iota - healthCheckStateConsoleReachable + HealthCheckStateConsoleUnreachable AgentHealthState = iota + HealthCheckStateConsoleReachable logFilename = "health.log" defaultTimeout = 5 //seconds ) type HealthChecker struct { once sync.Once - state int + lock sync.Mutex + state AgentHealthState checkInterval time.Duration client client.Planner logFilepath string @@ -51,7 +54,7 @@ func NewHealthChecker(log *log.PrefixLogger, client client.Planner, logFolder st return nil, fmt.Errorf("failed to open file %s for append %w", logFile, err) } return &HealthChecker{ - state: healthCheckStateConsoleUnreachable, + state: HealthCheckStateConsoleUnreachable, checkInterval: checkInterval, client: client, logFilepath: logFile, @@ -76,6 +79,8 @@ func NewHealthChecker(log *log.PrefixLogger, client client.Planner, logFolder st // checkInterval represents the time to wait between checks. // closeCh is the channel used to close the goroutine. func (h *HealthChecker) Start(closeCh chan chan any) { + h.do() + h.once.Do(func() { go func() { t := time.NewTicker(h.checkInterval) @@ -100,6 +105,12 @@ func (h *HealthChecker) Start(closeCh chan chan any) { }) } +func (h *HealthChecker) State() AgentHealthState { + h.lock.Lock() + defer h.lock.Unlock() + return h.state +} + func (h *HealthChecker) do() { ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout*time.Second) defer cancel() @@ -109,14 +120,18 @@ func (h *HealthChecker) do() { if _, err := h.logFile.Write([]byte(fmt.Sprintf("[%s] console.redhat.com is unreachable.\n", time.Now().Format(time.RFC3339)))); err != nil { h.logger.Errorf("failed to write to log file %s %w", h.logFilepath, err) } - h.state = healthCheckStateConsoleUnreachable + h.lock.Lock() + h.state = HealthCheckStateConsoleUnreachable + h.lock.Unlock() return } // if state changed from unreachable to ok log the entry - if h.state == healthCheckStateConsoleUnreachable { + if h.state == HealthCheckStateConsoleUnreachable { if _, err := h.logFile.Write([]byte(fmt.Sprintf("[%s] console.redhat.com is OK.\n", time.Now().Format(time.RFC3339)))); err != nil { h.logger.Errorf("failed to write to log file %s %w", h.logFilepath, err) } } - h.state = healthCheckStateConsoleReachable + h.lock.Lock() + h.state = HealthCheckStateConsoleReachable + h.lock.Unlock() } diff --git a/internal/agent/inventory.go b/internal/agent/inventory.go index 3afc563..bf7299d 100644 --- a/internal/agent/inventory.go +++ b/internal/agent/inventory.go @@ -5,10 +5,7 @@ import ( "context" "encoding/json" "fmt" - "net" - "net/url" "path/filepath" - "time" "github.com/google/uuid" api "github.com/kubev2v/migration-planner/api/v1alpha1" @@ -16,7 +13,6 @@ import ( "github.com/kubev2v/migration-planner/internal/agent/client" "github.com/kubev2v/migration-planner/internal/agent/fileio" "github.com/kubev2v/migration-planner/pkg/log" - "github.com/lthibault/jitterbug" ) type InventoryUpdater struct { @@ -32,59 +28,48 @@ type InventoryData struct { Error string `json:"error"` } -func NewInventoryUpdater(log *log.PrefixLogger, config *Config, client client.Planner) *InventoryUpdater { +func NewInventoryUpdater(log *log.PrefixLogger, config *Config, credUrl string, client client.Planner) *InventoryUpdater { return &InventoryUpdater{ log: log, config: config, client: client, prevStatus: []byte{}, + credUrl: credUrl, } } func (u *InventoryUpdater) UpdateServiceWithInventory(ctx context.Context) { - updateTicker := jitterbug.New(time.Duration(u.config.UpdateInterval.Duration), &jitterbug.Norm{Stdev: 30 * time.Millisecond, Mean: 0}) - defer updateTicker.Stop() - - u.initializeCredentialUrl() + status, statusInfo, inventory := calculateStatus(u.config.DataDir) + u.updateSourceStatus(ctx, status, statusInfo, inventory) +} - for { - select { - case <-ctx.Done(): - return - case <-updateTicker.C: - status, statusInfo, inventory := calculateStatus(u.config.DataDir) - u.updateSourceStatus(ctx, status, statusInfo, inventory) - } +func (u *InventoryUpdater) updateSourceStatus(ctx context.Context, status api.SourceStatus, statusInfo string, inventory *api.Inventory) { + update := agentapi.SourceStatusUpdate{ + Status: string(status), + StatusInfo: statusInfo, + Inventory: inventory, + CredentialUrl: u.credUrl, + // TODO: when moving to AgentStatusUpdate put this: + //Version: version, } -} -func (u *InventoryUpdater) initializeCredentialUrl() { - // Parse the service URL - parsedURL, err := url.Parse(u.config.PlannerService.Service.Server) + newContents, err := json.Marshal(update) if err != nil { - u.log.Errorf("error parsing service URL: %v", err) - u.credUrl = "N/A" - return + u.log.Errorf("failed marshalling new status: %v", err) } - - // Use either port if specified, or scheme - port := parsedURL.Port() - if port == "" { - port = parsedURL.Scheme + if bytes.Equal(u.prevStatus, newContents) { + u.log.Debug("Local status did not change, skipping service update") + return } - // Connect to service - conn, err := net.Dial("tcp", fmt.Sprintf("%s:%s", parsedURL.Hostname(), port)) + u.log.Debugf("Updating status to %s: %s", string(status), statusInfo) + err = u.client.UpdateSourceStatus(ctx, uuid.MustParse(u.config.SourceID), update) if err != nil { - u.log.Errorf("failed connecting to migration planner: %v", err) - u.credUrl = "N/A" + u.log.Errorf("failed updating status: %v", err) return } - defer conn.Close() - localAddr := conn.LocalAddr().(*net.TCPAddr) - u.credUrl = fmt.Sprintf("http://%s:%d", localAddr.IP.String(), agentPort) - u.log.Infof("Discovered Agent IP address: %s", u.credUrl) + u.prevStatus = newContents } func calculateStatus(dataDir string) (api.SourceStatus, string, *api.Inventory) { @@ -114,32 +99,3 @@ func calculateStatus(dataDir string) (api.SourceStatus, string, *api.Inventory) } return api.SourceStatusUpToDate, "Inventory successfully collected", &inventory.Inventory } - -func (u *InventoryUpdater) updateSourceStatus(ctx context.Context, status api.SourceStatus, statusInfo string, inventory *api.Inventory) { - update := agentapi.SourceStatusUpdate{ - Status: string(status), - StatusInfo: statusInfo, - Inventory: inventory, - CredentialUrl: u.credUrl, - // TODO: when moving to AgentStatusUpdate put this: - //Version: version, - } - - newContents, err := json.Marshal(update) - if err != nil { - u.log.Errorf("failed marshalling new status: %v", err) - } - if bytes.Equal(u.prevStatus, newContents) { - u.log.Debug("Local status did not change, skipping service update") - return - } - - u.log.Debugf("Updating status to %s: %s", string(status), statusInfo) - err = u.client.UpdateSourceStatus(ctx, uuid.MustParse(u.config.SourceID), update) - if err != nil { - u.log.Errorf("failed updating status: %v", err) - return - } - - u.prevStatus = newContents -} diff --git a/internal/agent/server.go b/internal/agent/server.go index d326b6c..5401ebe 100644 --- a/internal/agent/server.go +++ b/internal/agent/server.go @@ -4,9 +4,6 @@ import ( "context" "fmt" "net/http" - "os" - "os/signal" - "syscall" "time" "github.com/go-chi/chi" @@ -14,49 +11,59 @@ import ( "github.com/kubev2v/migration-planner/pkg/log" ) -const ( - agentPort = 3333 -) +/* +Server serves 3 endpoints: +- /login serves the credentials login form +- /api/v1/credentials called by the agent ui to pass the credentials entered by the user +- /api/v1/status return the status of the agent. +*/ +type Server struct { + port int + dataFolder string + wwwFolder string + restServer *http.Server + log *log.PrefixLogger +} + +func NewServer(port int, dataFolder, wwwFolder string) *Server { + return &Server{ + port: port, + dataFolder: dataFolder, + wwwFolder: wwwFolder, + } +} -func StartServer(log *log.PrefixLogger, config *Config) { +func (s *Server) Start(log *log.PrefixLogger) { router := chi.NewRouter() router.Use(middleware.RequestID) router.Use(middleware.Logger) - RegisterFileServer(router, log, config.WwwDir) - RegisterApi(router, log, config.DataDir) + RegisterFileServer(router, log, s.wwwFolder) + RegisterApi(router, log, s.dataFolder) + + s.restServer = &http.Server{Addr: fmt.Sprintf("0.0.0.0:%d", s.port), Handler: router} + + // Run the server + s.log = log + err := s.restServer.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + s.log.Fatalf("failed to start server: %w", err) + } +} + +func (s *Server) Stop(stopCh chan any) { + shutdownCtx, _ := context.WithTimeout(context.Background(), 10*time.Second) // nolint:govet + doneCh := make(chan any) - server := &http.Server{Addr: fmt.Sprintf("0.0.0.0:%d", agentPort), Handler: router} - serverCtx, serverStopCtx := context.WithCancel(context.Background()) - sig := make(chan os.Signal, 1) - signal.Notify(sig, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) go func() { - <-sig - shutdownCtx, _ := context.WithTimeout(serverCtx, 30*time.Second) // nolint:govet - - go func() { - <-shutdownCtx.Done() - if shutdownCtx.Err() == context.DeadlineExceeded { - log.Fatal("graceful shutdown timed out.. forcing exit.") - } - }() - - // Trigger graceful shutdown - err := server.Shutdown(shutdownCtx) + err := s.restServer.Shutdown(shutdownCtx) if err != nil { - log.Fatal(err) + s.log.Errorf("failed to graceful shutdown the server: %s", err) } - serverStopCtx() + close(doneCh) }() - go func() { - // Run the server - err := server.ListenAndServe() - if err != nil && err != http.ErrServerClosed { - log.Fatal(err.Error()) - } + <-doneCh - // Wait for server context to be stopped - <-serverCtx.Done() - }() + close(stopCh) }