Skip to content

Commit

Permalink
Merge pull request #47 from milanbhagwat/master
Browse files Browse the repository at this point in the history
Updated logic to save Stepdata in async way using sync.Cond
  • Loading branch information
vijaynalawade authored Jun 7, 2022
2 parents c75336e + 6e474ee commit 86f9ab0
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 5 deletions.
110 changes: 105 additions & 5 deletions flow-state/server/rest/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ package rest
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strconv"

flowEvent "github.com/project-flogo/flow/support/event"
"github.com/project-flogo/services/flow-state/event"
"github.com/project-flogo/services/flow-state/store/metadata"
"io/ioutil"
"net/http"
"strconv"
"sync"

"github.com/julienschmidt/httprouter"
"github.com/project-flogo/core/support/log"
Expand All @@ -36,6 +36,9 @@ const (
)

type ServiceEndpoints struct {
muc *sync.Cond
// mu *sync.Mutex
stepSlice [][]byte
logger log.Logger
stepStore store.Store
streamingStep bool
Expand All @@ -44,9 +47,12 @@ type ServiceEndpoints struct {
func AppendEndpoints(router *httprouter.Router, logger log.Logger, exposeRecorder bool, streamingStep bool) {

sm := &ServiceEndpoints{
muc: sync.NewCond(&sync.Mutex{}),
stepSlice: [][]byte{},
logger: logger,
stepStore: store.RegistedStore(),
}

router.GET("/v1/health", sm.getHealthCheck)
router.GET("/v1/instances", sm.getInstances)

Expand Down Expand Up @@ -81,6 +87,11 @@ func AppendEndpoints(router *httprouter.Router, logger log.Logger, exposeRecorde
router.POST("/v1/instances/start", sm.saveStart)
router.POST("/v1/instances/end", sm.saveEnd)
}
// start saveStep worker go routines
maxOpenConn := sm.stepStore.GetMaxOpenConn()
for i := 0; i < maxOpenConn; i++ {
go saveStepWorker(sm)
}
}

func (se *ServiceEndpoints) getHealthCheck(response http.ResponseWriter, request *http.Request, params httprouter.Params) {
Expand Down Expand Up @@ -669,7 +680,96 @@ func (se *ServiceEndpoints) saveStart(response http.ResponseWriter, request *htt

}

func (se *ServiceEndpoints) addToStepSlice(content []byte) {
se.muc.L.Lock()
defer se.muc.L.Unlock()
se.stepSlice = append(se.stepSlice, content)
se.muc.Broadcast() // broadcast signal to all waiting go routines
}
func (se *ServiceEndpoints) popFromStepSlice() []byte {
se.muc.L.Lock()
defer se.muc.L.Unlock()
var firstItem []byte
for {
if len(se.stepSlice) > 0 {
firstItem, se.stepSlice = se.stepSlice[0], se.stepSlice[1:]
return firstItem
} else {
se.logger.Infof("StepSlice is Empty so going to wait")
se.muc.Wait() // wait for condition to have non empty slice
se.logger.Infof("Received signal, came out of wait")
}
}
return nil
}
func (se *ServiceEndpoints) saveStep(response http.ResponseWriter, request *http.Request, _ httprouter.Params) {
se.logger.Debugf("Endpoint[POST:/instances/steps] : Called")
asyncCalling := request.Header.Get(ASYNC_CALLING_HEADER) == "true"
content, err := ioutil.ReadAll(request.Body)
if err != nil {
se.error(response, http.StatusBadRequest, fmt.Errorf("unable to read body"))
se.logger.Error("Endpoint[POST:/instances/steps] : %v", err)
return
}
if asyncCalling {
se.logger.Debug("Calling saveStep in Async way")
response.Header().Set("Content-Type", "application/json")
response.WriteHeader(http.StatusAccepted)
se.logger.Debug("Response sent with StatusAccepted.")
// store content in channel, it will be picked by limited goroutines for db processing
go func(content []byte) {
se.addToStepSlice(content) // add item to slice in separate go routine
se.logger.Info("content added to shared step slice.")
}(content)
} else {
step := &state.Step{}
err = json.Unmarshal(content, step)
if err != nil {
se.error(response, http.StatusBadRequest, fmt.Errorf("unable to unmarshal step json"))
se.logger.Debugf("Endpoint[POST:/instances/steps] : Step content - %s ", string(content))
se.logger.Errorf("Endpoint[POST:/instances/steps] : Error unmarshalling step - %v", err)
return
}
err = se.stepStore.SaveStep(step)
if err != nil {
se.error(response, http.StatusInternalServerError, fmt.Errorf("unable to save step"))
se.logger.Errorf("Endpoint[POST:/instances/steps] : Error saving step - %v", err)
return
}
response.Header().Set("Content-Type", "application/json")
response.WriteHeader(http.StatusOK)
}
}

func saveStepWorker(se *ServiceEndpoints) {
for { // infinite loop for getting work from channel
se.logger.Infof("pop from slice")
content := se.popFromStepSlice()
if content != nil {
se.logger.Info("content received from slice.")
step := &state.Step{}
err := json.Unmarshal(content, step)
if err != nil {
se.logger.Debugf("Endpoint[POST:/instances/steps] : Step content - %s ", string(content))
se.logger.Errorf("Endpoint[POST:/instances/steps] : Error unmarshalling step - %v", err)
continue
}
err = se.stepStore.SaveStep(step)
if err != nil {
se.logger.Errorf("Endpoint[POST:/instances/steps] : Error saving step - %v", err)
}
} else {
se.logger.Infof("**** Should never get here, StepSlice is Empty")
/* se.muc.L.Lock()
se.logger.Infof("StepSlice is Empty so going to wait")
se.muc.Wait()
se.logger.Infof("StepSlice out of wait")
se.muc.L.Unlock()*/
}
}
}

/*func (se *ServiceEndpoints) saveStep(response http.ResponseWriter, request *http.Request, _ httprouter.Params) {
se.logger.Debugf("Endpoint[POST:/instances/steps] : Called")
asyncCalling := request.Header.Get(ASYNC_CALLING_HEADER) == "true"
content, err := ioutil.ReadAll(request.Body)
Expand Down Expand Up @@ -716,7 +816,7 @@ func (se *ServiceEndpoints) saveStep(response http.ResponseWriter, request *http
response.WriteHeader(http.StatusOK)
}
}

*/
func (se *ServiceEndpoints) saveSnapshot(response http.ResponseWriter, request *http.Request, _ httprouter.Params) {
se.logger.Debugf("Endpoint[POST:/instances/snapshot] : Called")
asyncCalling := request.Header.Get(ASYNC_CALLING_HEADER) == "true"
Expand Down
5 changes: 5 additions & 0 deletions flow-state/store/mem/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ func (s *StepStore) GetDBPingStatus() bool {
return false
}

func (s *StepStore) GetMaxOpenConn() int {
//TODO
return 0

}
func (s *StepStore) GetStatus(flowId string) int {

s.RLock()
Expand Down
6 changes: 6 additions & 0 deletions flow-state/store/postgres/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ func (s *StepStore) GetDBPingStatus() bool {
return true
}

func (s *StepStore) GetMaxOpenConn() int {
connCount := s.db.db.Stats().MaxOpenConnections
return connCount

}

func (s *StepStore) GetStatus(flowId string) int {
s.RLock()
sc, ok := s.stepContainers[flowId]
Expand Down
1 change: 1 addition & 0 deletions flow-state/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const (
)

type Store interface {
GetMaxOpenConn() int
GetDBPingStatus() bool
GetStatus(flowId string) int
GetFlow(flowId string, metadata *metadata.Metadata) (*state.FlowInfo, error)
Expand Down

0 comments on commit 86f9ab0

Please sign in to comment.