Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor step execution logic #46

Merged
merged 7 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions backend/src/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

const (
NewServiceRequestEventName = "NewServiceRequestEvent"
StepCompletedEventName = "StepCompletedEvent"
)

type NewServiceRequestEvent struct {
Expand All @@ -25,3 +26,37 @@ func NewNewServiceRequestEvent(serviceRequest *models.ServiceRequestModel) *NewS
func (e *NewServiceRequestEvent) ServiceRequest() *models.ServiceRequestModel {
return e.serviceRequest
}

type StepCompletedEvent struct {
event.BasicEvent
completedStep *models.PipelineStepModel
serviceRequest *models.ServiceRequestModel
results interface{}
err error
}

func NewStepCompletedEvent(completedStep *models.PipelineStepModel, results interface{}, err error) *StepCompletedEvent {
e := &StepCompletedEvent{
completedStep: completedStep,
results: results,
err: err,
}
e.SetName(StepCompletedEventName)
return e
}

func (e *StepCompletedEvent) CompletedStep() *models.PipelineStepModel {
return e.completedStep
}

func (e *StepCompletedEvent) ServiceRequest() *models.ServiceRequestModel {
return e.serviceRequest
}

func (e *StepCompletedEvent) Results() interface{} {
return e.results
}

func (e *StepCompletedEvent) Err() error {
return e.err
}
116 changes: 62 additions & 54 deletions backend/src/execute/step_exec_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ import (
"github.com/joshtyf/flowforge/src/events"
"github.com/joshtyf/flowforge/src/logger"
"github.com/joshtyf/flowforge/src/util"
"go.mongodb.org/mongo-driver/mongo"
)

type ExecutionManager struct {
executors map[models.PipelineStepType]*stepExecutor
mongoClient *mongo.Client
executors map[models.PipelineStepType]*stepExecutor
}

type ExecutionManagerConfig func(*ExecutionManager)
Expand All @@ -26,8 +28,13 @@ func WithStepExecutor(step stepExecutor) ExecutionManagerConfig {
}

func NewStepExecutionManager(configs ...ExecutionManagerConfig) *ExecutionManager {
mongoClient, err := client.GetMongoClient()
if err != nil {
logger.Error("[ServiceRequestManager] Error getting mongo client", map[string]interface{}{"err": err})
}
srm := &ExecutionManager{
executors: map[models.PipelineStepType]*stepExecutor{},
executors: map[models.PipelineStepType]*stepExecutor{},
mongoClient: mongoClient,
}
for _, c := range configs {
c(srm)
Expand All @@ -38,26 +45,18 @@ func NewStepExecutionManager(configs ...ExecutionManagerConfig) *ExecutionManage
// Starts the manager by registering event listeners
func (srm *ExecutionManager) Start() {
event.On(events.NewServiceRequestEventName, event.ListenerFunc(srm.handleNewServiceRequestEvent), event.Normal)
event.On(events.StepCompletedEventName, event.ListenerFunc(srm.handleCompletedStepEvent), event.Normal)
}

func (srm *ExecutionManager) handleNewServiceRequestEvent(e event.Event) error {
logger.Info("[ServiceRequestManager] Handling new service request event", nil)
req := e.(*events.NewServiceRequestEvent).ServiceRequest()
err := srm.execute(req)
return err
}

func (srm *ExecutionManager) execute(serviceRequest *models.ServiceRequestModel) error {
serviceRequest := e.(*events.NewServiceRequestEvent).ServiceRequest()
if serviceRequest == nil {
logger.Error("[ServiceRequestManager] Service request is nil", nil)
return fmt.Errorf("service request is nil")
}
mongoClient, err := client.GetMongoClient()
if err != nil {
logger.Error("[ServiceRequestManager] Error getting mongo client", map[string]interface{}{"err": err})
}
// Fetch the pipeline so that we know what steps to execute
pipeline, err := database.NewPipeline(mongoClient).GetById(serviceRequest.PipelineId)
pipeline, err := database.NewPipeline(srm.mongoClient).GetById(serviceRequest.PipelineId)
if err != nil {
logger.Error("[ServiceRequestManager] Error getting pipeline", map[string]interface{}{"err": err})
}
Expand All @@ -74,61 +73,70 @@ func (srm *ExecutionManager) execute(serviceRequest *models.ServiceRequestModel)
logger.Error("[ServiceRequestManager] No executor found for first step", map[string]interface{}{"step": firstStep.StepName})
return fmt.Errorf("no executor found for first step")
}
currStep := firstStep

err = database.NewServiceRequest(mongoClient).UpdateStatus(serviceRequest.Id.Hex(), models.Running)
err = database.NewServiceRequest(srm.mongoClient).UpdateStatus(serviceRequest.Id.Hex(), models.Running)
if err != nil {
logger.Error("[ServiceRequestManager] Error updating service request status", map[string]interface{}{"err": err})
return err
}
err = srm.execute(serviceRequest, firstStep, currExecutor)
return err
}

// Execute the pipeline step by step
for {
func (srm *ExecutionManager) execute(serviceRequest *models.ServiceRequestModel, step *models.PipelineStepModel, executor *stepExecutor) error {
// Create an execution context with the current step and service request
executeCtx := context.WithValue(
context.WithValue(
context.Background(),
util.ServiceRequestKey,
serviceRequest),
util.StepKey,
step,
)
// Execute the current step
_, err := (*executor).execute(executeCtx)
if err != nil {
logger.Error("[ServiceRequestManager] Error executing step", map[string]interface{}{"step": (*executor).getStepType(), "err": err})
// TODO: Handle error
return err
}

// Create an execution context with the current step and service request
executeCtx := context.WithValue(
context.WithValue(
context.Background(),
util.ServiceRequestKey,
serviceRequest),
util.StepKey,
currStep,
)
// Execute the current step
_, err := (*currExecutor).execute(executeCtx)
return nil
}

func (srm *ExecutionManager) handleCompletedStepEvent(e event.Event) error {
logger.Info("[ServiceRequestManager] Handling step completed event", nil)
completedStepEvent := e.(*events.StepCompletedEvent)
completedStep := completedStepEvent.CompletedStep()
serviceRequest := completedStepEvent.ServiceRequest()
if completedStep.IsTerminalStep {
err := database.NewServiceRequest(srm.mongoClient).UpdateStatus(serviceRequest.Id.Hex(), models.Success)
if err != nil {
logger.Error("[ServiceRequestManager] Error executing step", map[string]interface{}{"step": (*currExecutor).getStepType(), "err": err})
// TODO: Handle error
return err
}

if currStep.IsTerminalStep {
// If the current step is a terminal step, we're done
break
} else {
// Set the current executor to the next executor
nextStep := pipeline.GetPipelineStep(currStep.NextStepName)
if nextStep == nil {
logger.Error("[ServiceRequestManager] No next step found", map[string]interface{}{"step": currStep.NextStepName})
return fmt.Errorf("no next step found")
}
nextExecutor := srm.executors[nextStep.StepType]
if nextExecutor == nil {
// TODO: Handle error
logger.Error("[ServiceRequestManager] No executor found for next step", map[string]interface{}{"step": nextStep.StepName})
return fmt.Errorf("no executor found for next step")
}
currExecutor = nextExecutor
currStep = nextStep
// Need to ensure idempotency or figure out a rollback solution
logger.Error("[ServiceRequestManager] Error updating service request status", map[string]interface{}{"err": err})
}
return nil
}

err = database.NewServiceRequest(mongoClient).UpdateStatus(serviceRequest.Id.Hex(), models.Success)
pipeline, err := database.NewPipeline(srm.mongoClient).GetById(serviceRequest.PipelineId)
if err != nil {
logger.Error("[ServiceRequestManager] Error getting pipeline", map[string]interface{}{"err": err})
return err
}

// Set the current executor to the next executor
nextStep := pipeline.GetPipelineStep(completedStep.NextStepName)
if nextStep == nil {
logger.Error("[ServiceRequestManager] No next step found", map[string]interface{}{"step": completedStep.NextStepName})
return fmt.Errorf("no next step found")
}
nextExecutor := srm.executors[nextStep.StepType]
if nextExecutor == nil {
// TODO: Handle error
// Need to ensure idempotency or figure out a rollback solution
logger.Error("[ServiceRequestManager] Error updating service request status", map[string]interface{}{"err": err})
logger.Error("[ServiceRequestManager] No executor found for next step", map[string]interface{}{"step": nextStep.StepName})
return fmt.Errorf("no executor found for next step")
}

srm.execute(serviceRequest, nextStep, nextExecutor)
return nil
}
3 changes: 3 additions & 0 deletions backend/src/execute/steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"fmt"
"net/http"

"github.com/gookit/event"
"github.com/joshtyf/flowforge/src/database/models"
"github.com/joshtyf/flowforge/src/events"
"github.com/joshtyf/flowforge/src/logger"
"github.com/joshtyf/flowforge/src/util"
)
Expand Down Expand Up @@ -48,6 +50,7 @@ func (e *apiStepExecutor) execute(ctx context.Context) (*stepExecResult, error)
return nil, fmt.Errorf("Non-200 response")
}
logger.Info("[APIStepExecutor] Success", map[string]interface{}{"status": resp.StatusCode})
event.FireAsync(events.NewStepCompletedEvent(step, &stepExecResult{}, nil))
return &stepExecResult{}, nil
}

Expand Down
Loading