diff --git a/flow-state/server/rest/endpoints.go b/flow-state/server/rest/endpoints.go index 33b3cb8..663c7f5 100644 --- a/flow-state/server/rest/endpoints.go +++ b/flow-state/server/rest/endpoints.go @@ -3,13 +3,13 @@ package rest import ( "encoding/json" "fmt" - "io/ioutil" - "net/http" - "strconv" - flowEvent "github.com/project-flogo/flow/support/event" "github.com/project-flogo/services/flow-state/event" "github.com/project-flogo/services/flow-state/store/metadata" + "io/ioutil" + "net/http" + "strconv" + "sync" "github.com/julienschmidt/httprouter" "github.com/project-flogo/core/support/log" @@ -36,6 +36,9 @@ const ( ) type ServiceEndpoints struct { + muc *sync.Cond + // mu *sync.Mutex + stepSlice [][]byte logger log.Logger stepStore store.Store streamingStep bool @@ -44,9 +47,12 @@ type ServiceEndpoints struct { func AppendEndpoints(router *httprouter.Router, logger log.Logger, exposeRecorder bool, streamingStep bool) { sm := &ServiceEndpoints{ + muc: sync.NewCond(&sync.Mutex{}), + stepSlice: [][]byte{}, logger: logger, stepStore: store.RegistedStore(), } + router.GET("/v1/health", sm.getHealthCheck) router.GET("/v1/instances", sm.getInstances) @@ -81,6 +87,11 @@ func AppendEndpoints(router *httprouter.Router, logger log.Logger, exposeRecorde router.POST("/v1/instances/start", sm.saveStart) router.POST("/v1/instances/end", sm.saveEnd) } + // start saveStep worker go routines + maxOpenConn := sm.stepStore.GetMaxOpenConn() + for i := 0; i < maxOpenConn; i++ { + go saveStepWorker(sm) + } } func (se *ServiceEndpoints) getHealthCheck(response http.ResponseWriter, request *http.Request, params httprouter.Params) { @@ -669,7 +680,96 @@ func (se *ServiceEndpoints) saveStart(response http.ResponseWriter, request *htt } +func (se *ServiceEndpoints) addToStepSlice(content []byte) { + se.muc.L.Lock() + defer se.muc.L.Unlock() + se.stepSlice = append(se.stepSlice, content) + se.muc.Broadcast() // broadcast signal to all waiting go routines +} +func (se *ServiceEndpoints) popFromStepSlice() []byte { + se.muc.L.Lock() + defer se.muc.L.Unlock() + var firstItem []byte + for { + if len(se.stepSlice) > 0 { + firstItem, se.stepSlice = se.stepSlice[0], se.stepSlice[1:] + return firstItem + } else { + se.logger.Infof("StepSlice is Empty so going to wait") + se.muc.Wait() // wait for condition to have non empty slice + se.logger.Infof("Received signal, came out of wait") + } + } + return nil +} func (se *ServiceEndpoints) saveStep(response http.ResponseWriter, request *http.Request, _ httprouter.Params) { + se.logger.Debugf("Endpoint[POST:/instances/steps] : Called") + asyncCalling := request.Header.Get(ASYNC_CALLING_HEADER) == "true" + content, err := ioutil.ReadAll(request.Body) + if err != nil { + se.error(response, http.StatusBadRequest, fmt.Errorf("unable to read body")) + se.logger.Error("Endpoint[POST:/instances/steps] : %v", err) + return + } + if asyncCalling { + se.logger.Debug("Calling saveStep in Async way") + response.Header().Set("Content-Type", "application/json") + response.WriteHeader(http.StatusAccepted) + se.logger.Debug("Response sent with StatusAccepted.") + // store content in channel, it will be picked by limited goroutines for db processing + go func(content []byte) { + se.addToStepSlice(content) // add item to slice in separate go routine + se.logger.Info("content added to shared step slice.") + }(content) + } else { + step := &state.Step{} + err = json.Unmarshal(content, step) + if err != nil { + se.error(response, http.StatusBadRequest, fmt.Errorf("unable to unmarshal step json")) + se.logger.Debugf("Endpoint[POST:/instances/steps] : Step content - %s ", string(content)) + se.logger.Errorf("Endpoint[POST:/instances/steps] : Error unmarshalling step - %v", err) + return + } + err = se.stepStore.SaveStep(step) + if err != nil { + se.error(response, http.StatusInternalServerError, fmt.Errorf("unable to save step")) + se.logger.Errorf("Endpoint[POST:/instances/steps] : Error saving step - %v", err) + return + } + response.Header().Set("Content-Type", "application/json") + response.WriteHeader(http.StatusOK) + } +} + +func saveStepWorker(se *ServiceEndpoints) { + for { // infinite loop for getting work from channel + se.logger.Infof("pop from slice") + content := se.popFromStepSlice() + if content != nil { + se.logger.Info("content received from slice.") + step := &state.Step{} + err := json.Unmarshal(content, step) + if err != nil { + se.logger.Debugf("Endpoint[POST:/instances/steps] : Step content - %s ", string(content)) + se.logger.Errorf("Endpoint[POST:/instances/steps] : Error unmarshalling step - %v", err) + continue + } + err = se.stepStore.SaveStep(step) + if err != nil { + se.logger.Errorf("Endpoint[POST:/instances/steps] : Error saving step - %v", err) + } + } else { + se.logger.Infof("**** Should never get here, StepSlice is Empty") + /* se.muc.L.Lock() + se.logger.Infof("StepSlice is Empty so going to wait") + se.muc.Wait() + se.logger.Infof("StepSlice out of wait") + se.muc.L.Unlock()*/ + } + } +} + +/*func (se *ServiceEndpoints) saveStep(response http.ResponseWriter, request *http.Request, _ httprouter.Params) { se.logger.Debugf("Endpoint[POST:/instances/steps] : Called") asyncCalling := request.Header.Get(ASYNC_CALLING_HEADER) == "true" content, err := ioutil.ReadAll(request.Body) @@ -716,7 +816,7 @@ func (se *ServiceEndpoints) saveStep(response http.ResponseWriter, request *http response.WriteHeader(http.StatusOK) } } - +*/ func (se *ServiceEndpoints) saveSnapshot(response http.ResponseWriter, request *http.Request, _ httprouter.Params) { se.logger.Debugf("Endpoint[POST:/instances/snapshot] : Called") asyncCalling := request.Header.Get(ASYNC_CALLING_HEADER) == "true" diff --git a/flow-state/store/mem/step.go b/flow-state/store/mem/step.go index a5706d0..fefe623 100644 --- a/flow-state/store/mem/step.go +++ b/flow-state/store/mem/step.go @@ -30,6 +30,11 @@ func (s *StepStore) GetDBPingStatus() bool { return false } +func (s *StepStore) GetMaxOpenConn() int { + //TODO + return 0 + +} func (s *StepStore) GetStatus(flowId string) int { s.RLock() diff --git a/flow-state/store/postgres/step.go b/flow-state/store/postgres/step.go index 47dba40..6f35fd0 100644 --- a/flow-state/store/postgres/step.go +++ b/flow-state/store/postgres/step.go @@ -43,6 +43,12 @@ func (s *StepStore) GetDBPingStatus() bool { return true } +func (s *StepStore) GetMaxOpenConn() int { + connCount := s.db.db.Stats().MaxOpenConnections + return connCount + +} + func (s *StepStore) GetStatus(flowId string) int { s.RLock() sc, ok := s.stepContainers[flowId] diff --git a/flow-state/store/store.go b/flow-state/store/store.go index 851e857..e5921ae 100644 --- a/flow-state/store/store.go +++ b/flow-state/store/store.go @@ -19,6 +19,7 @@ const ( ) type Store interface { + GetMaxOpenConn() int GetDBPingStatus() bool GetStatus(flowId string) int GetFlow(flowId string, metadata *metadata.Metadata) (*state.FlowInfo, error)