Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

authentication, merge features/pubnub #5

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@ go-reverse-proxy

goinx
web/
script/*
schema/*
*.sh
www/
postman
test.sh
config.yml
googlekey.json
goexec
.env
.env
text.log
149 changes: 104 additions & 45 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

execute "github.com/asyrafduyshart/go-exec-engine/pkg/execute"
"github.com/asyrafduyshart/go-exec-engine/pkg/jwt"
pb "github.com/asyrafduyshart/go-exec-engine/pkg/pubnub"
"github.com/asyrafduyshart/go-exec-engine/pkg/pubsub"
"github.com/go-playground/validator"

Expand All @@ -26,10 +27,11 @@ import (

// Config ...
type Config struct {
AccessLog string `yaml:"access_log"`
LogLevel string `yaml:"log_level"`
JwksUrl string `yaml:"jwks_url"`
Command []execute.Command `yaml:"commands,flow"`
AccessLog string `yaml:"access_log"`
LogLevel string `yaml:"log_level"`
PubNubServer bool `yaml:"pubnub_server"`
JwksUrl string `yaml:"jwks_url"`
Command []execute.Command `yaml:"commands,flow"`
}

const (
Expand Down Expand Up @@ -158,6 +160,58 @@ func shutdownHook() {
}()
}

func handleExecute(conf *Config, comm execute.Command) func(c *fiber.Ctx) error {
return func(c *fiber.Ctx) error {

if comm.Validate {
if comm.SchemaType == "avro" {
if err := execute.ValidateAvro(comm, string(c.Body())); err != nil {
log.Error("Trigger: \"%v\" will not be executed", comm.Name)
return c.Status(400).JSON(map[string]string{
"type": "ERROR",
"message": err.Error(),
})
}
} else if comm.SchemaType == "json" {
if err := execute.ValidateJSON(comm, string(c.Body())); err != nil {
log.Error("Trigger: \"%v\" will not be executed", comm.Name)
return c.Status(400).JSON(map[string]string{
"type": "ERROR",
"message": err.Error(),
})
}
}
}

if comm.Authentication {
auth := string(c.Request().Header.Peek("Authorization"))
jwksUrl := conf.JwksUrl
// Validate Token
token, err := jwt.ValidateAuth(auth, jwksUrl)
if err != nil {
return c.Status(401).JSON(map[string]string{
"type": "ERROR",
"message": err.Error(),
})
}
// Validate Claim
claims := jwt.ValidateClaimValue(token, comm.ValidateClaim)
if !claims {
return c.Status(403).JSON(map[string]string{
"type": "ERROR",
"message": "unauthorized.",
})
}
}
go execute.Execute(comm, string(c.Body()))

return c.JSON(map[string]string{
"type": "SUCCESS",
"message": "Task:" + comm.Name + " has been executed",
})
}
}

func main() {
godotenv.Load()
app := fiber.New()
Expand All @@ -181,12 +235,15 @@ func main() {
log.LogLevelNum = 4
}

// if err != nil {
// log.Error("Error pubnub %v", err)
// }

// log.Debug("Config Content: %v", conf)
count := 0
exitChan := make(chan int)
for _, command := range conf.Command {
var validate *validator.Validate
validate = validator.New()
validate := validator.New()
err := validate.Struct(command)
if err != nil {
if _, ok := err.(*validator.InvalidValidationError); ok {
Expand All @@ -199,10 +256,11 @@ func main() {
count++
} else {
log.Info("Trigger(%v) protocol(%v): \"%v\" is listening to target: %v", command.Type, command.Protocol, command.Name, command.Target)
if command.Protocol == "pubsub" {
switch command.Protocol {
case "pubsub":
go func(c execute.Command) {
err := pubsub.PullMsgs(ctx, "lido-white-label", c.Target, func(data string) {
execute.Execute(c, data)
err := pubsub.PullMsgs(ctx, os.Getenv("PROJECT_ID"), c.Target, func(data string) {
go execute.Execute(c, data)
})
if err != nil {
log.Error("Error in topic %v", c.Target)
Expand All @@ -213,42 +271,8 @@ func main() {
exitChan <- 0
}(command)
count++
} else if command.Protocol == "http" {
app.Post(command.Target, func(c *fiber.Ctx) error {
if command.Authentication {
auth := string(c.Request().Header.Peek("Authorization"))
jwksUrl := conf.JwksUrl
// Validate Token
token, err := jwt.ValidateAuth(auth, jwksUrl)
if err != nil {
return c.Status(401).JSON(map[string]string{
"type": "ERROR",
"message": err.Error(),
})
}
// Validate Claim
claims := jwt.ValidateClaimValue(token, command.ValidateClaim)
if !claims {
return c.Status(403).JSON(map[string]string{
"type": "ERROR",
"message": "unauthorized.",
})
}
fmt.Println("claim status", claims)
}

err = execute.Execute(command, string(c.Body()))
if err != nil {
return c.Status(500).JSON(map[string]string{
"type": "ERROR",
"message": err.Error(),
})
}
return c.JSON(map[string]string{
"type": "SUCCESS",
"message": "Task:" + command.Name + " has been executed",
})
})
case "http":
app.Post(command.Target, handleExecute(conf, command))
}
}
}
Expand All @@ -257,6 +281,41 @@ func main() {
return c.SendString("Execute Engine is Working!")
})

if conf.PubNubServer {
pubsubApp := fiber.New()
pn := pb.Init()
type PubNubReq struct {
Message string `json:"message" xml:"pass" form:"pass"`
}
pubsubApp.Post("/pubnub/publish/:cname", func(c *fiber.Ctx) error {
pnr := new(PubNubReq)
cname := c.Params("cname")
if err := c.BodyParser(pnr); err != nil {
fmt.Println("error = ", err)
return c.Status(400).JSON(map[string]string{
"type": "ERROR",
"message": err.Error(),
})
}

_, _, err := pn.PushMessage(cname, pnr.Message)
if err != nil {
fmt.Println("error = ", err)
return c.Status(400).JSON(map[string]string{
"type": "ERROR",
"message": err.Error(),
})
}

return c.JSON(map[string]string{
"type": "SUCCESS",
"channel": cname,
"message": pnr.Message,
})
})
go pubsubApp.Listen(fmt.Sprintf("0.0.0.0:%s", "7001"))
}

// os.Setenv("PORT","3000")
port := os.Getenv("PORT")
fmt.Println("Listening to port:", port)
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,18 @@ go 1.13

require (
cloud.google.com/go/pubsub v1.8.1
github.com/brianolson/cbor_go v1.0.0 // indirect
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/go-playground/universal-translator v0.17.0 // indirect
github.com/go-playground/validator v9.31.0+incompatible
github.com/gofiber/fiber/v2 v2.5.0
github.com/golang/snappy v0.0.2 // indirect
github.com/google/uuid v1.2.0 // indirect
github.com/joho/godotenv v1.3.0
github.com/leodido/go-urn v1.2.0 // indirect
github.com/lestrrat-go/jwx v1.2.14
github.com/linkedin/goavro v2.1.0+incompatible
github.com/pubnub/go v4.10.0+incompatible
github.com/xeipuuv/gojsonschema v1.2.0
gopkg.in/go-playground/assert.v1 v1.2.1 // indirect
gopkg.in/linkedin/goavro.v1 v1.0.5 // indirect
Expand Down
12 changes: 8 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/andybalholm/brotli v1.0.0 h1:7UCwP93aiSfvWpapti8g88vVVGp2qqtGyePsSuDafo4=
github.com/andybalholm/brotli v1.0.0/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y=
github.com/brianolson/cbor_go v1.0.0 h1:CurpJr4z5P94x/CtFgM9tf9QEEfUBJSRxR/4jbftw0E=
github.com/brianolson/cbor_go v1.0.0/go.mod h1:oGF4+yGIBUbkxYYGKSJRGIZ4Z91crezxGZAnnslEtT0=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
Expand Down Expand Up @@ -119,6 +121,8 @@ github.com/google/pprof v0.0.0-20200430221834-fc25d7d30c6d/go.mod h1:ZgVRPoUq/hf
github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/pprof v0.0.0-20200905233945-acf8798be1f7/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5 h1:sjZBwGj9Jlw33ImPtvFviGYvseOtDM7hkSKB7+Tv3SM=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
Expand Down Expand Up @@ -159,13 +163,15 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/pubnub/go v4.10.0+incompatible h1:ZFkbVwcAGTlGEIAdCuXL4nyK7YRYtMZ+gYhGvlGQUpw=
github.com/pubnub/go v4.10.0+incompatible/go.mod h1:lTAiOs5xrgym8YNzTSleYPEJPDvuLr5wDRkGOYqqJqU=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
Expand Down Expand Up @@ -254,7 +260,6 @@ golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2/go.mod h1:qpuaurCH72eLCgpAm/
golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20200904194848-62affa334b73 h1:MXfv8rhZWmFeqX3GNZRsd6vOLoaCHjYEX3qkRo3YBUA=
golang.org/x/net v0.0.0-20200904194848-62affa334b73/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201016165138-7b1cca2348c0 h1:5kGOVHlq0euqwzgTC9Vu15p6fV1Wi0ArVi8da2urnVg=
golang.org/x/net v0.0.0-20201016165138-7b1cca2348c0/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
Expand Down Expand Up @@ -301,7 +306,6 @@ golang.org/x/sys v0.0.0-20200515095857-1151b9dac4a9/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200523222454-059865788121/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200803210538-64077c9b5642/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200828194041-157a740278f4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f h1:Fqb3ao1hUmOR3GkUOg/Y+BadLwykBIzs5q8Ez2SbHyc=
golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201210223839-7e3030f88018 h1:XKi8B/gRBuTZN1vU9gFsLMm6zVz5FSCDzm8JYACnjy8=
Expand Down Expand Up @@ -453,7 +457,6 @@ google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpAD
google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGjtUeSXeh4=
google.golang.org/protobuf v1.25.0 h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4c=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand All @@ -465,6 +468,7 @@ gopkg.in/linkedin/goavro.v1 v1.0.5/go.mod h1:Aw5GdAbizjOEl0kAMHV9iHmA8reZzW/OKuJ
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
Expand Down
20 changes: 11 additions & 9 deletions pkg/execute/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ func Execute(command Command, data string) error {

if command.Validate {
if command.SchemaType == "avro" {
if err := validateAvro(command, data); err != nil {
if err := ValidateAvro(command, data); err != nil {
log.Error("Trigger: \"%v\" will not be executed", command.Name)
return err
}
} else if command.SchemaType == "json" {
if err := validateJSON(command, data); err != nil {
if err := ValidateJSON(command, data); err != nil {
log.Error("Trigger: \"%v\" will not be executed", command.Name)
return err
}
Expand All @@ -48,25 +48,27 @@ func Execute(command Command, data string) error {
if command.Type == "exec" {
s := strconv.Quote(string(data))
exec := []string{"bash", "-c", "echo " + s + " |" + " " + command.Exec}
log.Debug("Received Data %v", data)
out, err := cmdExec(exec...)
log.Info("Output: (%v) \n%v", command.Name, out)
if err != nil {
log.Error("Error %v:", err)
return err
}
log.Debug("Received Data %v", data)
log.Info("Output: (%v) \n%v", command.Name, out)
} else if command.Type == "bash" {
log.Debug("Received Data %v", data)
out, err := scriptExec(command.Exec, data)
log.Info("Output: (%v) \n%v", command.Name, out)
if err != nil {
log.Error("Error %v:", err)
return err
}
log.Debug("Received Data %v", data)
log.Info("Output: (%v) \n%v", command.Name, out)
}

return nil
}

func validateAvro(command Command, data string) error {
func ValidateAvro(command Command, data string) error {
content, err := ioutil.ReadFile(command.Schema)
if err != nil {
log.Error("Error readfile: %v", err)
Expand All @@ -88,7 +90,7 @@ func validateAvro(command Command, data string) error {
return nil
}

func validateJSON(command Command, data string) error {
func ValidateJSON(command Command, data string) error {
content, err := ioutil.ReadFile(command.Schema)
if err != nil {
log.Error("Error readfile: %v", err)
Expand Down Expand Up @@ -162,7 +164,7 @@ func scriptExec(scriptName string, data string) (string, error) {
out, err := cmd.Output()
if err != nil {
fmt.Println("the err", err)
return "", err
return string(out), err
}
return string(out), nil
}
Loading