Skip to content

Commit

Permalink
Add GoFlow flow constructs as a part of goflow library
Browse files Browse the repository at this point in the history
this was previously maintained in github.com/faasflow/lib/goflow

Signed-off-by: s8sg <[email protected]>
  • Loading branch information
s8sg committed Aug 3, 2020
1 parent 137d88e commit 4a77b27
Show file tree
Hide file tree
Showing 6 changed files with 435 additions and 12 deletions.
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
A Golang based high performance, scalable and distributed workflow framework

It allows to programmatically author distributed workflow as Directed Acyclic Graph (DAG) of tasks.
Goflow executes your tasks on an array of goflow workers by uniformly distributing the loads
GoFlow executes your tasks on an array of Flow workers by uniformly distributing the loads

![Build](https://github.com/faasflow/goflow/workflows/GO-Flow-Build/badge.svg)
[![GoDoc](https://godoc.org/github.com/faasflow/goflow?status.svg)](https://godoc.org/github.com/faasflow/goflow)
Expand All @@ -15,7 +15,7 @@ go get github.com/faasflow/goflow
```

## Write First Flow
> Library to Build Flow `github.com/faasflow/lib/goflow`
> Library to Build Flow `github.com/faasflow/goflow/flow`
[![GoDoc](https://godoc.org/github.com/faasflow/lib/goflow?status.svg)](https://godoc.org/github.com/faasflow/lib/goflow)

Expand All @@ -26,7 +26,7 @@ package main
import (
"fmt"
"github.com/faasflow/goflow"
flow "github.com/faasflow/lib/goflow"
flow "github.com/faasflow/goflow/flow"
)

// Workload function
Expand Down Expand Up @@ -73,7 +73,7 @@ curl -d hallo localhost:8080
GoFlow scale horizontally, you can distribute the load by just adding more instances.

#### Worker Mode
Alternatively you can start your goflow in worker mode. As a worker goflow only handles the workload,
Alternatively you can start your GoFlow in worker mode. As a worker GoFlow only handles the workload,
and if required you can only scale the workers
```go
fs := &goflow.FlowService{
Expand All @@ -85,7 +85,7 @@ fs.StartWorker("myflow", DefineWorkflow)
```

#### Server Mode
Similarly you can start your goflow as a server. It only handles the incoming http requests you will
Similarly you can start your GoFlow as a server. It only handles the incoming http requests you will
need to add workers to distribute the workload
```go
fs := &goflow.FlowService{
Expand Down
139 changes: 139 additions & 0 deletions flow/operation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package flow

import (
"fmt"
)

var (
BLANK_MODIFIER = func(data []byte) ([]byte, error) { return data, nil }
)

// FuncErrorHandler the error handler for OnFailure() options
type FuncErrorHandler func(error) error

// Modifier definition for Modify() call
type Modifier func([]byte, map[string][]string) ([]byte, error)

type ServiceOperation struct {
Id string // ID
Mod Modifier // Modifier
Options map[string][]string // The option as a input to workload

FailureHandler FuncErrorHandler // The Failure handler of the operation
}

// createWorkload Create a function with execution name
func createWorkload(id string, mod Modifier) *ServiceOperation {
operation := &ServiceOperation{}
operation.Mod = mod
operation.Id = id
operation.Options = make(map[string][]string)
return operation
}

func (operation *ServiceOperation) addOptions(key string, value string) {
array, ok := operation.Options[key]
if !ok {
operation.Options[key] = make([]string, 1)
operation.Options[key][0] = value
} else {
operation.Options[key] = append(array, value)
}
}

func (operation *ServiceOperation) addFailureHandler(handler FuncErrorHandler) {
operation.FailureHandler = handler
}

func (operation *ServiceOperation) GetOptions() map[string][]string {
return operation.Options
}

func (operation *ServiceOperation) GetId() string {
return operation.Id
}

func (operation *ServiceOperation) Encode() []byte {
return []byte("")
}

// executeWorkload executes a function call
func executeWorkload(operation *ServiceOperation, data []byte) ([]byte, error) {
var err error
var result []byte

options := operation.GetOptions()
result, err = operation.Mod(data, options)

return result, err
}

func (operation *ServiceOperation) Execute(data []byte, option map[string]interface{}) ([]byte, error) {
var result []byte
var err error

if operation.Mod != nil {
result, err = executeWorkload(operation, data)
if err != nil {
err = fmt.Errorf("function(%s), error: function execution failed, %v",
operation.Id, err)
if operation.FailureHandler != nil {
err = operation.FailureHandler(err)
}
if err != nil {
return nil, err
}
}
}

return result, nil
}

func (operation *ServiceOperation) GetProperties() map[string][]string {

result := make(map[string][]string)

isMod := "false"
isFunction := "false"
isHttpRequest := "false"
hasFailureHandler := "false"

if operation.Mod != nil {
isFunction = "true"
}
if operation.FailureHandler != nil {
hasFailureHandler = "true"
}

result["isMod"] = []string{isMod}
result["isFunction"] = []string{isFunction}
result["isHttpRequest"] = []string{isHttpRequest}
result["hasFailureHandler"] = []string{hasFailureHandler}

return result
}

// Apply adds a new workload to the given vertex
func (node *Node) Apply(id string, workload Modifier, opts ...Option) *Node {

newWorkload := createWorkload(id, workload)

o := &Options{}
for _, opt := range opts {
o.reset()
opt(o)
if len(o.option) != 0 {
for key, array := range o.option {
for _, value := range array {
newWorkload.addOptions(key, value)
}
}
}
if o.failureHandler != nil {
newWorkload.addFailureHandler(o.failureHandler)
}
}

node.unode.AddOperation(newWorkload)
return node
}
Loading

0 comments on commit 4a77b27

Please sign in to comment.