Skip to content

Commit

Permalink
Workflow status and Idle event support (#54)
Browse files Browse the repository at this point in the history
  • Loading branch information
jessepeterson authored May 30, 2024
1 parent cb732bf commit 0f82f4d
Show file tree
Hide file tree
Showing 30 changed files with 527 additions and 70 deletions.
5 changes: 4 additions & 1 deletion docs/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -397,14 +397,17 @@ components:
event:
type: string
description: Event type to subscribe to.
enum: [Enrollment, Authenticate, TokenUpdate, CheckOut]
enum: [Enrollment, Authenticate, TokenUpdate, CheckOut, Idle, IdleNotStartedSince]
workflow:
type: string
description: Name of NanoCMD workflow.
example: "io.micromdm.wf.example.v1"
context:
type: string
description: Workflow-dependent context.
event_context:
type: string
description: Event-dependent context.
JSONError:
type: object
properties:
Expand Down
6 changes: 5 additions & 1 deletion docs/operations-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ Configures Event Subscriptions. Event Subscriptions start workflows for MDM even
{
"event": "Enrollment",
"workflow": "io.micromdm.wf.example.v1",
"context": "string"
"context": "string",
"event_context": "string"
}
```

Expand All @@ -164,8 +165,11 @@ The JSON keys are as follows:
* `TokenUpdate`: when an enrollment sends a TokenUpdate MDM check-in message.
* `Enrollment`: when an enrollment enrolls; i.e. the first TokenUpdate message.
* `CheckOut`: when a device sends a CheckOut MDM check-in message.
* `Idle`: when an enrollment sends an Idle command response.
* `IdleNotStartedSince`: when an enrollment sends an Idle message and the associated workflow has not been started in the given number of seconds. The seconds are provided in the `event_context` string.
* `workflow`: the name of the workflow.
* `context`: optional context to give to the workflow when it starts.
* `event_context`: optional context to give to the event.

#### FileVault profile template endpoint

Expand Down
165 changes: 142 additions & 23 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -188,6 +189,9 @@ func (e *Engine) StartWorkflow(ctx context.Context, name string, context []byte,
if err = w.Start(ctx, ss); err != nil {
return instanceID, fmt.Errorf("staring workflow: %w", err)
}
if err = e.storage.RecordWorkflowStarted(ctx, startID, name, time.Now()); err != nil {
return instanceID, fmt.Errorf("recording workflow status: %w", err)
}
logger.Debug(
logkeys.InstanceID, instanceID,
logkeys.Message, "starting workflow",
Expand Down Expand Up @@ -288,6 +292,114 @@ func logAndError(err error, logger log.Logger, msg string) error {
return fmt.Errorf("%s: %w", msg, err)
}

// MDMIdleEvent is called when an MDM Report Results has an "Idle" status.
// MDMIdleEvent will dispatch workflow "Idle" events (for workflows that are
// configured for it) and will also start workflows for the "IdleNotStartedSince"
// event subscription type.
// Note: any other event subscription type starting workflows is not supported.
func (e *Engine) MDMIdleEvent(ctx context.Context, id string, raw []byte, mdmContext *workflow.MDMContext, eventAt time.Time) error {
logger := ctxlog.Logger(ctx, e.logger).With(logkeys.EnrollmentID, id)

// dispatch the events to (only) the workflow events
event := &workflow.Event{EventFlag: workflow.EventIdle}
if err := e.dispatchEvents(ctx, id, event, mdmContext, false, true); err != nil {
logger.Info(
logkeys.Message, "idle event: dispatch workflow events",
logkeys.Event, event.EventFlag,
logkeys.Error, err,
)
}

if e.eventStorage == nil {
return nil
}

subs, err := e.eventStorage.RetrieveEventSubscriptionsByEvent(ctx, workflow.EventIdleNotStartedSince)
if err != nil {
logger.Info(
logkeys.Message, "retrieving event subscriptions",
logkeys.Event, workflow.EventIdleNotStartedSince,
logkeys.Error, err,
)
}

if len(subs) < 1 {
return nil
}

var wg sync.WaitGroup
event = &workflow.Event{EventFlag: workflow.EventIdleNotStartedSince}
for _, sub := range subs {
wg.Add(1)
go func(es *storage.EventSubscription) {
defer wg.Done()

if es == nil {
return
}

subLogger := logger.With(
logkeys.Event, workflow.EventIdleNotStartedSince,
logkeys.WorkflowName, es.Workflow,
)

// get the last time this workflow started for this
// enrollment ID for the workflow was subscribed to.
started, err := e.storage.RetrieveWorkflowStarted(ctx, id, es.Workflow)
if err != nil {
subLogger.Info(
logkeys.Message, "retrieving workflow status",
logkeys.Error, err,
)
return
}

// make sure we have a valid event context (time between runs)
if es.EventContext == "" {
subLogger.Info(
logkeys.Error, "event context is empty",
)
return
}
sinceSeconds, err := strconv.Atoi(es.EventContext)
if err != nil {
subLogger.Info(
logkeys.Message, "converting event context to integer",
logkeys.Error, err,
)
return
} else if sinceSeconds < 1 {
subLogger.Info(
logkeys.Error, "event context less than 1 second",
)
return
}

// check if we've run this workflow "recently"
if !eventAt.After(started.Add(time.Second * time.Duration(sinceSeconds))) {
// TODO: hide behind an "extra" debug flag?
// subLogger.Debug(logkeys.Message, "workflow not due yet")
return
}

if instanceID, err := e.StartWorkflow(ctx, es.Workflow, []byte(es.Context), []string{id}, event, mdmContext); err != nil {
subLogger.Info(
logkeys.Message, "start workflow",
logkeys.InstanceID, instanceID,
logkeys.Error, err,
)
} else {
subLogger.Debug(
logkeys.Message, "started workflow",
logkeys.InstanceID, instanceID,
)
}
}(sub)
}
wg.Wait()
return nil
}

// MDMCommandResponseEvent receives MDM command responses.
func (e *Engine) MDMCommandResponseEvent(ctx context.Context, id string, uuid string, raw []byte, mdmContext *workflow.MDMContext) error {
logger := ctxlog.Logger(ctx, e.logger).With(
Expand Down Expand Up @@ -374,14 +486,15 @@ func (e *Engine) MDMCommandResponseEvent(ctx context.Context, id string, uuid st

// dispatchEvents dispatches MDM check-in events.
// this includes event subscriptions (user configured) and workflow
// configurations.
func (e *Engine) dispatchEvents(ctx context.Context, id string, ev *workflow.Event, mdmCtx *workflow.MDMContext) error {
// configs. The bool subEV as true indicates to run subscription event
// workflows and wfEV indicates to run workflow-configured events.
func (e *Engine) dispatchEvents(ctx context.Context, id string, ev *workflow.Event, mdmCtx *workflow.MDMContext, subEV, wfEV bool) error {
logger := ctxlog.Logger(ctx, e.logger).With(
"event", ev.EventFlag,
logkeys.Event, ev.EventFlag,
logkeys.EnrollmentID, id,
)
var wg sync.WaitGroup
if e.eventStorage != nil {
if subEV && e.eventStorage != nil {
subs, err := e.eventStorage.RetrieveEventSubscriptionsByEvent(ctx, ev.EventFlag)
if err != nil {
logger.Info(
Expand Down Expand Up @@ -411,23 +524,25 @@ func (e *Engine) dispatchEvents(ctx context.Context, id string, ev *workflow.Eve
}
}
}
for _, w := range e.eventWorkflows(ev.EventFlag) {
wg.Add(1)
go func(w workflow.Workflow) {
defer wg.Done()
if err := w.Event(ctx, ev, id, mdmCtx); err != nil {
logger.Info(
logkeys.Message, "workflow event",
logkeys.WorkflowName, w.Name(),
logkeys.Error, err,
)
} else {
logger.Debug(
logkeys.Message, "workflow event",
logkeys.WorkflowName, w.Name(),
)
}
}(w)
if wfEV {
for _, w := range e.eventWorkflows(ev.EventFlag) {
wg.Add(1)
go func(w workflow.Workflow) {
defer wg.Done()
if err := w.Event(ctx, ev, id, mdmCtx); err != nil {
logger.Info(
logkeys.Message, "workflow event",
logkeys.WorkflowName, w.Name(),
logkeys.Error, err,
)
} else {
logger.Debug(
logkeys.Message, "workflow event",
logkeys.WorkflowName, w.Name(),
)
}
}(w)
}
}
wg.Wait()
return nil
Expand Down Expand Up @@ -485,12 +600,16 @@ func (e *Engine) MDMCheckinEvent(ctx context.Context, id string, checkin interfa
if err := e.storage.CancelSteps(ctx, id, ""); err != nil {
return logAndError(err, logger, "checkin event: cancel steps")
}
// also clear out any workflow status for an id
if err := e.storage.ClearWorkflowStatus(ctx, id); err != nil {
return logAndError(err, logger, "checkin event: clearing workflow status")
}
}
for _, event := range events {
if err := e.dispatchEvents(ctx, id, event, mdmContext); err != nil {
if err := e.dispatchEvents(ctx, id, event, mdmContext, true, true); err != nil {
logger.Info(
logkeys.Message, "checkin event: dispatch events",
"event", event.EventFlag,
logkeys.Event, event.EventFlag,
logkeys.Error, err,
)
}
Expand Down
5 changes: 5 additions & 0 deletions engine/storage/diskv/diskv.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,10 @@ func New(path string) *Diskv {
CacheSizeMax: 1024 * 1024,
})),
uuid.NewUUID(),
kvdiskv.NewBucket(diskv.New(diskv.Options{
BasePath: filepath.Join(path, "engine", "wfstatus"),
Transform: flatTransform,
CacheSizeMax: 1024 * 1024,
})),
)}
}
1 change: 1 addition & 0 deletions engine/storage/inmem/inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@ func New() *InMem {
kvmap.NewBucket(),
kvmap.NewBucket(),
uuid.NewUUID(),
kvmap.NewBucket(),
)}
}
31 changes: 22 additions & 9 deletions engine/storage/kv/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ import (
)

const (
keySfxEventFlag = ".flag" // contains a strconv integer
keySfxEventWorkflow = ".name"
keySfxEventContext = ".ctx"
keySfxEventFlag = ".flag" // contains a strconv integer
keySfxEventWorkflow = ".name"
keySfxEventContext = ".ctx"
keySfxEventEventContext = ".evctx"
)

type kvEventSubscription struct {
Expand All @@ -37,6 +38,9 @@ func (es *kvEventSubscription) set(ctx context.Context, b kv.Bucket, name string
if len(es.Context) > 0 {
esMap[name+keySfxEventContext] = []byte(es.Context)
}
if len(es.EventContext) > 0 {
esMap[name+keySfxEventEventContext] = []byte(es.EventContext)
}
return kv.SetMap(ctx, b, esMap)
}

Expand All @@ -62,13 +66,21 @@ func (es *kvEventSubscription) get(ctx context.Context, b kv.Bucket, name string
es.Event = workflow.EventFlag(eventFlag).String()
if ok, err := b.Has(ctx, name+keySfxEventContext); err != nil {
return fmt.Errorf("checking event context: %w", err)
} else if !ok {
return nil
} else if ok {
if ctxBytes, err := b.Get(ctx, name+keySfxEventContext); err != nil {
return fmt.Errorf("getting event context: %w", err)
} else {
es.Context = string(ctxBytes)
}
}
if ctxBytes, err := b.Get(ctx, name+keySfxEventContext); err != nil {
return fmt.Errorf("getting event context: %w", err)
} else {
es.Context = string(ctxBytes)
if ok, err := b.Has(ctx, name+keySfxEventEventContext); err != nil {
return fmt.Errorf("checking event event_context: %w", err)
} else if ok {
if evCtxBytes, err := b.Get(ctx, name+keySfxEventEventContext); err != nil {
return fmt.Errorf("getting event event_context: %w", err)
} else {
es.EventContext = string(evCtxBytes)
}
}
return nil
}
Expand Down Expand Up @@ -145,5 +157,6 @@ func (s *KV) DeleteEventSubscription(ctx context.Context, name string) error {
name + keySfxEventFlag,
name + keySfxEventWorkflow,
name + keySfxEventContext,
name + keySfxEventEventContext,
})
}
Loading

0 comments on commit 0f82f4d

Please sign in to comment.