Skip to content

Commit

Permalink
agent/agent: Refactor run command
Browse files Browse the repository at this point in the history
The run command has been refractor to make it simpler. The inventory
status method does not have a loop anymore and it is called from the
agent itself.
The health check state is checked before sending update requests.

The idea is to reorganize the "inventory" and "healthcheck" into a
stateless services and let the agent control everything.

This commit does not change the logic of the agent in any way, it just
tries to improve organization of the code and the readability.

Signed-off-by: Cosmin Tupangiu <[email protected]>
  • Loading branch information
tupyy authored and machacekondra committed Nov 19, 2024
1 parent d60f49d commit 17355b3
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 154 deletions.
149 changes: 105 additions & 44 deletions internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,24 @@ package agent

import (
"context"
"fmt"
"net"
"net/url"
"os"
"os/signal"
"syscall"
"time"

"github.com/kubev2v/migration-planner/internal/agent/client"
"github.com/kubev2v/migration-planner/pkg/log"
"github.com/lthibault/jitterbug"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
)

const (
// name of the file which stores the current inventory
InventoryFile = "inventory.json"
InventoryFile = "inventory.json"
defaultAgentPort = 3333
)

// This varible is set during build time.
Expand All @@ -25,14 +30,18 @@ var version string
// New creates a new agent.
func New(log *log.PrefixLogger, config *Config) *Agent {
return &Agent{
config: config,
log: log,
config: config,
log: log,
healtCheckStopCh: make(chan chan any),
}
}

type Agent struct {
config *Config
log *log.PrefixLogger
config *Config
log *log.PrefixLogger
server *Server
healtCheckStopCh chan chan any
credUrl string
}

func (a *Agent) GetLogPrefix() string {
Expand All @@ -46,64 +55,116 @@ func (a *Agent) Run(ctx context.Context) error {
a.log.Infof("Configuration: %s", a.config.String())

defer utilruntime.HandleCrash()
ctx, cancel := context.WithCancel(ctx)
shutdownSignals := []os.Signal{os.Interrupt, syscall.SIGTERM}
// handle teardown
shutdownHandler := make(chan os.Signal, 2)
signal.Notify(shutdownHandler, shutdownSignals...)
// health check closing ch
healthCheckCh := make(chan chan any)
go func(ctx context.Context) {
select {
case <-shutdownHandler:
a.log.Infof("Received SIGTERM or SIGINT signal, shutting down.")
//We must wait for the health checker to close any open requests and the log file.
c := make(chan any)
healthCheckCh <- c
<-c
a.log.Infof("Health check stopped.")

close(shutdownHandler)
cancel()
case <-ctx.Done():
a.log.Infof("Context has been cancelled, shutting down.")
//We must wait for the health checker to close any open requests and the log file.
c := make(chan any)
healthCheckCh <- c
<-c
a.log.Infof("Health check stopped.")

close(shutdownHandler)
cancel()
}
}(ctx)

StartServer(a.log, a.config)

client, err := newPlannerClient(a.config)
if err != nil {
return err
}

sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)

ctx, cancel := context.WithCancel(ctx)
a.start(ctx, client)

<-sig

a.log.Info("stopping agent...")

a.Stop()
cancel()

return nil
}

func (a *Agent) Stop() {
serverCh := make(chan any)
a.server.Stop(serverCh)

<-serverCh
a.log.Info("server stopped")

c := make(chan any)
a.healtCheckStopCh <- c
<-c
a.log.Info("health check stopped")
}

func (a *Agent) start(ctx context.Context, plannerClient client.Planner) {
// start server
a.server = NewServer(defaultAgentPort, a.config.DataDir, a.config.WwwDir)
go a.server.Start(a.log)

// get the credentials url
a.initializeCredentialUrl()

// start the health check
healthChecker, err := NewHealthChecker(
a.log,
client,
plannerClient,
a.config.DataDir,
time.Duration(a.config.HealthCheckInterval*int64(time.Second)),
)
if err != nil {
return err
a.log.Fatalf("failed to start health check: %w", err)
}
healthChecker.Start(healthCheckCh)

// TODO refactor health checker to call it from the main goroutine
healthChecker.Start(a.healtCheckStopCh)

collector := NewCollector(a.log, a.config.DataDir)
collector.collect(ctx)

inventoryUpdater := NewInventoryUpdater(a.log, a.config, client)
inventoryUpdater.UpdateServiceWithInventory(ctx)
inventoryUpdater := NewInventoryUpdater(a.log, a.config, a.credUrl, plannerClient)
updateTicker := jitterbug.New(time.Duration(a.config.UpdateInterval.Duration), &jitterbug.Norm{Stdev: 30 * time.Millisecond, Mean: 0})

return nil
go func() {
for {
select {
case <-ctx.Done():
return
case <-updateTicker.C:
}

// check for health. Send requests only if we have connectivity
if healthChecker.State() == HealthCheckStateConsoleUnreachable {
continue
}

// set the status
inventoryUpdater.UpdateServiceWithInventory(ctx)
}
}()

}

func (a *Agent) initializeCredentialUrl() {
// Parse the service URL
parsedURL, err := url.Parse(a.config.PlannerService.Service.Server)
if err != nil {
a.log.Errorf("error parsing service URL: %v", err)
a.credUrl = "N/A"
return
}

// Use either port if specified, or scheme
port := parsedURL.Port()
if port == "" {
port = parsedURL.Scheme
}

// Connect to service
conn, err := net.Dial("tcp", fmt.Sprintf("%s:%s", parsedURL.Hostname(), port))
if err != nil {
a.log.Errorf("failed connecting to migration planner: %v", err)
a.credUrl = "N/A"
return
}
defer conn.Close()

localAddr := conn.LocalAddr().(*net.TCPAddr)
a.credUrl = fmt.Sprintf("http://%s:%d", localAddr.IP.String(), defaultAgentPort)
a.log.Infof("Discovered Agent IP address: %s", a.credUrl)
}

func newPlannerClient(cfg *Config) (client.Planner, error) {
Expand Down
4 changes: 3 additions & 1 deletion internal/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ const (
// DefaultPlannerEndpoint is the default address of the migration planner server
DefaultPlannerEndpoint = "https://localhost:7443"
// DefaultHealthCheck is the default value for health check interval in seconds.
DefaultHealthCheck = 5 * 60 // 5 min
// default value set 10s health check should be faster than the update period in order to block it
// if the console is unreachable
DefaultHealthCheck = 10
)

type Config struct {
Expand Down
29 changes: 22 additions & 7 deletions internal/agent/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,19 @@ import (
"github.com/kubev2v/migration-planner/pkg/log"
)

type AgentHealthState int

const (
healthCheckStateConsoleUnreachable = iota
healthCheckStateConsoleReachable
HealthCheckStateConsoleUnreachable AgentHealthState = iota
HealthCheckStateConsoleReachable
logFilename = "health.log"
defaultTimeout = 5 //seconds
)

type HealthChecker struct {
once sync.Once
state int
lock sync.Mutex
state AgentHealthState
checkInterval time.Duration
client client.Planner
logFilepath string
Expand Down Expand Up @@ -51,7 +54,7 @@ func NewHealthChecker(log *log.PrefixLogger, client client.Planner, logFolder st
return nil, fmt.Errorf("failed to open file %s for append %w", logFile, err)
}
return &HealthChecker{
state: healthCheckStateConsoleUnreachable,
state: HealthCheckStateConsoleUnreachable,
checkInterval: checkInterval,
client: client,
logFilepath: logFile,
Expand All @@ -76,6 +79,8 @@ func NewHealthChecker(log *log.PrefixLogger, client client.Planner, logFolder st
// checkInterval represents the time to wait between checks.
// closeCh is the channel used to close the goroutine.
func (h *HealthChecker) Start(closeCh chan chan any) {
h.do()

h.once.Do(func() {
go func() {
t := time.NewTicker(h.checkInterval)
Expand All @@ -100,6 +105,12 @@ func (h *HealthChecker) Start(closeCh chan chan any) {
})
}

func (h *HealthChecker) State() AgentHealthState {
h.lock.Lock()
defer h.lock.Unlock()
return h.state
}

func (h *HealthChecker) do() {
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout*time.Second)
defer cancel()
Expand All @@ -109,14 +120,18 @@ func (h *HealthChecker) do() {
if _, err := h.logFile.Write([]byte(fmt.Sprintf("[%s] console.redhat.com is unreachable.\n", time.Now().Format(time.RFC3339)))); err != nil {
h.logger.Errorf("failed to write to log file %s %w", h.logFilepath, err)
}
h.state = healthCheckStateConsoleUnreachable
h.lock.Lock()
h.state = HealthCheckStateConsoleUnreachable
h.lock.Unlock()
return
}
// if state changed from unreachable to ok log the entry
if h.state == healthCheckStateConsoleUnreachable {
if h.state == HealthCheckStateConsoleUnreachable {
if _, err := h.logFile.Write([]byte(fmt.Sprintf("[%s] console.redhat.com is OK.\n", time.Now().Format(time.RFC3339)))); err != nil {
h.logger.Errorf("failed to write to log file %s %w", h.logFilepath, err)
}
}
h.state = healthCheckStateConsoleReachable
h.lock.Lock()
h.state = HealthCheckStateConsoleReachable
h.lock.Unlock()
}
Loading

0 comments on commit 17355b3

Please sign in to comment.