Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

agent/agent: Refactor run command #76

Merged
merged 1 commit into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 105 additions & 44 deletions internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,24 @@ package agent

import (
"context"
"fmt"
"net"
"net/url"
"os"
"os/signal"
"syscall"
"time"

"github.com/kubev2v/migration-planner/internal/agent/client"
"github.com/kubev2v/migration-planner/pkg/log"
"github.com/lthibault/jitterbug"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
)

const (
// name of the file which stores the current inventory
InventoryFile = "inventory.json"
InventoryFile = "inventory.json"
defaultAgentPort = 3333
)

// This varible is set during build time.
Expand All @@ -25,14 +30,18 @@ var version string
// New creates a new agent.
func New(log *log.PrefixLogger, config *Config) *Agent {
return &Agent{
config: config,
log: log,
config: config,
log: log,
healtCheckStopCh: make(chan chan any),
}
}

type Agent struct {
config *Config
log *log.PrefixLogger
config *Config
log *log.PrefixLogger
server *Server
healtCheckStopCh chan chan any
credUrl string
}

func (a *Agent) GetLogPrefix() string {
Expand All @@ -46,64 +55,116 @@ func (a *Agent) Run(ctx context.Context) error {
a.log.Infof("Configuration: %s", a.config.String())

defer utilruntime.HandleCrash()
ctx, cancel := context.WithCancel(ctx)
shutdownSignals := []os.Signal{os.Interrupt, syscall.SIGTERM}
// handle teardown
shutdownHandler := make(chan os.Signal, 2)
signal.Notify(shutdownHandler, shutdownSignals...)
// health check closing ch
healthCheckCh := make(chan chan any)
go func(ctx context.Context) {
select {
case <-shutdownHandler:
a.log.Infof("Received SIGTERM or SIGINT signal, shutting down.")
//We must wait for the health checker to close any open requests and the log file.
c := make(chan any)
healthCheckCh <- c
<-c
a.log.Infof("Health check stopped.")

close(shutdownHandler)
cancel()
case <-ctx.Done():
a.log.Infof("Context has been cancelled, shutting down.")
//We must wait for the health checker to close any open requests and the log file.
c := make(chan any)
healthCheckCh <- c
<-c
a.log.Infof("Health check stopped.")

close(shutdownHandler)
cancel()
}
}(ctx)

StartServer(a.log, a.config)

client, err := newPlannerClient(a.config)
if err != nil {
return err
}

sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)

ctx, cancel := context.WithCancel(ctx)
a.start(ctx, client)

<-sig

a.log.Info("stopping agent...")

a.Stop()
cancel()

return nil
}

func (a *Agent) Stop() {
serverCh := make(chan any)
a.server.Stop(serverCh)

<-serverCh
a.log.Info("server stopped")

c := make(chan any)
a.healtCheckStopCh <- c
<-c
a.log.Info("health check stopped")
}

func (a *Agent) start(ctx context.Context, plannerClient client.Planner) {
// start server
a.server = NewServer(defaultAgentPort, a.config.DataDir, a.config.WwwDir)
go a.server.Start(a.log)

// get the credentials url
a.initializeCredentialUrl()

// start the health check
healthChecker, err := NewHealthChecker(
a.log,
client,
plannerClient,
a.config.DataDir,
time.Duration(a.config.HealthCheckInterval*int64(time.Second)),
)
if err != nil {
return err
a.log.Fatalf("failed to start health check: %w", err)
}
healthChecker.Start(healthCheckCh)

// TODO refactor health checker to call it from the main goroutine
healthChecker.Start(a.healtCheckStopCh)

collector := NewCollector(a.log, a.config.DataDir)
collector.collect(ctx)

inventoryUpdater := NewInventoryUpdater(a.log, a.config, client)
inventoryUpdater.UpdateServiceWithInventory(ctx)
inventoryUpdater := NewInventoryUpdater(a.log, a.config, a.credUrl, plannerClient)
updateTicker := jitterbug.New(time.Duration(a.config.UpdateInterval.Duration), &jitterbug.Norm{Stdev: 30 * time.Millisecond, Mean: 0})

return nil
go func() {
for {
select {
case <-ctx.Done():
return
case <-updateTicker.C:
}

// check for health. Send requests only if we have connectivity
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please explain why the code below is not part of case <-updateTicker.C:?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is but I've put it out of the scope for better readability. the select will block until tick or cancel().

if healthChecker.State() == HealthCheckStateConsoleUnreachable {
continue
}

// set the status
inventoryUpdater.UpdateServiceWithInventory(ctx)
}
}()

}

func (a *Agent) initializeCredentialUrl() {
// Parse the service URL
parsedURL, err := url.Parse(a.config.PlannerService.Service.Server)
if err != nil {
a.log.Errorf("error parsing service URL: %v", err)
a.credUrl = "N/A"
return
}

// Use either port if specified, or scheme
port := parsedURL.Port()
if port == "" {
port = parsedURL.Scheme
}

// Connect to service
conn, err := net.Dial("tcp", fmt.Sprintf("%s:%s", parsedURL.Hostname(), port))
if err != nil {
a.log.Errorf("failed connecting to migration planner: %v", err)
a.credUrl = "N/A"
return
}
defer conn.Close()

localAddr := conn.LocalAddr().(*net.TCPAddr)
a.credUrl = fmt.Sprintf("http://%s:%d", localAddr.IP.String(), defaultAgentPort)
a.log.Infof("Discovered Agent IP address: %s", a.credUrl)
}

func newPlannerClient(cfg *Config) (client.Planner, error) {
Expand Down
4 changes: 3 additions & 1 deletion internal/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ const (
// DefaultPlannerEndpoint is the default address of the migration planner server
DefaultPlannerEndpoint = "https://localhost:7443"
// DefaultHealthCheck is the default value for health check interval in seconds.
DefaultHealthCheck = 5 * 60 // 5 min
// default value set 10s health check should be faster than the update period in order to block it
// if the console is unreachable
DefaultHealthCheck = 10
)

type Config struct {
Expand Down
29 changes: 22 additions & 7 deletions internal/agent/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,19 @@ import (
"github.com/kubev2v/migration-planner/pkg/log"
)

type AgentHealthState int

const (
healthCheckStateConsoleUnreachable = iota
healthCheckStateConsoleReachable
HealthCheckStateConsoleUnreachable AgentHealthState = iota
HealthCheckStateConsoleReachable
logFilename = "health.log"
defaultTimeout = 5 //seconds
)

type HealthChecker struct {
once sync.Once
state int
lock sync.Mutex
state AgentHealthState
checkInterval time.Duration
client client.Planner
logFilepath string
Expand Down Expand Up @@ -51,7 +54,7 @@ func NewHealthChecker(log *log.PrefixLogger, client client.Planner, logFolder st
return nil, fmt.Errorf("failed to open file %s for append %w", logFile, err)
}
return &HealthChecker{
state: healthCheckStateConsoleUnreachable,
state: HealthCheckStateConsoleUnreachable,
checkInterval: checkInterval,
client: client,
logFilepath: logFile,
Expand All @@ -76,6 +79,8 @@ func NewHealthChecker(log *log.PrefixLogger, client client.Planner, logFolder st
// checkInterval represents the time to wait between checks.
// closeCh is the channel used to close the goroutine.
func (h *HealthChecker) Start(closeCh chan chan any) {
h.do()

h.once.Do(func() {
go func() {
t := time.NewTicker(h.checkInterval)
Expand All @@ -100,6 +105,12 @@ func (h *HealthChecker) Start(closeCh chan chan any) {
})
}

func (h *HealthChecker) State() AgentHealthState {
h.lock.Lock()
defer h.lock.Unlock()
return h.state
}

func (h *HealthChecker) do() {
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout*time.Second)
defer cancel()
Expand All @@ -109,14 +120,18 @@ func (h *HealthChecker) do() {
if _, err := h.logFile.Write([]byte(fmt.Sprintf("[%s] console.redhat.com is unreachable.\n", time.Now().Format(time.RFC3339)))); err != nil {
h.logger.Errorf("failed to write to log file %s %w", h.logFilepath, err)
}
h.state = healthCheckStateConsoleUnreachable
h.lock.Lock()
h.state = HealthCheckStateConsoleUnreachable
h.lock.Unlock()
return
}
// if state changed from unreachable to ok log the entry
if h.state == healthCheckStateConsoleUnreachable {
if h.state == HealthCheckStateConsoleUnreachable {
if _, err := h.logFile.Write([]byte(fmt.Sprintf("[%s] console.redhat.com is OK.\n", time.Now().Format(time.RFC3339)))); err != nil {
h.logger.Errorf("failed to write to log file %s %w", h.logFilepath, err)
}
}
h.state = healthCheckStateConsoleReachable
h.lock.Lock()
h.state = HealthCheckStateConsoleReachable
h.lock.Unlock()
}
Loading
Loading