Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature invoke flogo activity #102

Open
wants to merge 27 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
9c88d70
Add actionsService section to rulesession descriptor
rameshpolishetti Jul 24, 2019
08aa72a
Invoke activity when rule fires
rameshpolishetti Jul 30, 2019
bf1c4b4
Resolve rule action service inputs before executing
rameshpolishetti Aug 9, 2019
3958a45
Add gotest for rule action service
rameshpolishetti Aug 12, 2019
b2c2aa7
Support function based rule action service
rameshpolishetti Aug 12, 2019
6308d4a
Merge branch 'master' into feature-invoke-activity
rameshpolishetti Aug 12, 2019
9743b43
Refactor rule ActionFunc -> rule action service
rameshpolishetti Aug 13, 2019
6bf9174
Remove actionFun from ruleImpl
rameshpolishetti Aug 13, 2019
810be8a
Fix typos
rameshpolishetti Aug 13, 2019
c273b9f
updated example flogo.json's to work with invoke activity feature
LakshmiMekala Aug 14, 2019
e3fedca
Add support for flogo action based rule service
rameshpolishetti Aug 14, 2019
186cd93
Add support for async action based rule service
rameshpolishetti Aug 14, 2019
04ff831
Updating example to work with sync and async actions
LakshmiMekala Aug 14, 2019
dc9de0c
Improve log messages and fix gotests
rameshpolishetti Aug 19, 2019
653c923
update example
LakshmiMekala Aug 19, 2019
12a83e2
deleted invoke activity example and renamed to invokeservice
LakshmiMekala Aug 19, 2019
6e5ebdf
Improve invokeservice example & doc
rameshpolishetti Aug 19, 2019
d42b866
Improve logs & implement service context
rameshpolishetti Aug 20, 2019
cb01fe0
Merge branch 'master' into feature-invoke-activity
rameshpolishetti May 18, 2020
21e8016
Remove flogo-action support as a RuleActionService
rameshpolishetti May 19, 2020
91edc5f
Fix examples
rameshpolishetti May 19, 2020
6f909b0
update readme and rulesapp
nareshkumarthota May 19, 2020
45afd4e
removed sanity.sh scripts
nareshkumarthota May 19, 2020
610cbcc
go tests added for all apps
nareshkumarthota May 19, 2020
a58564c
lint and imports changes
nareshkumarthota May 19, 2020
470d590
Update README docs
rameshpolishetti May 19, 2020
2fb10d1
readme update
nareshkumarthota May 20, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ A `Rule` constitutes of multiple Conditions and the rule triggers when all its c

A `Condition` is an expression involving one or more tuple types. When the expression evaluates to true, the condition passes. In order to optimize a Rule's evaluation, the Rule network needs to know of the TupleTypes and the properties of the TupleType which participate in the `Condition` evaluation. These are provided when constructing the condition and adding it to the rule.

A `Action` is a function that is invoked each time that a matching combination of tuples are found that result in a `true` evaluation of all its conditions. Those matching tuples are passed to the action function.
A `ActionService` can be either go-funtion or a flogo-activity, this gets invoked each time that a matching combination of tuples are found that result in a `true` evaluation of all its conditions. Those matching tuples are passed to the service.

A `RuleSession` is a handle to interact with the rules API. You can create and register multiple rule sessions. Rule sessions are silos for the data that they hold, they are similar to namespaces. Sharing objects/state across rule sessions is not supported.

Expand Down Expand Up @@ -70,7 +70,13 @@ Next create a `RuleSession` and add all the `Rule`s with their `Condition`s and
//// check for name "Bob" in n1
rule := ruleapi.NewRule("n1.name == Bob")
rule.AddCondition("c1", []string{"n1"}, checkForBob, nil)
rule.SetAction(checkForBobAction)
serviceCfg := &config.ServiceDescriptor{
Name: "checkForBobAction",
Type: config.TypeServiceFunction,
Function: checkForBobAction,
}
aService, _ := ruleapi.NewActionService(serviceCfg)
rule.SetActionService(aService)
rule.SetContext("This is a test of context")
rs.AddRule(rule)
fmt.Printf("Rule added: [%s]\n", rule.GetName())
Expand All @@ -80,12 +86,17 @@ Next create a `RuleSession` and add all the `Rule`s with their `Condition`s and
rule2 := ruleapi.NewRule("n1.name == Bob && n1.name == n2.name")
rule2.AddCondition("c1", []string{"n1"}, checkForBob, nil)
rule2.AddCondition("c2", []string{"n1", "n2"}, checkSameNamesCondition, nil)
rule2.SetAction(checkSameNamesAction)
serviceCfg2 := &config.ServiceDescriptor{
Name: "checkSameNamesAction",
Type: config.TypeServiceFunction,
Function: checkSameNamesAction,
}
aService2, _ := ruleapi.NewActionService(serviceCfg2)
rule2.SetActionService(aService2)
rs.AddRule(rule2)
fmt.Printf("Rule added: [%s]\n", rule2.GetName())

//Finally, start the rule session before asserting tuples
//Your startup function, if registered will be invoked here

//Start the rule session
rs.Start(nil)

Here we create and assert the actual `Tuple's` which will be evaluated against the `Rule's` `Condition's` defined above.
Expand Down
10 changes: 8 additions & 2 deletions common/model/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type Rule interface {
GetName() string
GetIdentifiers() []TupleType
GetConditions() []Condition
GetActionFn() ActionFunction
GetActionService() ActionService
String() string
GetPriority() int
GetDeps() map[TupleType]map[string]bool
Expand All @@ -23,7 +23,7 @@ type Rule interface {
type MutableRule interface {
Rule
AddCondition(conditionName string, idrs []string, cFn ConditionEvaluator, ctx RuleContext) (err error)
SetAction(actionFn ActionFunction)
SetActionService(actionService ActionService)
SetPriority(priority int)
SetContext(ctx RuleContext)
AddExprCondition(conditionName string, cExpr string, ctx RuleContext) error
Expand Down Expand Up @@ -86,6 +86,12 @@ type ConditionEvaluator func(string, string, map[TupleType]Tuple, RuleContext) b
//i.e part of the server side API
type ActionFunction func(context.Context, RuleSession, string, map[TupleType]Tuple, RuleContext)

// ActionService action service
type ActionService interface {
SetInput(input map[string]interface{})
Execute(context.Context, RuleSession, string, map[TupleType]Tuple, RuleContext) (done bool, err error)
}

//StartupRSFunction is called once after creation of a RuleSession
type StartupRSFunction func(ctx context.Context, rs RuleSession, sessionCtx map[string]interface{}) (err error)

Expand Down
112 changes: 98 additions & 14 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,41 @@ package config
import (
"bytes"
"encoding/json"
"fmt"
"strconv"

"github.com/project-flogo/core/data/metadata"
"github.com/project-flogo/rules/common/model"
)

const (
// TypeServiceFunction represents go function based rule service
TypeServiceFunction = "function"
// TypeServiceActivity represents flgo-activity based rule service
TypeServiceActivity = "activity"
)

// RuleSessionDescriptor is a collection of rules to be loaded

type RuleActionDescriptor struct {
Name string `json:"name"`
IOMetadata *metadata.IOMetadata `json:"metadata"`
Rules []*RuleDescriptor `json:"rules"`
Services []*ServiceDescriptor `json:"services,omitempty"`
}

type RuleSessionDescriptor struct {
Rules []*RuleDescriptor `json:"rules"`
Rules []*RuleDescriptor `json:"rules"`
Services []*ServiceDescriptor `json:"services,omitempty"`
}

// RuleDescriptor defines a rule
type RuleDescriptor struct {
Name string
Conditions []*ConditionDescriptor
ActionFunc model.ActionFunction
Priority int
Identifiers []string
Name string
Conditions []*ConditionDescriptor
ActionService *ActionServiceDescriptor
Priority int
Identifiers []string
}

// ConditionDescriptor defines a condition in a rule
Expand All @@ -38,13 +48,30 @@ type ConditionDescriptor struct {
Expression string
}

// ActionServiceDescriptor defines rule action service
type ActionServiceDescriptor struct {
Service string `json:"service"`
Input map[string]interface{} `json:"input,omitempty"`
}

// ServiceDescriptor defines a functional target that may be invoked by a rule
type ServiceDescriptor struct {
Name string
Description string
Type string
Function model.ActionFunction
Ref string
Settings map[string]interface{}
}

// UnmarshalJSON unmarshals JSON into struct RuleDescriptor
func (c *RuleDescriptor) UnmarshalJSON(d []byte) error {
ser := &struct {
Name string `json:"name"`
Conditions []*ConditionDescriptor `json:"conditions"`
ActionFuncId string `json:"actionFunction"`
Priority int `json:"priority"`
Identifiers []string `json:"identifiers"`
Name string `json:"name"`
Conditions []*ConditionDescriptor `json:"conditions"`
ActionService *ActionServiceDescriptor `json:"actionService,omitempty"`
Priority int `json:"priority"`
Identifiers []string `json:"identifiers"`
}{}

if err := json.Unmarshal(d, ser); err != nil {
Expand All @@ -53,13 +80,14 @@ func (c *RuleDescriptor) UnmarshalJSON(d []byte) error {

c.Name = ser.Name
c.Conditions = ser.Conditions
c.ActionFunc = GetActionFunction(ser.ActionFuncId)
c.ActionService = ser.ActionService
c.Priority = ser.Priority
c.Identifiers = ser.Identifiers

return nil
}

// MarshalJSON returns JSON encoding of RuleDescriptor
func (c *RuleDescriptor) MarshalJSON() ([]byte, error) {
buffer := bytes.NewBufferString("{")
buffer.WriteString("\"name\":" + "\"" + c.Name + "\",")
Expand All @@ -82,13 +110,16 @@ func (c *RuleDescriptor) MarshalJSON() ([]byte, error) {
buffer.Truncate(buffer.Len() - 1)
buffer.WriteString("],")

actionFunctionID := GetActionFunctionID(c.ActionFunc)
buffer.WriteString("\"actionFunction\":\"" + actionFunctionID + "\",")
jsonActionService, err := json.Marshal(c.ActionService)
if err == nil {
buffer.WriteString("\"actionService\":" + string(jsonActionService) + ",")
}
buffer.WriteString("\"priority\":" + strconv.Itoa(c.Priority) + "}")

return buffer.Bytes(), nil
}

// UnmarshalJSON unmarshals JSON into struct ConditionDescriptor
func (c *ConditionDescriptor) UnmarshalJSON(d []byte) error {
ser := &struct {
Name string `json:"name"`
Expand All @@ -109,6 +140,7 @@ func (c *ConditionDescriptor) UnmarshalJSON(d []byte) error {
return nil
}

// MarshalJSON returns JSON encoding of ConditionDescriptor
func (c *ConditionDescriptor) MarshalJSON() ([]byte, error) {
buffer := bytes.NewBufferString("{")
buffer.WriteString("\"name\":" + "\"" + c.Name + "\",")
Expand All @@ -128,6 +160,58 @@ func (c *ConditionDescriptor) MarshalJSON() ([]byte, error) {
return buffer.Bytes(), nil
}

// UnmarshalJSON unmarshals JSON into struct ServiceDescriptor
func (sd *ServiceDescriptor) UnmarshalJSON(d []byte) error {
ser := &struct {
Name string `json:"name"`
Description string `json:"description,omitempty"`
Type string `json:"type"`
FunctionID string `json:"function,omitempty"`
Ref string `json:"ref"`
Settings map[string]interface{} `json:"settings,omitempty"`
}{}

if err := json.Unmarshal(d, ser); err != nil {
return err
}

sd.Name = ser.Name
sd.Description = ser.Description
if ser.Type == TypeServiceFunction || ser.Type == TypeServiceActivity {
sd.Type = ser.Type
} else {
return fmt.Errorf("unsupported type - '%s' is referenced in the service '%s'", ser.Type, ser.Name)
}
if ser.FunctionID != "" {
fn := GetActionFunction(ser.FunctionID)
if fn == nil {
return fmt.Errorf("function - '%s' not found", ser.FunctionID)
}
sd.Function = fn
}
sd.Ref = ser.Ref
sd.Settings = ser.Settings

return nil
}

// MarshalJSON returns JSON encoding of ServiceDescriptor
func (sd *ServiceDescriptor) MarshalJSON() ([]byte, error) {
buffer := bytes.NewBufferString("{")
buffer.WriteString("\"name\":" + "\"" + sd.Name + "\",")
buffer.WriteString("\"description\":" + "\"" + sd.Description + "\",")
buffer.WriteString("\"type\":" + "\"" + sd.Type + "\",")
functionID := GetActionFunctionID(sd.Function)
buffer.WriteString("\"function\":" + "\"" + functionID + "\",")
buffer.WriteString("\"ref\":" + "\"" + sd.Ref + "\",")
jsonSettings, err := json.Marshal(sd.Settings)
if err == nil {
buffer.WriteString("\"settings\":" + string(jsonSettings) + "}")
}

return buffer.Bytes(), nil
}

//metadata support
type DefinitionConfig struct {
Name string `json:"name"`
Expand Down
55 changes: 43 additions & 12 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ var testRuleSessionDescriptorJson = `{
"evaluator": "checkForBob"
}
],
"actionFunction": "checkForBobAction"
"actionService": {
"service": "checkForBobService"
}
},
{
"name": "n1.name == Bob && n1.name == n2.name",
Expand All @@ -39,8 +41,24 @@ var testRuleSessionDescriptorJson = `{
"evaluator": "checkSameNamesCondition"
}
],
"actionFunction": "checkSameNamesAction"
"actionService": {
"service": "checkSameNamesService"
}
}
],
"services": [
{
"name": "checkForBobService",
"description": "service checkForBobService",
"type": "function",
"function": "checkForBobAction"
},
{
"name": "checkSameNamesService",
"description": "service checkSameNamesService",
"type": "function",
"function": "checkSameNamesAction"
}
]
}
`
Expand All @@ -59,35 +77,32 @@ func TestDeserialize(t *testing.T) {
assert.Nil(t, err)
assert.NotNil(t, ruleSessionDescriptor.Rules)
assert.Equal(t, 2, len(ruleSessionDescriptor.Rules))
assert.Equal(t, 2, len(ruleSessionDescriptor.Services))

// rule-0
r1Cfg := ruleSessionDescriptor.Rules[0]

assert.Equal(t, "n1.name == Bob", r1Cfg.Name)
assert.NotNil(t, r1Cfg.Conditions)
assert.Equal(t, 1, len(r1Cfg.Conditions))

sf1 := reflect.ValueOf(checkForBobAction)
sf2 := reflect.ValueOf(r1Cfg.ActionFunc)
assert.Equal(t, sf1.Pointer(), sf2.Pointer())
assert.Equal(t, "checkForBobService", r1Cfg.ActionService.Service)

r1c1Cfg := r1Cfg.Conditions[0]
assert.Equal(t, "c1", r1c1Cfg.Name)
assert.NotNil(t, r1c1Cfg.Identifiers)
assert.Equal(t, 1, len(r1c1Cfg.Identifiers))

sf1 = reflect.ValueOf(checkForBob)
sf2 = reflect.ValueOf(r1c1Cfg.Evaluator)
sf1 := reflect.ValueOf(checkForBob)
sf2 := reflect.ValueOf(r1c1Cfg.Evaluator)
assert.Equal(t, sf1.Pointer(), sf2.Pointer())

// rule-1
r2Cfg := ruleSessionDescriptor.Rules[1]

assert.Equal(t, "n1.name == Bob && n1.name == n2.name", r2Cfg.Name)
assert.NotNil(t, r2Cfg.Conditions)
assert.Equal(t, 2, len(r2Cfg.Conditions))

sf1 = reflect.ValueOf(checkSameNamesAction)
sf2 = reflect.ValueOf(r2Cfg.ActionFunc)
assert.Equal(t, sf1.Pointer(), sf2.Pointer())
assert.Equal(t, "checkSameNamesService", r2Cfg.ActionService.Service)

r2c1Cfg := r2Cfg.Conditions[0]
assert.Equal(t, "c1", r2c1Cfg.Name)
Expand All @@ -106,6 +121,22 @@ func TestDeserialize(t *testing.T) {
sf1 = reflect.ValueOf(checkSameNamesCondition)
sf2 = reflect.ValueOf(r2c2Cfg.Evaluator)
assert.Equal(t, sf1.Pointer(), sf2.Pointer())

// service-0
s1Cfg := ruleSessionDescriptor.Services[0]
assert.Equal(t, "checkForBobService", s1Cfg.Name)
assert.Equal(t, "service checkForBobService", s1Cfg.Description)
sf1 = reflect.ValueOf(checkForBobAction)
sf2 = reflect.ValueOf(s1Cfg.Function)
assert.Equal(t, sf1.Pointer(), sf2.Pointer())

// service-1
s2Cfg := ruleSessionDescriptor.Services[1]
assert.Equal(t, "checkSameNamesService", s2Cfg.Name)
assert.Equal(t, "service checkSameNamesService", s2Cfg.Description)
sf1 = reflect.ValueOf(checkSameNamesAction)
sf2 = reflect.ValueOf(s2Cfg.Function)
assert.Equal(t, sf1.Pointer(), sf2.Pointer())
}

// TEST FUNCTIONS
Expand Down
3 changes: 2 additions & 1 deletion config/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ func (m *ResourceManager) GetResource(id string) interface{} {
func (m *ResourceManager) GetRuleSessionDescriptor(uri string) (*RuleSessionDescriptor, error) {

if strings.HasPrefix(uri, uriSchemeRes) {
return &RuleSessionDescriptor{m.configs[uri[len(uriSchemeRes):]].Rules}, nil
id := uri[len(uriSchemeRes):]
return &RuleSessionDescriptor{Rules: m.configs[id].Rules, Services: m.configs[id].Services}, nil
}

return nil, errors.New("cannot find RuleSession: " + uri)
Expand Down
Loading