Skip to content

Commit

Permalink
Fixes #6 by adding namespace support
Browse files Browse the repository at this point in the history
Signed-off-by: Joris De Winne <[email protected]>
  • Loading branch information
jdewinne authored and alexellis committed Feb 9, 2021
1 parent 5806411 commit 6a139a0
Show file tree
Hide file tree
Showing 94 changed files with 15,220 additions and 291 deletions.
12 changes: 5 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
module github.com/openfaas/cron-connector

go 1.13
go 1.15

require (
github.com/ewilde/faas-federation v0.0.0-20200206161705-23d8f6d639f1 // indirect
github.com/openfaas-incubator/connector-sdk v0.0.0-20190125151851-d722c9f72ad0
github.com/openfaas/faas v0.0.0-20190104165101-a65df4795bc6
github.com/openfaas/faas-provider v0.0.0-20181216160432-220324e98f5d // indirect
github.com/pkg/errors v0.8.1
github.com/openfaas-incubator/connector-sdk v0.0.0-20200902074656-7f648543d4aa
github.com/openfaas/faas-cli v0.0.0-20201119120128-c9d284d0c5bd
github.com/openfaas/faas-provider v0.15.1
github.com/pkg/errors v0.9.1
github.com/robfig/cron/v3 v3.0.0
gopkg.in/robfig/cron.v2 v2.0.0-20150107220207-be2e0b0deed5 // indirect
)
180 changes: 169 additions & 11 deletions go.sum

Large diffs are not rendered by default.

93 changes: 59 additions & 34 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,36 @@
package main

import (
"errors"
"context"
"fmt"
"log"
"net/http"
"os"
"time"

"github.com/openfaas-incubator/connector-sdk/types"
cfunction "github.com/openfaas/cron-connector/types"
"github.com/openfaas/cron-connector/version"
"github.com/openfaas/faas/gateway/requests"
sdk "github.com/openfaas/faas-cli/proxy"
ptypes "github.com/openfaas/faas-provider/types"
)

func main() {
creds := types.GetCredentials()
config, err := getControllerConfig()

if err != nil {
panic(err)
}

sha, ver := version.GetReleaseInfo()
log.Printf("Version: %s\tCommit: %s", sha, ver)

controller := types.NewController(creds, config)
invoker := types.NewInvoker(config.GatewayURL, types.MakeClient(config.UpstreamTimeout), config.PrintResponse)
cronScheduler := cfunction.NewScheduler()
topic := "cron-function"
interval := time.Second * 10

cronScheduler.Start()
err = startFunctionProbe(interval, topic, controller, cronScheduler, controller.Invoker)
err = startFunctionProbe(interval, topic, config, cronScheduler, invoker)

if err != nil {
panic(err)
Expand All @@ -44,7 +44,7 @@ func getControllerConfig() (*types.ControllerConfig, error) {
gURL, ok := os.LookupEnv("gateway_url")

if !ok {
return nil, errors.New("Gateway URL not set")
return nil, fmt.Errorf("Gateway URL not set")
}

return &types.ControllerConfig{
Expand All @@ -54,54 +54,79 @@ func getControllerConfig() (*types.ControllerConfig, error) {
}, nil
}

func startFunctionProbe(interval time.Duration, topic string, c *types.Controller, cronScheduler *cfunction.Scheduler, invoker *types.Invoker) error {
//BasicAuth basic authentication for the the gateway
type BasicAuth struct {
Username string
Password string
}

//Set set Authorization header on request
func (auth *BasicAuth) Set(req *http.Request) error {
req.SetBasicAuth(auth.Username, auth.Password)
return nil
}

func startFunctionProbe(interval time.Duration, topic string, c *types.ControllerConfig, cronScheduler *cfunction.Scheduler, invoker *types.Invoker) error {
runningFuncs := make(cfunction.ScheduledFunctions, 0)
lookupBuilder := cfunction.FunctionLookupBuilder{
GatewayURL: c.Config.GatewayURL,
Client: types.MakeClient(c.Config.UpstreamTimeout),
Credentials: c.Credentials,
timeout := 3 * time.Second
auth := &BasicAuth{}
auth.Username = types.GetCredentials().User
auth.Password = types.GetCredentials().Password

sdkClient, err := sdk.NewClient(auth, c.GatewayURL, nil, &timeout)
if err != nil {
panic(err)
}

ctx := context.Background()

ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
<-ticker.C
functions, err := lookupBuilder.GetFunctions()

namespaces, err := sdkClient.ListNamespaces(ctx)
if err != nil {
return errors.New(fmt.Sprint("Couldn't fetch Functions due to: ", err))
return fmt.Errorf("Couldn't fetch Namespaces due to: %s", err)
}

newCronFunctions := RequestsToCronFunctions(functions, topic)
addFuncs, deleteFuncs := GetNewAndDeleteFuncs(newCronFunctions, runningFuncs)
for _, namespace := range namespaces {
functions, err := sdkClient.ListFunctions(ctx, namespace)
if err != nil {
return fmt.Errorf("Couldn't fetch Functions due to: %s", err)
}

for _, function := range deleteFuncs {
cronScheduler.Remove(function)
log.Print("deleted function ", function.Function.Name)
}
newCronFunctions := RequestsToCronFunctions(functions, namespace, topic)
addFuncs, deleteFuncs := GetNewAndDeleteFuncs(newCronFunctions, runningFuncs, namespace)

for _, function := range deleteFuncs {
cronScheduler.Remove(function)
log.Print("deleted function ", function.Function.Name, " in ", function.Function.Namespace)
}

newScheduledFuncs := make(cfunction.ScheduledFunctions, 0)
newScheduledFuncs := make(cfunction.ScheduledFunctions, 0)

for _, function := range addFuncs {
f, err := cronScheduler.AddCronFunction(function, invoker)
if err != nil {
log.Fatal("could not add function ", function.Name)
for _, function := range addFuncs {
f, err := cronScheduler.AddCronFunction(function, invoker)
if err != nil {
log.Fatal("could not add function ", function.Name, " in ", function.Namespace)
}

newScheduledFuncs = append(newScheduledFuncs, f)
log.Print("added function ", function.Name, " in ", function.Namespace)
}

newScheduledFuncs = append(newScheduledFuncs, f)
log.Print("added function ", function.Name)
runningFuncs = UpdateScheduledFunctions(runningFuncs, newScheduledFuncs, deleteFuncs)
}

runningFuncs = UpdateScheduledFunctions(runningFuncs, newScheduledFuncs, deleteFuncs)
}
}

// RequestsToCronFunctions converts an array of requests.Function object to CronFunction, ignoring those that cannot be converted
func RequestsToCronFunctions(functions []requests.Function, topic string) cfunction.CronFunctions {
// RequestsToCronFunctions converts an array of types.FunctionStatus object to CronFunction, ignoring those that cannot be converted
func RequestsToCronFunctions(functions []ptypes.FunctionStatus, namespace string, topic string) cfunction.CronFunctions {
newCronFuncs := make(cfunction.CronFunctions, 0)
for _, function := range functions {
cF, err := cfunction.ToCronFunction(function, topic)
cF, err := cfunction.ToCronFunction(function, namespace, topic)
if err != nil {
continue
}
Expand All @@ -111,7 +136,7 @@ func RequestsToCronFunctions(functions []requests.Function, topic string) cfunct
}

// GetNewAndDeleteFuncs takes new functions and running cron functions and returns functions that need to be added and that need to be deleted
func GetNewAndDeleteFuncs(newFuncs cfunction.CronFunctions, oldFuncs cfunction.ScheduledFunctions) (cfunction.CronFunctions, cfunction.ScheduledFunctions) {
func GetNewAndDeleteFuncs(newFuncs cfunction.CronFunctions, oldFuncs cfunction.ScheduledFunctions, namespace string) (cfunction.CronFunctions, cfunction.ScheduledFunctions) {
addFuncs := make(cfunction.CronFunctions, 0)
deleteFuncs := make(cfunction.ScheduledFunctions, 0)

Expand All @@ -122,7 +147,7 @@ func GetNewAndDeleteFuncs(newFuncs cfunction.CronFunctions, oldFuncs cfunction.S
}

for _, function := range oldFuncs {
if !newFuncs.Contains(&function.Function) {
if !newFuncs.Contains(&function.Function) && function.Function.Namespace == namespace {
deleteFuncs = append(deleteFuncs, function)
}
}
Expand Down
50 changes: 40 additions & 10 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,22 @@ import (
"testing"

cfunction "github.com/openfaas/cron-connector/types"
"github.com/openfaas/faas/gateway/requests"
ptypes "github.com/openfaas/faas-provider/types"
)

func TestGetNewAndDeleteFuncs(t *testing.T) {
newCronFunctions := make(cfunction.CronFunctions, 3)
defaultReq := requests.Function{}
newCronFunctions[0] = cfunction.CronFunction{FuncData: defaultReq, Name: "test_function_unchanged", Schedule: "* * * * *"}
newCronFunctions[1] = cfunction.CronFunction{FuncData: defaultReq, Name: "test_function_to_add", Schedule: "* * * * *"}
newCronFunctions[1] = cfunction.CronFunction{FuncData: defaultReq, Name: "test_function_to_update", Schedule: "*/5 * * * *"}
defaultReq := ptypes.FunctionStatus{}
newCronFunctions[0] = cfunction.CronFunction{FuncData: defaultReq, Name: "test_function_unchanged", Namespace: "openfaas-fn", Schedule: "* * * * *"}
newCronFunctions[1] = cfunction.CronFunction{FuncData: defaultReq, Name: "test_function_to_add", Namespace: "openfaas-fn", Schedule: "* * * * *"}
newCronFunctions[2] = cfunction.CronFunction{FuncData: defaultReq, Name: "test_function_to_update", Namespace: "openfaas-fn", Schedule: "*/5 * * * *"}

oldFuncs := make(cfunction.ScheduledFunctions, 3)
oldFuncs[0] = cfunction.ScheduledFunction{Function: cfunction.CronFunction{FuncData: defaultReq, Name: "test_function_unchanged", Schedule: "* * * * *"}, ID: 0}
oldFuncs[1] = cfunction.ScheduledFunction{Function: cfunction.CronFunction{FuncData: defaultReq, Name: "test_function_to_delete", Schedule: "* * * * *"}, ID: 0}
oldFuncs[2] = cfunction.ScheduledFunction{Function: cfunction.CronFunction{FuncData: defaultReq, Name: "test_function_to_update", Schedule: "* * * * *"}, ID: 0}
oldFuncs[0] = cfunction.ScheduledFunction{Function: cfunction.CronFunction{FuncData: defaultReq, Name: "test_function_unchanged", Namespace: "openfaas-fn", Schedule: "* * * * *"}, ID: 0}
oldFuncs[1] = cfunction.ScheduledFunction{Function: cfunction.CronFunction{FuncData: defaultReq, Name: "test_function_to_delete", Namespace: "openfaas-fn", Schedule: "* * * * *"}, ID: 0}
oldFuncs[2] = cfunction.ScheduledFunction{Function: cfunction.CronFunction{FuncData: defaultReq, Name: "test_function_to_update", Namespace: "openfaas-fn", Schedule: "* * * * *"}, ID: 0}

addFuncs, deleteFuncs := GetNewAndDeleteFuncs(newCronFunctions, oldFuncs)
addFuncs, deleteFuncs := GetNewAndDeleteFuncs(newCronFunctions, oldFuncs, "openfaas-fn")
if !deleteFuncs.Contains(&oldFuncs[1].Function) {
t.Error("function was not deleted")
}
Expand All @@ -30,7 +30,37 @@ func TestGetNewAndDeleteFuncs(t *testing.T) {
t.Error("function was not added")
}

if !deleteFuncs.Contains(&oldFuncs[2].Function) && !addFuncs.Contains(&newCronFunctions[1]) {
if !deleteFuncs.Contains(&oldFuncs[2].Function) && !addFuncs.Contains(&newCronFunctions[2]) {
t.Error("function will not be updated")
}

if addFuncs.Contains(&newCronFunctions[0]) || deleteFuncs.Contains(&newCronFunctions[0]) {
t.Error("function should be left as it is")
}
}

func TestNamespaceFuncs(t *testing.T) {
newCronFunctions := make(cfunction.CronFunctions, 3)
defaultReq := ptypes.FunctionStatus{}
newCronFunctions[0] = cfunction.CronFunction{FuncData: defaultReq, Name: "test_function_one", Namespace: "openfaas-fn", Schedule: "* * * * *"}
newCronFunctions[1] = cfunction.CronFunction{FuncData: defaultReq, Name: "test_function_one", Namespace: "custom", Schedule: "* * * * *"}
newCronFunctions[2] = cfunction.CronFunction{FuncData: defaultReq, Name: "test_function_to_update", Namespace: "openfaas-fn", Schedule: "*/5 * * * *"}

oldFuncs := make(cfunction.ScheduledFunctions, 3)
oldFuncs[0] = cfunction.ScheduledFunction{Function: cfunction.CronFunction{FuncData: defaultReq, Name: "test_function_one", Namespace: "openfaas-fn", Schedule: "* * * * *"}, ID: 0}
oldFuncs[1] = cfunction.ScheduledFunction{Function: cfunction.CronFunction{FuncData: defaultReq, Name: "test_function_to_delete", Namespace: "openfaas-fn", Schedule: "* * * * *"}, ID: 0}
oldFuncs[2] = cfunction.ScheduledFunction{Function: cfunction.CronFunction{FuncData: defaultReq, Name: "test_function_to_update", Namespace: "openfaas-fn", Schedule: "* * * * *"}, ID: 0}

addFuncs, deleteFuncs := GetNewAndDeleteFuncs(newCronFunctions, oldFuncs, "openfaas-fn")
if !deleteFuncs.Contains(&oldFuncs[1].Function) {
t.Error("function was not deleted")
}

if !addFuncs.Contains(&newCronFunctions[1]) {
t.Error("function was not added")
}

if !deleteFuncs.Contains(&oldFuncs[2].Function) && !addFuncs.Contains(&newCronFunctions[2]) {
t.Error("function will not be updated")
}

Expand Down
22 changes: 12 additions & 10 deletions types/cron_function.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,16 @@ import (
"net/http"

"github.com/openfaas-incubator/connector-sdk/types"
"github.com/openfaas/faas/gateway/requests"
ptypes "github.com/openfaas/faas-provider/types"
"github.com/pkg/errors"
)

// CronFunction depicts an OpenFaaS function which is invoked by cron
type CronFunction struct {
FuncData requests.Function
Name string
Schedule string
FuncData ptypes.FunctionStatus
Name string
Namespace string
Schedule string
}

// CronFunctions a list of CronFunction
Expand All @@ -30,7 +31,7 @@ func (c *CronFunctions) Contains(cF *CronFunction) bool {

for _, f := range *c {

if f.Name == cF.Name && f.Schedule == cF.Schedule {
if f.Name == cF.Name && f.Namespace == cF.Namespace && f.Schedule == cF.Schedule {
return true
}

Expand All @@ -39,8 +40,8 @@ func (c *CronFunctions) Contains(cF *CronFunction) bool {
return false
}

// ToCronFunction converts a requests.Function object to the CronFunction and returns error if it is not possible
func ToCronFunction(f requests.Function, topic string) (CronFunction, error) {
// ToCronFunction converts a ptypes.FunctionStatus object to the CronFunction and returns error if it is not possible
func ToCronFunction(f ptypes.FunctionStatus, namespace string, topic string) (CronFunction, error) {
if f.Annotations == nil {
return CronFunction{}, errors.New(fmt.Sprint(f.Name, " has no annotations."))
}
Expand All @@ -58,13 +59,14 @@ func ToCronFunction(f requests.Function, topic string) (CronFunction, error) {
var c CronFunction
c.FuncData = f
c.Name = f.Name
c.Namespace = namespace
c.Schedule = fSchedule
return c, nil
}

// InvokeFunction Invokes the cron function
func (c CronFunction) InvokeFunction(i *types.Invoker) (*[]byte, error) {
gwURL := fmt.Sprintf("%s/function/%s", i.GatewayURL, c.Name)
gwURL := fmt.Sprintf("%s/function/%s.%s", i.GatewayURL, c.Name, c.Namespace)
reader := bytes.NewReader(make([]byte, 0))
httpReq, _ := http.NewRequest(http.MethodPost, gwURL, reader)

Expand All @@ -77,7 +79,7 @@ func (c CronFunction) InvokeFunction(i *types.Invoker) (*[]byte, error) {

if doErr != nil {
i.Responses <- types.InvokerResponse{
Error: errors.Wrap(doErr, fmt.Sprint("unable to invoke ", c.Name)),
Error: errors.Wrap(doErr, fmt.Sprint("unable to invoke ", c.Name, " in ", c.Namespace)),
}
return nil, doErr
}
Expand All @@ -89,7 +91,7 @@ func (c CronFunction) InvokeFunction(i *types.Invoker) (*[]byte, error) {
if readErr != nil {
log.Printf("Error reading body")
i.Responses <- types.InvokerResponse{
Error: errors.Wrap(readErr, fmt.Sprint("unable to invoke ", c.Name)),
Error: errors.Wrap(readErr, fmt.Sprint("unable to invoke ", c.Name, " in ", c.Namespace)),
}
return nil, doErr
}
Expand Down
47 changes: 0 additions & 47 deletions types/lookup_builder.go

This file was deleted.

Loading

0 comments on commit 6a139a0

Please sign in to comment.