Skip to content

Commit

Permalink
agent/multi-source: Implementation of multi-source model in the agent
Browse files Browse the repository at this point in the history
This commit is a breaking-change. It implementing the multi-source model
into agent.

The agent is nolonger bound to a specific sourceID. The sourceID is
computed from vCenter ID. The agents sends requests to two endpoints:
`/agents/status` and `/sources/status`. The agent endpoint is used to
send the status of the agent which is actually the status of the source
in the old model.
The `source` endpoint is used to send the inventory of the source only
when the inventory is up-to-date.
The flow is:
- agent starts up and is waiting for credentials. It sends a request to
  /agents endpoint with the new state.
- user enters the credentials and if agent obtained the inventory, it
  sends it to /source endpoint.

Signed-off-by: Cosmin Tupangiu <[email protected]>
  • Loading branch information
tupyy committed Nov 26, 2024
1 parent 439c179 commit ff185e2
Show file tree
Hide file tree
Showing 19 changed files with 737 additions and 133 deletions.
5 changes: 3 additions & 2 deletions api/v1alpha1/agent/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,13 @@ components:
type: string
inventory:
$ref: '../openapi.yaml#/components/schemas/Inventory'
credentialUrl:
agentId:
type: string
format: uuid
required:
- status
- statusInfo
- credentialUrl
- agentId
AgentStatusUpdate:
type: object
properties:
Expand Down
43 changes: 22 additions & 21 deletions api/v1alpha1/agent/spec.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions api/v1alpha1/agent/types.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 16 additions & 2 deletions api/v1alpha1/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,6 @@ components:
type: string
inventory:
$ref: '#/components/schemas/Inventory'
credentialUrl:
type: string
createdAt:
type: string
format: date-time
Expand All @@ -280,6 +278,10 @@ components:
format: date-time
sshKey:
type: string
agents:
type: array
items:
$ref: '#/components/schemas/SourceAgentItem'
required:
- id
- name
Expand All @@ -302,6 +304,18 @@ components:
type: array
items:
$ref: '#/components/schemas/Source'

SourceAgentItem:
type: object
properties:
id:
type: string
format: uuid
associated:
type: boolean
required:
- id
- associated

Error:
properties:
Expand Down
61 changes: 31 additions & 30 deletions api/v1alpha1/spec.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 15 additions & 9 deletions api/v1alpha1/types.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion cmd/planner-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,15 @@ import (
"fmt"
"os"

"github.com/google/uuid"
"github.com/kubev2v/migration-planner/internal/agent"
"github.com/kubev2v/migration-planner/pkg/log"
)

var (
agentID string
)

func main() {
command := NewAgentCommand()
if err := command.Execute(); err != nil {
Expand All @@ -30,6 +35,7 @@ func NewAgentCommand() *agentCmd {
}

flag.StringVar(&a.configFile, "config", agent.DefaultConfigFile, "Path to the agent's configuration file.")
flag.StringVar(&agentID, "id", os.Getenv("AGENT_ID"), "ID of the agent")

flag.Usage = func() {
fmt.Fprintf(flag.CommandLine.Output(), "Usage of %s:\n", os.Args[0])
Expand All @@ -52,7 +58,7 @@ func NewAgentCommand() *agentCmd {
}

func (a *agentCmd) Execute() error {
agentInstance := agent.New(a.log, a.config)
agentInstance := agent.New(uuid.MustParse(agentID), a.log, a.config)
if err := agentInstance.Run(context.Background()); err != nil {
a.log.Fatalf("running device agent: %v", err)
}
Expand Down
43 changes: 37 additions & 6 deletions internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package agent

import (
"context"
"errors"
"fmt"
"net"
"net/url"
Expand All @@ -10,6 +11,8 @@ import (
"syscall"
"time"

"github.com/google/uuid"
api "github.com/kubev2v/migration-planner/api/v1alpha1"
"github.com/kubev2v/migration-planner/internal/agent/client"
"github.com/kubev2v/migration-planner/pkg/log"
"github.com/lthibault/jitterbug"
Expand All @@ -28,11 +31,12 @@ const (
var version string

// New creates a new agent.
func New(log *log.PrefixLogger, config *Config) *Agent {
func New(id uuid.UUID, log *log.PrefixLogger, config *Config) *Agent {
return &Agent{
config: config,
log: log,
healtCheckStopCh: make(chan chan any),
id: id,
}
}

Expand All @@ -42,6 +46,7 @@ type Agent struct {
server *Server
healtCheckStopCh chan chan any
credUrl string
id uuid.UUID
}

func (a *Agent) GetLogPrefix() string {
Expand All @@ -67,7 +72,10 @@ func (a *Agent) Run(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
a.start(ctx, client)

<-sig
select {
case <-sig:
case <-ctx.Done():
}

a.log.Info("stopping agent...")

Expand Down Expand Up @@ -115,9 +123,18 @@ func (a *Agent) start(ctx context.Context, plannerClient client.Planner) {
collector := NewCollector(a.log, a.config.DataDir)
collector.collect(ctx)

inventoryUpdater := NewInventoryUpdater(a.log, a.config, a.credUrl, plannerClient)
inventoryUpdater := NewInventoryUpdater(a.log, a.id, plannerClient)
statusUpdater := NewStatusUpdater(a.log, a.id, version, a.credUrl, a.config, plannerClient)

updateTicker := jitterbug.New(time.Duration(a.config.UpdateInterval.Duration), &jitterbug.Norm{Stdev: 30 * time.Millisecond, Mean: 0})

/*
Main loop
The status of agent is always computed even if we don't have connectivity with the backend.
If we're connected to the backend, the agent sends its status and if the status is UpToDate,
it sends the inventory.
In case of "source gone", it stops everything and break from the loop.
*/
go func() {
for {
select {
Expand All @@ -126,16 +143,30 @@ func (a *Agent) start(ctx context.Context, plannerClient client.Planner) {
case <-updateTicker.C:
}

// calculate status regardless if we have connectivity withe the backend.
status, statusInfo, inventory := statusUpdater.CalculateStatus()

// check for health. Send requests only if we have connectivity
if healthChecker.State() == HealthCheckStateConsoleUnreachable {
continue
}

// set the status
inventoryUpdater.UpdateServiceWithInventory(ctx)
if err := statusUpdater.UpdateStatus(ctx, status, statusInfo); err != nil {
if errors.Is(err, client.ErrSourceGone) {
a.log.Info("Source is gone..Stop sending requests")
// stop the server and the healthchecker
a.Stop()
break
}
a.log.Errorf("unable to update agent status: %s", err)
continue // skip inventory update if we cannot update agent's state.
}

if status == api.AgentStatusUpToDate {
inventoryUpdater.UpdateServiceWithInventory(ctx, api.SourceStatusUpToDate, "Inventory collected with success", inventory)
}
}
}()

}

func (a *Agent) initializeCredentialUrl() {
Expand Down
Loading

0 comments on commit ff185e2

Please sign in to comment.