Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrency #1

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions agent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package main

import (
rpc "github.com/hashicorp/serf/client"
)

type Agent struct {
Directives *Directive
RPCConfig *rpc.Config
DeadNode chan bool
}

func NewAgent(directives *Directive) Agent {
// create RPC config
rpc_config := rpc.Config{
Addr: directives.Rpc_addr,
AuthKey: directives.Rpc_auth,
Timeout: directives.Rpc_timeout,
}

deadNode := make(chan bool)
for i := 0; i < directives.Workers; i++ {
NewWorker(directives, &rpc_config, deadNode)
}
return Agent{
Directives: directives,
RPCConfig: &rpc_config,
DeadNode: deadNode,
}
}

func (a *Agent) run() {
for {
_ = <-a.DeadNode
NewWorker(a.Directives, a.RPCConfig, a.DeadNode)
}
}
13 changes: 9 additions & 4 deletions directive.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,29 @@ type Directive struct {
Rpc_addr string `json:"rpc-addr"`
Rpc_auth string `json:"rpc-auth"`
Rpc_timeout time.Duration `json:"rpc-timeout"`
Workers int `json:"rpc-workers"`
Templates []Template
}

func ParseDirectives(config_file string) (Directive, error) {
func ParseDirectives(config_file string) (*Directive, error) {
config_json, err := ioutil.ReadFile(config_file)
if err != nil {
panic(err)
return nil, err
}
var directive Directive
err = json.Unmarshal(config_json, &directive)
if err != nil {
panic(err)
return nil, err
}
// default RPC workers
if directive.Workers == 0 {
directive.Workers = 1
}
// default RPC address
if directive.Rpc_addr == "" {
directive.Rpc_addr = "127.0.0.1:7373"
}
// timeout in millisecond. time.Duration use nanosecond by default
directive.Rpc_timeout = directive.Rpc_timeout * 1000000
return directive, nil
return &directive, nil
}
34 changes: 27 additions & 7 deletions directive_test.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,39 @@
package main

import (
//"testing"
"os"
"fmt"
)

func ExampleParseDirectives() {
var d Directive
var d *Directive
var e error
// example 1: no config
// example 1: bad path
d, e = ParseDirectives("test/unknown.json")
if e == nil || d != nil {
panic(e)
}

// example 2: bad json
if _, e = os.Create("test/empty"); e != nil {
panic(e)
}
d, e = ParseDirectives("test/empty")
if e == nil || d != nil {
panic(e)
}
if e = os.Remove("test/empty"); e != nil {
panic(e)
}

// example 3: no config
d, e = ParseDirectives("test/config_1.json")
if e != nil {
panic(e)
}
fmt.Println(d)
fmt.Println(*d)

// example 2: full config
// example 4: full config
d, e = ParseDirectives("test/config_2.json")
if e != nil {
panic(e)
Expand All @@ -30,11 +48,12 @@ func ExampleParseDirectives() {
fmt.Println(d.Rpc_addr)
fmt.Println(d.Rpc_auth)
fmt.Println(d.Rpc_timeout)
fmt.Println(d.Workers)
fmt.Println(d.Templates[0].Src)
fmt.Println(d.Templates[0].Dest)
fmt.Println(d.Templates[0].Cmd)
// Output:
// { map[] 127.0.0.1:7373 0 []}
// { map[] 127.0.0.1:7373 0 1 []}
// serf
// svr
// web
Expand All @@ -45,7 +64,8 @@ func ExampleParseDirectives() {
// 127.0.0.1:7373
// rpcauthtoken
// 500ms
// 5
// /path/to/template.tpl
// /path/to/result.file
// service dummy restart
// ls
}
89 changes: 18 additions & 71 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,100 +1,47 @@
package main

import (
//"encoding/json"
"errors"
//"fmt"
"os"
"os/exec"
//"strconv"
rpc "github.com/hashicorp/serf/client"
"flag"
"log"
"strings"
//"text/template"
)

// exit codes
/*
const (
OK = iota
SYNTAX_ERROR = iota
CMD_FAILED = iota
TEMPLATE_PARSE_FAILED = iota
"os"
)
*/

const (
DEBUG = false
DEBUG_FILE = "/tmp/serf_template.log"
DEBUG = false
DEBUG_FILE = "/var/log/serf_template.log"
CONFIG_FILE = "/etc/serf_template/config.json"
)

func main() {
if DEBUG {
log_file, _ := os.Create(DEBUG_FILE)
debug := flag.Bool("debug", DEBUG, "enable debug")
debugFile := flag.String("log", DEBUG_FILE, "log file for debuging")
configFile := flag.String("config", CONFIG_FILE, "path to the config file")
flag.Parse()

if *debug {
log_file, _ := os.Create(*debugFile)
defer log_file.Close()
log.SetOutput(log_file)
log.Println("Serf Template starting")
}

if len(os.Args) != 2 {
if *configFile == "" {
err := errors.New("No config file")
panic(err)
}

// parse directive from config file
directives, err := ParseDirectives(os.Args[1])
if err != nil {
panic(err)
}

if DEBUG {
log.Printf("directives: %s", directives)
}

// create RPC config
rpc_config := rpc.Config{
Addr: directives.Rpc_addr,
AuthKey: directives.Rpc_auth,
Timeout: directives.Rpc_timeout,
}
// create connection to the RPC interface
rpc_client, err := rpc.ClientFromConfig(&rpc_config)
directives, err := ParseDirectives(*configFile)
if err != nil {
panic(err)
}
defer rpc_client.Close()

// create input channel
ch := make(chan map[string]interface{})
_, err = rpc_client.Stream("member-join,member-failed,member-update,member-leave,member-reap", ch)
if err != nil {
panic(err)
if *debug {
log.Printf("directives: %v", *directives)
}

for {
// wait for signal from serf
<-ch

// get members' information
members, err := rpc_client.MembersFiltered(directives.Tags, directives.Status, directives.Name)
if err != nil {
panic(err)
}
agent := NewAgent(directives)

// for each templates:
// - render template
// - execute command if any
for i := 0; i < len(directives.Templates); i++ {
RenderTemplate(directives.Templates[i].Src, directives.Templates[i].Dest, members)

if directives.Templates[i].Cmd != "" {
cmd_args := strings.Split(directives.Templates[i].Cmd, " ")
cmd := exec.Command(cmd_args[0], cmd_args[1:]...)
err := cmd.Run()
if err != nil {
panic(err)
}
}
}
}
agent.run()
}
3 changes: 2 additions & 1 deletion test/config_2.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@
"rpc-addr": "127.0.0.1:7373",
"rpc-auth": "rpcauthtoken",
"rpc-timeout": 500,
"rpc-workers": 5,
"templates": [
{
"src": "/path/to/template.tpl",
"dest": "/path/to/result.file",
"cmd": "service dummy restart"
"cmd": "ls"
}
]
}
81 changes: 81 additions & 0 deletions worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package main

import (
rpc "github.com/hashicorp/serf/client"
"os/exec"
"strings"
)

type Worker struct {
Directives *Directive
Client *rpc.RPCClient
DeadNode chan bool
}

func NewWorker(config *Directive, rpc_config *rpc.Config, deadNode chan bool) *Worker {
// create connection to the RPC interface
rpc_client, err := rpc.ClientFromConfig(rpc_config)
if err != nil {
panic(err)
}

w := Worker{
Directives: config,
Client: rpc_client,
DeadNode: deadNode,
}
go w.run()

return &w
}

func (w *Worker) run() {
defer func() {
w.Client.Close()
w.DeadNode <- true
}()

ch := make(chan map[string]interface{})
suscription, err := w.Client.Stream("member-join,member-failed,member-update,member-leave,member-reap", ch)
if err != nil {
panic(err)
}
defer w.Client.Stop(suscription)

for {
// wait for signal from serf
<-ch

if err = w.processTemplates(); err != nil {
panic(err)
}
}
}

func (w *Worker) processTemplates() error {
members, err := w.Client.MembersFiltered(w.Directives.Tags, w.Directives.Status, w.Directives.Name)
if err != nil {
return err
}

// for each templates:
// - render template
// - execute command if any
for i := 0; i < len(w.Directives.Templates); i++ {
RenderTemplate(w.Directives.Templates[i].Src, w.Directives.Templates[i].Dest, members)

if w.Directives.Templates[i].Cmd != "" {
err = w.runCmd(w.Directives.Templates[i].Cmd)
if err != nil {
return err
}
}
}
return nil
}

func (w *Worker) runCmd(cmdString string) error {
cmd_args := strings.Split(cmdString, " ")
cmd := exec.Command(cmd_args[0], cmd_args[1:]...)
return cmd.Run()
}