Skip to content

Commit

Permalink
agent/agent: Refactor run command
Browse files Browse the repository at this point in the history
The run command has been refractor to make it simpler. The inventory
status method does not have a loop anymore and it is called from the
agent itself.
The health check state is checked before sending update requests.

The idea is to reorganize the "inventory" and "healthcheck" into a
stateless services and let the agent control everything.

This commit does not change the logic of the agent in any way, it just
tries to improve organization of the code and the readability.

Signed-off-by: Cosmin Tupangiu <[email protected]>
  • Loading branch information
tupyy committed Nov 18, 2024
1 parent 6ac0f8d commit cbd0872
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 135 deletions.
149 changes: 105 additions & 44 deletions internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,41 @@ package agent

import (
"context"
"fmt"
"net"
"net/url"
"os"
"os/signal"
"syscall"
"time"

"github.com/kubev2v/migration-planner/internal/agent/client"
"github.com/kubev2v/migration-planner/pkg/log"
"github.com/lthibault/jitterbug"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
)

const (
// name of the file which stores the current inventory
InventoryFile = "inventory.json"
InventoryFile = "inventory.json"
defaultAgentPort = 3333
)

// New creates a new agent.
func New(log *log.PrefixLogger, config *Config) *Agent {
return &Agent{
config: config,
log: log,
config: config,
log: log,
healtCheckStopCh: make(chan chan any),
}
}

type Agent struct {
config *Config
log *log.PrefixLogger
config *Config
log *log.PrefixLogger
server *Server
healtCheckStopCh chan chan any
credUrl string
}

func (a *Agent) GetLogPrefix() string {
Expand All @@ -41,64 +50,116 @@ func (a *Agent) Run(ctx context.Context) error {
a.log.Infof("Configuration: %s", a.config.String())

defer utilruntime.HandleCrash()
ctx, cancel := context.WithCancel(ctx)
shutdownSignals := []os.Signal{os.Interrupt, syscall.SIGTERM}
// handle teardown
shutdownHandler := make(chan os.Signal, 2)
signal.Notify(shutdownHandler, shutdownSignals...)
// health check closing ch
healthCheckCh := make(chan chan any)
go func(ctx context.Context) {
select {
case <-shutdownHandler:
a.log.Infof("Received SIGTERM or SIGINT signal, shutting down.")
//We must wait for the health checker to close any open requests and the log file.
c := make(chan any)
healthCheckCh <- c
<-c
a.log.Infof("Health check stopped.")

close(shutdownHandler)
cancel()
case <-ctx.Done():
a.log.Infof("Context has been cancelled, shutting down.")
//We must wait for the health checker to close any open requests and the log file.
c := make(chan any)
healthCheckCh <- c
<-c
a.log.Infof("Health check stopped.")

close(shutdownHandler)
cancel()
}
}(ctx)

StartServer(a.log, a.config)

client, err := newPlannerClient(a.config)
if err != nil {
return err
}

sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)

ctx, cancel := context.WithCancel(ctx)
a.start(ctx, client)

<-sig

a.log.Info("stopping agent...")

a.Stop()
cancel()

return nil
}

func (a *Agent) Stop() {
serverCh := make(chan any)
a.server.Stop(serverCh)

<-serverCh
a.log.Info("server stopped")

c := make(chan any)
a.healtCheckStopCh <- c
<-c
a.log.Info("health check stopped")
}

func (a *Agent) start(ctx context.Context, plannerClient client.Planner) {
// start server
a.server = NewServer(defaultAgentPort, a.config.DataDir, a.config.WwwDir)
go a.server.Start(a.log)

// get the credentials url
a.initializeCredentialUrl()

// start the health check
healthChecker, err := NewHealthChecker(
a.log,
client,
plannerClient,
a.config.DataDir,
time.Duration(a.config.HealthCheckInterval*int64(time.Second)),
)
if err != nil {
return err
a.log.Fatalf("failed to start health check: %w", err)
}
healthChecker.Start(healthCheckCh)

// TODO refactor health checker to call it from the main goroutine
healthChecker.Start(a.healtCheckStopCh)

collector := NewCollector(a.log, a.config.DataDir)
collector.collect(ctx)

inventoryUpdater := NewInventoryUpdater(a.log, a.config, client)
inventoryUpdater.UpdateServiceWithInventory(ctx)
inventoryUpdater := NewInventoryUpdater(a.log, a.config, a.credUrl, plannerClient)
updateTicker := jitterbug.New(time.Duration(a.config.UpdateInterval.Duration), &jitterbug.Norm{Stdev: 30 * time.Millisecond, Mean: 0})

return nil
go func() {
for {
select {
case <-ctx.Done():
return
case <-updateTicker.C:
}

// check for health. Send requests only if we have connectivity
if healthChecker.State() == HealthCheckStateConsoleUnreachable {
continue
}

// set the status
inventoryUpdater.UpdateServiceWithInventory(ctx)
}
}()

}

func (a *Agent) initializeCredentialUrl() {
// Parse the service URL
parsedURL, err := url.Parse(a.config.PlannerService.Service.Server)
if err != nil {
a.log.Errorf("error parsing service URL: %v", err)
a.credUrl = "N/A"
return
}

// Use either port if specified, or scheme
port := parsedURL.Port()
if port == "" {
port = parsedURL.Scheme
}

// Connect to service
conn, err := net.Dial("tcp", fmt.Sprintf("%s:%s", parsedURL.Hostname(), port))
if err != nil {
a.log.Errorf("failed connecting to migration planner: %v", err)
a.credUrl = "N/A"
return
}
defer conn.Close()

localAddr := conn.LocalAddr().(*net.TCPAddr)
a.credUrl = fmt.Sprintf("http://%s:%d", localAddr.IP.String(), defaultAgentPort)
a.log.Infof("Discovered Agent IP address: %s", a.credUrl)
}

func newPlannerClient(cfg *Config) (client.Planner, error) {
Expand Down
27 changes: 20 additions & 7 deletions internal/agent/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,19 @@ import (
"github.com/kubev2v/migration-planner/pkg/log"
)

type AgentHealthState int

const (
healthCheckStateConsoleUnreachable = iota
healthCheckStateConsoleReachable
HealthCheckStateConsoleUnreachable AgentHealthState = iota
HealthCheckStateConsoleReachable
logFilename = "health.log"
defaultTimeout = 5 //seconds
)

type HealthChecker struct {
once sync.Once
state int
lock sync.Mutex
state AgentHealthState
checkInterval time.Duration
client client.Planner
logFilepath string
Expand Down Expand Up @@ -51,7 +54,7 @@ func NewHealthChecker(log *log.PrefixLogger, client client.Planner, logFolder st
return nil, fmt.Errorf("failed to open file %s for append %w", logFile, err)
}
return &HealthChecker{
state: healthCheckStateConsoleUnreachable,
state: HealthCheckStateConsoleUnreachable,
checkInterval: checkInterval,
client: client,
logFilepath: logFile,
Expand Down Expand Up @@ -100,6 +103,12 @@ func (h *HealthChecker) Start(closeCh chan chan any) {
})
}

func (h *HealthChecker) State() AgentHealthState {
h.lock.Lock()
defer h.lock.Unlock()
return h.state
}

func (h *HealthChecker) do() {
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout*time.Second)
defer cancel()
Expand All @@ -109,14 +118,18 @@ func (h *HealthChecker) do() {
if _, err := h.logFile.Write([]byte(fmt.Sprintf("[%s] console.redhat.com is unreachable.\n", time.Now().Format(time.RFC3339)))); err != nil {
h.logger.Errorf("failed to write to log file %s %w", h.logFilepath, err)
}
h.state = healthCheckStateConsoleUnreachable
h.lock.Lock()
h.state = HealthCheckStateConsoleUnreachable
h.lock.Unlock()
return
}
// if state changed from unreachable to ok log the entry
if h.state == healthCheckStateConsoleUnreachable {
if h.state == HealthCheckStateConsoleUnreachable {
if _, err := h.logFile.Write([]byte(fmt.Sprintf("[%s] console.redhat.com is OK.\n", time.Now().Format(time.RFC3339)))); err != nil {
h.logger.Errorf("failed to write to log file %s %w", h.logFilepath, err)
}
}
h.state = healthCheckStateConsoleReachable
h.lock.Lock()
h.state = HealthCheckStateConsoleReachable
h.lock.Unlock()
}
52 changes: 4 additions & 48 deletions internal/agent/inventory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,14 @@ import (
"context"
"encoding/json"
"fmt"
"net"
"net/url"
"path/filepath"
"time"

"github.com/google/uuid"
api "github.com/kubev2v/migration-planner/api/v1alpha1"
agentapi "github.com/kubev2v/migration-planner/api/v1alpha1/agent"
"github.com/kubev2v/migration-planner/internal/agent/client"
"github.com/kubev2v/migration-planner/internal/agent/fileio"
"github.com/kubev2v/migration-planner/pkg/log"
"github.com/lthibault/jitterbug"
)

type InventoryUpdater struct {
Expand All @@ -32,59 +28,19 @@ type InventoryData struct {
Error string `json:"error"`
}

func NewInventoryUpdater(log *log.PrefixLogger, config *Config, client client.Planner) *InventoryUpdater {
func NewInventoryUpdater(log *log.PrefixLogger, config *Config, credUrl string, client client.Planner) *InventoryUpdater {
return &InventoryUpdater{
log: log,
config: config,
client: client,
prevStatus: []byte{},
credUrl: credUrl,
}
}

func (u *InventoryUpdater) UpdateServiceWithInventory(ctx context.Context) {
updateTicker := jitterbug.New(time.Duration(u.config.UpdateInterval.Duration), &jitterbug.Norm{Stdev: 30 * time.Millisecond, Mean: 0})
defer updateTicker.Stop()

u.initializeCredentialUrl()

for {
select {
case <-ctx.Done():
return
case <-updateTicker.C:
status, statusInfo, inventory := calculateStatus(u.config.DataDir)
u.updateSourceStatus(ctx, status, statusInfo, inventory)
}
}
}

func (u *InventoryUpdater) initializeCredentialUrl() {
// Parse the service URL
parsedURL, err := url.Parse(u.config.PlannerService.Service.Server)
if err != nil {
u.log.Errorf("error parsing service URL: %v", err)
u.credUrl = "N/A"
return
}

// Use either port if specified, or scheme
port := parsedURL.Port()
if port == "" {
port = parsedURL.Scheme
}

// Connect to service
conn, err := net.Dial("tcp", fmt.Sprintf("%s:%s", parsedURL.Hostname(), port))
if err != nil {
u.log.Errorf("failed connecting to migration planner: %v", err)
u.credUrl = "N/A"
return
}
defer conn.Close()

localAddr := conn.LocalAddr().(*net.TCPAddr)
u.credUrl = fmt.Sprintf("http://%s:%d", localAddr.IP.String(), agentPort)
u.log.Infof("Discovered Agent IP address: %s", u.credUrl)
status, statusInfo, inventory := calculateStatus(u.config.DataDir)
u.updateSourceStatus(ctx, status, statusInfo, inventory)
}

func calculateStatus(dataDir string) (api.SourceStatus, string, *api.Inventory) {
Expand Down
Loading

0 comments on commit cbd0872

Please sign in to comment.