From 7e443cf8f3054170413f6015b2f589630a072e10 Mon Sep 17 00:00:00 2001 From: Joshua Date: Sat, 2 Mar 2024 19:42:07 +0800 Subject: [PATCH 01/11] build: add psql driver --- backend/go.mod | 1 + backend/go.sum | 2 ++ 2 files changed, 3 insertions(+) diff --git a/backend/go.mod b/backend/go.mod index 4259c2f5..3fe5a2db 100644 --- a/backend/go.mod +++ b/backend/go.mod @@ -6,6 +6,7 @@ require ( github.com/gookit/event v1.1.1 github.com/gorilla/handlers v1.5.2 github.com/gorilla/mux v1.8.1 + github.com/lib/pq v1.10.9 go.mongodb.org/mongo-driver v1.13.1 ) diff --git a/backend/go.sum b/backend/go.sum index 7cfbbdd4..5a40a7bb 100644 --- a/backend/go.sum +++ b/backend/go.sum @@ -14,6 +14,8 @@ github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc= github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe h1:iruDEfMl2E6fbMZ9s0scYfZQ84/6SPL6zC8ACM2oIL0= github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= From e69f96160c506fdd0a7c21cc2f6de8a7656653a5 Mon Sep 17 00:00:00 2001 From: Joshua Date: Sat, 2 Mar 2024 19:42:39 +0800 Subject: [PATCH 02/11] feat: add POSTGRES_URI env --- compose.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/compose.yaml b/compose.yaml index 43af84c1..b164f1ee 100644 --- a/compose.yaml +++ b/compose.yaml @@ -22,6 +22,7 @@ services: context: backend environment: - MONGO_URI=${MONGO_URI} + - POSTGRES_URI=${POSTGRES_URI} postgres: profiles: From 2a4869e729dceb2655bc03554e963325876fefa2 Mon Sep 17 00:00:00 2001 From: Joshua Date: Sat, 2 Mar 2024 19:43:16 +0800 Subject: [PATCH 03/11] refactor: newly created SR should have status NotStarted --- backend/src/server/handlers.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/src/server/handlers.go b/backend/src/server/handlers.go index ac805373..436f391c 100644 --- a/backend/src/server/handlers.go +++ b/backend/src/server/handlers.go @@ -71,7 +71,7 @@ func handleCreateServiceRequest(client *mongo.Client) http.Handler { } srm.CreatedOn = time.Now() srm.LastUpdated = time.Now() - srm.Status = models.Pending + srm.Status = models.NotStarted res, err := database.NewServiceRequest(client).Create(&srm) if err != nil { From daf56cffe2e59c5ce7e5b4bfe34fb4b4d84b1797 Mon Sep 17 00:00:00 2001 From: Joshua Date: Sat, 2 Mar 2024 19:43:50 +0800 Subject: [PATCH 04/11] refactor: update psql init --- backend/database/init.sql | 37 +++++++++++++++++++------------------ 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/backend/database/init.sql b/backend/database/init.sql index f79c6190..d0af4930 100644 --- a/backend/database/init.sql +++ b/backend/database/init.sql @@ -3,7 +3,7 @@ -- -- Dumped from database version 16.1 --- Dumped by pg_dump version 16.1 (Homebrew) +-- Dumped by pg_dump version 16.2 (Homebrew) SET statement_timeout = 0; SET lock_timeout = 0; @@ -21,25 +21,26 @@ SET default_tablespace = ''; SET default_table_access_method = heap; -- --- Name: service_request_actions; Type: TABLE; Schema: public; Owner: postgres +-- Name: service_request_event; Type: TABLE; Schema: public; Owner: postgres -- -CREATE TABLE public.service_request_actions ( - action_id integer NOT NULL, - step_name character varying(50) NOT NULL, - action_name character varying(50) NOT NULL, - approved_by character varying(50), +CREATE TABLE public.service_request_event ( + event_id integer NOT NULL, + event_type character varying NOT NULL, + service_request_id character varying NOT NULL, + step_name character varying NOT NULL, + approved_by character varying, created_at timestamp without time zone DEFAULT now() ); -ALTER TABLE public.service_request_actions OWNER TO postgres; +ALTER TABLE public.service_request_event OWNER TO postgres; -- --- Name: service_request_actions_action_id_seq; Type: SEQUENCE; Schema: public; Owner: postgres +-- Name: service_request_events_event_id_seq; Type: SEQUENCE; Schema: public; Owner: postgres -- -CREATE SEQUENCE public.service_request_actions_action_id_seq +CREATE SEQUENCE public.service_request_events_event_id_seq AS integer START WITH 1 INCREMENT BY 1 @@ -48,28 +49,28 @@ CREATE SEQUENCE public.service_request_actions_action_id_seq CACHE 1; -ALTER SEQUENCE public.service_request_actions_action_id_seq OWNER TO postgres; +ALTER SEQUENCE public.service_request_events_event_id_seq OWNER TO postgres; -- --- Name: service_request_actions_action_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: postgres +-- Name: service_request_events_event_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: postgres -- -ALTER SEQUENCE public.service_request_actions_action_id_seq OWNED BY public.service_request_actions.action_id; +ALTER SEQUENCE public.service_request_events_event_id_seq OWNED BY public.service_request_event.event_id; -- --- Name: service_request_actions action_id; Type: DEFAULT; Schema: public; Owner: postgres +-- Name: service_request_event event_id; Type: DEFAULT; Schema: public; Owner: postgres -- -ALTER TABLE ONLY public.service_request_actions ALTER COLUMN action_id SET DEFAULT nextval('public.service_request_actions_action_id_seq'::regclass); +ALTER TABLE ONLY public.service_request_event ALTER COLUMN event_id SET DEFAULT nextval('public.service_request_events_event_id_seq'::regclass); -- --- Name: service_request_actions service_request_actions_pkey; Type: CONSTRAINT; Schema: public; Owner: postgres +-- Name: service_request_event service_request_events_pkey; Type: CONSTRAINT; Schema: public; Owner: postgres -- -ALTER TABLE ONLY public.service_request_actions - ADD CONSTRAINT service_request_actions_pkey PRIMARY KEY (action_id); +ALTER TABLE ONLY public.service_request_event + ADD CONSTRAINT service_request_events_pkey PRIMARY KEY (event_id); -- From aa071731c639879e9f64a0f4b5e48d9f153886b8 Mon Sep 17 00:00:00 2001 From: Joshua Date: Sat, 2 Mar 2024 19:45:42 +0800 Subject: [PATCH 05/11] fix: seeded SR not having the correct pipeline id --- backend/database/mongo_seed/main.go | 55 +++++++++++++++-------------- 1 file changed, 29 insertions(+), 26 deletions(-) diff --git a/backend/database/mongo_seed/main.go b/backend/database/mongo_seed/main.go index 575f5823..53f8ceb2 100644 --- a/backend/database/mongo_seed/main.go +++ b/backend/database/mongo_seed/main.go @@ -16,36 +16,14 @@ func main() { panic(err) } - serviceReqId, err := primitive.ObjectIDFromHex("F2D8E1A73B964C5E7A0F81D9") - if err != nil { - panic(err) - } - serviceRequest := models.ServiceRequestModel{ - Id: serviceReqId, - PipelineId: "1", - PipelineVersion: 1, - Status: models.Success, - Remarks: "This is a test service request.", - CreatedOn: time.Date(2024, time.January, 1, 1, 0, 0, 0, time.UTC), - LastUpdated: time.Date(2024, time.January, 1, 1, 0, 0, 0, time.UTC), - } - - res, err := database.NewServiceRequest(c).Create(&serviceRequest) - if err != nil { - panic(err) - } - if oid, ok := res.InsertedID.(primitive.ObjectID); ok { - logger.Info("Inserted service request", map[string]interface{}{"id": oid.String()}) - } else { - panic("Inserted ID is not an ObjectID") - } - pipelineUuid, err := primitive.ObjectIDFromHex("8A7F3EBCD951246A5F0E9B87") + pipelineIdInHex := "8A7F3EBCD951246A5F0E9B87" + pipelineId, err := primitive.ObjectIDFromHex(pipelineIdInHex) if err != nil { panic(err) } pipeline := models.PipelineModel{ PipelineName: "Test Pipeline", - Id: pipelineUuid, + Id: pipelineId, Version: 1, FirstStepName: "step1", Steps: []models.PipelineStepModel{ @@ -71,7 +49,7 @@ func main() { }, } - res, err = database.NewPipeline(c).Create(&pipeline) + res, err := database.NewPipeline(c).Create(&pipeline) if err != nil { panic(err) } @@ -80,4 +58,29 @@ func main() { } else { panic("Inserted ID is not an ObjectID") } + + serviceReqIdInHex := "F2D8E1A73B964C5E7A0F81D9" + serviceReqId, err := primitive.ObjectIDFromHex(serviceReqIdInHex) + if err != nil { + panic(err) + } + serviceRequest := models.ServiceRequestModel{ + Id: serviceReqId, + PipelineId: pipelineIdInHex, + PipelineVersion: 1, + Status: models.Success, + Remarks: "This is a test service request.", + CreatedOn: time.Date(2024, time.January, 1, 1, 0, 0, 0, time.UTC), + LastUpdated: time.Date(2024, time.January, 1, 1, 0, 0, 0, time.UTC), + } + + res, err = database.NewServiceRequest(c).Create(&serviceRequest) + if err != nil { + panic(err) + } + if oid, ok := res.InsertedID.(primitive.ObjectID); ok { + logger.Info("Inserted service request", map[string]interface{}{"id": oid.String()}) + } else { + panic("Inserted ID is not an ObjectID") + } } From 805c0d8c1896fd50bb2b13edb55858b539df5587 Mon Sep 17 00:00:00 2001 From: Joshua Date: Sat, 2 Mar 2024 19:47:04 +0800 Subject: [PATCH 06/11] feat: add psql db client --- backend/src/database/client/psql.go | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) create mode 100644 backend/src/database/client/psql.go diff --git a/backend/src/database/client/psql.go b/backend/src/database/client/psql.go new file mode 100644 index 00000000..e5745f2a --- /dev/null +++ b/backend/src/database/client/psql.go @@ -0,0 +1,26 @@ +package client + +import ( + "database/sql" + "fmt" + "os" + + _ "github.com/lib/pq" +) + +var db *sql.DB + +func GetPsqlClient() (*sql.DB, error) { + if db == nil { + uri := os.Getenv("POSTGRES_URI") + if uri == "" { + return nil, fmt.Errorf("POSTGRES_URI environment variable not set") + } + database, err := sql.Open("postgres", uri) + if err != nil { + return nil, err + } + db = database + } + return db, nil +} From 57f356ac0e7ad12a04b713146f8480b33b3b4406 Mon Sep 17 00:00:00 2001 From: Joshua Date: Sat, 2 Mar 2024 19:47:47 +0800 Subject: [PATCH 07/11] feat: add service request event model --- .../database/models/service_request_event.go | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) create mode 100644 backend/src/database/models/service_request_event.go diff --git a/backend/src/database/models/service_request_event.go b/backend/src/database/models/service_request_event.go new file mode 100644 index 00000000..bbcf132c --- /dev/null +++ b/backend/src/database/models/service_request_event.go @@ -0,0 +1,21 @@ +package models + +import "time" + +type EventType string + +const ( + STEP_STARTED EventType = "STEP_STARTED" + STEP_APPROVED EventType = "STEP_APPROVED" + STEP_REJECTED EventType = "STEP_REJECTED" + STEP_COMPLETED EventType = "STEP_COMPLETED" +) + +type ServiceRequestEventModel struct { + EventId int + EventType EventType + ServiceRequestId string + StepName string + ApprovedBy string + CreatedAt time.Time +} From 87f47e27887854d843f048f2a4dd18ff15286a69 Mon Sep 17 00:00:00 2001 From: Joshua Date: Sat, 2 Mar 2024 19:47:56 +0800 Subject: [PATCH 08/11] feat: add Create method --- backend/src/database/service_request_event.go | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) create mode 100644 backend/src/database/service_request_event.go diff --git a/backend/src/database/service_request_event.go b/backend/src/database/service_request_event.go new file mode 100644 index 00000000..39fde191 --- /dev/null +++ b/backend/src/database/service_request_event.go @@ -0,0 +1,32 @@ +package database + +import ( + "database/sql" + + "github.com/joshtyf/flowforge/src/database/models" +) + +type ServiceRequestEvent struct { + db *sql.DB +} + +func NewServiceRequestEvent(db *sql.DB) *ServiceRequestEvent { + return &ServiceRequestEvent{ + db: db, + } +} + +func (sre *ServiceRequestEvent) Create(srem *models.ServiceRequestEventModel) error { + queryStr := "INSERT INTO service_request_event (event_type, service_request_id, step_name, approved_by) VALUES ($1, $2, $3, $4)" + err := sre.db.QueryRow( + queryStr, + srem.EventType, + srem.ServiceRequestId, + srem.StepName, + srem.ApprovedBy, + ).Scan() + if err != nil { + return err + } + return nil +} From 9af8a90394f34f7a0c587f9cf1144f0440597db8 Mon Sep 17 00:00:00 2001 From: Joshua Date: Sat, 2 Mar 2024 19:48:32 +0800 Subject: [PATCH 09/11] feat: create service request event when starting new step --- backend/src/execute/step_exec_mgr.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/backend/src/execute/step_exec_mgr.go b/backend/src/execute/step_exec_mgr.go index 85d04070..839df9b0 100644 --- a/backend/src/execute/step_exec_mgr.go +++ b/backend/src/execute/step_exec_mgr.go @@ -2,6 +2,7 @@ package execute import ( "context" + "database/sql" "fmt" "github.com/gookit/event" @@ -16,6 +17,7 @@ import ( type ExecutionManager struct { mongoClient *mongo.Client + psqlClient *sql.DB executors map[models.PipelineStepType]*stepExecutor } @@ -32,9 +34,14 @@ func NewStepExecutionManager(configs ...ExecutionManagerConfig) *ExecutionManage if err != nil { logger.Error("[ServiceRequestManager] Error getting mongo client", map[string]interface{}{"err": err}) } + psqlClient, err := client.GetPsqlClient() + if err != nil { + logger.Error("[ServiceRequestManager] Error getting psql client", map[string]interface{}{"err": err}) + } srm := &ExecutionManager{ executors: map[models.PipelineStepType]*stepExecutor{}, mongoClient: mongoClient, + psqlClient: psqlClient, } for _, c := range configs { c(srm) @@ -74,11 +81,22 @@ func (srm *ExecutionManager) handleNewServiceRequestEvent(e event.Event) error { return fmt.Errorf("no executor found for first step") } + // TODO: Need to handle both database queries as a transaction err = database.NewServiceRequest(srm.mongoClient).UpdateStatus(serviceRequest.Id.Hex(), models.Running) if err != nil { logger.Error("[ServiceRequestManager] Error updating service request status", map[string]interface{}{"err": err}) return err } + serviceRequestEvent := database.NewServiceRequestEvent(srm.psqlClient) + err = serviceRequestEvent.Create(&models.ServiceRequestEventModel{ + EventType: models.STEP_STARTED, + ServiceRequestId: serviceRequest.Id.Hex(), + StepName: firstStep.StepName, + }) + if err != nil { + logger.Error("[ServiceRequestManager] Error creating service request event", map[string]interface{}{"err": err}) + return err + } err = srm.execute(serviceRequest, firstStep, currExecutor) return err } From 7d8ddd9cae315671f981fc8fb60968bed84d0039 Mon Sep 17 00:00:00 2001 From: Joshua Date: Sun, 3 Mar 2024 14:25:22 +0800 Subject: [PATCH 10/11] refactor: move step event started into 'execute' method --- backend/src/execute/step_exec_mgr.go | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/backend/src/execute/step_exec_mgr.go b/backend/src/execute/step_exec_mgr.go index 839df9b0..60234d5d 100644 --- a/backend/src/execute/step_exec_mgr.go +++ b/backend/src/execute/step_exec_mgr.go @@ -81,22 +81,13 @@ func (srm *ExecutionManager) handleNewServiceRequestEvent(e event.Event) error { return fmt.Errorf("no executor found for first step") } - // TODO: Need to handle both database queries as a transaction + // Update the service request status to running err = database.NewServiceRequest(srm.mongoClient).UpdateStatus(serviceRequest.Id.Hex(), models.Running) if err != nil { logger.Error("[ServiceRequestManager] Error updating service request status", map[string]interface{}{"err": err}) return err } - serviceRequestEvent := database.NewServiceRequestEvent(srm.psqlClient) - err = serviceRequestEvent.Create(&models.ServiceRequestEventModel{ - EventType: models.STEP_STARTED, - ServiceRequestId: serviceRequest.Id.Hex(), - StepName: firstStep.StepName, - }) - if err != nil { - logger.Error("[ServiceRequestManager] Error creating service request event", map[string]interface{}{"err": err}) - return err - } + err = srm.execute(serviceRequest, firstStep, currExecutor) return err } @@ -111,8 +102,19 @@ func (srm *ExecutionManager) execute(serviceRequest *models.ServiceRequestModel, util.StepKey, step, ) + // Log step started event + serviceRequestEvent := database.NewServiceRequestEvent(srm.psqlClient) + err := serviceRequestEvent.Create(&models.ServiceRequestEventModel{ + EventType: models.STEP_STARTED, + ServiceRequestId: serviceRequest.Id.Hex(), + StepName: step.StepName, + }) + if err != nil { + logger.Error("[ServiceRequestManager] Error creating service request event", map[string]interface{}{"err": err}) + return err + } // Execute the current step - _, err := (*executor).execute(executeCtx) + _, err = (*executor).execute(executeCtx) if err != nil { logger.Error("[ServiceRequestManager] Error executing step", map[string]interface{}{"step": (*executor).getStepType(), "err": err}) // TODO: Handle error From 463a7f31c781f3b705031429b4b419ee4d215cac Mon Sep 17 00:00:00 2001 From: Joshua Date: Sun, 3 Mar 2024 14:30:25 +0800 Subject: [PATCH 11/11] feat: log step completed event --- backend/src/execute/step_exec_mgr.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/backend/src/execute/step_exec_mgr.go b/backend/src/execute/step_exec_mgr.go index 60234d5d..b814499f 100644 --- a/backend/src/execute/step_exec_mgr.go +++ b/backend/src/execute/step_exec_mgr.go @@ -138,6 +138,19 @@ func (srm *ExecutionManager) handleCompletedStepEvent(e event.Event) error { return fmt.Errorf("service request is nil") } + // Log step completed event + serviceRequestEvent := database.NewServiceRequestEvent(srm.psqlClient) + err := serviceRequestEvent.Create(&models.ServiceRequestEventModel{ + EventType: models.STEP_COMPLETED, + ServiceRequestId: serviceRequest.Id.Hex(), + StepName: completedStep.StepName, + }) + if err != nil { + // TODO: not sure if we should return here. We need to handle the error better + logger.Error("[ServiceRequestManager] Error creating service request event", map[string]interface{}{"err": err}) + return err + } + if completedStep.IsTerminalStep { err := database.NewServiceRequest(srm.mongoClient).UpdateStatus(serviceRequest.Id.Hex(), models.Success) if err != nil {