Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add initial event logging for step execution #74

Merged
merged 11 commits into from
Mar 7, 2024
37 changes: 19 additions & 18 deletions backend/database/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
--

-- Dumped from database version 16.1
-- Dumped by pg_dump version 16.1 (Homebrew)
-- Dumped by pg_dump version 16.2 (Homebrew)

SET statement_timeout = 0;
SET lock_timeout = 0;
Expand All @@ -21,25 +21,26 @@ SET default_tablespace = '';
SET default_table_access_method = heap;

--
-- Name: service_request_actions; Type: TABLE; Schema: public; Owner: postgres
-- Name: service_request_event; Type: TABLE; Schema: public; Owner: postgres
--

CREATE TABLE public.service_request_actions (
action_id integer NOT NULL,
step_name character varying(50) NOT NULL,
action_name character varying(50) NOT NULL,
approved_by character varying(50),
CREATE TABLE public.service_request_event (
event_id integer NOT NULL,
event_type character varying NOT NULL,
service_request_id character varying NOT NULL,
step_name character varying NOT NULL,
approved_by character varying,
created_at timestamp without time zone DEFAULT now()
);


ALTER TABLE public.service_request_actions OWNER TO postgres;
ALTER TABLE public.service_request_event OWNER TO postgres;

--
-- Name: service_request_actions_action_id_seq; Type: SEQUENCE; Schema: public; Owner: postgres
-- Name: service_request_events_event_id_seq; Type: SEQUENCE; Schema: public; Owner: postgres
--

CREATE SEQUENCE public.service_request_actions_action_id_seq
CREATE SEQUENCE public.service_request_events_event_id_seq
AS integer
START WITH 1
INCREMENT BY 1
Expand All @@ -48,28 +49,28 @@ CREATE SEQUENCE public.service_request_actions_action_id_seq
CACHE 1;


ALTER SEQUENCE public.service_request_actions_action_id_seq OWNER TO postgres;
ALTER SEQUENCE public.service_request_events_event_id_seq OWNER TO postgres;

--
-- Name: service_request_actions_action_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: postgres
-- Name: service_request_events_event_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: postgres
--

ALTER SEQUENCE public.service_request_actions_action_id_seq OWNED BY public.service_request_actions.action_id;
ALTER SEQUENCE public.service_request_events_event_id_seq OWNED BY public.service_request_event.event_id;


--
-- Name: service_request_actions action_id; Type: DEFAULT; Schema: public; Owner: postgres
-- Name: service_request_event event_id; Type: DEFAULT; Schema: public; Owner: postgres
--

ALTER TABLE ONLY public.service_request_actions ALTER COLUMN action_id SET DEFAULT nextval('public.service_request_actions_action_id_seq'::regclass);
ALTER TABLE ONLY public.service_request_event ALTER COLUMN event_id SET DEFAULT nextval('public.service_request_events_event_id_seq'::regclass);


--
-- Name: service_request_actions service_request_actions_pkey; Type: CONSTRAINT; Schema: public; Owner: postgres
-- Name: service_request_event service_request_events_pkey; Type: CONSTRAINT; Schema: public; Owner: postgres
--

ALTER TABLE ONLY public.service_request_actions
ADD CONSTRAINT service_request_actions_pkey PRIMARY KEY (action_id);
ALTER TABLE ONLY public.service_request_event
ADD CONSTRAINT service_request_events_pkey PRIMARY KEY (event_id);


--
Expand Down
55 changes: 29 additions & 26 deletions backend/database/mongo_seed/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,14 @@ func main() {
panic(err)
}

serviceReqId, err := primitive.ObjectIDFromHex("F2D8E1A73B964C5E7A0F81D9")
if err != nil {
panic(err)
}
serviceRequest := models.ServiceRequestModel{
Id: serviceReqId,
PipelineId: "1",
PipelineVersion: 1,
Status: models.Success,
Remarks: "This is a test service request.",
CreatedOn: time.Date(2024, time.January, 1, 1, 0, 0, 0, time.UTC),
LastUpdated: time.Date(2024, time.January, 1, 1, 0, 0, 0, time.UTC),
}

res, err := database.NewServiceRequest(c).Create(&serviceRequest)
if err != nil {
panic(err)
}
if oid, ok := res.InsertedID.(primitive.ObjectID); ok {
logger.Info("Inserted service request", map[string]interface{}{"id": oid.String()})
} else {
panic("Inserted ID is not an ObjectID")
}
pipelineUuid, err := primitive.ObjectIDFromHex("8A7F3EBCD951246A5F0E9B87")
pipelineIdInHex := "8A7F3EBCD951246A5F0E9B87"
pipelineId, err := primitive.ObjectIDFromHex(pipelineIdInHex)
if err != nil {
panic(err)
}
pipeline := models.PipelineModel{
PipelineName: "Test Pipeline",
Id: pipelineUuid,
Id: pipelineId,
Version: 1,
FirstStepName: "step1",
Steps: []models.PipelineStepModel{
Expand All @@ -71,7 +49,7 @@ func main() {
},
}

res, err = database.NewPipeline(c).Create(&pipeline)
res, err := database.NewPipeline(c).Create(&pipeline)
if err != nil {
panic(err)
}
Expand All @@ -80,4 +58,29 @@ func main() {
} else {
panic("Inserted ID is not an ObjectID")
}

serviceReqIdInHex := "F2D8E1A73B964C5E7A0F81D9"
serviceReqId, err := primitive.ObjectIDFromHex(serviceReqIdInHex)
if err != nil {
panic(err)
}
serviceRequest := models.ServiceRequestModel{
Id: serviceReqId,
PipelineId: pipelineIdInHex,
PipelineVersion: 1,
Status: models.Success,
Remarks: "This is a test service request.",
CreatedOn: time.Date(2024, time.January, 1, 1, 0, 0, 0, time.UTC),
LastUpdated: time.Date(2024, time.January, 1, 1, 0, 0, 0, time.UTC),
}

res, err = database.NewServiceRequest(c).Create(&serviceRequest)
if err != nil {
panic(err)
}
if oid, ok := res.InsertedID.(primitive.ObjectID); ok {
logger.Info("Inserted service request", map[string]interface{}{"id": oid.String()})
} else {
panic("Inserted ID is not an ObjectID")
}
}
1 change: 1 addition & 0 deletions backend/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/gookit/event v1.1.1
github.com/gorilla/handlers v1.5.2
github.com/gorilla/mux v1.8.1
github.com/lib/pq v1.10.9
go.mongodb.org/mongo-driver v1.13.1
)

Expand Down
2 changes: 2 additions & 0 deletions backend/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY=
github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ=
github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc=
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe h1:iruDEfMl2E6fbMZ9s0scYfZQ84/6SPL6zC8ACM2oIL0=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down
26 changes: 26 additions & 0 deletions backend/src/database/client/psql.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package client

import (
"database/sql"
"fmt"
"os"

_ "github.com/lib/pq"
)

var db *sql.DB

func GetPsqlClient() (*sql.DB, error) {
if db == nil {
uri := os.Getenv("POSTGRES_URI")
if uri == "" {
return nil, fmt.Errorf("POSTGRES_URI environment variable not set")
}
database, err := sql.Open("postgres", uri)
if err != nil {
return nil, err
}
db = database
}
return db, nil
}
21 changes: 21 additions & 0 deletions backend/src/database/models/service_request_event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package models

import "time"

type EventType string

const (
STEP_STARTED EventType = "STEP_STARTED"
Copy link
Collaborator

@Ziyang-98 Ziyang-98 Mar 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean there is only 4 events now? what happen to the other events? can lmk if anything needs to be changed/added to the existing events for the service request dashboard status column.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the dashboard status column should show the overall status of the service request. The EventType you see here is more granular, and shows the event that has happened for this particular step. You won't be using this EventType for the status column

STEP_APPROVED EventType = "STEP_APPROVED"
STEP_REJECTED EventType = "STEP_REJECTED"
STEP_COMPLETED EventType = "STEP_COMPLETED"
)

type ServiceRequestEventModel struct {
EventId int
EventType EventType
ServiceRequestId string
StepName string
ApprovedBy string
CreatedAt time.Time
}
32 changes: 32 additions & 0 deletions backend/src/database/service_request_event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package database

import (
"database/sql"

"github.com/joshtyf/flowforge/src/database/models"
)

type ServiceRequestEvent struct {
db *sql.DB
}

func NewServiceRequestEvent(db *sql.DB) *ServiceRequestEvent {
return &ServiceRequestEvent{
db: db,
}
}

func (sre *ServiceRequestEvent) Create(srem *models.ServiceRequestEventModel) error {
queryStr := "INSERT INTO service_request_event (event_type, service_request_id, step_name, approved_by) VALUES ($1, $2, $3, $4)"
err := sre.db.QueryRow(
queryStr,
srem.EventType,
srem.ServiceRequestId,
srem.StepName,
srem.ApprovedBy,
).Scan()
if err != nil {
return err
}
return nil
}
35 changes: 34 additions & 1 deletion backend/src/execute/step_exec_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package execute

import (
"context"
"database/sql"
"fmt"

"github.com/gookit/event"
Expand All @@ -16,6 +17,7 @@ import (

type ExecutionManager struct {
mongoClient *mongo.Client
psqlClient *sql.DB
executors map[models.PipelineStepType]*stepExecutor
}

Expand All @@ -32,9 +34,14 @@ func NewStepExecutionManager(configs ...ExecutionManagerConfig) *ExecutionManage
if err != nil {
logger.Error("[ServiceRequestManager] Error getting mongo client", map[string]interface{}{"err": err})
}
psqlClient, err := client.GetPsqlClient()
if err != nil {
logger.Error("[ServiceRequestManager] Error getting psql client", map[string]interface{}{"err": err})
}
srm := &ExecutionManager{
executors: map[models.PipelineStepType]*stepExecutor{},
mongoClient: mongoClient,
psqlClient: psqlClient,
}
for _, c := range configs {
c(srm)
Expand Down Expand Up @@ -74,11 +81,13 @@ func (srm *ExecutionManager) handleNewServiceRequestEvent(e event.Event) error {
return fmt.Errorf("no executor found for first step")
}

// Update the service request status to running
err = database.NewServiceRequest(srm.mongoClient).UpdateStatus(serviceRequest.Id.Hex(), models.Running)
if err != nil {
logger.Error("[ServiceRequestManager] Error updating service request status", map[string]interface{}{"err": err})
return err
}

err = srm.execute(serviceRequest, firstStep, currExecutor)
return err
}
Expand All @@ -93,8 +102,19 @@ func (srm *ExecutionManager) execute(serviceRequest *models.ServiceRequestModel,
util.StepKey,
step,
)
// Log step started event
serviceRequestEvent := database.NewServiceRequestEvent(srm.psqlClient)
err := serviceRequestEvent.Create(&models.ServiceRequestEventModel{
EventType: models.STEP_STARTED,
ServiceRequestId: serviceRequest.Id.Hex(),
StepName: step.StepName,
})
if err != nil {
logger.Error("[ServiceRequestManager] Error creating service request event", map[string]interface{}{"err": err})
return err
}
// Execute the current step
_, err := (*executor).execute(executeCtx)
_, err = (*executor).execute(executeCtx)
if err != nil {
logger.Error("[ServiceRequestManager] Error executing step", map[string]interface{}{"step": (*executor).getStepType(), "err": err})
// TODO: Handle error
Expand All @@ -118,6 +138,19 @@ func (srm *ExecutionManager) handleCompletedStepEvent(e event.Event) error {
return fmt.Errorf("service request is nil")
}

// Log step completed event
serviceRequestEvent := database.NewServiceRequestEvent(srm.psqlClient)
err := serviceRequestEvent.Create(&models.ServiceRequestEventModel{
EventType: models.STEP_COMPLETED,
ServiceRequestId: serviceRequest.Id.Hex(),
StepName: completedStep.StepName,
})
if err != nil {
// TODO: not sure if we should return here. We need to handle the error better
logger.Error("[ServiceRequestManager] Error creating service request event", map[string]interface{}{"err": err})
return err
}

if completedStep.IsTerminalStep {
err := database.NewServiceRequest(srm.mongoClient).UpdateStatus(serviceRequest.Id.Hex(), models.Success)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion backend/src/server/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func handleCreateServiceRequest(client *mongo.Client) http.Handler {
}
srm.CreatedOn = time.Now()
srm.LastUpdated = time.Now()
srm.Status = models.Pending
srm.Status = models.NotStarted

res, err := database.NewServiceRequest(client).Create(&srm)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ services:
context: backend
environment:
- MONGO_URI=${MONGO_URI}
- POSTGRES_URI=${POSTGRES_URI}

postgres:
profiles:
Expand Down
Loading