Skip to content

Commit

Permalink
PR feedback: add registrationID lock, and fix error handling flow whe…
Browse files Browse the repository at this point in the history
…n reload is required
  • Loading branch information
zackattack01 committed Dec 20, 2024
1 parent db0b434 commit 81d84d9
Showing 1 changed file with 28 additions and 8 deletions.
36 changes: 28 additions & 8 deletions pkg/osquery/runtime/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const (

type Runner struct {
registrationIds []string // we expect to run one instance per registration ID
regIDLock sync.Mutex // locks access to registrationIds
instances map[string]*OsqueryInstance // maps registration ID to currently-running instance
instanceLock sync.Mutex // locks access to `instances` to avoid e.g. restarting an instance that isn't running yet
slogger *slog.Logger
Expand Down Expand Up @@ -54,9 +55,13 @@ func New(k types.Knapsack, serviceClient service.KolideService, opts ...OsqueryI

func (r *Runner) Run() error {
for {
// if our instances ever exit unexpectedly, return immediately
if err := r.runRegisteredInstances(); err != nil {
return err
err := r.runRegisteredInstances()
if err != nil {
// log any errors but continue, in case we intend to reload
r.slogger.Log(context.TODO(), slog.LevelWarn,
"runRegisteredInstances terminated with error",
"err", err,
)
}

// if we're in a state that required re-running all registered instances,
Expand All @@ -66,8 +71,7 @@ func (r *Runner) Run() error {
continue
}

// otherwise, exit cleanly
return nil
return err
}
}

Expand All @@ -82,7 +86,11 @@ func (r *Runner) runRegisteredInstances() error {
wg, ctx := errgroup.WithContext(context.Background())

// Start each worker for each instance
for _, registrationId := range r.registrationIds {
r.regIDLock.Lock()
regIDs := r.registrationIds
r.regIDLock.Unlock()

for _, registrationId := range regIDs {
id := registrationId
wg.Go(func() error {
if err := r.runInstance(id); err != nil {
Expand Down Expand Up @@ -327,7 +335,11 @@ func (r *Runner) Healthy() error {
defer r.instanceLock.Unlock()

healthcheckErrs := make([]error, 0)
for _, registrationId := range r.registrationIds {
r.regIDLock.Lock()
regIDs := r.registrationIds
r.regIDLock.Unlock()

for _, registrationId := range regIDs {
instance, ok := r.instances[registrationId]
if !ok {
healthcheckErrs = append(healthcheckErrs, fmt.Errorf("running instance does not exist for %s", registrationId))
Expand All @@ -350,8 +362,11 @@ func (r *Runner) InstanceStatuses() map[string]types.InstanceStatus {
r.instanceLock.Lock()
defer r.instanceLock.Unlock()

r.regIDLock.Lock()
regIDs := r.registrationIds
r.regIDLock.Unlock()
instanceStatuses := make(map[string]types.InstanceStatus)
for _, registrationId := range r.registrationIds {
for _, registrationId := range regIDs {
instance, ok := r.instances[registrationId]
if !ok {
instanceStatuses[registrationId] = types.InstanceStatusNotStarted
Expand All @@ -373,7 +388,10 @@ func (r *Runner) InstanceStatuses() map[string]types.InstanceStatus {
// and resets the runner instances for the new registrationIDs if required
func (r *Runner) UpdateRegistrationIDs(newRegistrationIDs []string) error {
slices.Sort(newRegistrationIDs)

r.regIDLock.Lock()
existingRegistrationIDs := r.registrationIds
r.regIDLock.Unlock()
slices.Sort(existingRegistrationIDs)

if slices.Equal(newRegistrationIDs, existingRegistrationIDs) {
Expand All @@ -391,7 +409,9 @@ func (r *Runner) UpdateRegistrationIDs(newRegistrationIDs []string) error {
)

// we know there are changes, safe to update the internal registrationIDs now
r.regIDLock.Lock()
r.registrationIds = newRegistrationIDs
r.regIDLock.Unlock()
// mark rerun as required so that we can safely shutdown all workers and have the changes
// picked back up from within the main Run function
r.rerunRequired.Store(true)
Expand Down

0 comments on commit 81d84d9

Please sign in to comment.