From 494fccafe089582f0dc3fc22e5a15366e71164dc Mon Sep 17 00:00:00 2001 From: Ashutosh Bhide <37860760+abhide-tibco@users.noreply.github.com> Date: Tue, 2 May 2023 18:51:29 +0530 Subject: [PATCH] Flow rerun with io (#54) * update go mod for flow * update queries for flow state and flow rerun * Fixed go mod file * Removed vendor folder * add queries for flow rerun count * Updated go mod * Fixed error * add vendor to .gitignore * add config.json to .gitignore * add config.json to .gitignore * add flow rerun count and flow input * Updated go mod * Added support for existing tables Dont kill the service immediately but keep running * Added checks for database connected * Fixed error * Fixed max connection issue --------- Co-authored-by: Abhijit Wakchaure --- .gitignore | 2 + flow-state/go.mod | 20 +- flow-state/go.sum | 40 +--- flow-state/server/rest/endpoints.go | 13 +- flow-state/store/mem/step.go | 2 +- flow-state/store/postgres/connection.go | 5 + flow-state/store/postgres/operation.go | 64 +++++- flow-state/store/postgres/step.go | 264 +++++++++++++++++++++--- flow-state/store/store.go | 2 +- 9 files changed, 325 insertions(+), 87 deletions(-) diff --git a/.gitignore b/.gitignore index 47700de..130aeb4 100644 --- a/.gitignore +++ b/.gitignore @@ -47,3 +47,5 @@ tags .build-cache submodules/flogo-cicd/.build-cache ./Dockerfile +vendor/ +flow-state/config.json \ No newline at end of file diff --git a/flow-state/go.mod b/flow-state/go.mod index 6ad2902..cf95001 100644 --- a/flow-state/go.mod +++ b/flow-state/go.mod @@ -1,12 +1,20 @@ module github.com/project-flogo/services/flow-state -go 1.16 +go 1.18 require ( - github.com/gorilla/websocket v1.4.2 + github.com/gorilla/websocket v1.5.0 github.com/julienschmidt/httprouter v1.3.0 - github.com/lib/pq v1.10.3 - github.com/project-flogo/core v1.6.0 - github.com/project-flogo/flow v1.6.0 - github.com/rs/cors v1.8.0 + github.com/lib/pq v1.10.7 + github.com/project-flogo/core v1.6.4 + github.com/project-flogo/flow v1.6.5-0.20230324065406-53d6cf9cc418 + github.com/rs/cors v1.8.3 +) + +require ( + github.com/araddon/dateparse v0.0.0-20190622164848-0fb0a474d195 // indirect + github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 // indirect + go.uber.org/atomic v1.6.0 // indirect + go.uber.org/multierr v1.5.0 // indirect + go.uber.org/zap v1.16.0 // indirect ) diff --git a/flow-state/go.sum b/flow-state/go.sum index d14eee3..dbe9086 100644 --- a/flow-state/go.sum +++ b/flow-state/go.sum @@ -5,30 +5,17 @@ github.com/araddon/dateparse v0.0.0-20190622164848-0fb0a474d195/go.mod h1:SLqhdZ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= -github.com/gin-gonic/gin v1.5.0/go.mod h1:Nd6IXA8m5kNZdNEHMBd93KT+mdY3+bewLgRvmCsR2Do= -github.com/go-playground/locales v0.12.1/go.mod h1:IUMDtCfWo/w/mtMfIE/IG2K+Ey3ygWanZIBtBW0W2TM= -github.com/go-playground/universal-translator v0.16.0/go.mod h1:1AnU7NaIRDWWzGEKwgtJRd2xk99HeFyHw3yid4rvQIY= -github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= -github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= -github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= -github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4dN7jwJOQ1U= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/leodido/go-urn v1.1.0/go.mod h1:+cyI34gQWZcE1eQU7NVgKkkzdXDQHr1dBMtdAPozLkw= -github.com/lib/pq v1.10.3 h1:v9QZf2Sn6AmjXtQeFpdoq/eaNtYP6IN+7lcrygsIAtg= -github.com/lib/pq v1.10.3/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= -github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ= -github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= -github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/lib/pq v1.10.7 h1:p7ZhMD+KsSRozJr34udlUrhboJwWAgCg34+/ZZNvZZw= +github.com/lib/pq v1.10.7/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 h1:RWengNIwukTxcDr9M+97sNutRR1RKhG96O6jWumTTnw= github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826/go.mod h1:TaXosZuwdSHYgviHp1DAtfrULt5eUgsSMsZf+YrPgl8= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -36,20 +23,17 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/project-flogo/core v1.4.0/go.mod h1:fapTXUhLxDeAHyb6eMkuwnYswO8FpZJAMat055QVdJE= -github.com/project-flogo/core v1.6.0 h1:+LH226SGU5961xh5H9lp8iUGp5dWPHaMYGu0pTuSLSE= -github.com/project-flogo/core v1.6.0/go.mod h1:fapTXUhLxDeAHyb6eMkuwnYswO8FpZJAMat055QVdJE= -github.com/project-flogo/flow v1.6.0 h1:04La0Tp0QcIuXRXPPncYZ9XwDMLlDdZWxxzQaw92MYg= -github.com/project-flogo/flow v1.6.0/go.mod h1:LesZqWPKWub4EZ71+kib74n5MF7ePlVPAuP0DVtRhkI= +github.com/project-flogo/core v1.6.4 h1:MG4XNXqEmmY3qkPyUOmq5Ph0m8ycknnKcFqsH8avPa0= +github.com/project-flogo/core v1.6.4/go.mod h1:fapTXUhLxDeAHyb6eMkuwnYswO8FpZJAMat055QVdJE= +github.com/project-flogo/flow v1.6.5-0.20230324065406-53d6cf9cc418 h1:jB0fcOjaq5FsgMYKdqVuh+k4JN/T+WEtIdhrVZWh32A= +github.com/project-flogo/flow v1.6.5-0.20230324065406-53d6cf9cc418/go.mod h1:8b0r4IuPb54MFOxKhyq3sPDm080Hzgtv2yVdAA6aEg0= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= -github.com/rs/cors v1.8.0 h1:P2KMzcFwrPoSjkF1WLRPsp3UMLyql8L4v9hQpVeK5so= -github.com/rs/cors v1.8.0/go.mod h1:EBwu+T5AvHOcXwvZIkQFjUN6s8Czyqw12GL/Y0tUyRM= +github.com/rs/cors v1.8.3 h1:O+qNyWn7Z+F9M0ILBHgMVPuB1xTOucVd5gtaYyXBpRo= +github.com/rs/cors v1.8.3/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= -github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= -github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ= github.com/xeipuuv/gojsonschema v1.1.0/go.mod h1:5yf86TLmAcydyeJq5YvxkGPE2fm/u4myDekKRoLuqhs= @@ -72,7 +56,6 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= @@ -81,11 +64,8 @@ golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5 h1:hKsoRgsbwY1NafxrwTs+k64 golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= -gopkg.in/go-playground/assert.v1 v1.2.1/go.mod h1:9RXL0bg/zibRAgZUYszZSwO/z8Y/a8bDuhia5mkpMnE= -gopkg.in/go-playground/validator.v9 v9.29.1/go.mod h1:+c9/zcJMFNgbLvly1L1V+PpxWdVbfP1avr/N00E2vyQ= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXeM= diff --git a/flow-state/server/rest/endpoints.go b/flow-state/server/rest/endpoints.go index 8b0384d..ceb4f25 100644 --- a/flow-state/server/rest/endpoints.go +++ b/flow-state/server/rest/endpoints.go @@ -6,6 +6,7 @@ import ( flowEvent "github.com/project-flogo/flow/support/event" "github.com/project-flogo/services/flow-state/event" "github.com/project-flogo/services/flow-state/store/metadata" + "github.com/project-flogo/services/flow-state/store/postgres" "io/ioutil" "net/http" "strconv" @@ -98,11 +99,13 @@ func (se *ServiceEndpoints) getHealthCheck(response http.ResponseWriter, request se.logger.Debugf("Endpoint[GET:/health] : Called") switch request.Method { case http.MethodGet: - if se.stepStore.Status() { - response.WriteHeader(http.StatusOK) - } else { - se.logger.Info("Health check status failed") - response.WriteHeader(515) + + status := se.stepStore.Status().(*postgres.DBDetails) + + response.Header().Set("Content-Type", "application/json") + response.WriteHeader(http.StatusOK) + if err := json.NewEncoder(response).Encode(status); err != nil { + se.logger.Error(err.Error()) } default: response.WriteHeader(http.StatusMethodNotAllowed) diff --git a/flow-state/store/mem/step.go b/flow-state/store/mem/step.go index b072d1d..c7205e0 100644 --- a/flow-state/store/mem/step.go +++ b/flow-state/store/mem/step.go @@ -25,7 +25,7 @@ type StepStore struct { snapshots sync.Map } -func (s *StepStore) Status() bool { +func (s *StepStore) Status() interface{} { //TODO return true } diff --git a/flow-state/store/postgres/connection.go b/flow-state/store/postgres/connection.go index 437b2cd..396b1ef 100644 --- a/flow-state/store/postgres/connection.go +++ b/flow-state/store/postgres/connection.go @@ -65,6 +65,11 @@ func decodeTLSParam(tlsparm string) string { func NewDB(settings map[string]interface{}) (*sql.DB, error) { var err error + dbMaxConn := settings["maxopenconnection"] + _, err = strconv.Atoi(dbMaxConn.(string)) + if err != nil { + settings["maxopenconnection"] = "0" + } s := &pgConnection{} err = metadata.MapToStruct(settings, s, false) diff --git a/flow-state/store/postgres/operation.go b/flow-state/store/postgres/operation.go index 83bbf91..8e4b282 100644 --- a/flow-state/store/postgres/operation.go +++ b/flow-state/store/postgres/operation.go @@ -14,24 +14,53 @@ import ( ) const ( - STEP_INSERT = "INSERT INTO steps (flowinstanceid, stepid, taskname, status, starttime, endtime, stepdata) VALUES ($1,$2,$3,$4,$5,$6,$7);" - SNAPSHOT_INSERT = "INSERT INTO snapshopt (flowinstanceid, hostid, stepid, starttime, endtime, stepdata) VALUES ($1,$2,$3,$4,$5,$6);" - FlowState_UPSERT_RERUN = "INSERT INTO flowstate (flowInstanceId, userId, appName,appVersion, flowName, hostId,startTime,endTime,status) VALUES ($1,$2,$3,$4,$5,$6,$7,$8, $9) ON CONFLICT (flowinstanceid) DO UPDATE SET hostId = EXCLUDED.hostId, flowName = EXCLUDED.flowName, userId = EXCLUDED.userId, status = EXCLUDED.status, starttime=EXCLUDED.starttime,endtime= EXCLUDED.endtime;\n" - UpdateFlowState = "UPDATE flowstate set endtime=$1,status=$2, executiontime=ROUND( ((EXTRACT(EPOCH FROM ($1 - starttime)))*1000) :: numeric , 3) where flowinstanceid = $3;" + STEP_INSERT = "INSERT INTO steps (flowinstanceid, stepid, taskname, status, starttime, endtime, stepdata) VALUES ($1,$2,$3,$4,$5,$6,$7);" + SNAPSHOT_INSERT = "INSERT INTO snapshopt (flowinstanceid, hostid, stepid, starttime, endtime, stepdata) VALUES ($1,$2,$3,$4,$5,$6);" + + FlowState_UPSERT_RERUN_v1 = "INSERT INTO flowstate (flowInstanceId, userId, appName,appVersion, flowName, hostId, startTime, endTime, status, rerunofflowinstanceid) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10) ON CONFLICT (flowinstanceid) DO UPDATE SET hostId = EXCLUDED.hostId, flowName = EXCLUDED.flowName, userId = EXCLUDED.userId, status = EXCLUDED.status, starttime=EXCLUDED.starttime,endtime= EXCLUDED.endtime;" + FlowState_UPSERT_RERUN_v2 = "INSERT INTO flowstate (flowInstanceId, userId, appName,appVersion, flowName, hostId, flowInput, flowOutput, rerunCount, startTime, endTime, status, rerunofflowinstanceid) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13) ON CONFLICT (flowinstanceid) DO UPDATE SET hostId = EXCLUDED.hostId, flowName = EXCLUDED.flowName, userId = EXCLUDED.userId, status = EXCLUDED.status, flowInput = EXCLUDED.flowInput, flowOutput = EXCLUDED.flowOutput, rerunCount = EXCLUDED.rerunCount, starttime=EXCLUDED.starttime,endtime= EXCLUDED.endtime;" + UpdateFlowState_v1 = "UPDATE flowstate set endtime=$1,status=$2, executiontime=ROUND( ((EXTRACT(EPOCH FROM ($1 - starttime)))*1000) :: numeric , 3) where flowinstanceid = $3;" + UpdateFlowState_v2 = "UPDATE flowstate set endtime=$1, status=$2, flowOutput=$3, executiontime=ROUND( ((EXTRACT(EPOCH FROM ($1 - starttime)))*1000) :: numeric , 3) where flowinstanceid = $4;" UpsertSteps = "INSERT INTO steps (flowinstanceid, stepid, taskname, status, starttime, endtime, stepdata, subflowid, flowname, rerun) VALUES($1,$2,$3,$4,$5,$6,$7, $8, $9, $10) ON CONFLICT (flowinstanceid, stepid) DO UPDATE SET status = EXCLUDED.status, starttime=EXCLUDED.starttime,endtime= EXCLUDED.endtime,stepdata=EXCLUDED.stepdata;\n" DeleteSteps = "DELETE from steps where flowinstanceid = $1 and CAST(stepid as INTEGER) >= $2" - UpsertAppState = "INSERT INTO appstate (userId, appName, persistenceenabled) VALUES($1,$2,$3) ON CONFLICT (userId, appName) DO UPDATE SET persistenceenabled = EXCLUDED.persistenceenabled ;\n" + UpsertAppState = "INSERT INTO appstate (userId, appName, persistenceenabled) VALUES($1,$2,$3) ON CONFLICT (userId, appName) DO UPDATE SET persistenceenabled = EXCLUDED.persistenceenabled ;\n" + IncrementRerunCount = "UPDATE flowstate SET reruncount = reruncount + 1 WHERE flowinstanceid = $1 RETURNING reruncount;" ) type StatefulDB struct { - db *sql.DB + db *sql.DB + dbDetails *DBDetails } func (s *StatefulDB) InsertFlowState(flowState *state.FlowState) (results *ResultSet, err error) { - inputArgs := []interface{}{flowState.FlowInstanceId, flowState.UserId, flowState.AppName, flowState.AppVersion, flowState.FlowName, flowState.HostId, flowState.StartTime, flowState.EndTime, flowState.FlowStats} - return s.insert(FlowState_UPSERT_RERUN, inputArgs) + var flowInputs, flowOutputs []byte + if flowState.FlowInputs == nil { + flowState.FlowInputs = make(map[string]interface{}) + } + flowInputs, _ = json.Marshal(flowState.FlowInputs) + + //if flowState.FlowOutputs != nil { + // flowOutputs, _ = json.Marshal(flowState.FlowOutputs) + //} + if flowState.OriginalInstanceId != "" { + // update original instance Id and increment rerun count + if s.dbDetails.SmVersion == "2.0" { + _, err = s.update(IncrementRerunCount, []interface{}{flowState.OriginalInstanceId}) + if err != nil { + return nil, err + } + } + } + + if s.dbDetails.SmVersion == "1.0" { + inputArgs := []interface{}{flowState.FlowInstanceId, flowState.UserId, flowState.AppName, flowState.AppVersion, flowState.FlowName, flowState.HostId, flowState.StartTime, flowState.EndTime, flowState.FlowStats, flowState.OriginalInstanceId} + return s.insert(FlowState_UPSERT_RERUN_v1, inputArgs) + } + inputArgs := []interface{}{flowState.FlowInstanceId, flowState.UserId, flowState.AppName, flowState.AppVersion, flowState.FlowName, flowState.HostId, flowInputs, flowOutputs, flowState.RerunCount, flowState.StartTime, flowState.EndTime, flowState.FlowStats, flowState.OriginalInstanceId} + return s.insert(FlowState_UPSERT_RERUN_v2, inputArgs) + } func (s *StatefulDB) InsertAppState(appStatedata *metadata.Metadata) (results *ResultSet, err error) { @@ -40,8 +69,21 @@ func (s *StatefulDB) InsertAppState(appStatedata *metadata.Metadata) (results *R } func (s *StatefulDB) UpdateFlowState(flowState *state.FlowState) (results *ResultSet, err error) { - inputArgs := []interface{}{flowState.EndTime, flowState.FlowStats, flowState.FlowInstanceId} - return s.insert(UpdateFlowState, inputArgs) + var flowOutputs []byte + //if flowState.FlowInputs != nil { + // flowInputs, _ = json.Marshal(flowState.FlowInputs) + //} + if flowState.FlowOutputs != nil { + flowOutputs, _ = json.Marshal(flowState.FlowOutputs) + } + + if s.dbDetails.SmVersion == "1.0" { + inputArgs := []interface{}{flowState.EndTime, flowState.FlowStats, flowState.FlowInstanceId} + return s.insert(UpdateFlowState_v1, inputArgs) + } + inputArgs := []interface{}{flowState.EndTime, flowState.FlowStats, flowOutputs, flowState.FlowInstanceId} + return s.insert(UpdateFlowState_v2, inputArgs) + } func (s *StatefulDB) InsertSteps(step *state.Step) (results *ResultSet, err error) { @@ -173,7 +215,7 @@ func (s *StatefulDB) delete(deleteSql string, inputArgs []interface{}) (results return UnmarshalRows(rows) } -//GetStatement +// GetStatement func (s *StatefulDB) getStepStatement(prepared string) (stmt *sql.Stmt, err error) { preparedQueryCacheMutex.Lock() defer preparedQueryCacheMutex.Unlock() diff --git a/flow-state/store/postgres/step.go b/flow-state/store/postgres/step.go index 0b3f1d5..8fae9b7 100644 --- a/flow-state/store/postgres/step.go +++ b/flow-state/store/postgres/step.go @@ -4,7 +4,9 @@ import ( "database/sql/driver" "encoding/base64" "encoding/json" + "errors" "fmt" + "github.com/lib/pq" "reflect" "strconv" "strings" @@ -20,10 +22,30 @@ import ( func NewStore(settings map[string]interface{}) (*StepStore, error) { db, err := NewDB(settings) + dbDetails := &DBDetails{} if err != nil { - return nil, err + dbDetails.Connected = false + dbDetails.Message = err.Error() + dbDetails.SmVersion = "1.0" + return &StepStore{db: &StatefulDB{db: db, dbDetails: dbDetails}, settings: settings}, nil } - return &StepStore{db: &StatefulDB{db: db}, settings: settings}, err + + statefulDB := &StatefulDB{db: db} + dbDetails.Connected = true + dbDetails.Message = "Connected" + dbDetails.getDBDetails(statefulDB) + dbDetails.Status = true + statefulDB.dbDetails = dbDetails + return &StepStore{db: statefulDB, settings: settings}, err + +} + +type DBDetails struct { + SmVersion string `json:"smVersion"` + Connected bool `json:"connected"` + TablesExists bool `json:"tablesExists"` + Message string `json:"message"` + Status bool `json:"status"` } type StepStore struct { @@ -36,16 +58,66 @@ type StepStore struct { settings map[string]interface{} } -func (s *StepStore) Status() bool { +func (d *DBDetails) getDBDetails(db *StatefulDB) { + sqlSelect := "SELECT count(*) FROM information_schema.tables WHERE table_name in ('flowstate' , 'appstate' , 'steps')" + set, err := db.query(sqlSelect, nil) + if err != nil { + d.TablesExists = false + d.Message = "One or more required tables are missing in the database schema" + } else { + for _, v := range set.Record { + m := *v + count, _ := coerce.ToInt(m["count"]) + if count == 3 { + d.TablesExists = true + } else { + d.TablesExists = false + d.Message = "One or more required tables are missing in the database schema" + + } + } + } + if !d.TablesExists { + return + } + sqlSelect = "SELECT column_name FROM information_schema.columns WHERE table_name = 'flowstate' and column_name = 'flowinput'" + set, err = db.query(sqlSelect, nil) + if err != nil { + d.SmVersion = "1.0" + } else { + if set.Record == nil { + d.SmVersion = "1.0" + } + if len(set.Record) > 0 { + d.SmVersion = "2.0" + } + } +} + +func (s *StepStore) Status() interface{} { + + if !s.db.dbDetails.Connected { + return s.db.dbDetails + } + if err := s.db.db.Ping(); err != nil { - return false + s.db.dbDetails.Status = false + } else { + s.db.dbDetails.Status = true } - return true + + return s.db.dbDetails } func (s *StepStore) MaxConcurrencyLimit() int { - connCount := s.db.db.Stats().MaxOpenConnections - return connCount + if s.db.dbDetails.Connected { + if s.db.db.Stats().MaxOpenConnections > 0 { + return s.db.db.Stats().MaxOpenConnections + } + return 50 + } else { + return 1 + } } @@ -74,6 +146,11 @@ func (s *StepStore) GetStatus(flowId string) int { //} func (s *StepStore) GetFailedFlows(metadata *metadata.Metadata) ([]*state.FlowInfo, error) { + + if !s.db.dbDetails.Connected { + return nil, errors.New("Database is not connected") + } + var whereStr = "where" if len(metadata.Username) < 0 { return nil, fmt.Errorf("unauthorized, please provide user infomation") @@ -140,6 +217,11 @@ func (s *StepStore) GetFailedFlows(metadata *metadata.Metadata) ([]*state.FlowIn } func (s *StepStore) GetCompletedFlows(metadata *metadata.Metadata) ([]*state.FlowInfo, error) { + + if !s.db.dbDetails.Connected { + return nil, errors.New("Database is not connected") + } + var whereStr = "where" if len(metadata.Username) < 0 { return nil, fmt.Errorf("unauthorized, please provide user infomation") @@ -206,6 +288,11 @@ func (s *StepStore) GetCompletedFlows(metadata *metadata.Metadata) ([]*state.Flo } func (s *StepStore) GetFlows(metadata *metadata.Metadata) ([]*state.FlowInfo, error) { + + if !s.db.dbDetails.Connected { + return nil, errors.New("Database is not connected") + } + var whereStr = "where" if len(metadata.Username) < 0 { return nil, fmt.Errorf("unauthorized, please provide user infomation") @@ -285,6 +372,11 @@ func (s *StepStore) GetFlows(metadata *metadata.Metadata) ([]*state.FlowInfo, er } func (s *StepStore) GetFlowsWithRecordCount(mtdata *metadata.Metadata) (*metadata.FlowRecord, error) { + + if !s.db.dbDetails.Connected { + return nil, errors.New("Database is not connected") + } + var whereStr = "where" if len(mtdata.Username) < 0 { return nil, fmt.Errorf("unauthorized, please provide user infomation") @@ -314,7 +406,7 @@ func (s *StepStore) GetFlowsWithRecordCount(mtdata *metadata.Metadata) (*metadat } if len(mtdata.FlowInstanceId) > 0 { - whereStr += " and flowinstanceid='" + mtdata.FlowInstanceId + "'" + whereStr += " and (flowinstanceid='" + mtdata.FlowInstanceId + "' or rerunofflowinstanceid='" + mtdata.FlowInstanceId + "' )" } if len(mtdata.Interval) > 0 { @@ -332,27 +424,43 @@ func (s *StepStore) GetFlowsWithRecordCount(mtdata *metadata.Metadata) (*metadat whereStr += offsetLimitStr } - set, err := s.db.query("select flowinstanceid, flowname, status, hostid, starttime, endtime, executiontime, count(*) over() AS full_count from flowstate "+whereStr, nil) + var set *ResultSet + var err error + if s.db.dbDetails.SmVersion == "1.0" { + set, err = s.db.query("select flowinstanceid, flowname, status, hostid, starttime, endtime, executiontime, rerunofflowinstanceid, count(*) over() AS full_count from flowstate "+whereStr, nil) + } else { + set, err = s.db.query("select flowinstanceid, flowname, status, hostid, starttime, endtime, executiontime, rerunofflowinstanceid, reruncount, flowinput, count(*) over() AS full_count from flowstate "+whereStr, nil) + } if err != nil { - if err == driver.ErrBadConn || strings.Contains(err.Error(), "connection refused") || strings.Contains(err.Error(), "network is unreachable") || - strings.Contains(err.Error(), "connection reset by peer") || strings.Contains(err.Error(), "dial tcp: lookup") || - strings.Contains(err.Error(), "timeout") || strings.Contains(err.Error(), "timedout") || - strings.Contains(err.Error(), "timed out") || strings.Contains(err.Error(), "net.Error") || strings.Contains(err.Error(), "i/o timeout") { - if retryErr := s.RetryDBConnection(); retryErr == nil { - logCache.Debugf("Retrying from GetFlowsWithRecordCount after successful connection retry ") - set, err = s.db.query("select flowinstanceid, flowname, status, hostid, starttime, endtime, executiontime, count(*) over() AS full_count from flowstate "+whereStr, nil) - if err != nil { - logCache.Errorf("Could not connect to database server error:, %s", err.Error()) - return nil, err + pqerror, ok := err.(*pq.Error) + if ok { + if pqerror.Routine == "errorMissingColumn" { + set, err = s.db.query("select flowinstanceid, flowname, status, hostid, starttime, endtime, executiontime, rerunofflowinstanceid, count(*) over() AS full_count from flowstate "+whereStr, nil) + } + } + + if err != nil { + + if err == driver.ErrBadConn || strings.Contains(err.Error(), "connection refused") || strings.Contains(err.Error(), "network is unreachable") || + strings.Contains(err.Error(), "connection reset by peer") || strings.Contains(err.Error(), "dial tcp: lookup") || + strings.Contains(err.Error(), "timeout") || strings.Contains(err.Error(), "timedout") || + strings.Contains(err.Error(), "timed out") || strings.Contains(err.Error(), "net.Error") || strings.Contains(err.Error(), "i/o timeout") { + if retryErr := s.RetryDBConnection(); retryErr == nil { + logCache.Debugf("Retrying from GetFlowsWithRecordCount after successful connection retry ") + set, err = s.db.query("select flowinstanceid, flowname, status, hostid, starttime, endtime, executiontime, rerunofflowinstanceid, reruncount, flowinput, count(*) over() AS full_count from flowstate "+whereStr, nil) + if err != nil { + logCache.Errorf("Could not connect to database server error:, %s", err.Error()) + return nil, err + } + } else { + logCache.Errorf("Could not connect to database server error:, %s", retryErr.Error()) + return nil, retryErr } } else { - logCache.Errorf("Could not connect to database server error:, %s", retryErr.Error()) - return nil, retryErr + logCache.Errorf("Could not connect to database server error:, %s", err.Error()) + return nil, err } - } else { - logCache.Errorf("Could not connect to database server error:, %s", err.Error()) - return nil, err } } var count int32 @@ -367,14 +475,25 @@ func (s *StepStore) GetFlowsWithRecordCount(mtdata *metadata.Metadata) (*metadat endtime, _ := coerce.ToString(m["endtime"]) executiontime, _ := coerce.ToString(m["executiontime"]) count, _ = coerce.ToInt32(m["full_count"]) + originalInstanceId, _ := coerce.ToString(m["rerunofflowinstanceid"]) + reRunCount, _ := coerce.ToInt(m["reruncount"]) + var flowInput map[string]interface{} + + if m["flowinput"] != nil { + flowInput = make(map[string]interface{}) + } + info := &state.FlowInfo{ - Id: id, - FlowName: flowName, - HostId: hostid, - FlowStatus: status, - StartTime: starttime, - EndTime: endtime, - ExecutionTime: executiontime, + Id: id, + FlowName: flowName, + HostId: hostid, + FlowStatus: status, + StartTime: starttime, + EndTime: endtime, + ExecutionTime: executiontime, + OriginalInstanceId: originalInstanceId, + RerunCount: reRunCount, + FlowInputs: flowInput, } flowinfo = append(flowinfo, info) } @@ -387,6 +506,10 @@ func (s *StepStore) GetFlowsWithRecordCount(mtdata *metadata.Metadata) (*metadat } func (s *StepStore) GetFlow(flowid string, metadata *metadata.Metadata) (*state.FlowInfo, error) { + if !s.db.dbDetails.Connected { + return nil, errors.New("Database is not connected") + } + var whereStr = "where flowinstanceid = '" + flowid + "'" if len(metadata.Username) > 0 { whereStr += " and userId='" + metadata.Username + "'" @@ -402,7 +525,7 @@ func (s *StepStore) GetFlow(flowid string, metadata *metadata.Metadata) (*state. whereStr += " and hostId='" + metadata.HostId + "'" } - set, err := s.db.query("select flowinstanceid, flowname, status from flowstate "+whereStr, nil) + set, err := s.db.query("select flowinstanceid, flowname, status, flowinput, reruncount, rerunofflowinstanceid from flowstate "+whereStr, nil) if err != nil { if err == driver.ErrBadConn || strings.Contains(err.Error(), "connection refused") || strings.Contains(err.Error(), "network is unreachable") || strings.Contains(err.Error(), "connection reset by peer") || strings.Contains(err.Error(), "dial tcp: lookup") || @@ -410,7 +533,7 @@ func (s *StepStore) GetFlow(flowid string, metadata *metadata.Metadata) (*state. strings.Contains(err.Error(), "timed out") || strings.Contains(err.Error(), "net.Error") || strings.Contains(err.Error(), "i/o timeout") { if retryErr := s.RetryDBConnection(); retryErr == nil { logCache.Debugf("Retrying from GetFlow after successful connection retry ") - set, err = s.db.query("select flowinstanceid, flowname, status from flowstate "+whereStr, nil) + set, err = s.db.query("select flowinstanceid, flowname, status, flowinput from flowstate "+whereStr, nil) if err != nil { logCache.Errorf("Could not connect to database server error:, %s", err.Error()) return nil, err @@ -431,11 +554,29 @@ func (s *StepStore) GetFlow(flowid string, metadata *metadata.Metadata) (*state. id, _ := coerce.ToString(m["flowinstanceid"]) flowName, _ := coerce.ToString(m["flowname"]) status, _ := coerce.ToString(m["status"]) + + flowInputBytes, err := coerce.ToBytes(m["flowinput"]) + if err != nil { + logCache.Errorf("decodeBase64 for flowInputBytes in GetFlow error:, %s", err.Error()) + return nil, fmt.Errorf("decodeBase64 for flowInputBytes in GetFlow error:", err.Error()) + } + dbuf := make([]byte, base64.StdEncoding.DecodedLen(len(flowInputBytes))) + n, err := base64.StdEncoding.Decode(dbuf, flowInputBytes) + if err != nil { + return nil, err + } + stepData := dbuf[:n] + var flowInput map[string]interface{} + err = json.Unmarshal(stepData, &flowInput) + if err != nil { + return nil, err + } info := &state.FlowInfo{ Id: id, FlowName: flowName, FlowStatus: status, FlowURI: "res://flow:" + flowName, + FlowInputs: flowInput, } flowinfo = append(flowinfo, info) } @@ -446,6 +587,10 @@ func (s *StepStore) GetFlow(flowid string, metadata *metadata.Metadata) (*state. } func (s *StepStore) GetFlowNames(metadata *metadata.Metadata) ([]string, error) { + if !s.db.dbDetails.Connected { + return nil, errors.New("Database is not connected") + } + var whereStr = "where " if len(metadata.Username) > 0 { whereStr += " userId='" + metadata.Username + "'" @@ -494,6 +639,11 @@ func (s *StepStore) GetFlowNames(metadata *metadata.Metadata) ([]string, error) } func (s *StepStore) GetAppVersions(metadata *metadata.Metadata) ([]string, error) { + + if !s.db.dbDetails.Connected { + return nil, errors.New("Database is not connected") + } + var whereStr = "where " if len(metadata.Username) > 0 { whereStr += " userId='" + metadata.Username + "'" @@ -535,6 +685,11 @@ func (s *StepStore) GetAppVersions(metadata *metadata.Metadata) ([]string, error } func (s *StepStore) GetAppState(metadata *metadata.Metadata) (string, error) { + + if !s.db.dbDetails.Connected { + return "", errors.New("Database is not connected") + } + var whereStr = "where " if len(metadata.Username) > 0 { whereStr += " userId='" + metadata.Username + "'" @@ -574,6 +729,10 @@ func (s *StepStore) GetAppState(metadata *metadata.Metadata) (string, error) { } func (s *StepStore) SaveAppState(metadata *metadata.Metadata) error { + if !s.db.dbDetails.Connected { + return errors.New("Database is not connected") + } + _, err := s.db.InsertAppState(metadata) if err != nil && (err == driver.ErrBadConn || strings.Contains(err.Error(), "connection refused") || strings.Contains(err.Error(), "network is unreachable") || strings.Contains(err.Error(), "connection reset by peer") || strings.Contains(err.Error(), "dial tcp: lookup") || @@ -595,6 +754,11 @@ func (s *StepStore) SaveAppState(metadata *metadata.Metadata) error { } func (s *StepStore) SaveStep(step *state.Step) error { + + if !s.db.dbDetails.Connected { + return errors.New("Database is not connected") + } + _, err := s.db.InsertSteps(step) if err != nil && (err == driver.ErrBadConn || strings.Contains(err.Error(), "connection refused") || strings.Contains(err.Error(), "network is unreachable") || strings.Contains(err.Error(), "connection reset by peer") || strings.Contains(err.Error(), "dial tcp: lookup") || @@ -616,6 +780,11 @@ func (s *StepStore) SaveStep(step *state.Step) error { } func (s *StepStore) DeleteSteps(flowId string, stepId string) error { + + if !s.db.dbDetails.Connected { + return errors.New("Database is not connected") + } + _, err := s.db.DeleteSteps(flowId, stepId) if err != nil && (err == driver.ErrBadConn || strings.Contains(err.Error(), "connection refused") || strings.Contains(err.Error(), "network is unreachable") || strings.Contains(err.Error(), "connection reset by peer") || strings.Contains(err.Error(), "dial tcp: lookup") || @@ -638,6 +807,10 @@ func (s *StepStore) DeleteSteps(flowId string, stepId string) error { func (s *StepStore) GetSteps(flowId string) ([]*state.Step, error) { + if !s.db.dbDetails.Connected { + return nil, errors.New("Database is not connected") + } + set, err := s.db.query("select stepdata from steps where flowinstanceid = '"+flowId+"'", nil) if err != nil { if err == driver.ErrBadConn || strings.Contains(err.Error(), "connection refused") || strings.Contains(err.Error(), "network is unreachable") || @@ -689,6 +862,9 @@ func (s *StepStore) GetSteps(flowId string) ([]*state.Step, error) { func (s *StepStore) GetStepsAsTasks(flowId string) ([][]*task.Task, error) { + if !s.db.dbDetails.Connected { + return nil, errors.New("Database is not connected") + } set, err := s.db.query("select stepdata from steps where flowinstanceid = '"+flowId+"'", nil) if err != nil { if err == driver.ErrBadConn || strings.Contains(err.Error(), "connection refused") || strings.Contains(err.Error(), "network is unreachable") || @@ -747,6 +923,10 @@ func (s *StepStore) GetStepsAsTasks(flowId string) ([][]*task.Task, error) { } func (s *StepStore) GetStepdataForActivity(flowId, stepid, taskname string) ([]*task.Task, error) { + + if !s.db.dbDetails.Connected { + return nil, errors.New("Database is not connected") + } query := "select stepdata from steps where flowinstanceid = '" + flowId + "' and stepid = '" + stepid + "'" if taskname != "" { query += " and taskname = '" + taskname + "'" @@ -831,6 +1011,11 @@ func (s *StepStore) GetStepdataForActivity(flowId, stepid, taskname string) ([]* } func (s *StepStore) GetStepIdOfEnclosingCallSubflow(flowid, taskname, subflowid string) (string, error) { + + if !s.db.dbDetails.Connected { + return "", errors.New("Database is not connected") + } + set, err := s.db.query("select stepid from steps where taskname = '"+taskname+"' and flowinstanceid= '"+flowid+"' and subflowid= '"+subflowid+"' and status != 'Waiting'", nil) if err != nil { if err == driver.ErrBadConn || strings.Contains(err.Error(), "connection refused") || strings.Contains(err.Error(), "network is unreachable") || @@ -863,6 +1048,10 @@ func (s *StepStore) GetStepIdOfEnclosingCallSubflow(flowid, taskname, subflowid func (s *StepStore) GetStepsStatus(flowId string) ([]map[string]string, error) { + if !s.db.dbDetails.Connected { + return nil, errors.New("Database is not connected") + } + set, err := s.db.query("select stepid, taskname, status, starttime, flowname, rerun, subflowid from steps where flowinstanceid = '"+flowId+"' and stepid != '0' order by cast(stepid as integer)", nil) if err != nil { @@ -998,6 +1187,10 @@ func (s *StepStore) GetSnapshot(flowId string) *state.Snapshot { } func (s *StepStore) RecordStart(flowState *state.FlowState) error { + + if !s.db.dbDetails.Connected { + return errors.New("Database is not connected") + } _, err := s.db.InsertFlowState(flowState) if err != nil && (err == driver.ErrBadConn || strings.Contains(err.Error(), "connection refused") || strings.Contains(err.Error(), "network is unreachable") || strings.Contains(err.Error(), "connection reset by peer") || strings.Contains(err.Error(), "dial tcp: lookup") || @@ -1019,6 +1212,11 @@ func (s *StepStore) RecordStart(flowState *state.FlowState) error { } func (s *StepStore) RecordEnd(flowState *state.FlowState) error { + + if !s.db.dbDetails.Connected { + return errors.New("Database is not connected") + } + _, err := s.db.UpdateFlowState(flowState) if err != nil && (err == driver.ErrBadConn || strings.Contains(err.Error(), "connection refused") || strings.Contains(err.Error(), "network is unreachable") || strings.Contains(err.Error(), "connection reset by peer") || strings.Contains(err.Error(), "dial tcp: lookup") || diff --git a/flow-state/store/store.go b/flow-state/store/store.go index 5bb8886..e0dc80d 100644 --- a/flow-state/store/store.go +++ b/flow-state/store/store.go @@ -20,7 +20,7 @@ const ( type Store interface { MaxConcurrencyLimit() int - Status() bool + Status() interface{} GetStatus(flowId string) int GetFlow(flowId string, metadata *metadata.Metadata) (*state.FlowInfo, error) GetFlows(metadata *metadata.Metadata) ([]*state.FlowInfo, error)