Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/persisting generated log files and updating SR cancellation endpoint #149

Merged
merged 14 commits into from
May 26, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ Run the following command to seed the database wth sample data
```bash
docker compose --profile be-seed -p flowforge up --build
```
You may specify the following arguments in front of the above command to seed your own files:
create_users -> specifies whether to create user in auth0, default is "false"
users -> specify user csv file name, default is "user.csv"
orgs -> specify org csv file name, default is "org.csv"
You may specify the following arguments in front of the above command to seed your own files:<br>
create_users -> specifies whether to create user in auth0, default is "false"<br>
users -> specify user csv file name, default is "user.csv"<br>
orgs -> specify org csv file name, default is "org.csv"<br>
memberships -> specify membership csv file name, default is "membership.csv"

Example:
Expand All @@ -44,8 +44,9 @@ docker compose --profile be -p flowforge up --build
Run the following command to delete container

```bash
docker compose -p flowforge down -v --rmi local
docker compose -p flowforge down --rmi local
```
**Note: To remove all volumes associated with the service, add -v flag after down**

## Setup Frontend Development

Expand Down
11 changes: 0 additions & 11 deletions backend/database/mongo_seed/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@ import (
"os"
"time"

"github.com/gookit/event"
"github.com/joshtyf/flowforge/src/database"
"github.com/joshtyf/flowforge/src/database/client"
"github.com/joshtyf/flowforge/src/database/models"
"github.com/joshtyf/flowforge/src/events"
"github.com/joshtyf/flowforge/src/logger"
"go.mongodb.org/mongo-driver/bson/primitive"
)
Expand Down Expand Up @@ -601,13 +599,4 @@ func main() {
panic("Inserted ID is not an ObjectID")
}
}

// start SRs
event.FireAsync(events.NewNewServiceRequestEvent(&serviceRequest1))
event.FireAsync(events.NewNewServiceRequestEvent(&serviceRequest7))
event.FireAsync(events.NewNewServiceRequestEvent(&serviceRequest3))
event.FireAsync(events.NewNewServiceRequestEvent(&serviceRequest6))
event.FireAsync(events.NewNewServiceRequestEvent(&serviceRequest9))
event.FireAsync(events.NewNewServiceRequestEvent(&serviceRequest12))

}
13 changes: 13 additions & 0 deletions backend/src/database/models/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,19 @@ const (
WaitForApprovalStep PipelineStepType = "WAIT_FOR_APPROVAL"
)

var cancellableStepTypes = []PipelineStepType{
WaitForApprovalStep,
}

func IsCancellablePipelineStepType(stepType PipelineStepType) bool {
for _, cancellableStepType := range cancellableStepTypes {
if stepType == cancellableStepType {
return true
}
}
return false
}

var allPipelineStepTypes = []PipelineStepType{APIStep, WaitForApprovalStep}

func IsValidPipelineStepType(stepType PipelineStepType) bool {
Expand Down
26 changes: 13 additions & 13 deletions backend/src/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,20 @@ func (e *NewServiceRequestEvent) ServiceRequest() *models.ServiceRequestModel {

type StepCompletedEvent struct {
event.BasicEvent
completedStep string
serviceRequest *models.ServiceRequestModel
createdBy string
results interface{}
err error
completedStep string
serviceRequestId string
createdBy string
results interface{}
err error
}

func NewStepCompletedEvent(completedStep string, serviceRequest *models.ServiceRequestModel, createdBy string, results interface{}, err error) *StepCompletedEvent {
func NewStepCompletedEvent(completedStep string, serviceRequestId string, createdBy string, results interface{}, err error) *StepCompletedEvent {
e := &StepCompletedEvent{
completedStep: completedStep,
serviceRequest: serviceRequest,
createdBy: createdBy,
results: results,
err: err,
completedStep: completedStep,
serviceRequestId: serviceRequestId,
createdBy: createdBy,
results: results,
err: err,
}
e.SetName(StepCompletedEventName)
return e
Expand All @@ -53,8 +53,8 @@ func (e *StepCompletedEvent) CompletedStep() string {
return e.completedStep
}

func (e *StepCompletedEvent) ServiceRequest() *models.ServiceRequestModel {
return e.serviceRequest
func (e *StepCompletedEvent) ServiceRequestId() string {
return e.serviceRequestId
}

func (e *StepCompletedEvent) CreatedBy() string {
Expand Down
19 changes: 15 additions & 4 deletions backend/src/execute/step_exec_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ func (srm *ExecutionManager) execute(serviceRequest *models.ServiceRequestModel,
}

// Create a log file for the current step
// TODO: file is not persisted in Docker container
f, err := os.OpenFile(
logger.CreateExecutorLogFilePath(serviceRequest.Id.Hex(), step.StepName),
os.O_RDWR|os.O_CREATE|os.O_APPEND,
Expand Down Expand Up @@ -177,13 +176,19 @@ func (srm *ExecutionManager) handleCompletedStepEvent(e event.Event) error {
srm.logger.Error(fmt.Sprintf("event %s missing data: %s", e.Name(), "completed step"))
return fmt.Errorf("completed step is not provided")
}
serviceRequest := completedStepEvent.ServiceRequest()
if serviceRequest == nil {
serviceRequestId := completedStepEvent.ServiceRequestId()
if serviceRequestId == "" {
srm.logger.Error(fmt.Sprintf("event %s missing data: %s", e.Name(), "service request"))
return fmt.Errorf("service request is nil")
}

pipeline, err := database.NewPipeline(srm.mongoClient).GetById(serviceRequest.PipelineId)
serviceRequest, err := database.NewServiceRequest(srm.mongoClient).GetById(serviceRequestId)
if err != nil {
srm.logger.Error(fmt.Sprintf("error encounter while verifying sr status: %s", err))
return err
}

pipeline, err := database.NewPipeline(srm.mongoClient).GetById(serviceRequestId)
if errors.Is(err, mongo.ErrNoDocuments) {
srm.logger.Error(fmt.Sprintf("pipeline not found: %s", serviceRequest.PipelineId))
return err
Expand Down Expand Up @@ -220,6 +225,12 @@ func (srm *ExecutionManager) handleCompletedStepEvent(e event.Event) error {
return nil
}

// Check if SR has been cancelled
if serviceRequest.Status == models.CANCELLED {
srm.logger.Info(fmt.Sprintf("service request %s has been cancelled. Will not proceed to execute next step", serviceRequestId))
return nil
}

// Set the current executor to the next executor
nextStep := pipeline.GetPipelineStep(completedStepModel.NextStepName)
if nextStep == nil {
Expand Down
2 changes: 1 addition & 1 deletion backend/src/execute/steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (e *apiStepExecutor) execute(ctx context.Context, l *logger.ExecutorLogger)
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("non-200 response")
}
event.FireAsync(events.NewStepCompletedEvent(step.StepName, serviceRequest, "", &stepExecResult{}, nil))
event.FireAsync(events.NewStepCompletedEvent(step.StepName, serviceRequest.Id.Hex(), "", &stepExecResult{}, nil))
return &stepExecResult{}, nil
}

Expand Down
35 changes: 26 additions & 9 deletions backend/src/server/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (s *ServerHandler) registerRoutes(r *mux.Router) {
r.Handle("/api/service_request/{requestId}", isAuthenticated(getOrgIdUsingSrId(s.mongoClient, isOrgMember(s.psqlClient, handleGetServiceRequest(s.logger, s.mongoClient, s.psqlClient), s.logger), s.logger), s.logger)).Methods("GET")
r.Handle("/api/service_request", isAuthenticated(getOrgIdFromRequestBody(isOrgMember(s.psqlClient, handleCreateServiceRequest(s.logger, s.mongoClient, s.psqlClient), s.logger), s.logger), s.logger)).Methods("POST").Headers("Content-Type", "application/json")
r.Handle("/api/service_request/{requestId}", isAuthenticated(getOrgIdFromRequestBody(isOrgMember(s.psqlClient, handleUpdateServiceRequest(s.logger, s.mongoClient), s.logger), s.logger), s.logger)).Methods("PATCH").Headers("Content-Type", "application/json")
r.Handle("/api/service_request/{requestId}/cancel", isAuthenticated(getOrgIdUsingSrId(s.mongoClient, isOrgMember(s.psqlClient, handleCancelStartedServiceRequest(s.logger, s.mongoClient), s.logger), s.logger), s.logger)).Methods("PUT")
r.Handle("/api/service_request/{requestId}/cancel", isAuthenticated(getOrgIdUsingSrId(s.mongoClient, isOrgMember(s.psqlClient, handleCancelStartedServiceRequest(s.logger, s.mongoClient, s.psqlClient), s.logger), s.logger), s.logger)).Methods("PUT")
r.Handle("/api/service_request/{requestId}/start", isAuthenticated(getOrgIdUsingSrId(s.mongoClient, isOrgMember(s.psqlClient, handleStartServiceRequest(s.logger, s.mongoClient), s.logger), s.logger), s.logger)).Methods("PUT")
r.Handle("/api/service_request/{requestId}/approve", isAuthenticated(getOrgIdUsingSrId(s.mongoClient, isOrgAdmin(s.psqlClient, handleApproveServiceRequest(s.logger, s.mongoClient, s.psqlClient), s.logger), s.logger), s.logger)).Methods("PUT")
r.Handle("/api/service_request/{requestId}/reject", isAuthenticated(getOrgIdUsingSrId(s.mongoClient, isOrgAdmin(s.psqlClient, handleRejectServiceRequest(s.logger, s.mongoClient, s.psqlClient), s.logger), s.logger), s.logger)).Methods("PUT")
Expand Down Expand Up @@ -287,7 +287,7 @@ func handleCreateServiceRequest(logger logger.ServerLogger, mongoClient *mongo.C
})
}

func handleCancelStartedServiceRequest(logger logger.ServerLogger, client *mongo.Client) http.Handler {
func handleCancelStartedServiceRequest(logger logger.ServerLogger, client *mongo.Client, psqlClient *sql.DB) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
requestId := vars["requestId"]
Expand All @@ -298,20 +298,19 @@ func handleCancelStartedServiceRequest(logger logger.ServerLogger, client *mongo
return
}
status := sr.Status
if status != models.PENDING && status != models.RUNNING {
if status == models.COMPLETED {
Zheng-Zhi-Qiang marked this conversation as resolved.
Show resolved Hide resolved
logger.Error(fmt.Sprintf("failed to %s service request %s: %s", "cancel", requestId, "execution has been completed"))
encode(w, r, http.StatusBadRequest, newHandlerError(ErrServiceRequestAlreadyCompleted, http.StatusBadRequest))
return
}

if status == models.NOT_STARTED {
logger.Error(fmt.Sprintf("failed to %s service request %s: %s", "cancel", requestId, "execution has not been started"))
encode(w, r, http.StatusBadRequest, newHandlerError(ErrServiceRequestNotStarted, http.StatusBadRequest))
sre, err := database.NewServiceRequestEvent(psqlClient).GetLatestStepEvent(requestId)
if err != nil {
logger.Error(fmt.Sprintf("error encountered while handling API request: %s", err))
encode(w, r, http.StatusInternalServerError, newHandlerError(ErrInternalServerError, http.StatusInternalServerError))
return
}

// TODO: implement cancellation of sr

err = database.NewServiceRequest(client).UpdateStatus(requestId, models.CANCELLED)

// TODO: discuss how to handle this error
Expand All @@ -320,6 +319,24 @@ func handleCancelStartedServiceRequest(logger logger.ServerLogger, client *mongo
encode(w, r, http.StatusInternalServerError, newHandlerError(ErrInternalServerError, http.StatusInternalServerError))
return
}

if models.IsCancellablePipelineStepType(sre.StepType) {
userId := r.Context().Value(jwtmiddleware.ContextKey{}).(*validator.ValidatedClaims).RegisteredClaims.Subject
// Log step cancelled event
serviceRequestEvent := database.NewServiceRequestEvent(psqlClient)
err = serviceRequestEvent.Create(&models.ServiceRequestEventModel{
EventType: models.STEP_COMPLETED,
ServiceRequestId: sr.Id.Hex(),
StepName: sre.StepName,
CreatedBy: userId,
StepType: sre.StepType,
})
if err != nil {
encode(w, r, http.StatusInternalServerError, newHandlerError(ErrInternalServerError, http.StatusInternalServerError))
return
}
}

encode[any](w, r, http.StatusOK, nil)
})
}
Expand Down Expand Up @@ -422,7 +439,7 @@ func handleApproveServiceRequest(logger logger.ServerLogger, client *mongo.Clien

logger.Info(fmt.Sprintf("approving service request \"%s\" at step \"%s\", performed by %s", serviceRequestId, latestStep.StepName, user.Name))
// TODO: figure out how to pass the step result prior to the approval to the next step
event.FireAsync(events.NewStepCompletedEvent(latestStep.StepName, serviceRequest, userId, nil, nil))
event.FireAsync(events.NewStepCompletedEvent(latestStep.StepName, serviceRequest.Id.Hex(), userId, nil, nil))
encode[any](w, r, http.StatusOK, nil)
})
}
Expand Down
5 changes: 5 additions & 0 deletions compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ services:
- be
build:
context: backend
volumes:
- flowforge_executor_logs:/backend/executor_logs
environment:
- ENV=${ENV}
- MONGO_URI=${MONGO_URI}
Expand Down Expand Up @@ -123,3 +125,6 @@ services:

environment:
- MONGO_URI=${MONGO_URI}

volumes:
flowforge_executor_logs:
Loading